Weights & Biases - Week 1 

For this example, we will be using the Complaints Dataset available on the HuggingFace Hub to build a classification model. 

The (fictional) business case of this model is that a corporation wants to use a classifier to help route complaints to the right departments and teams to improve the customer journey. The complaints department has collected a large amount of complaints and which departments dealt with them, highlighting the most probable issue.

We start by importing the required libraries to run this example notebook

In [None]:
import torch
import wandb
import sys
from pathlib import Path
from datasets import load_dataset, Dataset
from torch import tensor, nn, device, cuda
from transformers import AutoTokenizer, TrainingArguments, Trainer, AutoModelForSequenceClassification, DataCollatorWithPadding
from transformers.trainer_callback import EarlyStoppingCallback, TrainerCallback
from huggingface_hub import HfFolder
from datasets import load_metric
import numpy as np
import pandas as pd

Start by Logging into HF Hub and Weights and Biases

In [None]:
### Login to HF Hub

In [None]:
import wandb

wandb.login()

We set up our Weights and Biases Parameters at the start so we can have them easily accessible in one place if we want to change the way our data lineage dag looks

In [None]:
# WANDB BASE PARAMETERS
PROJECT_NAME = 'wandb-week-1-complaints-classifier'
ENTITY = "wandb_course"
# WANDB ARTIFACT TYPES
DATASET_TYPE = "dataset"
MODEL_TYPE = "model"
# WANDB JOB TYPES
RAW_DATA_JOB_TYPE = "fetch-raw-data"
DATA_PROCESSING_JOB_TYPE = "process-data"
SPLIT_DATA_JOB_TYPE = "split-data"
MODEL_TRAINING_JOB_TYPE = "model-training"
# WANDB ARTIFACT NAMES
RAW_DATA_ARTIFACT = "complaints_raw_data"
PROCESSED_DATA_ARTIFACT = "complaints_raw_data"
TRAIN_DATA_ARTIFACT = "complaints_train_data"
TEST_DATA_ARTIFACT = "complaints_test_data"
# DATA FOLDERS
RAW_DATA_FOLDER = 'complaints-dataset/raw'
PROCESSED_DATA_FOLDER = 'complaints-dataset/processed'
TRAIN_DATA_FOLDER = 'complaints-dataset/train'
TEST_DATA_FOLDER = 'complaints-dataset/test'
MODEL_DATA_FOLDER = 'complaints-model'
# DATASET COLUMNS TO KEEP
TEXT_COLUMN = "Complaints Text"
TARGET_COLUMN = "Sub Product"
# TRANSFORMERS PARAMETERS
MODEL_NAME = "distilbert-base-uncased"
NUM_EPOCHS = 3
TRAIN_BATCH_SIZE = 32
EVAL_BATCH_SIZE = 32
WARMUP_STEPS = 500
LEARNING_RATE = 5e-5
FP16 = True
# HUB PARAMETERS
PUSH_TO_HUB = True
HUB_MODEL_ID = "distilbert-complaints-wandb"
HUB_STRATEGY = "every_save"

# Data Preparation

### Download & Log Raw Data

In [None]:
run = wandb.init(project=PROJECT_NAME, entity=ENTITY, job_type=RAW_DATA_JOB_TYPE)

# Loading consumer complaints dataset - Note: This is a big dataset
text_dataset = load_dataset("consumer-finance-complaints", ignore_verifications=True)

# Create and log the raw data artifact
raw_data_art = wandb.Artifact(text_dataset, type=DATASET_TYPE)
raw_data_art.add_dir(RAW_DATA_FOLDER, name=RAW_DATA_ARTIFACT)
run.log_artifact(raw_data_art)

# end the RAW DATA job
run.finish()

### Process & Log Data

Set-up the tokenizer and encoder function

In [None]:
# download tokenizer
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

# Used to encode the labels
label2id = text_dataset['train'].features[TARGET_COLUMN].str2int()

# Used later to initialise the model
number_classes = text_dataset['train'].features['labels'].num_classes
id2label = text_dataset['train'].features[TARGET_COLUMN].int2str()

