# Load Libraries, Read Data, and Label Mapping


In [1]:
import math
import torch
import torch.nn as nn
import torchtext
import pandas as pd
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader
from tqdm import tqdm
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

df = pd.read_csv('spam_ham.csv')
df_train, df_test = train_test_split(df, test_size=0.2, random_state=42)
print(df_train.head())

     Category                                            Message
1978     spam  Reply to win £100 weekly! Where will the 2006 ...
3989      ham  Hello. Sort of out in town already. That . So ...
3935      ham   How come guoyang go n tell her? Then u told her?
4078      ham  Hey sathya till now we dint meet not even a si...
4086     spam  Orange brings you ringtones from all time Char...


In [2]:
labels = df_train["Category"].unique()
num_labels = len(labels)
label2id, id2label = dict(), dict()
for i, label in enumerate(labels):
    label2id[label] = i
    id2label[i] = label

print(id2label)
print(label2id)


{0: 'spam', 1: 'ham'}
{'spam': 0, 'ham': 1}


# Build Vocabulary

In [3]:
# Load tokenizer
tokenizer = get_tokenizer('basic_english')

text = 'this is text'
print(tokenizer(text))

['this', 'is', 'text']


In [4]:
# Initialize training data iterator
class TextIter(torch.utils.data.Dataset):

  def __init__(self, input_data):
      self.text = input_data['Message'].values.tolist()
  def __len__(self):
      return len(self.text)
  def __getitem__(self, idx):
      return self.text[idx]

# Build vocabulary
def yield_tokens(data_iter):
    for text in data_iter:
        yield tokenizer(text)

data_iter = TextIter(df_train)
vocab = build_vocab_from_iterator(yield_tokens(data_iter), specials=["<pad>", "<unk>"])
vocab.set_default_index(vocab["<unk>"])
print(vocab.get_stoi())



In [5]:
text_unk = 'this is jkjkj' # jkjkj is an unknown vocab
seq_unk = [vocab[word] for word in tokenizer(text_unk)]

print(tokenizer(text_unk))
print(seq_unk)

['this', 'is', 'jkjkj']
[49, 15, 1]


In [6]:
# We will use this example throughout the article
text = 'this is text' 
seq = [vocab[word] for word in tokenizer(text)]

print(tokenizer(text))
print(seq)

['this', 'is', 'text']
[49, 15, 81]


# Word Embedding

In [7]:
class Embeddings(nn.Module):
    def __init__(self, d_model, vocab_size):
        super(Embeddings, self).__init__()
        self.emb = nn.Embedding(vocab_size, d_model)
        self.d_model = d_model

    def forward(self, x):
        return self.emb(x) * math.sqrt(self.d_model)

In [8]:
hidden_size = 4

input_data = torch.LongTensor(seq).unsqueeze(0)
emb_model = Embeddings(hidden_size, len(vocab))
token_emb = emb_model(input_data) 
print(f'Size of token embedding: {token_emb.size()}')

Size of token embedding: torch.Size([1, 3, 4])


# Positional Encoding

In [9]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, vocab_size=5000, dropout=0.1):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(vocab_size, d_model)
        position = torch.arange(0, vocab_size, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(
            torch.arange(0, d_model, 2).float()
            * (-math.log(10000.0) / d_model)
        )
  
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer("pe", pe)

    def forward(self, x):
        x = x + self.pe[:, : x.size(1), :]
        return self.dropout(x)

In [10]:
pe_model = PositionalEncoding(d_model=4, vocab_size=len(vocab))
output_pe = pe_model(token_emb)

print(f'Size of output embedding: {output_pe.size()}')

Size of output embedding: torch.Size([1, 3, 4])


# Self-Attention

In [11]:
class SingleHeadAttention(nn.Module):
    def __init__(self, d_model, d_head_size):
        super().__init__()
        self.lin_key = nn.Linear(d_model, d_head_size, bias=False)
        self.lin_query = nn.Linear(d_model, d_head_size, bias=False)
        self.lin_value = nn.Linear(d_model, d_head_size, bias=False)
        self.d_model = d_model

    def forward(self, x):
        query = self.lin_query(x)
        key = self.lin_key(x)
        value = self.lin_value(x)

        scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(self.d_model)
        p_attn = scores.softmax(dim=-1)
        x = torch.matmul(p_attn, value)

        return x

In [12]:
class MultiHeadAttention(nn.Module):
    def __init__(self, h, d_model, dropout=0.1):
        super().__init__()
        assert d_model % h == 0
        d_k = d_model // h
        self.multi_head = nn.ModuleList([SingleHeadAttention(d_model, d_k) for _ in range(h)])
        self.lin_agg = nn.Linear(d_model, d_model)

    def forward(self, x):
        x = torch.cat([head(x) for head in self.multi_head], dim=-1)
        return self.lin_agg(x)

In [13]:
mult_att = MultiHeadAttention(h=2, d_model=4)
output_mult_att = mult_att(output_pe)

print(f'Size of output embedding after multi-head attention: {output_mult_att.size()}')

Size of output embedding after multi-head attention: torch.Size([1, 3, 4])


# Residual Connection

In [14]:
class LayerNorm(nn.Module):
    def __init__(self, d_model, eps=1e-6):
        super(LayerNorm, self).__init__()
        self.a_2 = nn.Parameter(torch.ones(d_model))
        self.b_2 = nn.Parameter(torch.zeros(d_model))
        self.eps = eps

    def forward(self, x):
        mean = x.mean(-1, keepdim=True)
        std = x.std(-1, keepdim=True)
        return self.a_2 * (x - mean) / (std + self.eps) + self.b_2

class ResidualConnection(nn.Module):
    def __init__(self, d_model, dropout=0.1):
        super().__init__()
        self.norm = LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x1, x2):
        return self.dropout(self.norm(x1 + x2))

