In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install git+https://github.com/huggingface/transformers.git
!pip install datasets
!pip install transformers torch
!pip install sentencepiece

In [3]:
# cd /content/drive/MyDrive/NLP244/QUEST4/hf_snli/hf_libraries_demo

/content/drive/MyDrive/NLP244/QUEST4/hf_snli/hf_libraries_demo


In [None]:
!pip install evaluate

In [None]:
!pip install wandb

In [5]:
from datasets import list_datasets, load_dataset, DatasetDict
from collections import Counter
from typing import List, Dict, Union, Callable, Any
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
from pprint import pprint
import torch

In [None]:
from pprint import pprint

from datasets import load_dataset, DatasetDict, Dataset
import datasets
from datasets import Dataset
import pandas as pd


# loading the dataset will download if not present in your cache, or simply load from there
filepath_train="/content/drive/MyDrive/snli/train.csv"
filepath_val="/content/drive/MyDrive/snli/val.csv"
filepath_test="/content/drive/MyDrive/snli/test.csv"   
train_ds = pd.read_csv(filepath_train)
val_ds = pd.read_csv(filepath_val)
test_ds = pd.read_csv(filepath_test)
train_ds = train_ds.dropna()
val_ds = val_ds.dropna()
test_ds = test_ds.dropna()    
train_dataset = Dataset.from_pandas(train_ds)
val_dataset = Dataset.from_pandas(val_ds)
test_dataset = Dataset.from_pandas(test_ds)
french_snli_DD = datasets.DatasetDict({"train":train_dataset,"validation": val_dataset, "test":test_dataset})
dataset: DatasetDict = french_snli_DD

assert sorted(list(dataset.keys())) == ['test', 'train', 'validation'], f"unexpected splits or keys! {dataset}"
print(dataset)
pprint(train_dataset[0])
for item in train_dataset:
    assert 'hypothesis' in item and type(item['hypothesis']) == str
    assert 'premise' in item and type(item['premise']) == str
    assert 'label' in item and type(item['label']) == int

# loading separately is the same as just accessing from DatasetDict:
for item_a, item_b in zip(train_dataset, dataset['train']):
    assert item_a == item_b, "datasets aren't the same!"


In [7]:
from typing import Any, Dict, Callable
import datasets
from datasets import Dataset
import pandas as pd
from datasets import DatasetDict, load_dataset

DataPoint = Dict[str, Any]


def lowercase_text(item: DataPoint) -> DataPoint:
    return {"hypothesis": item['hypothesis'].lower(), "premise": item['premise'].lower() }

def pre_process_dataset(dataset: DatasetDict) -> DatasetDict:
    dataset = dataset.map(lowercase_text)
    assert 'label' in dataset['train'][0]  # note: non-destructive operation

    dataset = dataset.map(
        lambda item: {'hypothesis': ' '.join(item['hypothesis'].split())},
        desc="normalizing all white space to a single space"
    )
    dataset = dataset.map(
        lambda item: {'premise': ' '.join(item['premise'].split())},
        desc="normalizing all white space to a single space"
    )
    return dataset

    # see function above for details
    dataset = pre_process_dataset(dataset)

    # notice the reduced size due to filter
    print(dataset)



In [53]:
from typing import Dict

from transformers import TextClassificationPipeline, AutoModelForSequenceClassification, AutoConfig
from transformers.pipelines.base import GenericTensor


class PerfectTextClassificationPipeline(TextClassificationPipeline):

    def __init__(self, **kwargs):
        model = AutoModel.from_pretrained("distilbert-base-cased")
        kwargs = {
            "framework": "pt",
            "model": model,
            "task": "text-classification",
            **kwargs
        }
        super().__init__(**kwargs)

    def _sanitize_parameters(self, return_all_scores=None, function_to_apply=None, top_k="", **tokenizer_kwargs):
        return super()._sanitize_parameters(return_all_scores, function_to_apply, top_k, **tokenizer_kwargs)

    def preprocess(self, inputs, **tokenizer_kwargs) -> Dict[str, GenericTensor]:
        return inputs

    def _forward(self, model_inputs):
        # print(model_inputs)
        if 'text_pair' not in model_inputs:
            raise ValueError("this pipeline needs labels to cheat and get perfect performance! call compute with "
                             "second_input_column='premise'")

        return {"label": model_inputs['label']}

    def postprocess(self, model_outputs, function_to_apply=None, top_k=1, _legacy=True):
        return model_outputs

