In [19]:
from model import Encoder  # Import your custom Encoder
from dataset import TextClassificationDataset  # Assuming you adapted this class based on previous instructions

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.optim.lr_scheduler import LambdaLR

import warnings
from tqdm import tqdm
import os
from pathlib import Path

from tokenizers import Tokenizer

from torch.utils.tensorboard import SummaryWriter

from transformers import BertTokenizer

import pandas as pd

from model import EncoderBlock, MultiHeadAttentionBlock, FeedForwardBlock, InputEmbeddings, PositionalEncoding

## Tokenize

In [26]:
def tokenize(text):
    """
    Tokenize the input text using the BERT tokenizer
    """
    # Load the BERT tokenizer
    tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
    # Tokenize the text
    input_ids = tokenizer.encode(text, return_tensors='pt')

    return tokenizer, input_ids

## Encode

In [34]:
def init_encoder(config):
    """
    Initialize the Transformer encoder
    """
    # Initialize the Transformer encoder layers (you can adjust the number of layers)
    encoder_layers = nn.ModuleList([EncoderBlock(MultiHeadAttentionBlock(config['d_model'], h=config['heads'], dropout=config['dropout_rate']),
                                                FeedForwardBlock(d_model=config['d_model'], d_ff=config['d_ff'], dropout=config['dropout_rate']),
                                                dropout=config['dropout_rate'])
                                    for _ in range(config['num_layers'])])
    encoder = Encoder(encoder_layers)

    return encoder

In [35]:
def encode_text(encoder, input_ids, config):
    """
    Initialize the model, the optimizer and the loss function
    """
    # Create the embedding and positional encoding layers
    input_embedding = InputEmbeddings(config['d_model'], config['vocab_size'])
    positional_encoding = PositionalEncoding(config['d_model'], config['seq_len'], config['dropout_rate'])

    # Encode the input text
    # convert the input_ids to embeddings
    embeddings = input_embedding(input_ids)
    # add the positional encoding to the embeddings
    embeddings = positional_encoding(embeddings)
    # pass the embeddings through the encoder
    encoded_text = encoder(embeddings, None)

    return encoded_text    

In [37]:
tokenizer, input_ids = tokenize("Hello, my dog is cute")

config = {
    "d_model": 512,
    "seq_len": 128,
    "vocab_size": tokenizer.vocab_size,
    "dropout_rate": 0.1,
    "heads": 8,
    "d_ff": 2048,
    "num_layers": 6
}

encoder = init_encoder(config)
encoded_text = encode_text(encoder, input_ids, config)
print(encoded_text)  # Expected output: torch.Size([1, 128, 512])


tensor([[[ 1.7268, -0.0136,  0.0512,  ..., -1.8737, -0.8607, -0.8110],
         [ 1.1094, -0.0577, -0.7989,  ..., -0.4510, -0.0106,  0.9494],
         [ 0.0223,  0.1444, -1.1096,  ...,  0.0805,  1.1550,  1.3365],
         ...,
         [-0.7944, -0.0942, -0.3358,  ..., -0.3985, -1.1086,  0.5394],
         [-1.3717,  1.6325, -0.1623,  ...,  1.7245,  1.5824, -0.4135],
         [-2.6252, -0.4823, -1.0174,  ...,  1.3007,  0.0369, -0.8003]]],
       grad_fn=<AddBackward0>)


## Train Classifier

In [40]:
class TextClassifier(nn.Module):
    def __init__(self, encoder, d_model, output_dim):
        super().__init__()
        self.encoder = encoder
        self.classifier = nn.Linear(d_model, output_dim)

    def forward(self, x, config):
        # Encode the input text using the provided encode_text function
        encoded_text = encode_text(self.encoder, x, config)
        # Use only the encoding of the [CLS] token for classification purposes
        cls_encoding = encoded_text[:, 0, :]  # Assuming [CLS] is at the first position
        # Pass the [CLS] encoding through the classifier
        return self.classifier(cls_encoding)

# Configuration for the encoder and classifier
config = {
    "d_model": 512,
    "seq_len": 128,
    "vocab_size": tokenizer.vocab_size,
    "dropout_rate": 0.1,
    "heads": 8,
    "d_ff": 2048,
    "num_layers": 6
}

# Initialize the encoder and classifier
encoder = init_encoder(config)
model = TextClassifier(encoder, config['d_model'], 1)  # '1' for binary classification

## Prepare dataset

In [43]:
from torch.utils.data import Dataset, DataLoader
import torch

class TextDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length):
        self.encodings = [tokenizer.encode(text, add_special_tokens=True, max_length=max_length, truncation=True, padding='max_length') for text in texts]
        self.labels = labels

    def __getitem__(self, idx):
        item = {'input_ids': torch.tensor(self.encodings[idx], dtype=torch.long),
                'labels': torch.tensor(self.labels[idx], dtype=torch.float)}
        return item

    def __len__(self):
        return len(self.labels)

In [44]:
from sklearn.model_selection import train_test_split
import pandas as pd

data = pd.read_csv('cleaned_data_sw.csv')
# Assuming data is your DataFrame containing 'tweet' and 'class'
# Shuffle the data
data = data.sample(frac=1).reset_index(drop=True)

