In [1]:
import datasets
from collections import  Counter
from datasets import load_dataset
import pandas as pd
import sklearn
from sklearn.feature_extraction import DictVectorizer
from sklearn.linear_model import LogisticRegression
import torch

In [2]:
dynasent_r1  = load_dataset("dynabench/dynasent", 'dynabench.dynasent.r1.all', trust_remote_code=True)
dynasent_r1

Repo card metadata block was not found. Setting CardData to empty.


DatasetDict({
    train: Dataset({
        features: ['id', 'hit_ids', 'sentence', 'indices_into_review_text', 'model_0_label', 'model_0_probs', 'text_id', 'review_id', 'review_rating', 'label_distribution', 'gold_label', 'metadata'],
        num_rows: 80488
    })
    validation: Dataset({
        features: ['id', 'hit_ids', 'sentence', 'indices_into_review_text', 'model_0_label', 'model_0_probs', 'text_id', 'review_id', 'review_rating', 'label_distribution', 'gold_label', 'metadata'],
        num_rows: 3600
    })
    test: Dataset({
        features: ['id', 'hit_ids', 'sentence', 'indices_into_review_text', 'model_0_label', 'model_0_probs', 'text_id', 'review_id', 'review_rating', 'label_distribution', 'gold_label', 'metadata'],
        num_rows: 3600
    })
})

In [3]:
def print_label_dist(dataset, label_name="gold_label", split_names=("train", "validation")):
    for split_name in split_names:
        print(split_name)
        dist = sorted(Counter(dataset[split_name][label_name]).items())
        for k, v in dist:
            print(f"\t{k:>4s}: {v}")

print_label_dist(dynasent_r1)

train
	negative: 14021
	neutral: 45076
	positive: 21391
validation
	negative: 1200
	neutral: 1200
	positive: 1200


In [4]:
dynasent_r2 = load_dataset("dynabench/dynasent", 'dynabench.dynasent.r2.all')

Repo card metadata block was not found. Setting CardData to empty.


In [5]:
dynasent_r2['train'][25]

{'id': 'r2-0000042',
 'hit_ids': ['y22532', 'y22740'],
 'sentence': 'He kindly walked us through the menu through the specialties and the wine list that we did not want.',
 'sentence_author': 'w148',
 'has_prompt': True,
 'prompt_data': {'indices_into_review_text': [453, 525],
  'review_rating': 5,
  'prompt_sentence': 'He walked us through the menu through the specialties and the wine list.',
  'review_id': '7joEfaGr4aC0OrR9I7CJ8Q'},
 'model_1_label': 'positive',
 'model_1_probs': {'negative': 0.15193994343280792,
  'positive': 0.7109395265579224,
  'neutral': 0.1371205449104309},
 'text_id': 'r2-0000042',
 'label_distribution': {'positive': [],
  'negative': ['w516', 'w199', 'w294'],
  'neutral': ['w544'],
  'mixed': ['w527']},
 'gold_label': 'negative',
 'metadata': {'split': 'train',
  'round': 2,
  'subset': 'all',
  'model_in_the_loop': 'RoBERTa'}}

In [6]:
print_label_dist(dynasent_r2)

train
	negative: 4579
	neutral: 2448
	positive: 6038
validation
	negative: 240
	neutral: 240
	positive: 240


In [7]:
sst = load_dataset("SetFit/sst5")

Repo card metadata block was not found. Setting CardData to empty.


In [8]:
print_label_dist(sst, label_name='label_text')

train
	negative: 2218
	neutral: 1624
	positive: 2322
	very negative: 1092
	very positive: 1288
validation
	negative: 289
	neutral: 229
	positive: 279
	very negative: 139
	very positive: 165


In [9]:
sst["train"][35]

{'text': "a well-intentioned effort that 's still too burdened by the actor 's offbeat sensibilities for the earnest emotional core to emerge with any degree of accessibility .",
 'label': 1,
 'label_text': 'negative'}

reformart the SST dataset

In [10]:
def conver_sst_label(label):
    return label.split(" ")[-1]

for split_name in ("train", "test", "validation"):
    dist = [conver_sst_label(s) for s in sst[split_name]['label_text']]
    sst[split_name] = sst[split_name].add_column("gold_label", dist)
    sst[split_name] = sst[split_name].add_column("sentence", sst[split_name]["text"])

print_label_dist(sst)

train
	negative: 3310
	neutral: 1624
	positive: 3610
validation
	negative: 428
	neutral: 229
	positive: 444


## Linear Classifiers

In [11]:
def unigrams_phi(sentence: str):
    return Counter(sentence.lower().split(" "))

