In [None]:
from google.colab import files
uploaded = files.upload()

Saving amazon_cells_labelled.txt to amazon_cells_labelled.txt


In [None]:
pip install nltk



In [None]:
import nltk
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader
import numpy as np
from matplotlib import pyplot
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from nltk.corpus import stopwords
from nltk import word_tokenize
from sklearn.metrics import accuracy_score, confusion_matrix, precision_score, recall_score, classification_report
nltk.download('punkt')
nltk.download('stopwords')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [None]:
def preprocess_pandas(data, columns):
    df_ = pd.DataFrame(columns=columns)
    data['Sentence'] = data['Sentence'].str.lower()
    data['Sentence'] = data['Sentence'].replace('[a-zA-Z0-9-_.]+@[a-zA-Z0-9-_.]+', '', regex=True)                      # remove emails
    data['Sentence'] = data['Sentence'].replace('((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(\.|$)){4}', '', regex=True)    # remove IP address
    data['Sentence'] = data['Sentence'].str.replace('[^\w\s]','')                                                       # remove special characters
    data['Sentence'] = data['Sentence'].replace('\d', '', regex=True)                                                   # remove numbers
    for index, row in data.iterrows():
        word_tokens = word_tokenize(row['Sentence'])
        filtered_sent = [w for w in word_tokens if not w in stopwords.words('english')]
        new_row = pd.DataFrame([{
            "index": row['index'],
            "Class": row['Class'],
            "Sentence": " ".join(filtered_sent)
        }])
        df_ = pd.concat([df_, new_row], ignore_index=True)
    return data

# If this is the primary file that is executed (ie not an import of another file)

# get data, pre-process and split
data = pd.read_csv("amazon_cells_labelled.txt", delimiter='\t', header=None)
data.columns = ['Sentence', 'Class']
data['index'] = data.index                                          # add new column index
columns = ['index', 'Class', 'Sentence']
data = preprocess_pandas(data, columns)                             # pre-process
training_data, validation_data, training_labels, validation_labels = train_test_split( # split the data into training, validation, and test splits
    data['Sentence'].values.astype('U'),
    data['Class'].values.astype('int32'),
    test_size=0.10,
    random_state=0,
    shuffle=True
)

In [None]:
word_vectorizer = TfidfVectorizer(analyzer='word', ngram_range=(1,2), max_features=1500, max_df=0.5, use_idf=True, norm='l2')
training_data = word_vectorizer.fit_transform(training_data)        # transform texts to sparse matrix
training_data = training_data.todense()                             # convert to dense matrix for Pytorch
vocab_size = len(word_vectorizer.vocabulary_)
validation_data = word_vectorizer.transform(validation_data)
validation_data = validation_data.todense()
train_x_tensor = torch.from_numpy(np.array(training_data)).type(torch.FloatTensor)
train_y_tensor = torch.from_numpy(np.array(training_labels)).long()
validation_x_tensor = torch.from_numpy(np.array(validation_data)).type(torch.FloatTensor)
validation_y_tensor = torch.from_numpy(np.array(validation_labels)).long()

In [None]:
# Print the dimensions
print("Training data tensor dimensions:", train_x_tensor.shape)
print("Training labels tensor dimensions:", train_y_tensor.shape)
print("Validation data tensor dimensions:", validation_x_tensor.shape)
print("Validation labels tensor dimensions:", validation_y_tensor.shape)

Training data tensor dimensions: torch.Size([900, 1500])
Training labels tensor dimensions: torch.Size([900])
Validation data tensor dimensions: torch.Size([100, 1500])
Validation labels tensor dimensions: torch.Size([100])


In [None]:
import math
import os
from tempfile import TemporaryDirectory
from typing import Tuple

import torch
from torch import nn, Tensor
from torch.nn import TransformerEncoder, TransformerEncoderLayer
from torch.utils.data import dataset

class TransformerModel(nn.Module):

    def __init__(self, ntoken: int, d_model: int, nhead: int, d_hid: int,
                 nlayers: int, dropout: float = 0.5,num_classes: int = 2):
        super().__init__()
        self.model_type = 'Transformer'
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        encoder_layers = TransformerEncoderLayer(d_model, nhead, d_hid, dropout)
        self.transformer_encoder = TransformerEncoder(encoder_layers, nlayers)
        self.embedding = nn.Embedding(ntoken, d_model)
        self.dropout=nn.Dropout(dropout)
        self.d_model = d_model
        self.pooling=nn.AdaptiveAvgPool1d(1)
        self.linear = nn.Linear(d_model, num_classes)

        self.init_weights()

    def init_weights(self) -> None:
        initrange = 0.1
        self.embedding.weight.data.uniform_(-initrange, initrange)
        self.linear.bias.data.zero_()
        self.linear.weight.data.uniform_(-initrange, initrange)

    def forward(self, src: Tensor, src_mask: Tensor = None) -> Tensor:
        """
        Arguments:
            src: Tensor, shape ``[seq_len, batch_size]``
            src_mask: Tensor, shape ``[seq_len, seq_len]``

        Returns:
            output Tensor of shape ``[seq_len, batch_size, ntoken]``
        """
        src = self.embedding(src) * math.sqrt(self.d_model)
        src = self.pos_encoder(src)
        if src_mask is None:
            """Generate a square causal mask for the sequence. The masked positions are filled with float('-inf').
            Unmasked positions are filled with float(0.0).
            """
            src_mask = nn.Transformer.generate_square_subsequent_mask(len(src)).to(device)
        output = self.transformer_encoder(src, src_mask)
        output=self.dropout(output)
        pooled_output = self.pooling(output.permute(1, 2, 0))  # Permute dimensions for adaptive pooling
        pooled_output = pooled_output.squeeze(-1)  # Remove singleton dimension
        output = self.linear(pooled_output)

        return output

