## Load and Preprocess our Dataset

In [0]:
# load our slightly preprocessed dataset

from datasets import load_dataset

cleaned = load_dataset("MarioBarbeque/UCI_drug_reviews")



Downloading readme:   0%|          | 0.00/1.22k [00:00<?, ?B/s]



Downloading data:   0%|          | 0.00/39.8M [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/9.95M [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/16.6M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/122092 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/30530 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/50882 [00:00<?, ? examples/s]

In [0]:
# Define our labels as the set of all 805 possible medical conditions across our dataset

from datasets import ClassLabel

train_conditions = set(cleaned["train"].unique("condition"))
validate_conditions = set(cleaned["validation"].unique("condition"))
test_conditions = set(cleaned["test"].unique("condition"))
conditions = train_conditions | validate_conditions | test_conditions # union operator
condition_labels = ClassLabel(num_classes=len(conditions), names=list(conditions))

# Mapping Labels to IDs
def map_label_to_class_index(example):
    example['label'] = condition_labels.str2int(example['condition'])
    return example

def map_class_index_to_label(example):
    example['label_name'] = condition_labels.int2str(example['label'])
    return example

labeled = cleaned.map(map_label_to_class_index, batched=True)
labeled = labeled.map(map_class_index_to_label, batched=True)

condition_labels.num_classes

Map:   0%|          | 0/122092 [00:00<?, ? examples/s]

Map:   0%|          | 0/30530 [00:00<?, ? examples/s]

Map:   0%|          | 0/50882 [00:00<?, ? examples/s]

Map:   0%|          | 0/122092 [00:00<?, ? examples/s]

Map:   0%|          | 0/30530 [00:00<?, ? examples/s]

Map:   0%|          | 0/50882 [00:00<?, ? examples/s]

805

In [0]:
# define the tokenzier and data collator, as inhereted from RoBERTa

from transformers import AutoTokenizer, DataCollatorWithPadding

tokenizer = AutoTokenizer.from_pretrained("FacebookAI/roberta-base")

def tokenize_function(example):
    return tokenizer(example["review"], truncation=True)


tokenized_datasets = cleaned.map(tokenize_function, batched=True)
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

2024-11-04 19:27:37.204767: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


Map:   0%|          | 0/122092 [00:00<?, ? examples/s]

Map:   0%|          | 0/30530 [00:00<?, ? examples/s]

Map:   0%|          | 0/50882 [00:00<?, ? examples/s]

In [0]:
tokenized_datasets

DatasetDict({
    train: Dataset({
        features: ['patient_id', 'drugName', 'condition', 'review', 'rating', 'date', 'usefulCount', 'review_length', 'labels', 'input_ids', 'attention_mask'],
        num_rows: 122092
    })
    validation: Dataset({
        features: ['patient_id', 'drugName', 'condition', 'review', 'rating', 'date', 'usefulCount', 'review_length', 'labels', 'input_ids', 'attention_mask'],
        num_rows: 30530
    })
    test: Dataset({
        features: ['patient_id', 'drugName', 'condition', 'review', 'rating', 'date', 'usefulCount', 'review_length', 'labels', 'input_ids', 'attention_mask'],
        num_rows: 50882
    })
})

In [0]:
# remove all extraneous columns

tokenized_datasets = tokenized_datasets.remove_columns(["patient_id", "drugName", "condition", "review", "rating", "date", "usefulCount", "review_length"])

In [0]:
# peek remaining features

tokenized_datasets["train"]

Dataset({
    features: ['labels', 'input_ids', 'attention_mask'],
    num_rows: 122092
})

## Train our Model

In [0]:
from transformers import AutoModelForSequenceClassification

hf_location = "FacebookAI/roberta-base"
model = AutoModelForSequenceClassification.from_pretrained(hf_location, num_labels=condition_labels.num_classes)

Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at /Volumes/workspace_dogfood/jgr/hugging_face_cache/hub/models--FacebookAI--roberta-base/snapshots/e2da8e2f811d1448a5b465c236feacd80ffbac7b/ and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [0]:
# this cell can be skipped - purely explanatory

# we make use of the AdamW optimizer with a learning rate of 5e-5 and a linear scheduler
# we make this cell explicit here, but AdamW and a scheduled linear learning rate are implied for the 
# HF Trainer API we eventually use

from torch.optim import AdamW
from transformers import get_scheduler

optimizer = AdamW(model.parameters(), lr=5e-5)

num_epochs = 3
num_training_steps = num_epochs * len(train_dataloader)
lr_scheduler = get_scheduler(
    "linear",
    optimizer=optimizer,
    num_warmup_steps=0,
    num_training_steps=num_training_steps,
)
print(num_training_steps)

In [0]:
# move model to GPU

import torch

device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
model.to(device)
device

device(type='cuda')

In [0]:
# function to keep track of our memory usage as we train

def mem_status(): 
    if torch.cuda.is_available():
        gpus = torch.cuda.device_count()
        print("Memory status: ")
        for i in range(gpus):
            properties = torch.cuda.get_device_properties(i)
            total_memory = properties.total_memory / (1024 ** 3)  # Convert to GB
            allocated_memory = torch.cuda.memory_allocated(i) / (1024 ** 3)  # Convert to GB
            reserved_memory = torch.cuda.memory_reserved(i) / (1024 ** 3)  # Convert to GB
            available_memory = total_memory - reserved_memory
            print(f"GPU {i}:")
            print(f"  Total memory: {total_memory:.2f} GB")
            print(f"  Allocated memory: {allocated_memory:.2f} GB")
            print(f"  Reserved memory: {reserved_memory:.2f} GB")
            print(f"  Available memory: {available_memory:.2f} GB")
    else:
        print("No GPU available.")

mem_status()

Memory status: 
GPU 0:
  Total memory: 15.57 GB
  Allocated memory: 0.47 GB
  Reserved memory: 0.52 GB
  Available memory: 15.06 GB


In [0]:
# Training loop where we make use of teh HF Trainer for easy setup
# We make use of the default AdamW optimizer and linear LR scheduler

import numpy as np
from datasets import load_metric

output_dir = "/Volumes/workspace_dogfood/jgr/distributed_training_cache/test" # Author's local save location

def train_model():
    from transformers import TrainingArguments, Trainer

    training_args = TrainingArguments(
      output_dir=output_dir,
      learning_rate=5e-5,
      per_device_train_batch_size=16,
      per_device_eval_batch_size=16,
      num_train_epochs=3,
      weight_decay=0.01,
      save_strategy="epoch",
      report_to=[], # in case we want to enable MLFlow logging
      push_to_hub=False,  # we do this manually in the end
      load_best_model_at_end=True,
      metric_for_best_model="eval_loss",
      evaluation_strategy="epoch"
    )

    trainer = Trainer(
      model=model,
      args=training_args,
      train_dataset=tokenized_datasets["train"],
      eval_dataset=tokenized_datasets["validation"],
      tokenizer=tokenizer,
      data_collator=data_collator,
    #   compute_metrics=compute_metrics, # we explicitly run our evaluation loop later to account for bugs in the Hugging Face Evaluate library
    )
    trainer.train()
    return trainer.state.best_model_checkpoint

In [0]:
# use the PyTorch `TorchDistributor` class from PySpark to run our training on a single-node multi GPU compute instance

from pyspark.ml.torch.distributor import TorchDistributor

NUM_PROCESSES = torch.cuda.device_count()
print(f"We're using {NUM_PROCESSES} GPUs")
trained_checkpoint = TorchDistributor(num_processes=NUM_PROCESSES, local_mode=True, use_gpu=True).run(train_model)

We're using 1 GPUs


Started local training with 1 processes


com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:136)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:728)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:446)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:446)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can

