In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from transformers import GPT2Tokenizer, GPT2ForSequenceClassification, AdamW
from torch.utils.data import Dataset, DataLoader
import torch


Let's print our dependencies

In [2]:
from importlib.metadata import version

pkgs = ["matplotlib",
        "numpy",
        "torch",
        "pandas",
        "transformers" 
       ]
for p in pkgs:
    print(f"{p} version: {version(p)}")

matplotlib version: 3.8.2
numpy version: 1.26.4
torch version: 2.2.1+cu121
pandas version: 2.1.4
transformers version: 4.45.1


Creating our dataset

In [3]:
from pathlib import Path
DS_PATH = Path.cwd()/'data'

In [4]:

# Load the data
df = pd.read_csv(DS_PATH/'synth-itr1-n166.csv')

# Display class distribution
print(df['label'].value_counts())


label
Negative      122
Positive       34
Not Stated     10
Name: count, dtype: int64


In [5]:
# Encode labels
label_encoding = {'Positive': 0, 'Negative': 1, 'Not Stated': 2}

For the sake of simplicity, we're going to undersample the dataset so it contains 10 instances from each class

In [6]:
# Balance the dataset
def balance_dataset(df):
    min_class_count = df['label'].value_counts().min()
    balanced_df = df.groupby('label').apply(lambda x: x.sample(min_class_count)).reset_index(drop=True)
    return balanced_df

balanced_df = balance_dataset(df)
print(balanced_df['label'].value_counts())

label
Negative      10
Not Stated    10
Positive      10
Name: count, dtype: int64


In [7]:
balanced_df.head()

Unnamed: 0.1,Unnamed: 0,report_text,label
0,24,BILATERAL DIAGNOSTIC MAMMOGRAM\nClinical Histo...,Negative
1,115,BILATERAL DIAGNOSTIC MAMMOGRAM. History: Patie...,Negative
2,17,LEFT DIAGNOSTIC MAMMOGRAM\nClinical History: 7...,Negative
3,39,BILATERAL SCREENING MAMMOGRAM\nHistory: Screen...,Negative
4,122,BILATERAL DIAGNOSTIC MAMMOGRAM FOR SCREENING. ...,Negative


In [8]:
# Plot class distribution
plt.figure(figsize=(10, 5))
balanced_df['label'].value_counts().plot(kind='bar')
plt.title('Balanced Class Distribution')
plt.xlabel('Label')
plt.ylabel('Count')
plt.savefig('balanced_class_distribution.png')
plt.close()

In [9]:
balanced_df['label_encoded'] = balanced_df['label'].map(label_encoding)

In [10]:

# Split the data
train_df, temp_df = train_test_split(balanced_df, test_size=0.3, stratify=balanced_df['label'], random_state=42)
val_df, test_df = train_test_split(temp_df, test_size=0.5, stratify=temp_df['label'], random_state=42)


Creating dataloaders

In [11]:
# Initialize tokenizer and model
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
tokenizer.pad_token = tokenizer.eos_token
model = GPT2ForSequenceClassification.from_pretrained('gpt2', num_labels=3)
model.config.pad_token_id = model.config.eos_token_id

Some weights of GPT2ForSequenceClassification were not initialized from the model checkpoint at gpt2 and are newly initialized: ['score.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [12]:
model.config.pad_token_id

50256

In [13]:

# Custom dataset
class ReportDataset(Dataset):
    def __init__(self, dataframe, tokenizer, max_length):
        self.data = dataframe
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        text = self.data.iloc[idx]['report_text']
        label = self.data.iloc[idx]['label_encoded']
        
        encoding = self.tokenizer(text, truncation=True, padding='max_length', max_length=self.max_length, return_tensors='pt')
        
        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': torch.tensor(label, dtype=torch.long)
        }

In [14]:
# class ReportDataset(Dataset):
#     def __init__(self, dataframe, tokenizer, max_length=None, pad_token_id=model.config.pad_token_id):
#         self.data = dataframe

