In [None]:
!pip install torch
!pip install transformers



# **Pre-trained Models**

When working with deep learning models, it is possible we have better options for model development than simply training from scratch, i.e. training a model that has not observed any data yet, just the data you are training it with. Instead, we can make use of models that have already been trainined on large corpuses. We have several options:


1.   Use the model as it is, not further training and just apply it to our dataset
2.   Add our own task as another layer to the model,
3.   Freeze 1 or more layers of the pre-trained model and fine-tune the remaining layers on a specific task.

Let's say we want to make use of word embeddings from a pre-trained word2vec model. We don't want to waste time generating representation for words since it was likely done to a far better degree with a large corpus we don't have so we will focus on making a classification using these representations.

We saw something similar in our previous classification notebook with SpaCy vectors as well. In that scenario their were no more weights or parameters involved. We would not update the weights in the vectors so we could consider this model frozen. Below, we show a more detailed example with word2vec.

In [13]:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import precision_score, recall_score, f1_score

class Word2VecClassifier(nn.Module):
    def __init__(self, embedding_matrix):
      # the Word2VecClassifier class is calling the constructor of the parent class, which is nn.Module
      super(Word2VecClassifier, self).__init__()
      # this is our embedding matrix, we are freezing the parameters
      self.embeddings = nn.Embedding.from_pretrained(torch.FloatTensor(embedding_matrix), freeze=False)
      # this is our classifier layer that we are adding to our pretrained word2vec
      # it is a fully connected layer that that takes input that is the size of the embedding vectors
      # and outputs a value of size 1 (single output)
      self.fc = nn.Linear(embedding_matrix.shape[1], 1)
      # The self.sigmoid instance is an activation function that squashes the output of self.fc to a value between 0 and 1.
      self.sigmoid = nn.Sigmoid()

    def forward(self, x):
      # takes the input x and passes it through an embedding layer, which maps the input to a high-dimensional space
      x = self.embeddings(x)
      # average over the sequence length dimension
      x = torch.mean(x, dim=1)
      x = self.fc(x)
      x = self.sigmoid(x)
      return x

## Preparing the data for training our classifier

The last notebook on classification showed how to use train_test_split. We can use the result of this spliting function to train our classifier here as well. The only thing we need to do is ensure that what we are working with are numbers (remember tensors cannot be strings). We will stick to using this same SMS Spam Classification dataset.

In [14]:
import pandas as pd
import numpy as np
import gensim
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split

# Load our pre-trained word2vec model
word2vec_model = gensim.models.KeyedVectors.load_word2vec_format('word2vec_cbow.bin', binary=True)
# Convert the embeddings to a PyTorch tensor
embedding_matrix = torch.tensor(word2vec_model.vectors)
model = Word2VecClassifier(embedding_matrix)


# Read in the CSV file using pandas
df = pd.read_csv('SMSSpamCollection', sep='\t', header=None)
# Replace 'ham' with 1 and 'spam' with 0
df.iloc[:,0] = df.iloc[:,0].replace({'ham': 0, 'spam': 1})

X = df.iloc[:,1]
y = df.iloc[:,0]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.15, random_state=1)


input_train = X_train.tolist()

tensors = []
for text in input_train:
    # tokenize the text
    tokens = text.split()
    # convert each token to its corresponding index in the Word2Vec model
    indices = [word2vec_model.key_to_index[token] for token in tokens if token in word2vec_model.key_to_index]
    # convert the indices to a PyTorch tensor
    tensor = torch.LongTensor(indices)
    tensors.append(tensor)

# pad the sequences to make them of equal length
train_input_tensor = torch.nn.utils.rnn.pad_sequence(tensors, batch_first=True)
train_target_tensor = torch.FloatTensor(y_train.tolist()).view(-1, 1)

  df.iloc[:,0] = df.iloc[:,0].replace({'ham': 0, 'spam': 1})


We will do the same for validation purposes.