## Evaluate our Model!

In [0]:
# now empty GPU cache before loading model back onto GPUs for evaluation

torch.cuda.empty_cache()

In [0]:
# reload our model explicitly into memory to check it out and then evaluate

trained_model = AutoModelForSequenceClassification.from_pretrained(trained_checkpoint, device_map="auto", num_labels=condition_labels.num_classes)

In [0]:
# look at model details, specifically the number of out_features (805)
# this is the number of labels we are predicting and is nontrially 805 since we have dispatched the 
# old RoBERTa fill-mask model head in favor of a classification head

trained_model

In [0]:
# created a PyTorch validaiton DataLoader for eval

from torch.utils.data import DataLoader

validation_dataloader = DataLoader(tokenized_datasets["validation"], shuffle=False, batch_size=8, collate_fn=data_collator)

In [0]:
# hack for testing a single batch through the model

for batch in validation_dataloader:
    break
batch = {k: v.to(torch.device("cuda")) for k, v in batch.items()}

In [0]:
# confirm we get logits

output = trained_model(**batch)
output.logits

In [0]:
# RELEVANT TO NEXT 3 CELLS:

# copy and paste custom evaluation metrics from Author's GitHub for robust multilabel classification
# code available here: https://github.com/johngrahamreynolds/FixedMetricsForHF

# ideally one could clone this repo or I could create a python small wheel that contains the relevant
# classes for download and import it, but C&P is easy enough despite some minimal redundancy

In [0]:
import datasets
import evaluate
from evaluate import evaluator, Metric
# from evaluate.metrics.f1 import F1
from sklearn.metrics import f1_score