# tokenizer helper function
def tokenize(batch):
    tokenized_batch = tokenizer(batch['text'], padding='max_length', truncation=True)
    tokenized_batch["labels"] = [label2id[label] for label in batch["labels"]]
    return tokenized_batch

Process the data as part of a wandb run

In [None]:
run = wandb.init(project=PROJECT_NAME, entity=ENTITY, job_type=DATA_PROCESSING_JOB_TYPE)

# By including `use_artifact` we're logging the usage to W&B and can track it as part of the lineage
text_dataset = run.use_artifact(f'{RAW_DATA_ARTIFACT}:latest')

# Extracting the target column, there is only one split at this point (train)
columns = text_dataset['train'].column_names

# Remove the columns which aren't in scope for us
remove_cols = [e for e in columns if e not in (TEXT_COLUMN, TARGET_COLUMN)]
processed_data = text_dataset.remove_columns(remove_cols)

# Renaming the columns to the names expected by the classifier
processed_data = processed_data.rename_column(TEXT_COLUMN, "text")
processed_data = processed_data.rename_column(TARGET_COLUMN, "labels")

# Filtering out empty/no-text complaints
processed_data = processed_data.filter(lambda example: len(example['text'])>0)

# tokenize dataset
processed_data = processed_data['train'].map(tokenize, batched=True)

# set format for pytorch
processed_data.set_format('torch', columns=['input_ids', 'attention_mask', 'labels'])

# Create and log the raw data artifact
processed_data_art = wandb.Artifact(processed_data, type=DATASET_TYPE)
processed_data_art.add_dir(PROCESSED_DATA_FOLDER, name=PROCESSED_DATA_ARTIFACT)
run.log_artifact(processed_data_art)

# End the PROCESS DATA job
run.finish()

#### Split Data


In [None]:
run = wandb.init(project=PROJECT_NAME, entity=ENTITY, job_type=SPLIT_DATA_JOB_TYPE)

# By including `use_artifact` we're logging the usage to W&B and can track it as part of the lineage
processed_data = run.use_artifact(f'{PROCESSED_DATA_ARTIFACT}:latest')

# Splitting the dataset into training and validation datasets
split_data = processed_data['train'].train_test_split(test_size=0.2,seed=0)
train_dataset = split_data['train']
test_dataset = split_data['test']

# Create and log the train data artifact
train_data_art = wandb.Artifact(processed_data, type=DATASET_TYPE)
train_data_art.add_dir(TRAIN_DATA_FOLDER, name=TRAIN_DATA_ARTIFACT)
run.log_artifact(train_data_art)

# Create and log the test data artifact
test_data_art = wandb.Artifact(processed_data, type=DATASET_TYPE)
test_data_art.add_dir(TEST_DATA_FOLDER, name=TEST_DATA_ARTIFACT)
run.log_artifact(test_data_art)

# End the SPLIT DATA job
run.finish()

### Set-Up Model

Set-Up the Training Arguments

In [None]:
training_args = TrainingArguments(
output_dir=MODEL_DATA_FOLDER.as_posix(),
num_train_epochs=NUM_EPOCHS,
per_device_train_batch_size=TRAIN_BATCH_SIZE,
per_device_eval_batch_size=EVAL_BATCH_SIZE,
warmup_steps=WARMUP_STEPS,
fp16=FP16,
learning_rate=float(LEARNING_RATE),
# logging & evaluation strategies
logging_dir=f"{MODEL_DATA_FOLDER.as_posix()}/logs",
logging_steps=50, 
evaluation_strategy="steps",
eval_steps=2000,
save_strategy="steps",
save_steps=2000,
save_total_limit=2,
load_best_model_at_end=True,
metric_for_best_model="f1",
report_to="wandb",
# push to hub parameters
push_to_hub=PUSH_TO_HUB,
hub_strategy=HUB_STRATEGY,
hub_model_id=HUB_MODEL_ID,
#hub_token=args.hub_token,
)

