<a href="https://colab.research.google.com/github/juanprida/nlp_with_transformers/blob/main/08_making_transformers_efficient_in_production.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install transformers
!pip install datasets

In [None]:
# Hugging Face imports
from transformers import (
    pipeline,
    TrainingArguments,
    Trainer,
    AutoTokenizer,
    AutoConfig,
    AutoModelForSequenceClassification
)
from transformers.pipelines.text_classification import TextClassificationPipeline
from datasets import load_metric
from datasets import load_dataset
from datasets.dataset_dict import DatasetDict
from huggingface_hub import notebook_login

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.quantization import quantize_dynamic

import numpy as np
from pathlib import Path
from time import perf_counter
from typing import Dict, Tuple, Union

In [None]:
# Set the device to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
# Login to Hugging Face Hub
notebook_login()

### Load data.

In [None]:
# Download custom dataset
clinc = load_dataset("clinc_oos", "plus")

# In thie chapter we are going to be classifiying intents from text.
intents = clinc["train"].features["intent"]

# Let's take a quick look at the data.
sample = clinc["test"][42]
print(f"Text: {sample['text']}")
print(f"Intent: {intents.int2str(sample['intent'])}")

### Build `PerformanceBenchmark` in order to measure the performance of the model.

In [None]:
class PerformanceBenchmark:
    """Class to benchmark performance of a pipeline."""

    def __init__(self, pipeline: TextClassificationPipeline, dataset: DatasetDict, optim_type: str = "BERT baseline"):
        self.pipeline = pipeline
        self.dataset = dataset
        self.optim_type = optim_type
        self.accuracy_score = load_metric("accuracy")

    def compute_accuracy(self):
        preds, labels = [], []
        for example in self.dataset:
            pred = self.pipeline(example["text"])[0]["label"]
            label = example["intent"]
            preds.append(intents.str2int(pred))
            labels.append(label)
        accuracy = self.accuracy_score.compute(predictions=preds, references=labels)
        print(f"Accuracy on test set - {accuracy['accuracy']:.3f}")
        return accuracy

    def compute_size(self) -> Dict[str, float]:
        state_dict = self.pipeline.model.state_dict()
        tmp_path = Path("model.pt")
        torch.save(state_dict, tmp_path)
        # Calculate size in megabytes.
        size_mb = Path(tmp_path).stat().st_size / (1024 * 1024)
        # Delete temp file
        tmp_path.unlink()
        print(f"Model size (MB) is {size_mb:.2f}")
        return {"size_mb": size_mb}

    def time_pipeline(self, query: str = "What is the pin for my account?") -> Dict[str, float]:
        latencies = []
        for _ in range(10):
            _ = self.pipeline(query)
        for _ in range(10):
            start_time = perf_counter()
            _ = self.pipeline(query)
            latency = perf_counter() - start_time
            latencies.append(latency)

        time_avg_ms = 1000 * np.mean(latencies)
        time_std_ms = 1000 * np.std(latencies)
        print(f"avg latency: {time_avg_ms:.2f} +/- {time_std_ms:.2f}")
        return {"time_avg_ms": time_avg_ms, "time_std_ms": time_std_ms}

    def run_benchmark(self) -> Dict[str, Dict[str, float]]:
        metrics = {}
        metrics[self.optim_type] = self.compute_size()
        metrics[self.optim_type].update(self.time_pipeline())
        metrics[self.optim_type].update(self.compute_accuracy())
        return metrics

### 01. Baseline: BERT without any optimization.

In [None]:
bert_ckpt = "transformersbook/bert-base-uncased-finetuned-clinc"
pipe = pipeline("text-classification", model=bert_ckpt)

pb = PerformanceBenchmark(pipe, clinc["test"])
perf_metrics = pb.run_benchmark()

### 02. Distillation: BERT with distillation.

In [None]:
# Arguments for training
class DistillationTrainingArguments(TrainingArguments):
    """Training arguments for distillation."""

    def __init__(self, *args, alpha=0.5, temperature=2.0, **kwargs):
        super().__init__(*args, **kwargs)
        self.alpha = alpha
        self.temperature = temperature


