In [0]:
!pip install transformers torch mlflow
dbutils.library.restartPython()

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
Collecting transformers
  Downloading transformers-4.42.4-py3-none-any.whl (9.3 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 9.3/9.3 MB 16.7 MB/s eta 0:00:00
Collecting torch
  Downloading torch-2.3.1-cp310-cp310-manylinux1_x86_64.whl (779.1 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 779.1/779.1 MB 899.9 kB/s eta 0:00:00
Collecting tokenizers<0.20,>=0.19
  Downloading tokenizers-0.19.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.6 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 3.6/3.6 MB 40.1 MB/s eta 0:00:00
Collecting safetensors>=0.4.1
  Downloading safetensors-0.4.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.2 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 1.2/1.2 MB 22.5 MB/s eta 0:00:00
Collecting regex!=2019.12.17
  Downloading regex-2024.5.15-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (775 kB)
     ━

In [0]:
import boto3
import mlflow
import warnings
import pandas as pd
from io import BytesIO

import torch
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import DataLoader, Dataset

from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report

from transformers import RobertaTokenizer, RobertaForSequenceClassification, AdamW

# Suppress warnings
warnings.filterwarnings("ignore")

### S3 Data Loading

In [0]:
def load_s3_data_to_dataframe(s3_client, bucket_name, file_key):
    """
    Fetches and converts a single CSV file from an S3 bucket into a DataFrame.
    """
    response = s3_client.get_object(Bucket=bucket_name, Key=file_key)
    response_body = response['Body'].read()
    data = pd.read_csv(BytesIO(response_body))
    return data


def load_recent_csvs_from_s3(s3_client, bucket_name, prefix):
    """
    Retrieves the two most recent CSV files from a specified folder in an S3 bucket and merges them into one DataFrame.
    """
    response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
    
    files = sorted(
        (item for item in response.get('Contents', []) if item['Key'].endswith('.csv')),
        key=lambda item: item['LastModified'],
        reverse=True
    )
    data_frames = [
        pd.read_csv(BytesIO(s3_client.get_object(Bucket=bucket_name, Key=file['Key'])['Body'].read()))
         for file in files[:2]
    ]
    combined_df = pd.concat(data_frames, ignore_index=True)
    return combined_df



aws_access_key_id = dbutils.secrets.get(scope="mlops", key="aws_access_key_id")
aws_secret_access_key = dbutils.secrets.get(scope="mlops", key="aws_secret_access_key")


client = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
bucket_name = 'pofce-mlops-bucket'
final_val_data = load_s3_data_to_dataframe(client, bucket_name, 'val_data.csv')
data = load_recent_csvs_from_s3(client, bucket_name, 'train_data/')

In [0]:
# data = data.iloc[:750]
# final_val_data = final_val_data.iloc[:250]

### Train_data preparation

In [0]:
data.dropna(inplace=True)
data.drop_duplicates(subset=['clean_text'], inplace=True)

texts = data['clean_text'].values
labels = data['category'].values

train_texts, val_texts, train_labels, val_labels = train_test_split(texts, labels, test_size=0.2, random_state=42)

label_encoder = LabelEncoder()
encoded_labels_train = label_encoder.fit_transform(train_labels)
label_mapping = {original_label: int_label for original_label, int_label in zip(train_labels, encoded_labels_train)}

label_encoder = LabelEncoder()
encoded_labels_valid = label_encoder.fit_transform(val_labels)
label_mapping = {original_label: int_label for original_label, int_label in zip(val_labels, encoded_labels_valid)}


tokenizer = RobertaTokenizer.from_pretrained('roberta-base')
model = RobertaForSequenceClassification.from_pretrained('roberta-base', num_labels=3)


class Sentiment(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len=128):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = str(self.texts[idx])
        label = self.labels[idx]
        encoding = self.tokenizer.encode_plus(
            text,
            truncation=True,
            add_special_tokens=True,
            max_length=self.max_len,
            return_token_type_ids=False,
            pad_to_max_length=True,
            return_attention_mask=True,
            return_tensors='pt',
        )
        return {
            'text': text,
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': torch.tensor(label, dtype=torch.long)
        }


train_labels = encoded_labels_train
val_labels = encoded_labels_valid

train_data = Sentiment(train_texts, train_labels, tokenizer, max_len=128)
val_data = Sentiment(val_texts, val_labels, tokenizer, max_len=128)

train_loader = DataLoader(train_data, batch_size=32, shuffle=True)
val_loader = DataLoader(val_data, batch_size=32)

Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at roberta-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


### Evaluation of Model Performance on Test Data
This code cell is designed to evaluate a trained model's performance on a validation dataset by calculating metrics such as accuracy

In [0]:
def get_test_scores(device):
    final_val_data.rename(columns={'clean_comment': 'clean_text'}, inplace=True)

    final_val_data.dropna(inplace=True)

    final_val_data.drop_duplicates(subset=['clean_text'], inplace=True)

    final_val_data.reset_index(drop=True, inplace=True)

    from torch.utils.data import DataLoader

    test_enc_labels = label_encoder.transform(final_val_data['category'])
    test_dataset = Sentiment(final_val_data['clean_text'], test_enc_labels, tokenizer, max_len=128)
    test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False)

    model.eval()

    all_predictions = []
    all_true_labels = []

    with torch.no_grad():
        for batch in test_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            outputs = model(input_ids, attention_mask=attention_mask)
            predictions = torch.argmax(outputs.logits, dim=1)

            all_predictions.extend(predictions.cpu().numpy())
            all_true_labels.extend(labels.cpu().numpy())

    predicted_labels = label_encoder.inverse_transform(all_predictions)
    true_labels = label_encoder.inverse_transform(all_true_labels)
    
    accuracy = accuracy_score(true_labels, predicted_labels)
    report_dict = classification_report(true_labels, predicted_labels, output_dict=True)
    return accuracy, report_dict

### Setting Up MLflow Model Signature

In [0]:
from mlflow.models.signature import ModelSignature
from mlflow.types.schema import Schema, ColSpec

input_schema = Schema([ColSpec("long", "input_ids"), ColSpec("long", "attention_mask"), ColSpec("long", "labels")])
output_schema = Schema([ColSpec("float", "logits")])

signature = ModelSignature(inputs=input_schema, outputs=output_schema)

### Model Train

In [0]:
def train_model(model, train_loader, val_loader):
    with mlflow.start_run(run_name="MLOPs Tracking Enhanced"):
        # Define hyperparameters
        params = {
            "learning_rate": 1e-5,
            "epochs": 1,
            "optimizer": "AdamW",
            "batch_size": len(train_loader.batch_sampler),
            "scheduler": "StepLR"
        }
        mlflow.log_params(params)

        # Setup optimizer and scheduler
        optimizer = AdamW(model.parameters(), lr=params["learning_rate"])
        scheduler = StepLR(optimizer, step_size=1, gamma=0.1)
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        model.to(device)

        # Training loop
        for epoch in range(params["epochs"]):
            model.train()
            epoch_loss = 0
            for batch in train_loader:
                optimizer.zero_grad()
                input_ids = batch['input_ids'].to(device)
                attention_mask = batch['attention_mask'].to(device)
                labels = batch['labels'].to(device)
                outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
                loss = outputs.loss  # Assuming the model returns a named tuple with loss
                loss.backward()
                optimizer.step()
                epoch_loss += loss.item()
            scheduler.step()
            mlflow.log_metric("loss", epoch_loss / len(train_loader), step=epoch)

            # Validation loop
            accuracy, report_dict = get_test_scores(device)
            mlflow.log_metric("accuracy", accuracy)
            mlflow.log_metric("macro_avg_precision", report_dict["macro avg"]["precision"])
            mlflow.log_metric("macro_avg_recall", report_dict["macro avg"]["recall"])
            mlflow.log_metric("macro_avg_f1-score", report_dict["macro avg"]["f1-score"])

        # Log and register model with a signature
        mlflow.pytorch.log_model(model, "model", signature=signature)
        model_name = "MyModel"
        model_uri = f"runs:/{mlflow.active_run().info.run_id}/model"
        model_info = mlflow.register_model(model_uri, model_name)

        return model_info


challenger_info = train_model(model, train_loader, val_loader)

Uploading artifacts:   0%|          | 0/10 [00:00<?, ?it/s]

Registered model 'MyModel' already exists. Creating a new version of this model...


Downloading artifacts:   0%|          | 0/10 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/10 [00:00<?, ?it/s]

Created version '2' of model 'mlops.default.mymodel'.


### Model Version Management with MLflow

This process ensures that the most accurate model is always designated as the "Champion," fostering a continuous improvement cycle in model deployment.

In [0]:
from mlflow.tracking import MlflowClient

client = MlflowClient()
if challenger_info.version == '1':
    client.set_registered_model_alias(challenger_info.name, "Champion", challenger_info.version)
else:
    champion_version = client.get_model_version_by_alias(challenger_info.name, "Champion")
    champion_accuracy = client.get_metric_history(champion_version.run_id, "accuracy")[0].value
    challenger_accuracy = client.get_metric_history(challenger_info.run_id, "accuracy")[0].value

    if champion_accuracy < challenger_accuracy:
        client.set_registered_model_alias(champion_version.name, "Retired", champion_version.version)
        client.set_registered_model_alias(challenger_info.name, "Champion", challenger_info.version)

### Automatic Deployment

This feature has been disabled because it is not available in the trial version. Instead, we are using a local API. However, this feature will be fully functional in the complete workspace environment.

In [0]:
# import json
# import requests

# champion_info = client.get_model_version_by_alias("mymodel", "Champion")

# API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() 
# API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()

# data = {
#     "name": "SentimentService",
#     "config": {
#         "served_entities": [
#             {
#                 "entity_name": champion_info.name,
#                 "entity_version": champion_info.version,
#                 "workload_size": "Small",
#                 "scale_to_zero_enabled": False,
#                 "workload_type": "CPU",
#             }
#         ]
#     },
# }

# headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"}

# response = requests.post(
#     url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers
# )

# print(json.dumps(response.json(), indent=4))

# {
#     "error_code": "FEATURE_DISABLED",
#     "message": "Model serving is not available for trial workspaces. Please contact your organization admin or Databricks support."
# }

{
    "error_code": "FEATURE_DISABLED",
    "message": "Model serving is not available for trial workspaces. Please contact your organization admin or Databricks support."
}