#         # Pre-tokenize texts
#         self.encoded_texts = [
#             tokenizer.encode(text) for text in self.data["report_text"]
#         ]

#         if max_length is None:
#             self.max_length = self._longest_encoded_length()
#         else:
#             self.max_length = max_length
#             # Truncate sequences if they are longer than max_length
#             self.encoded_texts = [
#                 encoded_text[:self.max_length]
#                 for encoded_text in self.encoded_texts
#             ]

#         # Pad sequences to the longest sequence
#         self.encoded_texts = [
#             encoded_text + [pad_token_id] * (self.max_length - len(encoded_text))
#             for encoded_text in self.encoded_texts
#         ]

#     def __getitem__(self, index):
#         encoded = self.encoded_texts[index]
#         label = self.data.iloc[index]["label_encoded"]
#         return (
#             torch.tensor(encoded, dtype=torch.long),
#             torch.tensor(label, dtype=torch.long)
#         )

#     def __len__(self):
#         return len(self.data)

#     def _longest_encoded_length(self):
#         max_length = 0
#         for encoded_text in self.encoded_texts:
#             encoded_length = len(encoded_text)
#             if encoded_length > max_length:
#                 max_length = encoded_length
#         return max_length

In [15]:
train_dataset = ReportDataset(train_df, tokenizer, max_length=1024)
print(train_dataset)

<__main__.ReportDataset object at 0x7fb37dfd28f0>


In [16]:
val_dataset = ReportDataset(val_df, tokenizer, max_length=train_dataset.max_length)
test_dataset = ReportDataset(test_df, tokenizer, max_length=train_dataset.max_length)

In [17]:
# Create datasets and dataloaders
num_workers = 4
batch_size = 8

In [18]:
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers, drop_last=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, num_workers=num_workers, drop_last=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, num_workers=num_workers, drop_last=False)

In [19]:
print(f"{len(train_loader)} training batches")
print(f"{len(val_loader)} validation batches")
print(f"{len(test_loader)} test batches")

2 training batches
1 validation batches
1 test batches


Onto model verification etc

In [20]:
model.config