# could in principle subclass F1, but ideally we can work the fix into the F1 class to maintain SOLID code
class FixedF1(evaluate.Metric):

    def __init__(self, average="binary"):
        super().__init__()
        self.average = average
        # additional values passed to compute() could and probably should (?) all be passed here so that the final computation is configured immediately at Metric instantiation

    def _info(self):
        return evaluate.MetricInfo(
            description="Custom built F1 metric for true *multilabel* classification - the 'multilabel' config_name var in the evaluate.EvaluationModules class appears to better address multi-class classification, where features can fall under a multitude of labels. Granted, the subtely is minimal and easily confused. This class is implemented with the intention of enabling the evaluation of multiple multilabel classification metrics at the same time using the evaluate.CombinedEvaluations.combine method.",
            citation="",
            inputs_description="'average': This parameter is required for multiclass/multilabel targets. If None, the scores for each class are returned. Otherwise, this determines the type of averaging performed on the data. Options include: {‘micro’, ‘macro’, ‘samples’, ‘weighted’, ‘binary’} or None.",
            features=datasets.Features(
                {
                    "predictions": datasets.Sequence(datasets.Value("int32")),
                    "references": datasets.Sequence(datasets.Value("int32")),
                }
                if self.config_name == "multilabel"
                else {
                    "predictions": datasets.Value("int32"),
                    "references": datasets.Value("int32"),
                }
            ),
            reference_urls=["https://scikit-learn.org/stable/modules/generated/sklearn.metrics.f1_score.html"],
        )
    
    # could remove specific kwargs like average, sample_weight from _compute() method of F1

    def _compute(self, predictions, references, labels=None, pos_label=1, average="binary", sample_weight=None):
        score = f1_score(
            references, predictions, labels=labels, pos_label=pos_label, average=self.average, sample_weight=sample_weight
        )
        return {"f1": float(score) if score.size == 1 else score}

In [0]:
import datasets
import evaluate
from evaluate import evaluator, Metric
# from evaluate.metrics.precision import Precision
from sklearn.metrics import precision_score

# could in principle subclass Precision, but ideally we can work the fix into the Precision class to maintain SOLID code
class FixedPrecision(evaluate.Metric):

    def __init__(self, average="binary", zero_division="warn"):
        super().__init__()
        self.average = average
        self.zero_division = zero_division
        # additional values passed to compute() could and probably should (?) all be passed here so that the final computation is configured immediately at Metric instantiation

    def _info(self):
        return evaluate.MetricInfo(
            description="Custom built Precision metric for true *multilabel* classification - the 'multilabel' config_name var in the evaluate.EvaluationModules class appears to better address multi-class classification, where features can fall under a multitude of labels. Granted, the subtlety is minimal and easily confused. This class is implemented with the intention of enabling the evaluation of multiple multilabel classification metrics at the same time using the evaluate.CombinedEvaluations.combine method.",
            citation="",
            inputs_description="'average': This parameter is required for multiclass/multilabel targets. If None, the scores for each class are returned. Otherwise, this determines the type of averaging performed on the data. Options include: {‘micro’, ‘macro’, ‘samples’, ‘weighted’, ‘binary’} or None.",
            features=datasets.Features(
                {
                    "predictions": datasets.Sequence(datasets.Value("int32")),
                    "references": datasets.Sequence(datasets.Value("int32")),
                }
                if self.config_name == "multilabel"
                else {
                    "predictions": datasets.Value("int32"),
                    "references": datasets.Value("int32"),
                }
            ),
            reference_urls=["https://scikit-learn.org/stable/modules/generated/sklearn.metrics.precision_score.html"],
        )
    
    # could remove specific kwargs like average, sample_weight from _compute() method and simply pass them to the underlying scikit-learn function in the form of a class var self.*

    def _compute(
        self, predictions, references, labels=None, pos_label=1, average="binary", sample_weight=None, zero_division="warn",
    ):
        score = precision_score(
            references, predictions, labels=labels, pos_label=pos_label, average=self.average, sample_weight=sample_weight, zero_division=self.zero_division,
        )
        return {"precision": float(score) if score.size == 1 else score}

In [0]:
import datasets
import evaluate
from evaluate import evaluator, Metric
# from evaluate.metrics.recall import Recall
from sklearn.metrics import recall_score