uni = unigrams_phi("Hello! I love Machine Learning!")
uni

Counter({'hello!': 1, 'i': 1, 'love': 1, 'machine': 1, 'learning!': 1})

In [12]:
train_feats = [
    {'a': 1, 'b': 1},
    {'b': 1, 'c': 2}
]

vec = DictVectorizer(sparse=False)
X_train = vec.fit_transform(train_feats)

df = pd.DataFrame(X_train, columns=vec.get_feature_names_out())
df

Unnamed: 0,a,b,c
0,1.0,1.0,0.0
1,0.0,1.0,2.0


In [13]:
test_feats = [
    {'a': 2, 'c': 1},
    {'a': 4, 'b': 2, 'd': 1}
]

X_test = vec.transform(test_feats)
df = pd.DataFrame(X_test, columns=vec.get_feature_names_out())
df

Unnamed: 0,a,b,c
0,2.0,0.0,1.0
1,4.0,2.0,0.0


The most common mistake with DictVectorizer is calling fit_transform on test examples. This will wipe out the existing representation scheme, replacing it with one that matches the test examples. That will happen silently, but then you'll find that the new representations are incompatible with the model you fit. This is likely to manifest itself as a ValueError relating to feature counts.

### Task 1: Tweetgrams

In [14]:
from nltk.tokenize import TweetTokenizer

def tweetgrams_phi(sentence: str, **kwargs):
    tk = TweetTokenizer(kwargs)
    tokens = tk.tokenize(sentence)
    return Counter(tokens)

tweet = "Let that SINK in :)"

grams = tweetgrams_phi(tweet, preserve_case=False)
grams

Counter({'Let': 1, 'that': 1, 'SINK': 1, 'in': 1, ':)': 1})

#### Test Function

In [15]:
def test_tweetgrams_phi(func):
    examples = [
        (
            "Here's an example with an emoticon :)", 
            Counter({'an': 2, "Here's": 1, 'example': 1, 'with': 1, 'emoticon': 1, ':)': 1})
        ),
        (
            "The URL is https://pytorch.org!", 
            Counter({'The': 1, 'URL': 1, 'is': 1, 'https://pytorch.org': 1, '!': 1})
        )
    ]
    errcount = 0
    for ex, expected in examples:
        result = func(ex, preserve_case=True)
        if result != expected:
            errcount += 1
            print(f"Error for `{func.__name__}`: For input {ex}, "
                  f"expected {expected} but got {result}")
    caps_ex = "CAPS"
    caps_result = func(caps_ex, preserve_case=False)
    caps_expected = Counter({"CAPS": 1})
    if caps_result != caps_expected:
        errcount += 1
        print(f"Error for `{func.__name__}`: For input {caps_ex}, "
              f"expected {caps_expected} but got {caps_result}")
    if errcount == 0:
        print(f"All tests passed for `{func.__name__}`")

In [16]:
test_tweetgrams_phi(tweetgrams_phi)

All tests passed for `tweetgrams_phi`


### Task 2: Model Training

In [17]:
from sklearn.linear_model import LogisticRegression
from sklearn.feature_extraction import DictVectorizer
from sklearn.metrics import classification_report

def train_linear_model(model, featfunc, train_dataset):
    """Train an sklearn classifier.

    Parameters
    ----------
    model : sklearn classifier model
    featfunc : func
        Maps strings to Counter instances
    train_dataset: dict
        Must have a key "sentence" containing strings that `featfunc`
        will process, and a key "gold_label" giving labels

    Returns
    -------
    tuple
        * A trained version of `model`
        * A fitted `vectorizer` for the train set

    """

    feats = [dict(featfunc(s)) for s in train_dataset['sentence']]
    vec = DictVectorizer()
    X_train = vec.fit_transform(feats)
    model.fit(X_train, train_dataset['gold_label'])
    return model, vec


train_dataset = {
    'sentence': ['A A', 'A B', 'B B', 'B A', 'B'],
    'gold_label': [0, 1, 0, 1, 1]
}
def featfunc(s):
    return Counter(s.split())

model = LogisticRegression()
train_linear_model(model, featfunc, train_dataset)

(LogisticRegression(), DictVectorizer())