GPT2Config {
  "_name_or_path": "gpt2",
  "activation_function": "gelu_new",
  "architectures": [
    "GPT2LMHeadModel"
  ],
  "attn_pdrop": 0.1,
  "bos_token_id": 50256,
  "embd_pdrop": 0.1,
  "eos_token_id": 50256,
  "id2label": {
    "0": "LABEL_0",
    "1": "LABEL_1",
    "2": "LABEL_2"
  },
  "initializer_range": 0.02,
  "label2id": {
    "LABEL_0": 0,
    "LABEL_1": 1,
    "LABEL_2": 2
  },
  "layer_norm_epsilon": 1e-05,
  "model_type": "gpt2",
  "n_ctx": 1024,
  "n_embd": 768,
  "n_head": 12,
  "n_inner": null,
  "n_layer": 12,
  "n_positions": 1024,
  "pad_token_id": 50256,
  "reorder_and_upcast_attn": false,
  "resid_pdrop": 0.1,
  "scale_attn_by_inverse_layer_idx": false,
  "scale_attn_weights": true,
  "summary_activation": null,
  "summary_first_dropout": 0.1,
  "summary_proj_to_labels": true,
  "summary_type": "cls_index",
  "summary_use_proj": true,
  "task_specific_params": {
    "text-generation": {
      "do_sample": true,
      "max_length": 50
    }
  },
  "transform

In [21]:
assert train_dataset.max_length <= model.config.n_positions, (
    f"Dataset length {train_dataset.max_length} exceeds model's context "
    f"length {model.config.n_positions}. Reinitialize data sets with "
    f"`max_length={model.config.n_positions}`"
)

In [22]:
for param in model.parameters():
    param.requires_grad = False

In [23]:
for name, param in model.named_parameters():
    if param.requires_grad: print(f"Parameter '{name}' is trainable")

In [24]:
for name, param in model.named_parameters(): 
    if param.requires_grad == False: print(f"Parameter `{name}` is not trainable") 

Parameter `transformer.wte.weight` is not trainable
Parameter `transformer.wpe.weight` is not trainable
Parameter `transformer.h.0.ln_1.weight` is not trainable
Parameter `transformer.h.0.ln_1.bias` is not trainable
Parameter `transformer.h.0.attn.c_attn.weight` is not trainable
Parameter `transformer.h.0.attn.c_attn.bias` is not trainable
Parameter `transformer.h.0.attn.c_proj.weight` is not trainable
Parameter `transformer.h.0.attn.c_proj.bias` is not trainable
Parameter `transformer.h.0.ln_2.weight` is not trainable
Parameter `transformer.h.0.ln_2.bias` is not trainable
Parameter `transformer.h.0.mlp.c_fc.weight` is not trainable
Parameter `transformer.h.0.mlp.c_fc.bias` is not trainable
Parameter `transformer.h.0.mlp.c_proj.weight` is not trainable
Parameter `transformer.h.0.mlp.c_proj.bias` is not trainable
Parameter `transformer.h.1.ln_1.weight` is not trainable
Parameter `transformer.h.1.ln_1.bias` is not trainable
Parameter `transformer.h.1.attn.c_attn.weight` is not trainable


In [25]:
model.score

Linear(in_features=768, out_features=3, bias=False)

In [26]:
for param in model.transformer.h[-1].parameters(): param.requires_grad = True
for param in model.transformer.ln_f.parameters(): param.required_grad = True

In [27]:
for name, param in model.named_parameters():
    if name.startswith('score'):
        param.required_grad = True

In [28]:
for name, param in model.named_parameters():
    if param.requires_grad: print(f"Parameter '{name}' is trainable")

Parameter 'transformer.h.11.ln_1.weight' is trainable
Parameter 'transformer.h.11.ln_1.bias' is trainable
Parameter 'transformer.h.11.attn.c_attn.weight' is trainable
Parameter 'transformer.h.11.attn.c_attn.bias' is trainable
Parameter 'transformer.h.11.attn.c_proj.weight' is trainable
Parameter 'transformer.h.11.attn.c_proj.bias' is trainable
Parameter 'transformer.h.11.ln_2.weight' is trainable
Parameter 'transformer.h.11.ln_2.bias' is trainable
Parameter 'transformer.h.11.mlp.c_fc.weight' is trainable
Parameter 'transformer.h.11.mlp.c_fc.bias' is trainable
Parameter 'transformer.h.11.mlp.c_proj.weight' is trainable
Parameter 'transformer.h.11.mlp.c_proj.bias' is trainable


In [29]:
for name, param in model.named_parameters(): 
    if param.requires_grad == False: print(f"Parameter `{name}` is not trainable") 

Parameter `transformer.wte.weight` is not trainable
Parameter `transformer.wpe.weight` is not trainable
Parameter `transformer.h.0.ln_1.weight` is not trainable
Parameter `transformer.h.0.ln_1.bias` is not trainable
Parameter `transformer.h.0.attn.c_attn.weight` is not trainable
Parameter `transformer.h.0.attn.c_attn.bias` is not trainable
Parameter `transformer.h.0.attn.c_proj.weight` is not trainable
Parameter `transformer.h.0.attn.c_proj.bias` is not trainable
Parameter `transformer.h.0.ln_2.weight` is not trainable
Parameter `transformer.h.0.ln_2.bias` is not trainable
Parameter `transformer.h.0.mlp.c_fc.weight` is not trainable
Parameter `transformer.h.0.mlp.c_fc.bias` is not trainable
Parameter `transformer.h.0.mlp.c_proj.weight` is not trainable
Parameter `transformer.h.0.mlp.c_proj.bias` is not trainable
Parameter `transformer.h.1.ln_1.weight` is not trainable
Parameter `transformer.h.1.ln_1.bias` is not trainable
Parameter `transformer.h.1.attn.c_attn.weight` is not trainable


I'm not sure if linear layer `score` is trainable?!

In [30]:
# Training function
def train(model, train_loader, val_loader, optimizer, num_epochs, device):
    model.to(device)
    
    train_losses = []
    val_losses = []
    
    for epoch in range(num_epochs):
        model.train()
        total_train_loss = 0
        
        for batch in train_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            
            optimizer.zero_grad()
            outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs.loss
            loss.backward()
            optimizer.step()
            
            total_train_loss += loss.item()
        
        avg_train_loss = total_train_loss / len(train_loader)
        train_losses.append(avg_train_loss)
        
        model.eval()
        total_val_loss = 0
        
        with torch.no_grad():
            for batch in val_loader:
                input_ids = batch['input_ids'].to(device)
                attention_mask = batch['attention_mask'].to(device)
                labels = batch['labels'].to(device)
                
                outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
                loss = outputs.loss
                
                total_val_loss += loss.item()
        
        avg_val_loss = total_val_loss / len(val_loader)
        val_losses.append(avg_val_loss)
        
        print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}')
    
    return train_losses, val_losses