Set-Up Metrics

In [None]:

def compute_metrics(eval_pred):
  # define metrics and metrics function
  f1_metric = load_metric("f1")
  accuracy_metric = load_metric( "accuracy")
  recall_metric = load_metric("recall")
  precision_metric = load_metric("precision")
  
  predictions, labels = eval_pred
  predictions = np.argmax(predictions, axis=1) # predictions.argmax(-1)
  acc = accuracy_metric.compute(predictions=predictions, references=labels)
  recall = recall_metric.compute(predictions=predictions, references=labels, average='weighted')
  f1 = f1_metric.compute(predictions=predictions, references=labels, average="weighted")
  precision = precision_metric.compute(predictions=predictions, references=labels, average="weighted")

  return {
      "accuracy": acc["accuracy"],
      "f1": f1["f1"],
      "recall": recall["recall"],
      "precision" : precision["precision"]
  }

Write a Callback to Log Validation Examples to Weights and Biases

In [None]:
class SamplesTableLogger(TrainerCallback):
    def __init__(self, sample):
        super().__init__()

        self.dataset = sample

    def on_epoch_end(self, args: TrainingArguments, state: TrainerState, control: TrainerControl, **kwargs):
    
        val_batch = next(iter(self.datamodule.val_dataloader()))
        ids = val_batch["input_ids"]
        p_o_t = labels = val_batch["piece_of_text_id"]
        del val_batch["piece_of_text_id"]
        sentences = tokenizer.batch_decode(ids, skip_special_tokens=True)

        labels = val_batch["labels"]
        del val_batch["labels"]
        val_batch.to(device)
        
        outputs = model(**val_batch)
        logits = outputs.logits
        preds = logits.argmax(-1)

        df = pd.DataFrame(
            {"Sentence": sentences, "Label": labels, "Predicted": preds.cpu().detach().numpy()}
        )
        
        print(df)
            
        wrong_df = df[df["Label"] != df["Predicted"]]
        trainer.logger.experiment.log(
            {
                "examples": wandb.Table(dataframe=wrong_df, allow_mixed_types=True),
                "global_step": trainer.global_step,
            })

In [None]:
run = wandb.init(project=PROJECT_NAME, entity=ENTITY, job_type=MODEL_TRAINING_JOB_TYPE)

# By including `use_artifact` we're logging the usage to W&B and can track it as part of the lineage
train_dataset = run.use_artifact(f'{TRAIN_DATA_ARTIFACT}:latest')
test_dataset = run.use_artifact(f'{TEST_DATA_ARTIFACT}:latest')

# define data_collator
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

model = AutoModelForSequenceClassification.from_pretrained(
MODEL_NAME, num_labels=number_classes, label2id=label2id, id2label=id2label
)

trainer = Trainer(
    model,
    training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    data_collator=data_collator,
    tokenizer=tokenizer,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=3)],
    compute_metrics=compute_metrics,
)

trainer.train()

outputs = trainer.evaluate()
trainer.save_model()

# subset the validation dataframe
# 

predictions_df = trainer.predict()


run.log({'Evaluation Metrics': wandb.Table(dataframe=pd.DataFrame(outputs))})

wandb.finish()

# save best model, metrics and create model card
if args.push_to_hub:
    trainer.create_model_card(model_name=HUB_MODEL_ID)
    # wait for asynchronous pushes to finish
    time.sleep(180)
    trainer.push_to_hub()


In [None]:


def get_training_data(text_column:str, target_column:str):

  """
  Fetches the Complaints dataset from the HuggingFace Hub and generates id and class mappings
  """  

  # Splitting the dataset into training and validation datasets
  text_dataset = text_dataset['train'].train_test_split(test_size=0.2,seed=0)

  # Extracting the target column
  columns = text_dataset['train'].column_names
  
  # Remove the columns which aren't in scope for us
  remove_cols = [e for e in columns if e not in (text_column, target_column)]
  text_dataset = text_dataset.remove_columns(remove_cols)

  # Isn't this just:
  class_mapping = text_dataset['train'].features[target_column].int2str()
  label2id = text_dataset['train'].features[target_column].str2int()