In [15]:
res_conn_1 = ResidualConnection(d_model=4)
output_res_conn_1 = res_conn_1(output_pe, output_mult_att)

print(f'Size of output embedding after residual connection: {output_res_conn_1.size()}')

Size of output embedding after residual connection: torch.Size([1, 3, 4])


# Feed-Forward

In [16]:
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff, dropout=0.1):
        super().__init__()
        self.w_1 = nn.Linear(d_model, d_ff)
        self.w_2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        return self.w_2(self.dropout(self.w_1(x).relu()))

In [17]:
ff = FeedForward(d_model=4, d_ff=12)
output_ff = ff(output_res_conn_1)

print(f'Size of output embedding after feed-forward network: {output_ff.size()}')

Size of output embedding after feed-forward network: torch.Size([1, 3, 4])


In [18]:
res_conn_2 = ResidualConnection(d_model=4)
output_res_conn_2 = res_conn_2(output_res_conn_1, output_ff)

print(f'Size of output embedding after second residual: {output_res_conn_2.size()}')

Size of output embedding after second residual: torch.Size([1, 3, 4])


# Encoder Stack

In [19]:
class SingleEncoder(nn.Module):
    def __init__(self, d_model, self_attn, feed_forward, dropout):
        super().__init__()
        self.self_attn = self_attn
        self.feed_forward = feed_forward
        self.res_1 = ResidualConnection(d_model, dropout)
        self.res_2 = ResidualConnection(d_model, dropout)

        self.d_model = d_model

    def forward(self, x):
        x_attn = self.self_attn(x)
        x_res_1 = self.res_1(x, x_attn)
        x_ff = self.feed_forward(x_res_1)
        x_res_2 = self.res_2(x_res_1, x_ff)

        return x_res_2

In [20]:
class EncoderBlocks(nn.Module):
    def __init__(self, layer, N):
        super().__init__()
        self.layers = nn.ModuleList([layer for _ in range(N)])
        self.norm = LayerNorm(layer.d_model)

    def forward(self, x):
        for layer in self.layers:
            x = layer(x)
        return self.norm(x)

# Transformer Encoder Model

In [21]:
class TransformerEncoderModel(nn.Module):
    def __init__(self, vocab_size, d_model, nhead, d_ff, N,
                dropout=0.1):
        super().__init__()
        assert d_model % nhead == 0, "nheads must divide evenly into d_model"

        self.emb = Embeddings(d_model, vocab_size)
        self.pos_encoder = PositionalEncoding(d_model=d_model, vocab_size=vocab_size)

        attn = MultiHeadAttention(nhead, d_model)
        ff = FeedForward(d_model, d_ff, dropout)
        self.transformer_encoder = EncoderBlocks(SingleEncoder(d_model, attn, ff, dropout), N)
        self.classifier = nn.Linear(d_model, 2)
        self.d_model = d_model

    def forward(self, x):
        x = self.emb(x) * math.sqrt(self.d_model)
        x = self.pos_encoder(x)
        x = self.transformer_encoder(x)
        x = x.mean(dim=1)
        x = self.classifier(x)
        return x

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = TransformerEncoderModel(len(vocab), d_model=300, nhead=4, d_ff=50, 
                                    N=6, dropout=0.1).to(device)

# Dataloader

In [22]:
class TextDataset(torch.utils.data.Dataset):

  def __init__(self, input_data):        
      self.text = input_data['Message'].values.tolist()
      self.label = [int(label2id[i]) for i in input_data['Category'].values.tolist()]

  def __len__(self):
      return len(self.label)
    
  def get_sequence_token(self, idx):
      sequence = [vocab[word] for word in tokenizer(self.text[idx])]
      len_seq = len(sequence)
      return sequence
  
  def get_labels(self, idx):
      return self.label[idx]

  def __getitem__(self, idx):
      sequence = self.get_sequence_token(idx)
      label = self.get_labels(idx)
      return sequence, label