In [18]:
def test_train_linear_model(func):
    train_dataset = {
        'sentence': ['A A', 'A B', 'B B', 'B A', 'B'],
        'gold_label': [0, 1, 0, 1, 1]}
    def featfunc(s):
        return Counter(s.split())
    model = LogisticRegression()
    result = func(model, featfunc, train_dataset)
    if not isinstance(result, tuple) or len(result) != 2:
        print(f"Error for `{func.__name__}`: Incorrect return type")
        return
    model, vectorizer = result
    if not hasattr(vectorizer, 'vocabulary_'):
        print(f"Error for `{func.__name__}`: "
              f"Second return value is not a trained vectorizer")
        return
    if not hasattr(model, 'classes_'):
        print(f"Error for `{func.__name__}`: "
              f"First return value is not a trained classifier")
        return
    print(f"No errors found for `{func.__name__}`")

_ =  test_train_linear_model(train_linear_model)

No errors found for `train_linear_model`


#### train model on dynasent R1

In [19]:
lr_unigrams, vec_unigrams = train_linear_model(
    LogisticRegression(max_iter=1000), 
    unigrams_phi, dynasent_r1['train'])

### Task 3: model assessment

In [20]:
def assess_linear_model(model, featfunc, vectorizer, assess_dataset):
    """Assess a trained sklearn model.

    Parameters
    ----------
    model: trained sklearn model
    featfunc : func
        Maps strings to count dicts
    vectorizer : fitted DictVectorizer
    assess_dataset: dict
        Must have a key "sentence" containing strings that `featfunc`
        will process, and a key "gold_label" giving labels

    Returns
    -------
    A classification report (multiline string)

    """
    feats = [featfunc(s) for s in assess_dataset['sentence']]
    X_test = vectorizer.transform(feats)
    preds = model.predict(X_test)
    
    return classification_report(assess_dataset['gold_label'], preds, digits=3)

In [21]:
def test_assess_linear_model(assessfunc, trainfunc):
    train_dataset = {
        'sentence': ['A A', 'A B', 'B B', 'B A', 'A', 'B'],
        'gold_label': [0, 1, 0, 1, 0, 1]}
    assess_dataset = {
        'sentence': ['A C', 'B A'],
        'gold_label': [0, 1]}
    def featfunc(s):
        return Counter(s.split())
    model = LogisticRegression()
    model, vectorizer = trainfunc(model, featfunc, train_dataset)
    result = assessfunc(model, featfunc, vectorizer, assess_dataset)
    errcount = 0
    if len(vectorizer.vocabulary_) != 2:
        print(f"Error for `{assessfunc.__name__}`: Unexpected feature count")
        errcount += 1
    if 'weighted avg' not in result:
        print(f"Error for `{assessfunc.__name__}`: Unexpected return value")
        errcount += 1
    if errcount == 0:
        print(f"No errors found for `{assessfunc.__name__}`")

test_assess_linear_model(assess_linear_model, train_linear_model)


No errors found for `assess_linear_model`


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


In [22]:
report = assess_linear_model(
    lr_unigrams,
    unigrams_phi,
    vec_unigrams,
    dynasent_r1['validation'])

print(report)

              precision    recall  f1-score   support

    negative      0.759     0.364     0.492      1200
     neutral      0.523     0.890     0.659      1200
    positive      0.699     0.572     0.629      1200

    accuracy                          0.609      3600
   macro avg      0.660     0.609     0.593      3600
weighted avg      0.660     0.609     0.593      3600



## Question 2: Transformer fine-tuning

In [23]:
import transformers
from transformers import AutoModel, AutoTokenizer

transformers.logging.set_verbosity_error()

In [24]:
weights_name = "prajjwal1/bert-mini"

bert = AutoModel.from_pretrained(weights_name)
bert_tokenizer = AutoTokenizer.from_pretrained(weights_name)


In [25]:
example_text = "Bert knows Snuffleupagus"
bert_tokenizer.tokenize(example_text)

['bert', 'knows', 's', '##nu', '##ffle', '##up', '##ag', '##us']

encode() is used for the model

In [26]:
ex_ids = bert_tokenizer.encode(example_text, add_special_tokens=True)
ex_ids

[101, 14324, 4282, 1055, 11231, 18142, 6279, 8490, 2271, 102]

In [27]:
with torch.no_grad():
    reps = bert(torch.tensor([ex_ids]))

reps.keys()

odict_keys(['last_hidden_state', 'pooler_output'])

The value of `last_hidden_state` hidden state is the sequence of final output states from the model:



In [28]:
reps.last_hidden_state.shape

torch.Size([1, 10, 256])

#### masking example

In [29]:
with torch.no_grad():
    reps = bert(torch.tensor([ex_ids]))
    print(reps.last_hidden_state[0][0][:5])

tensor([-0.3763, -0.3209,  0.8817,  0.4568, -1.0314])