In [15]:
# input and target tensors for training and validation
tensors = []
for text in X_test.tolist():
    # tokenize the text
    tokens = text.split()
    # convert each token to its corresponding index in the Word2Vec model
    indices = [word2vec_model.key_to_index[token] for token in tokens if token in word2vec_model.key_to_index]
    # convert the indices to a PyTorch tensor
    tensor = torch.LongTensor(indices)
    tensors.append(tensor)

val_input_tensor = torch.nn.utils.rnn.pad_sequence(tensors, batch_first=True)
val_target_tensor = torch.FloatTensor(y_test.tolist()).view(-1, 1)

Now, we are ready to fine-tune our classifier.

In [18]:
# define loss function and optimizer
# Binary Cross Entropy Loss
criterion = nn.BCELoss()
optimizer = optim.Adam(model.fc.parameters(), lr=2e-5)

# train the model for 10 epochs
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    optimizer.zero_grad()
    output_tensor = model(train_input_tensor)
    loss = criterion(output_tensor, train_target_tensor)
    loss.backward()
    optimizer.step()

    print('Epoch: {}, Loss: {:.4f}'.format(epoch+1, loss.item()))


# evaluate the model on the validation set
# When a model is in evaluation mode, disables some operations that are only
# used during training, such as dropout and batch normalization
model.eval()
with torch.no_grad():
    val_output_tensor = model(val_input_tensor)
    val_loss = criterion(val_output_tensor, val_target_tensor)

    predictions = (val_output_tensor > 0.5).int()
    accuracy = (predictions == val_target_tensor.int()).float().mean().item()
    # calculate precision, recall, and F1 score
    # print(val_output_tensor)
    val_precision = precision_score(val_target_tensor, predictions, zero_division=1)
    val_recall = recall_score(val_target_tensor, predictions, zero_division=1)
    val_f1 = f1_score(val_target_tensor, predictions, zero_division=1)

# report the metrics
print('Validation set metrics:')
print('Loss: {:.4f}, Accuracy: {:.4f}'.format(val_loss.item(), accuracy))
print('Precision: {:.4f}'.format(val_precision))
print('Recall: {:.4f}'.format(val_recall))
print('F1 score: {:.4f}'.format(val_f1))

Epoch: 1, Loss: 0.5955
Epoch: 2, Loss: 0.5948
Epoch: 3, Loss: 0.5942
Epoch: 4, Loss: 0.5935
Epoch: 5, Loss: 0.5928
Epoch: 6, Loss: 0.5922
Epoch: 7, Loss: 0.5915
Epoch: 8, Loss: 0.5908
Epoch: 9, Loss: 0.5902
Epoch: 10, Loss: 0.5895
Validation set metrics:
Loss: 0.5954, Accuracy: 0.8648
Precision: 1.0000
Recall: 0.0000
F1 score: 0.0000


In this example, when we pass new data to the text classifier, we do not actually update the parameters of our model.

What if we want to update? What if we don't want to keep all the original word2vec embeddings as they were in the pre-trained model?

We need to set requires_grad to True:

**self.embeddings.weight.requires_grad = True**
or update the freeze parameter in the embedding to False.

Now, the parameters will get updated through backpropagation beyond just that final fully connected layer (fc) we have for our classifier.

## **Another example with a more complex pre-trained model**

How about we try a more complex example? Some of you may want to use a language model like BERT or gpt-2. We won't go into detail right now about what these types of models actually are or how they work (that will come later), but for now lets just see what kind of data they might expect and try to load and run them.

Below we have loaded a pre-trained DistilBert model.

In [19]:
import torch
import torch.nn as nn

In [23]:
from transformers import DistilBertTokenizer, DistilBertForSequenceClassification

# Load the pre-trained DistilBert model and tokenizer
tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased')
model = DistilBertForSequenceClassification.from_pretrained('distilbert-base-uncased')