In [None]:
import evaluate
from evaluate import TextClassificationEvaluator, Metric, EvaluationModuleInfo
from sklearn.metrics import f1_score
class MyMacroF1Metric(Metric):
    """
    You can define custom metrics! In this case I do this to compute Macro-F1, which averages per-class F1 scores
    """
    f1_metric_info: EvaluationModuleInfo = evaluate.load("f1")._info()

    def _info(self) -> EvaluationModuleInfo:
        return MyMacroF1Metric.f1_metric_info

    def _compute(self, predictions=None, references=None, labels=None, pos_label=1, sample_weight=None) -> Dict[str, Any]:
        score = f1_score(
            references, predictions, labels=labels, pos_label=pos_label, average="macro", sample_weight=sample_weight
        )
        return {"f1": float(score) if score.size == 1 else score}



# lets set up a text classification evaluator
text_eval: TextClassificationEvaluator = TextClassificationEvaluator('text-classification',
                                                                      default_metric_name="accuracy")

# create a 'perfect' model and evaluate
perfect_model: PerfectTextClassificationPipeline = PerfectTextClassificationPipeline()

# you can also instantiate a metric yourself with evaluate.load:
f1_metric: MyMacroF1Metric = MyMacroF1Metric()
results = text_eval.compute(
    model_or_pipeline=perfect_model,
    data=dataset['test'],
    input_column='hypothesis', 
    second_input_column='premise', 
    label_column='label',
    metric=evaluate.combine(evaluations=["accuracy", f1_metric]),
)
assert results['accuracy'] == results['f1'] == 1.0, \
    f"we used the perfect pipeline, expected perfect prediction! got {results['accuracy']}"
print("==== Perfect Model Results (expected 1.0) ====")
pprint(results)

In [None]:
import os
from typing import Dict

import datasets
from datasets import Dataset
import pandas as pd
import torch.cuda
import wandb as wandb
from datasets import DatasetDict, load_dataset, Dataset
from evaluate import EvaluationModule
from torch import Tensor
from transformers import AutoTokenizer, AutoModel,TrainingArguments, Trainer, EvalPrediction
import torch.nn.functional as F


class LabelSmoothedTrainer(Trainer):
    """
    To add label-smoothing, we can just sub-class to compute loss with different parameters
    """
    def compute_loss(self, model, inputs, return_outputs=False):
        model_outputs = model(**inputs)
        loss = F.cross_entropy(model_outputs['logits'], inputs['label'], label_smoothing=.01)
        model_outputs['loss'] = loss
        return (loss, model_outputs) if return_outputs else loss


if __name__ == '__main__':

    tokenizer = AutoTokenizer.from_pretrained("cmarkea/distilcamembert-base")
    model = AutoModel.from_pretrained("cmarkea/distilcamembert-base")

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)
    train_dataset = train_dataset.map(lambda batch: tokenizer(batch['hypothesis'], batch['premise'],truncation=True), batched=True, batch_size=256)
    val_dataset = val_dataset.map(lambda batch: tokenizer(batch['hypothesis'], batch['premise'],truncation=True), batched=True, batch_size=256)
    test_dataset = test_dataset.map(lambda batch: tokenizer(batch['hypothesis'],batch['premise'], truncation=True), batched=True, batch_size=256)

    # convert train set to tensors with only model inputs
    train_dataset.set_format(type="pt", columns=['input_ids', 'attention_mask', 'hypothesis', 'premise', 'label'])

    f1_metric: MyMacroF1Metric = MyMacroF1Metric()
    my_evaluation: EvaluationModule = evaluate.combine(["accuracy", f1_metric])

    def my_compute_metrics(eval_pred: EvalPrediction) -> Dict[str, float]:
        logits, labels = eval_pred.predictions, eval_pred.label_ids
        predictions: Tensor = logits.argmax(axis=1)
        return my_evaluation.compute(predictions=predictions, references=labels)

    # Let's fine-tune with the Trainer API!
    training_args: TrainingArguments = TrainingArguments(
        output_dir="/content/drive/MyDrive/NLP244/QUEST4/hf_snli/check_points",
        do_train=True,
        do_eval=True,
        do_predict=True,
        evaluation_strategy="steps",
        eval_steps=128,
        per_device_train_batch_size=64,
        per_device_eval_batch_size=128,
        save_steps=128,
        save_strategy="steps",
        save_total_limit=5,
        report_to=None,
        logging_steps=50,
        num_train_epochs=20,
        metric_for_best_model="accuracy",
        load_best_model_at_end=True,
        dataloader_num_workers=2,  # set to 0 when debugging and >1 when running!
    )


    trainer: LabelSmoothedTrainer = LabelSmoothedTrainer(
        model=model,
        args=training_args,
        data_collator=None,  # let HF set this to an instance of transformers.DataCollatorWithPadding
        train_dataset=train_dataset,
        eval_dataset=val_dataset,
        tokenizer=tokenizer,
        compute_metrics=my_compute_metrics,
    )

    trainer.train()
    model = trainer.model  # make sure to load_best_model_at_end=True!

    # run a final evaluation on the test set
    trainer.evaluate(metric_key_prefix="test", eval_dataset=test_dataset)