# could in principle subclass Recall, but ideally we can work the fix into the Recall class to maintain SOLID code
class FixedRecall(evaluate.Metric):

    def __init__(self, average="binary"):
        super().__init__()
        self.average = average
        # additional values passed to compute() could and probably should (?) all be passed here so that the final computation is configured immediately at Metric instantiation

    def _info(self):
        return evaluate.MetricInfo(
            description="Custom built Recall metric for true *multilabel* classification - the 'multilabel' config_name var in the evaluate.EvaluationModules class appears to better address multi-class classification, where features can fall under a multitude of labels. Granted, the subtlety is minimal and easily confused. This class is implemented with the intention of enabling the evaluation of multiple multilabel classification metrics at the same time using the evaluate.CombinedEvaluations.combine method.",
            citation="",
            inputs_description="'average': This parameter is required for multiclass/multilabel targets. If None, the scores for each class are returned. Otherwise, this determines the type of averaging performed on the data. Options include: {‘micro’, ‘macro’, ‘samples’, ‘weighted’, ‘binary’} or None.",
            features=datasets.Features(
                {
                    "predictions": datasets.Sequence(datasets.Value("int32")),
                    "references": datasets.Sequence(datasets.Value("int32")),
                }
                if self.config_name == "multilabel"
                else {
                    "predictions": datasets.Value("int32"),
                    "references": datasets.Value("int32"),
                }
            ),
            reference_urls=["https://scikit-learn.org/stable/modules/generated/sklearn.metrics.f1_score.html"],
        )
    
    # could remove specific kwargs like average, sample_weight from _compute() method and simply pass them to the underlying scikit-learn function in the form of a class var self.*

    def _compute(
        self, predictions, references, labels=None, pos_label=1, average="binary", sample_weight=None, zero_division="warn",
    ):
        score = recall_score(
            references, predictions, labels=labels, pos_label=pos_label, average=self.average, sample_weight=sample_weight, zero_division=zero_division,
        )
        return {"recall": float(score) if score.size == 1 else score}

In [0]:
# instaniate our combined custom metrics

import evaluate
import numpy as np

f1 = FixedF1(average="weighted")
precision = FixedPrecision(average="weighted", zero_division=np.nan)
recall = FixedRecall(average="weighted")
accuracy = evaluate.load("accuracy")

combined = evaluate.combine([f1, accuracy, recall, precision])

In [0]:
# now we run our evaluation loop

from tqdm.auto import tqdm

test_dataloader = DataLoader(tokenized_datasets["test"], shuffle=False, batch_size=8, collate_fn=data_collator)

num_batches = len(test_dataloader) 
progress_bar = tqdm(range(num_batches))

trained_model.eval()

# evaluation loop
for batch in test_dataloader:
    batch = {k: v.to(torch.device("cuda")) for k, v in batch.items()}
    with torch.no_grad():
        outputs = trained_model(**batch)

    logits = outputs.logits
    predictions = torch.argmax(logits, dim=-1)
    combined.add_batch(predictions=predictions, references=batch["labels"])

    progress_bar.update(1)

combined.compute()

## Push our Model to the Hub

In [0]:
# the model is pushed to the hub with the standard
model_name = "RoBERTa-base-DReiFT"
trained_model.push_to_hub(model_name, commit_message="Pushing fine-tuned RoBERTa model")

In [0]:
# Lastly, we make one final modification to the model's config by updating the label names to
# reflect the medical conditions that we are classifying
# This will improve ease of inference

id2label_dict = dict()
for i in range(condition_labels.num_classes):
    id2label_dict[i] = condition_labels.int2str(i)
label2id_dict = {v: k for k, v in id2label_dict.items()}

In [0]:
# now we reload the model from the hub but correct the config as we want it with the proper label dicts
from transformers import AutoConfig, AutoModelForSequenceClassification

# define config
config = AutoConfig.from_pretrained(model_name, label2id=label2id_dict, id2label=id2label_dict)

# load model with config
model = AutoModelForSequenceClassification.from_pretrained(model_name, config=config)

# repush corrected model
model.push_to_hub(model_name, commit_message="Repushing corrected RoBERTa model with proper label names in config")

## Inference

In [0]:
# Now, as detailed in the model card, one can use the model as follows:

from transformers import AutoModelForSequenceClassification, AutoTokenizer

model_name = "MarioBarbeque/RoBERTa-base-DReiFT"
tokenizer_name = "FacebookAI/roberta-base"

model = AutoModelForSequenceClassification.from_pretrained(model_name, device_map="auto")
tokenizer = AutoTokenizer.from_pretrained(tokenizer_name)

# Pass a unique 'drug-review' to classify the underlying condition based upon 805 pretrained medical issues
drug_review = ["My tonsils were swollen and I had a hard time swallowing. I had a minimal fever to accompany the pain in my throat. Taking Aleve at regular intervals throughout the day improved my swallowing. I am now taking Aleve every 4 hours."]
tokenized_review = tokenizer(drug_review, return_tensors="pt", truncation=True, padding=True).to("cuda")

output = model(**tokenized_review)
predicted_label = model.config.id2label[torch.argmax(output.logits, dim=-1).item()]
print(f"The model predicted the underlying condition to be: {predicted_label}")