def collate_fn(batch):
    sequences, labels = zip(*batch)
    max_len = max(len(seq) for seq in sequences)
    
    # Initialize tensors with zeros and fill them with sequences and labels
    padded_sequences = torch.zeros((len(sequences), max_len), dtype=torch.long)
    padded_labels = torch.tensor(labels, dtype=torch.long)
    
    for i in range(len(sequences)):
        sequence = sequences[i]
        padded_sequences[i, :len(sequence)] = torch.tensor(sequence, dtype=torch.long)
    
    return padded_sequences, padded_labels

# Model Training

In [None]:
def train(model, dataset, epochs, lr, bs):

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam((p for p in model.parameters() 
      if p.requires_grad), lr=lr)
    train_dataset = TextDataset(dataset)
    train_dataloader = DataLoader(train_dataset, batch_size=bs, collate_fn=collate_fn, shuffle=True)
    
    # Training loop
    for epoch in range(epochs):
        total_loss_train = 0
        total_acc_train = 0   
        for train_sequence, train_label in tqdm(train_dataloader):
            
            # Model prediction
            predictions = model(train_sequence.to(device))
            labels = train_label.to(device)
            loss = criterion(predictions, labels)

            # Calculate accuracy and loss per batch
            correct = predictions.argmax(axis=1) == labels
            acc = correct.sum().item() / correct.size(0)
            total_acc_train += correct.sum().item()
            total_loss_train += loss.item()

            # Backprop
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
            optimizer.step()

        print(f'Epochs: {epoch + 1} | Loss: {total_loss_train / len(train_dataset): .3f} | Accuracy: {total_acc_train / len(train_dataset): .3f}')
    
epochs = 15
lr = 1e-4
batch_size = 4
train(model, df_train, epochs, lr, batch_size)

100%|██████████████████████████████████████████████████████████████████████████████| 1115/1115 [02:07<00:00,  8.74it/s]


Epochs: 1 | Loss:  0.092 | Accuracy:  0.905


100%|██████████████████████████████████████████████████████████████████████████████| 1115/1115 [02:16<00:00,  8.18it/s]


Epochs: 2 | Loss:  0.075 | Accuracy:  0.936


100%|██████████████████████████████████████████████████████████████████████████████| 1115/1115 [02:15<00:00,  8.21it/s]


Epochs: 3 | Loss:  0.063 | Accuracy:  0.934


100%|██████████████████████████████████████████████████████████████████████████████| 1115/1115 [02:41<00:00,  6.91it/s]


Epochs: 4 | Loss:  0.054 | Accuracy:  0.947


100%|██████████████████████████████████████████████████████████████████████████████| 1115/1115 [02:38<00:00,  7.06it/s]


Epochs: 5 | Loss:  0.048 | Accuracy:  0.944


100%|██████████████████████████████████████████████████████████████████████████████| 1115/1115 [02:39<00:00,  7.00it/s]


Epochs: 6 | Loss:  0.061 | Accuracy:  0.945


100%|██████████████████████████████████████████████████████████████████████████████| 1115/1115 [04:45<00:00,  3.90it/s]


Epochs: 7 | Loss:  0.050 | Accuracy:  0.951


100%|██████████████████████████████████████████████████████████████████████████████| 1115/1115 [02:31<00:00,  7.38it/s]


Epochs: 8 | Loss:  0.045 | Accuracy:  0.953


100%|██████████████████████████████████████████████████████████████████████████████| 1115/1115 [02:24<00:00,  7.69it/s]


Epochs: 9 | Loss:  0.048 | Accuracy:  0.948


100%|██████████████████████████████████████████████████████████████████████████████| 1115/1115 [02:28<00:00,  7.51it/s]


Epochs: 10 | Loss:  0.046 | Accuracy:  0.947


100%|██████████████████████████████████████████████████████████████████████████████| 1115/1115 [02:49<00:00,  6.58it/s]


Epochs: 11 | Loss:  0.050 | Accuracy:  0.946


 91%|███████████████████████████████████████████████████████████████████████▎      | 1019/1115 [02:26<00:15,  6.19it/s]

# Model Prediction

In [None]:
def predict(text):

  sequence = torch.tensor([vocab[word] for word in tokenizer(text)], dtype=torch.long).unsqueeze(0)
  output = model(sequence.to(device))
  prediction = id2label[output.argmax(axis=1).item()]

  return prediction

In [None]:
idx = 24
text = df_test['Message'].values.tolist()[idx]
gt = df_test['Category'].values.tolist()[idx]
prediction = predict(text)

print(f'Text: {text}')
print(f'Ground Truth: {gt}')
print(f'Prediction: {prediction}')

In [None]:
idx = 35
text = df_test['Message'].values.tolist()[idx]
gt = df_test['Category'].values.tolist()[idx]
prediction = predict(text)

print(f'Text: {text}')
print(f'Ground Truth: {gt}')
print(f'Prediction: {prediction}')