In [30]:
with torch.no_grad():
    # Mask the last 5 tokens:
    am = torch.tensor([[1, 1, 1, 1, 1, 0, 0, 0, 0, 0]])
    maskreps = bert(torch.tensor([ex_ids]), attention_mask=am)
    print(maskreps.last_hidden_state[0][0][:5])

tensor([-0.1793, -0.8994,  0.9695,  0.9130, -0.7129])


### Task 1: Batch tokenization

In [31]:
def get_batch_token_ids(batch: list[str], tokenizer):
   res = tokenizer.batch_encode_plus(
      batch,
      max_length=512,
      padding="max_length",
      truncation=True,
      return_attention_mask=True,
      return_tensors="pt"
   )
      
   return res  

In [32]:
def test_get_batch_token_ids(func):
    examples = [
        "Bert knows Snuffleupagus",
        "ELMo knew Bert.",
        "Buffalo " * 520
    ]
    test_tokenizer = AutoTokenizer.from_pretrained("prajjwal1/bert-mini")
    result = func(examples, test_tokenizer)
    errcount = 0
    if 'attention_mask' not in result:
        errcount += 1  
        print(f"Error for `{func.__name__}`: "
              f"Attention mask was not returned")
    ids = result['input_ids']
    if not isinstance(ids, torch.Tensor):
        errcount += 1
        print(f"Error for `{func.__name__}`: "
              f"Return values are not tensors")
    if ids.shape[1] != 512:
        errcount += 1
        print(f"Error for `{func.__name__}`: "
              f"Expected sequence length 512; got {ids.shape[1]}")
    if ids[0][0] != bert_tokenizer.cls_token_id:
        errcount += 1
        print(f"Error for `{func.__name__}`: "
              f"Special tokens were not added")
    if errcount == 0:
        print(f"No errors found for `{func.__name__}`")

In [33]:
test_get_batch_token_ids(get_batch_token_ids)

No errors found for `get_batch_token_ids`


### Task 2: Contextual representations


In [34]:
def get_reps(dataset, model, tokenizer, batchsize=20):
    """Represent each example in `dataset` with the final hidden state 
    above the [CLS] token.

    Parameters
    ----------
    dataset : list of str
    model : BertModel
    tokenizer : BertTokenizerFast
    batchsize : int

    Returns
    -------
    torch.Tensor with shape `(n_examples, dim)` where `dim` is the
    dimensionality of the representations for `model`

    """
    data = []
    with torch.no_grad():
        pass
        # Iterate over `dataset` in batches:
        ##### YOUR CODE HERE

        for i in range(0, len(dataset), batchsize):
            batch = dataset[i: i+batchsize]

            token_ids = get_batch_token_ids(batch, tokenizer)
            reps = model(token_ids['input_ids'], attention_mask=token_ids['attention_mask'])

            cls_reps = reps.last_hidden_state[:, 0, :]
            data.append(cls_reps)

        
        return torch.cat(data, dim=0)

In [35]:
sents = ["The cat slept.", "The bird chirped."]
test_model = AutoModel.from_pretrained(weights_name)
test_tokenizer = AutoTokenizer.from_pretrained(weights_name)
result = get_reps(sents, test_model, test_tokenizer, batchsize=2)
result.shape

torch.Size([2, 256])

the reason why only vectors of the CLS tokens are used for BERT models: the CLS token is the first token in the sequence, and it is used to represent the entire sequence. The CLS token is used for classification tasks, and the final hidden state of the CLS token is used as the aggregate sequence representation for the classification task.

In [36]:
def test_get_reps(func):
    examples = ["The cat slept.", "The bird chirped."] * 20
    weights_name = "prajjwal1/bert-mini"
    test_model = AutoModel.from_pretrained(weights_name)
    test_tokenizer = AutoTokenizer.from_pretrained(weights_name)
    result = func(examples, test_model, test_tokenizer, batchsize=2)
    errcount = 0
    if result.shape != (40, 256):
        errcount += 1
        print(f"Error for `{func.__name__}`: "
              f"Expected shape {(40, 256)}, got {result.shape}")
    if round(result[0][0].item(), 2) != -0.64:
        errcount += 1
        print(f"Error for `{func.__name__}`: "
              f"Representations seem to be incorrect")
    if errcount == 0:
        print(f"No errors found for `{func.__name__}`")

test_get_reps(get_reps)

No errors found for `get_reps`


### Task 3: fine-tuning

In [43]:
import torch.nn as nn