#   class_mapping = { 
#       idx: text_dataset['train'].features[target_column].int2str(idx)
#       for idx, names in enumerate(
#           text_dataset['train'].features[target_column].names)
#   }
  
#   label2id = {
#       text_dataset['train'].features[target_column].int2str(idx): idx
#       for idx, names in enumerate(
#           text_dataset['train'].features[target_column].names)
#   }

  text_dataset = text_dataset.rename_column(text_column, "text")
  text_dataset = text_dataset.rename_column(target_column, "labels")

  number_classes = text_dataset['train'].features['labels'].num_classes

  # Filtering out empty/no-text complaints
  text_dataset = text_dataset.filter(lambda example: len(example['text'])>0)

  return text_dataset, class_mapping, label2id, number_classes

if __name__ == "__main__":

    parser = argparse.ArgumentParser(description='Train new HuggingFace Model for a given two column dataset')
    parser.add_argument('--text_col', dest='text_col', type=str, help='The name of the text column in the dataset')
    parser.add_argument('--feature_col', dest='feature_col', type=str, help='The name of the text column in the dataset')
    parser.add_argument('--model_id', dest='model_id', type=str, default='distilbert-base-uncased', help='Name of the HF model to use')
    parser.add_argument('--dataset_id', dest='dataset_id', type=str, default='consumer-complaints', help='Name of the HF Hub dataset to use')
    parser.add_argument('--experiment_name', dest='experiment_name', default='complaints_dataset', type=str, help='path to the csv file')
    parser.add_argument('--from_checkpoint', dest='from_checkpoint', type=str, help='If training should start from a specific checkpoint')
    parser.add_argument('--evaluate', dest='evaluate', type=str, help='If training should evaluate the final model performance')
    parser.add_argument("--epochs", type=int, default=3)
    parser.add_argument("--train_batch_size", type=int, default=32)
    parser.add_argument("--eval_batch_size", type=int, default=64)
    parser.add_argument("--warmup_steps", type=int, default=500)
    parser.add_argument("--learning_rate", type=str, default=5e-5)
    parser.add_argument("--fp16", type=bool, default=True)

    # Push to Hub Parameters
    parser.add_argument("--push_to_hub", type=bool, default=True)
    parser.add_argument("--hub_model_id", type=str, default=None)
    parser.add_argument("--hub_strategy", type=str, default="every_save")
    parser.add_argument("--hub_token", type=str, default=None)

    args, _ = parser.parse_known_args()

    # Set up logging
    logger = logging.getLogger(__name__)

    logging.basicConfig(
        level=logging.getLevelName("INFO"),
        handlers=[logging.StreamHandler(sys.stdout)],
        format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    )
    
    # Check for GPU
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    # Load dataset
    text_dataset, id2label, label2id, number_classes = get_training_data(args.text_col, args.feature_col)

    logging.info("Analysing Label Distribution")
    train_dataset = text_dataset['train'].to_pandas()
    label_info = LabelAnalyser(train_dataset)
    logging.info(label_info)

    # If the dataset is imbalanced, we want to generate class weights for the loss function to penalise more for rarer classes
    if label_info.label_distribution != 'balanced':
      logging.info("Dataset is imbalanced, generating class weights and undersampling majority class")
      # Sampling majority to the mean value
      target_classes, other_classes = sampling_strategy(train_dataset['text'],train_dataset['labels'],round(train_dataset.labels.value_counts().mean()),t='majority')
      df = undersample_df(train_dataset,target_classes,other_classes)
      logging.info("Recalculating label distribution")
      label_info = LabelAnalyser(df)
      logging.info(label_info)
      weights = label_info.generate_class_weights()
      # The weights need to be converted to a torch tensor and loaded into the GPU to be accessible by the Trainer
      allocated_weights = tensor(weights)
      weights = allocated_weights.to(device)
    else:
      weights = None

    # download tokenizer
    tokenizer = AutoTokenizer.from_pretrained(args.model_id)

    # tokenizer helper function
    def tokenize(batch):
        tokenized_batch = tokenizer(batch['text'], padding='max_length', truncation=True)
        tokenized_batch["labels"] = [label_info.str2int[label] for label in batch["labels"]]
        return tokenized_batch

    # tokenize dataset
    train_dataset = text_dataset['train'].map(tokenize, batched=True)
    test_dataset = text_dataset['test'].map(tokenize, batched=True)

    # set format for pytorch
    train_dataset.set_format('torch', columns=['input_ids', 'attention_mask', 'labels'])
    test_dataset.set_format('torch', columns=['input_ids', 'attention_mask', 'labels'])

    class CustomTrainer(Trainer):
        def compute_loss(self, model, inputs, return_outputs=False):
            labels = inputs.get("labels")
            # forward pass
            outputs = model(**inputs)
            logits = outputs.get("logits")
            # compute custom loss (suppose one has 3 labels with different weights)
            loss_fct = nn.CrossEntropyLoss(weight=weights)
            loss = loss_fct(logits.view(-1, self.model.config.num_labels), labels.view(-1))
            return (loss, outputs) if return_outputs else loss

    if device == "cuda":
      fp_16 = True
    else:
      fp_16 = False

    logging.info("Setting up Trainer Args")
    
    output_dir = Path("/opt/ml/output/data")
        
    logging.info(f'Created Directory')
    training_args = TrainingArguments(
    output_dir=output_dir.as_posix(),
    num_train_epochs=args.epochs,
    per_device_train_batch_size=args.train_batch_size,
    per_device_eval_batch_size=args.eval_batch_size,
    warmup_steps=args.warmup_steps,
    fp16=fp_16,
    learning_rate=float(args.learning_rate),
    # logging & evaluation strategies
    logging_dir=f"{output_dir.as_posix()}/logs",
    logging_steps=50, 
    evaluation_strategy="steps",
    eval_steps=2000,
    save_strategy="steps",
    save_steps=2000,
    save_total_limit=2,
    load_best_model_at_end=True,
    metric_for_best_model="f1",
    report_to="wandb",
    # push to hub parameters
    push_to_hub=args.push_to_hub,
    hub_strategy=args.hub_strategy,
    hub_model_id=args.hub_model_id,
    hub_token=args.hub_token,
)

    # define data_collator
    data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

    logging.info("Initializing Model")
    model = AutoModelForSequenceClassification.from_pretrained(
    args.model_id, num_labels=number_classes, label2id=label2id, id2label=id2label
)
    trainer = CustomTrainer(
        model,
        training_args,
        train_dataset=train_dataset,
        eval_dataset=test_dataset,
        data_collator=data_collator,
        tokenizer=tokenizer,
        callbacks=[EarlyStoppingCallback(early_stopping_patience=3)],
        compute_metrics=compute_metrics,
    )

    logging.info('Starting Model Training')
    trainer.train()

    logging.info('Computing Evaluation F1')
    outputs = trainer.evaluate()
    
    # writes eval result to file which can be accessed later in s3 ouput
    with open(os.path.join(output_dir.as_posix(), "eval_results.txt"), "w") as writer:
        print(f"***** Eval results *****")
        for key, value in sorted(outputs.items()):
            writer.write(f"{key} = {value}\n")
            print(f"{key} = {value}\n")
    logging.info(f'{outputs}')
    logging.info(f"Eval Accuracy: {outputs['eval_f1']}")
    logging.info('Saving Model')
    trainer.save_model()
    wandb.finish()

    # save best model, metrics and create model card
    if args.push_to_hub:
        trainer.create_model_card(model_name=args.hub_model_id)
        # wait for asynchronous pushes to finish
        time.sleep(180)
        trainer.push_to_hub()
        
    # Saves the model to s3 uses os.environ["SM_MODEL_DIR"] to make sure checkpointing works
    trainer.save_model(os.environ["SM_MODEL_DIR"])