In [None]:
class PositionalEncoding(nn.Module):

    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x: Tensor) -> Tensor:
        """
        Arguments:
            x: Tensor, shape ``[seq_len, batch_size, embedding_dim]``
        """
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)

In [None]:
batch_size = 64
batch_size_eval=16
train_dataset = TensorDataset(train_x_tensor, train_y_tensor)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

validation_dataset = TensorDataset(validation_x_tensor, validation_y_tensor)
validation_loader = DataLoader(validation_dataset, batch_size=batch_size_eval, shuffle=False)

In [None]:
ntokens = vocab_size  # size of vocabulary
emsize = 200  # embedding dimension
d_hid = 200  # dimension of the feedforward network model in ``nn.TransformerEncoder``
nlayers = 2  # number of ``nn.TransformerEncoderLayer`` in ``nn.TransformerEncoder``
nhead = 2  # number of heads in ``nn.MultiheadAttention``
dropout = 0.2  # dropout probability

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = TransformerModel(ntokens, emsize, nhead, d_hid, nlayers, dropout).to(device)



In [None]:
import os
import time
import torch
import torch.nn as nn
import math
from torch.utils.data import DataLoader, TensorDataset
from tempfile import TemporaryDirectory

# Assuming model is predefined somewhere in your code
# and train_loader, validation_loader are DataLoader instances containing your training and validation data.

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1, gamma=0.95)

def train(model, train_loader, epoch):
    model.train()  # Turn on train mode
    total_loss = 0.0
    correct = 0
    total = 0

    log_interval = 200  # Adjust this as needed
    start_time = time.time()

    for batch_idx, (data, targets) in enumerate(train_loader):
        data = data.transpose(0,1).long()  # Ensure data is of type long
        targets = targets.long()
        optimizer.zero_grad()
        output = model(data)
        #print(f'Output shape: {output.shape}, Output data type: {output.dtype}')

        loss = criterion(output, targets)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
        optimizer.step()

        total_loss += loss.item()
        _,predicted=torch.max(output,1)
        total+=targets.size(0)
        correct +=(predicted==targets).sum().item()
    train_loss=total_loss/len(train_loader)
    train_acc=100. * correct/total

    return train_loss, train_acc

def evaluate(model, validation_loader):
    model.eval()
    total_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for data, targets in validation_loader:
            data = data.transpose(0,1).long()
            output = model(data)
            total_loss += criterion(output, targets).item()

            _, predicted= torch.max(output,1)
            total +=targets.size(0)
            correct += (predicted==targets).sum().item()
    val_loss=total_loss/len(validation_loader)
    val_acc=100.*correct/total
    return val_loss, val_acc

# Main training and validation loop
best_val_loss = float('inf')
epochs = 10  # Set the number of epochs

with TemporaryDirectory() as tempdir:
    best_model_params_path = os.path.join(tempdir, "best_model_params.pt")

    for epoch in range(1, epochs + 1):
        epoch_start_time = time.time()

        train_loss, train_acc = train(model, train_loader, epoch)
        val_loss, val_acc = evaluate(model, validation_loader)

        elapsed = time.time() - epoch_start_time

        print('-' * 89)
        print(f'| Epoch {epoch:3d}/{epochs:3d}, '
              f'Training Loss: {train_loss:.4f}, '
              f'Validation Loss: {val_loss:.4f}, '
              f'Training Accuracy: {train_acc:.2f}%, '
              f'Validation Accuracy: {val_acc:.2f}% |')
        print('-' * 89)

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), best_model_params_path)

        scheduler.step()

    model.load_state_dict(torch.load(best_model_params_path))  # Load the best model

-----------------------------------------------------------------------------------------
| Epoch   1/ 10, Training Loss: 1.1372, Validation Loss: 0.6925, Training Accuracy: 49.22%, Validation Accuracy: 53.00% |
-----------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------
| Epoch   2/ 10, Training Loss: 0.7075, Validation Loss: 0.7182, Training Accuracy: 50.78%, Validation Accuracy: 53.00% |
-----------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------
| Epoch   3/ 10, Training Loss: 0.7102, Validation Loss: 0.7051, Training Accuracy: 50.33%, Validation Accuracy: 53.00% |
-----------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------
| Ep