class BertClassifierModule(nn.Module):
    def __init__(
        self, 
        n_classes: int,
        hidden_activation,
        weights_name="prajjwal1/bert-mini"
    ):
        """
            This module loads a Transformer based on  `weights_name`,
            puts it in train mode, add a dense layer with activation
            function give by `hidden_activation`, and puts a classifier
            layer on top of that as the final output. The output of
            the dense layer should have the same dimensionality as the
            model input.

            Parameters
            ----------
            n_classes : int
                Number of classes for the output layer
            hidden_activation : torch activation function
                e.g., nn.Tanh()
            weights_name : str
                Name of pretrained model to load from Hugging Face

        """
        super().__init__()
        self.n_classes = n_classes
        self.weights_name = weights_name
        self.bert = AutoModel.from_pretrained(self.weights_name)
        self.bert.train()
        self.hidden_activation = hidden_activation
        self.hidden_dim = self.bert.embeddings.word_embeddings.embedding_dim

        self.classifier_layer = nn.Sequential(
            nn.Linear(self.hidden_dim, self.hidden_dim),
            self.hidden_activation,
            nn.Linear(self.hidden_dim, self.n_classes)
        )

    def forward(self, indices, mask):
        """
        Process `indices` with `mask` by feeding these arguments
        to `self.bert` and then feeding the initial hidden state
        in `last_hidden_state` to `self.classifier_layer`

        Parameters
        ----------
        indices : tensor.LongTensor of shape (n_batch, k)
            Indices into the `self.bert` embedding layer. `n_batch` is
            the number of examples and `k` is the sequence length for
            this batch
        mask : tensor.LongTensor of shape (n_batch, d)
            Binary vector indicating which values should be masked.
            `n_batch` is the number of examples and `k` is the
            sequence length for this batch

        Returns
        -------
        tensor.FloatTensor
            Predicted values, shape `(n_batch, self.n_classes)`

        """
        bert_output = self.bert(input_ids=indices, attention_mask=mask)
        cls_reps = bert_output.last_hidden_state[:, 0, :]
        predictions = self.classifier_layer(cls_reps)

        return predictions

In [45]:
bert_module = BertClassifierModule(n_classes=3, hidden_activation=nn.Tanh())

ids = get_batch_token_ids(dynasent_r1['train']['sentence'][:2], bert_tokenizer)

result = bert_module(ids['input_ids'], ids['attention_mask'])

print(f"Shape: {result.shape}")
print(result)

Shape: torch.Size([2, 3])
tensor([[ 0.3307, -0.3084,  0.0894],
        [ 0.1298, -0.1202, -0.1604]], grad_fn=<AddmmBackward0>)


In [46]:
def test_bert_classifier_module(moduleclass): 
    expected_out = 5
    expected_hidden = 256
    expected_activation = nn.ReLU()
    mod = moduleclass(expected_out, expected_activation)
    errcount = 0

    # Basic layer structure:
    if not hasattr(mod, "classifier_layer") or mod.classifier_layer is None:
        errcount += 1
        print(f"Error for `{moduleclass.__name__}`: "
              f"Missing attribute `classifier_layer`")
        return 
    for i in range(3):
        try:
            bert_module.classifier_layer[i]
        except IndexError:
            errcount += 1
            print(f"Error for `{moduleclass.__name__}`: "
                  f"`classifier_layer` is not an `nn.Sequential` "
                  f"and/or does not have the right structure")
    # Correct first layer dimensionality:
    result_hidden = mod.classifier_layer[0].out_features
    if result_hidden != expected_hidden:
        errcount += 1
        print(f"Error for `{moduleclass.__name__}`: "
              f"Expected `classifier_layer` hidden dim {expected_hidden}, "
              f"got {result_hidden}") 
    # Correct activation:
    result_activation = mod.classifier_layer[1].__class__.__name__
    if result_activation != expected_activation.__class__.__name__:
        errcount += 1
        print(f"Error for `{moduleclass.__name__}`: "
              f"Incorrect hidden activation")
    # Correct output dimensionality:
    result_out = mod.classifier_layer[2].out_features
    if result_out != expected_out:
        errcount += 1
        print(f"Error for `{moduleclass.__name__}`: "
              f"Expected `classifier_layer` out dim {expected_out}, "
              f"got {result_out}")
    # forward method:
    ids = get_batch_token_ids(["A B C", "A B"], bert_tokenizer)
    result = mod(ids['input_ids'], ids['attention_mask'])
    if result.shape != (2, 5):
        errcount += 1
        print(f"Error for `{moduleclass.__name__}`: "
              f"Expected output shape {(2, 5)}, got {result.shape}")
    if errcount == 0:
        print(f"No errors found for `{moduleclass.__name__}`")

test_bert_classifier_module(BertClassifierModule)

No errors found for `BertClassifierModule`