# Check the size of the model
model_size = sum(p.numel() for p in model.parameters() if p.requires_grad)
print("The DistilBERT model has", model_size, "parameters.")

print(f"Number of layers: {model.config.num_hidden_layers}")

Some weights of DistilBertForSequenceClassification were not initialized from the model checkpoint at distilbert-base-uncased and are newly initialized: ['classifier.bias', 'pre_classifier.weight', 'pre_classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


The DistilBERT model has 66955010 parameters.
Number of layers: 6


**Freezing the Layers**

If you want to freeze the parameters in the embedding layer and the first num_layers_to_freeze layers of the transformer, then you need both loops. If you only want to freeze parameters in the transformer layers and not in the embedding layer, you can skip the first loop.

In [21]:
for param in model.distilbert.embeddings.parameters():
    param.requires_grad = False

num_layers_to_freeze = 6
for i in range(num_layers_to_freeze):
    for param in model.distilbert.transformer.layer[i].parameters():
        param.requires_grad = False


The DistilBERT model expects to receive at minimum a sequence of text that has been encoded as a tensor of input ids as well as an attention mask, which is a series of 0s and 1s indicating which tokens should be paid attention to. We will indicate attention be paid to all of our tokens.

In [24]:
from sklearn.metrics import accuracy_score

# Prepare the training data (X is a list of input sequences, y is a list of labels)
X = ["The movie was great!", "I did not care for it."]
y = [1, 0]

# Tokenize the input sequences and convert them to PyTorch tensors
inputs = tokenizer(X, padding=True, truncation=True, return_tensors="pt")

# Define the cross-entropy loss function
criterion = torch.nn.CrossEntropyLoss()
# Define the optimizer for fine-tuning
optimizer = torch.optim.Adam(model.parameters(), lr=2e-5)

# Fine-tune the model on the binary classification task
for epoch in range(10):
    # Forward pass
    outputs = model(inputs['input_ids'], attention_mask=inputs['attention_mask'])[0]
    # Compute the loss
    loss = criterion(outputs, torch.tensor(y))
    # Backward pass and update parameters
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    # Print loss and accuracy
    preds = torch.argmax(outputs, dim=1)
    print(f"Epoch {epoch+1}: loss={loss.item():.4f}")

# can also add evaluation here
X_val = ["Another great example.", "A negative example."]
y_val = [1, 0]

val_inputs = tokenizer(X_val, padding=True, truncation=True, return_tensors="pt")

model.eval()
with torch.no_grad():
    val_outputs = model(**val_inputs).logits
    val_loss = criterion(val_outputs, torch.tensor(y_val))

    # Metrics
    val_predictions = torch.argmax(val_outputs, dim=1).numpy()

    val_accuracy = accuracy_score(y_val, val_predictions)
    val_precision = precision_score(y_val, val_predictions)
    val_recall = recall_score(y_val, val_predictions)
    val_f1 = f1_score(y_val, val_predictions)

# Print final evaluation metrics
print("Final Evaluation Metrics:")
print(f"Validation Loss: {val_loss.item():.4f}")
print(f"Accuracy: {val_accuracy:.4f}")
print(f"Precision: {val_precision:.4f}")
print(f"Recall: {val_recall:.4f}")
print(f"F1 Score: {val_f1:.4f}")

Epoch 1: loss=0.6977
Epoch 2: loss=0.6557
Epoch 3: loss=0.6069
Epoch 4: loss=0.5621
Epoch 5: loss=0.5154
Epoch 6: loss=0.4671
Epoch 7: loss=0.4197
Epoch 8: loss=0.3728
Epoch 9: loss=0.3245
Epoch 10: loss=0.2823
Final Evaluation Metrics:
Validation Loss: 0.6761
Accuracy: 0.5000
Precision: 0.5000
Recall: 1.0000
F1 Score: 0.6667


**Exercise: Explore the following pre-trained models and answer these questions:**

*   How big is the model once you load it?
*   How many layers do these models have and what are they referred to as (what are the named)?
*   How would you freeze the layers of these models?

In [26]:
# GPT-2
from transformers import GPT2Tokenizer, GPT2Model

# Load the pre-trained GPT-2 tokenizer and model
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
model = GPT2Model.from_pretrained('gpt2')

print(f"Number of layers: {model.config.num_hidden_layers}")

#options to freeze layers
for name, param in model.named_parameters():
    if 'h.11' not in name:  # layer 11 will not be frozen
        param.requires_grad = False

for i, param in enumerate(model.parameters()):
    if i < (model.config.num_hidden_layers - 1):
        param.requires_grad = False

for name, param in model.named_parameters():
    if 'classifier' not in name:
        param.requires_grad = False

vocab.json:   0%|          | 0.00/1.04M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/665 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/548M [00:00<?, ?B/s]

In [25]:
# RoBERTa
from transformers import RobertaModel, RobertaTokenizer

# Load the pretrained RoBERTa tokenizer and model
tokenizer = RobertaTokenizer.from_pretrained('roberta-base')
model = RobertaModel.from_pretrained('roberta-base')

print(f"Number of layers: {model.config.num_hidden_layers}")

#freeze all layers
for param in model.parameters():
    param.requires_grad = False

#freeze all layers except the last one
for name, param in model.named_parameters():
    if f'layer.{model.config.num_hidden_layers - 1}.' not in name:
        param.requires_grad = False

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/481 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/499M [00:00<?, ?B/s]

Some weights of RobertaModel were not initialized from the model checkpoint at roberta-base and are newly initialized: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


## **SpaCy Pre-trained Models**

In spaCy, TextCategorizer is a built-in component we can use to train a text classification model. It takes a sequence of text and produces a set of scores that indicate the likelihood of each label being correct, also referred to as `category scores` or `cats` for short.

In [None]:
import spacy
import random
from spacy.training.example import Example
from spacy.util import minibatch

#create an empty model
nlp = spacy.blank('en')

#add the TextCategorizer to the empty model
textcat = nlp.add_pipe('textcat')

#add labels to text classifier
textcat.add_label("NEGATIVE")
textcat.add_label("POSITIVE")

#load the training data
train_data = [
    ("The movie was great!", {"cats": {"POSITIVE": 1.0, "NEGATIVE": 0.0}}),
    ("This movie was terrible.", {"cats": {"POSITIVE": 0.0, "NEGATIVE": 1.0}})
]

#training the model
epochs = 10
optimizer = nlp.begin_training()
for i in range(epochs):
  losses = {}
  random.shuffle(train_data)
  for batch in minibatch(train_data, size=2):
    examples = []
    for text, labels in batch:
      doc = nlp.make_doc(text)
      examples.append(Example.from_dict(doc, labels))
    # Update model with texts and labels
    nlp.update(examples, sgd=optimizer, losses=losses)

text = "I do not recommend this movie."
doc = nlp(text)
print(doc.cats)

{'NEGATIVE': 0.6461510062217712, 'POSITIVE': 0.353848934173584}


So, above we were training our model from scratch. You can also use models in the pipeline that already exist.

One thing to note about training SpaCy pre-trained models (or any pre-trained models) is working out the type of data you are going to fine-tune with and if there is a risk of the model "forgetting" other important patterns because of how your data looks. If you are very particular about the labels you want to add for NER for example, you may want to make your solution rule-based instead of updating a model. The only way to know is through testing.

In [None]:
import spacy

nlp = spacy.load("en_core_web_sm")

#training data
train_data = [
("Patient is a 65-year-old male with a history of heart disease and diabetes.", {"entities": [(48, 61, "CONDITION"), (66, 74, "CONDITION")]}),
("The CT scan showed a large mass in the patient's right lung.", {"entities": [(4, 11, "EXAM"), (27, 31, "CONDITION"), (55, 59, "ORGAN")]}),
("The patient was diagnosed with Stage 4 breast cancer.", {"entities": [(39, 52, "CONDITION")]}),
("The patient's blood pressure was 140/90 mmHg.", {"entities": [(13, 27, "VITAL_SIGN"), (32, 43, "MEASUREMENT_UNIT")]}),
("The patient's potassium levels are low.", {"entities": [(21, 29, "VITAL_SIGN"), (31, 34, "CONDITION")]}),
("The patient was prescribed metformin for their diabetes.", {"entities": [(28, 36, "MEDICATION"), (47, 55, "CONDITION"), (22, 27, "MEDICATION_TYPE")]}),
("The patient has a family history of heart disease.", {"entities": [(24, 36, "CONDITION")]}),
("The patient's ECG showed an irregular heartbeat.", {"entities": [(14, 17, "EXAM"), (28, 47, "CONDITION")]}),
("The patient is allergic to penicillin.", {"entities": [(27, 37, "MEDICATION")]}),
("The patient had a myocardial infarction last year.", {"entities": [(18, 39, "CONDITION")]}),
]

#add NER component to the pipeline if it doesn't already exist
if "ner" not in nlp.pipe_names:
    ner = nlp.create_pipe("ner")
    nlp.add_pipe(ner, last=True)
else:
    ner = nlp.get_pipe("ner")

#add labels for the new entities
for _, annotations in train_data:
    for ent in annotations.get("entities"):
        ner.add_label(ent[2])

#disable other pipeline components during training to speed up the process
other_pipes = [pipe for pipe in nlp.pipe_names if pipe != "ner"]
with nlp.disable_pipes(*other_pipes):
    #train the model
    n_iter = 20
    for i in range(n_iter):
        losses = {}
        for text, annotations in train_data:
            doc = nlp.make_doc(text)
            example = spacy.training.Example.from_dict(doc, annotations)
            nlp.update([example], losses=losses, drop=0.5)
        print("Iteration:", i+1, "Loss:", losses["ner"])


Iteration: 1 Loss: 29.747752292840683
Iteration: 2 Loss: 29.756018845184634
Iteration: 3 Loss: 23.299985338741152
Iteration: 4 Loss: 21.618954238151808
Iteration: 5 Loss: 21.545523644398372
Iteration: 6 Loss: 19.230148042513644
Iteration: 7 Loss: 19.926596779377597
Iteration: 8 Loss: 19.484994174587996
Iteration: 9 Loss: 16.536360246348572
Iteration: 10 Loss: 18.266475674005072
Iteration: 11 Loss: 16.04015678047962
Iteration: 12 Loss: 14.124103000881684
Iteration: 13 Loss: 19.400561078768508
Iteration: 14 Loss: 17.933757668935577
Iteration: 15 Loss: 13.847345589983787
Iteration: 16 Loss: 10.168225680122228
Iteration: 17 Loss: 24.10209755130481
Iteration: 18 Loss: 11.883754546884083
Iteration: 19 Loss: 9.503610716204427
Iteration: 20 Loss: 19.927363326259997


In [None]:
#test new sentences
test_sentences = [
    "The patient has a history of heart disease and diabetes.",
    "A CT scan revealed a mass in the left lung.",
    "Blood pressure was measured at 130/80 mmHg.",
    "The patient is taking metformin for diabetes.",
]

for sentence in test_sentences:
    doc = nlp(sentence)
    print("\nText:", sentence)
    print("Entities:", [(ent.text, ent.label_) for ent in doc.ents])


Text: The patient has a history of heart disease and diabetes.
Entities: [('diabetes', 'CONDITION')]

Text: A CT scan revealed a mass in the left lung.
Entities: [('CT scan', 'EXAM'), ('mass', 'CONDITION'), ('lung.', 'CONDITION')]

Text: Blood pressure was measured at 130/80 mmHg.
Entities: []

Text: The patient is taking metformin for diabetes.
Entities: [('diabetes', 'CONDITION')]