In [31]:
# Train the model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
optimizer = AdamW(model.parameters(), lr=5e-5)
num_epochs = 3



In [32]:
train_losses, val_losses = train(model, train_loader, val_loader, optimizer, num_epochs, device)

Epoch 1/3, Train Loss: 4.2373, Val Loss: 3.8770
Epoch 2/3, Train Loss: 4.3680, Val Loss: 3.6446
Epoch 3/3, Train Loss: 3.1554, Val Loss: 3.4152


In [33]:
# Plot training and validation loss
plt.figure(figsize=(10, 5))
plt.plot(train_losses, label='Training Loss')
plt.plot(val_losses, label='Validation Loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.savefig('loss_plot.png')
plt.close()

In [34]:
# Evaluate on test set
model.eval()
test_loss = 0
correct = 0
total = 0

In [35]:
with torch.no_grad():
    for batch in test_loader:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        
        outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs.loss
        logits = outputs.logits
        
        test_loss += loss.item()
        _, predicted = torch.max(logits, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

avg_test_loss = test_loss / len(test_loader)
accuracy = 100 * correct / total

print(f'Test Loss: {avg_test_loss:.4f}, Accuracy: {accuracy:.2f}%')

Test Loss: 3.4377, Accuracy: 40.00%


In [50]:
def classify(text, model, tokenizer, device, max_length=1024):
    model.eval()

    # Prepare inputs to the model
    input_ids = tokenizer.encode(text, add_special_tokens=True)
    
    # Truncate sequences if they are too long
    input_ids = input_ids[:min(max_length, model.config.max_position_embeddings)]

    # Pad sequences to the max length
    padding_length = max_length - len(input_ids)
    input_ids = input_ids + ([tokenizer.pad_token_id] * padding_length)
    
    attention_mask = [1] * len(input_ids)
    
    # Convert to tensors and add batch dimension
    input_tensor = torch.tensor(input_ids).unsqueeze(0).to(device)
    attention_mask = torch.tensor(attention_mask).unsqueeze(0).to(device)

    # Model inference
    with torch.no_grad():
        outputs = model(input_tensor, attention_mask=attention_mask)
        logits = outputs.logits
    
    predicted_label = torch.argmax(logits, dim=1).item()

    # Map the predicted label to sentiment
    sentiment_map = {0: "Positive", 1: "Negative", 2: "Not Stated"}
    return sentiment_map[predicted_label]

In [51]:
# Assuming you have already loaded your model and tokenizer
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

text = """
There are scattered fibroglandular densities. No definite mass, architectural distortion, suspicious calcifications, or skin thickening are seen.
"""

sentiment = classify(text, model, tokenizer, device)
print(f"The sentiment of the text is: {sentiment}")


The sentiment of the text is: Not Stated