# Trainer for distillation
class DistillationTrainer(Trainer):
    """Trainer for distillation."""

    def __init__(self, *args, teacher_model=None, **kwargs):
        super().__init__(*args, **kwargs)
        self.teacher_model = teacher_model

        def compute_loss(
            self, model: nn.Module, inputs: Dict[str, Union[torch.Tensor, Any]], return_outputs: bool = False
        ) -> Union[torch.Tensor, Tuple[torch.Tensor, Any]]:
            outputs_stu = model(**inputs)

            # Extract cross-entropy loss and logits from student
            loss_ce = outputs_stu.loss
            logits_stu = outputs_stu.logits

            # Extract logits from teacher
            with torch.no_grad():
                outputs_tea = self.teacher_model(**inputs)
                logits_tea = outputs_tea.logits

            # Soften probabilities and compute distillation loss
            loss_fct = nn.KLDivLoss(reduction="batchmean")
            loss_kd = self.args.temperature**2 * loss_fct(
                F.log_softmax(logits_stu / self.args.temperature, dim=-1),
                F.softmax(logits_tea / self.args.temperature, dim=-1),
            )

            # Return weighted student loss
            loss = self.args.alpha * loss_ce + (1.0 - self.args.alpha) * loss_kd
            return (loss, outputs_stu) if return_outputs else loss

In [None]:
student_ckpt = "distilbert-base-uncased"
teacher_ckpt = "transformersbook/bert-base-uncased-finetuned-clinc"

# 01. Model initialization 
num_labels = intents.num_classes
id2label = pipe.model.config.id2label
label2id = pipe.model.config.label2id
student_config = AutoConfig.from_pretrained(student_ckpt, num_labels=num_labels, id2label=id2label, label2id=label2id)

def student_init():
    return AutoModelForSequenceClassification.from_pretrained(student_ckpt, config=student_config).to(device)

# 02. Teacher model
teacher_model = AutoModelForSequenceClassification.from_pretrained(teacher_ckpt, num_labels=num_labels).to(device)

# 03. Arguments for training
student_training_args = DistillationTrainingArguments(
    output_dir="distilbert-base-uncased-finetuned-clinc",
    evaluation_strategy="epoch",
    num_train_epochs=5,
    learning_rate=2e-5,
    per_device_train_batch_size=48,
    per_device_eval_batch_size=48,
    alpha=1,
    weight_decay=0.01,
    push_to_hub=True,
)
# 04. Dataset & Tokenizer
student_tokenizer = AutoTokenizer.from_pretrained(student_ckpt)

def tokenize_text(batch):
    return student_tokenizer(batch["text"], truncation=True)

clinc_enc = clinc.map(tokenize_text, batched=True, remove_columns=["text"])
clinc_enc = clinc_enc.rename_column("intent", "labels")

# 05. Metrics
accuracy_score = load_metric("accuracy")
def compute_metrics(pred):
    predictions, labels = pred
    predictions = np.argmax(predictions, axis=1)
    return accuracy_score.compute(predictions=predictions, references=labels)

# Putting it all together under a trainer.
distilbert_trainer = DistillationTrainer(
    model_init=student_init,
    teacher_model=teacher_model,
    args=student_training_args,
    train_dataset=clinc_enc["train"],
    eval_dataset=clinc_enc["validation"],
    compute_metrics=compute_metrics,
    tokenizer=student_tokenizer,
)

distilbert_trainer.train()
distilbert_trainer.push_to_hub("Training completed!")

In [None]:
# Benchmarking the distillation model
finetuned_ckpt = "transformersbook/distilbert-base-uncased-finetuned-clinc"
pipe = pipeline("text-classification", model=finetuned_ckpt)

optim_type = "DistilBERT"
pb = PerformanceBenchmark(pipe, clinc["test"], optim_type=optim_type)
perf_metrics.update(pb.run_benchmark())

### 03. Quantization: Apply quantization to our distilled model.

In [None]:
model = (AutoModelForSequenceClassification.from_pretrained(finetuned_ckpt).to("cpu"))
# Quantize the model
model_quantized = quantize_dynamic(model, {nn.Linear}, dtype=torch.qint8)


pipe = pipeline("text-classification", model=model_quantized, tokenizer=student_tokenizer)
optim_type = "Distillation + quantization"
pb = PerformanceBenchmark(pipe, clinc["test"], optim_type=optim_type)
perf_metrics.update(pb.run_benchmark())