# Split the data into training and temp data sets (80-20 split)
train_data, temp_data = train_test_split(data, test_size=0.2, random_state=42)

# Split the temp data into validation and test data sets (50-50 split of 20% total data)
validation_data, test_data = train_test_split(temp_data, test_size=0.5, random_state=42)

# Now you can create datasets for each split
train_dataset = TextDataset(train_data['tweet'].tolist(), train_data['class'].tolist(), tokenizer, config['seq_len'])
validation_dataset = TextDataset(validation_data['tweet'].tolist(), validation_data['class'].tolist(), tokenizer, config['seq_len'])
test_dataset = TextDataset(test_data['tweet'].tolist(), test_data['class'].tolist(), tokenizer, config['seq_len'])

# And DataLoaders for each dataset
train_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=True)
validation_dataloader = DataLoader(validation_dataset, batch_size=16, shuffle=False)
test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=False)

## Training Loop

In [47]:
num_epochs = 3
best_val_loss = float('inf')  # Initialize best validation loss for early stopping/checkpointing

from torch.optim import Adam
from torch.nn import BCEWithLogitsLoss

# Assuming 'model' is already instantiated
optimizer = Adam(model.parameters(), lr=5e-5)
loss_fn = BCEWithLogitsLoss()

for epoch in range(num_epochs):
    # Training phase
    model.train()
    total_loss = 0
    for batch in train_dataloader:
        optimizer.zero_grad()  # Use the optimizer instance
        input_ids = batch['input_ids']
        labels = batch['labels'].unsqueeze(1)  # Adjust dimensions for consistency
        outputs = model(input_ids, config)
        loss = loss_fn(outputs, labels)  # Use the loss function instance
        loss.backward()
        optimizer.step()  # Use the optimizer instance
        total_loss += loss.item()
    train_loss = total_loss / len(train_dataloader)
    
    # Validation phase
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for batch in validation_dataloader:
            input_ids = batch['input_ids']
            labels = batch['labels'].unsqueeze(1)
            outputs = model(input_ids, config)
            loss = loss_fn(outputs, labels)  # Use the loss function instance
            total_loss += loss.item()
    val_loss = total_loss / len(validation_dataloader)

    # Print metrics or store them for later
    print(f"Epoch {epoch+1}, Train Loss: {train_loss}, Validation Loss: {val_loss}")

    # Check if this is the best model based on validation loss
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        # Save the model
        torch.save(model.state_dict(), 'best_model_state.bin')


Epoch 1, Train Loss: 0.4555098480394771, Validation Loss: 0.47362861527550604
Epoch 2, Train Loss: 0.4554745435414295, Validation Loss: 0.4719776435244468
Epoch 3, Train Loss: 0.45463386024198227, Validation Loss: 0.47069941345722444


In [48]:
# Assuming your model architecture is correctly defined as `model`
model.load_state_dict(torch.load('best_model_state.bin'))

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')
model.to(device)  # Move model to the correct device
model.eval()  # Set the model to evaluation mode

Using device: cpu


TextClassifier(
  (encoder): Encoder(
    (layers): ModuleList(
      (0-5): 6 x EncoderBlock(
        (self_attention_block): MultiHeadAttentionBlock(
          (w_q): Linear(in_features=512, out_features=512, bias=True)
          (w_k): Linear(in_features=512, out_features=512, bias=True)
          (w_v): Linear(in_features=512, out_features=512, bias=True)
          (w_o): Linear(in_features=512, out_features=512, bias=True)
          (dropout): Dropout(p=0.1, inplace=False)
        )
        (feed_forward_block): FeedForwardBlock(
          (linear_1): Linear(in_features=512, out_features=2048, bias=True)
          (dropout): Dropout(p=0.1, inplace=False)
          (linear_2): Linear(in_features=2048, out_features=512, bias=True)
        )
        (residual_connections): ModuleList(
          (0-1): 2 x ResidualConnection(
            (dropout): Dropout(p=0.1, inplace=False)
            (norm): LayerNormalization()
          )
        )
      )
    )
    (norm): LayerNormalization(

In [51]:
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
import numpy as np

# Assuming 'test_dataloader' is already instantiated and properly setup
true_labels = []
predictions = []

with torch.no_grad():
    for batch in test_dataloader:
        input_ids = batch['input_ids'].to(device)
        labels = batch['labels'].unsqueeze(1).to(device)  # Adjust dimensions if necessary
        outputs = model(input_ids, config).squeeze()  # Remove extra dimensions from model outputs if necessary
        predicted_labels = torch.round(torch.sigmoid(outputs))  # Convert logits to binary predictions
        true_labels.extend(labels.detach().cpu().numpy())
        predictions.extend(predicted_labels.detach().cpu().numpy())

# Convert lists to numpy arrays for metric calculation
true_labels = np.array(true_labels)
predictions = np.array(predictions)

# Calculate metrics
accuracy = accuracy_score(true_labels, predictions)
precision, recall, f1, _ = precision_recall_fscore_support(true_labels, predictions, average='binary')

print(f'Accuracy: {accuracy:.4f}')
print(f'Precision: {precision:.4f}')
print(f'Recall: {recall:.4f}')
print(f'F1-Score: {f1:.4f}')

Accuracy: 0.8334
Precision: 0.0000
Recall: 0.0000
F1-Score: 0.0000


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
