CNN + Self-Attention Text Classifier (PyTorch)

In [None]:
import time
start_time = time.time()
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    confusion_matrix, mean_absolute_error, matthews_corrcoef, r2_score
)
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
DATA_PATH = r"D:\Thesis\16-7-25\thesis 2.csv"
MAX_LEN = 100
BATCH_SIZE = 32
EPOCHS = 5
LR = 1e-3

df = pd.read_csv(DATA_PATH)
texts = df['Text'].astype(str).tolist()
labels = df['Label'].astype(int).tolist()
num_classes = len(set(labels))

tokenizer = lambda x: x.lower().split()
vocab = {'<PAD>': 0, '<UNK>': 1}
for text in texts:
    for word in tokenizer(text):
        if word not in vocab:
            vocab[word] = len(vocab)

def encode(text, max_len):
    tokens = tokenizer(text)
    ids = [vocab.get(token, vocab['<UNK>']) for token in tokens]
    ids = ids[:max_len] + [vocab['<PAD>']] * (max_len - len(ids))
    return ids

X = [encode(t, MAX_LEN) for t in texts]
X_train, X_test, y_train, y_test = train_test_split(X, labels, test_size=0.2, stratify=labels, random_state=42)

class TextDataset(Dataset):
    def __init__(self, inputs, labels):
        self.inputs = inputs
        self.labels = labels
    def __len__(self):
        return len(self.inputs)
    def __getitem__(self, idx):
        return {
            'input_ids': torch.tensor(self.inputs[idx], dtype=torch.long),
            'label': torch.tensor(self.labels[idx], dtype=torch.long)
        }

train_loader = DataLoader(TextDataset(X_train, y_train), batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(TextDataset(X_test, y_test), batch_size=BATCH_SIZE)

class SelfAttention(nn.Module):
    def __init__(self, embed_dim):
        super().__init__()
        self.query = nn.Linear(embed_dim, embed_dim)
        self.key = nn.Linear(embed_dim, embed_dim)
        self.value = nn.Linear(embed_dim, embed_dim)
        self.scale = embed_dim ** 0.5

    def forward(self, x):
        Q = self.query(x)
        K = self.key(x)
        V = self.value(x)
        scores = torch.matmul(Q, K.transpose(-2, -1)) / self.scale
        weights = torch.softmax(scores, dim=-1)
        return torch.matmul(weights, V)

class CNNAttentionClassifier(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_classes):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
        self.conv = nn.Conv1d(embed_dim, 128, kernel_size=5, padding=2)
        self.attn = SelfAttention(128)
        self.pool = nn.AdaptiveAvgPool1d(1)
        self.fc = nn.Linear(128, num_classes)

    def forward(self, x):
        x = self.embedding(x)                    # (B, T, E)
        x = x.permute(0, 2, 1)                   # (B, E, T)
        x = torch.relu(self.conv(x))            # (B, C, T)
        x = x.permute(0, 2, 1)                   # (B, T, C)
        x = self.attn(x)                         # (B, T, C)
        x = x.permute(0, 2, 1)                   # (B, C, T)
        x = self.pool(x).squeeze(-1)             # (B, C)
        return self.fc(x)

model = CNNAttentionClassifier(len(vocab), embed_dim=128, num_classes=num_classes).to(DEVICE)
optimizer = torch.optim.Adam(model.parameters(), lr=LR)
criterion = nn.CrossEntropyLoss()

train_acc_list = []

for epoch in range(EPOCHS):
    model.train()
    total_correct, total = 0, 0
    for batch in train_loader:
        inputs = batch['input_ids'].to(DEVICE)
        labels = batch['label'].to(DEVICE)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        preds = outputs.argmax(dim=1)
        total_correct += (preds == labels).sum().item()
        total += labels.size(0)
    acc = total_correct / total
    train_acc_list.append(acc)
    print(f"Epoch {epoch+1} | Train Accuracy: {acc:.4f}")

model.eval()
y_true, y_pred = [], []
with torch.no_grad():
    for batch in test_loader:
        inputs = batch['input_ids'].to(DEVICE)
        labels = batch['label'].to(DEVICE)
        outputs = model(inputs)
        preds = outputs.argmax(dim=1)
        y_true.extend(labels.cpu().numpy())
        y_pred.extend(preds.cpu().numpy())

acc = accuracy_score(y_true, y_pred)
prec = precision_score(y_true, y_pred, average='macro')
rec = recall_score(y_true, y_pred, average='macro')
f1 = f1_score(y_true, y_pred, average='macro')
mae = mean_absolute_error(y_true, y_pred)
mcc = matthews_corrcoef(y_true, y_pred)
r2 = r2_score(y_true, y_pred)

print(f"\nFinal Test Metrics:")
print(f"Accuracy : {acc:.4f}")
print(f"Precision: {prec:.4f}")
print(f"Recall   : {rec:.4f}")
print(f"F1 Score : {f1:.4f}")
print(f"MAE      : {mae:.4f}")
print(f"MCC      : {mcc:.4f}")
print(f"R² Score : {r2:.4f}")

end_time = time.time()
total_time = end_time - start_time
minutes = total_time // 60
seconds = total_time % 60
print(f"\nTotal time taken: {int(minutes)} minutes and {int(seconds)} seconds")


plt.plot(range(1, EPOCHS + 1), train_acc_list, marker='o')
plt.title("Training Accuracy")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.grid()
plt.show()

cm = confusion_matrix(y_true, y_pred)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.title("Confusion Matrix")
plt.xlabel("Predicted")
plt.ylabel("Actual")
plt.show()


LSTM + ATTENTION

In [None]:
import time
start_time = time.time()

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    confusion_matrix, mean_absolute_error, matthews_corrcoef, r2_score
)
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
DATA_PATH = r"D:\Thesis\16-7-25\thesis 2.csv"
MAX_LEN = 100
BATCH_SIZE = 32
EPOCHS = 5
LR = 1e-3

# Load data
df = pd.read_csv(DATA_PATH)
texts = df['Text'].astype(str).tolist()
labels = df['Label'].astype(int).tolist()
num_classes = len(set(labels))

# Simple tokenizer + build vocab
tokenizer = lambda x: x.lower().split()
vocab = {'<PAD>':0, '<UNK>':1}
for text in texts:
    for word in tokenizer(text):
        if word not in vocab:
            vocab[word] = len(vocab)

def encode(text, max_len):
    tokens = tokenizer(text)
    ids = [vocab.get(t, vocab['<UNK>']) for t in tokens]
    ids = ids[:max_len] + [vocab['<PAD>']]*(max_len - len(ids))
    return ids

X = [encode(t, MAX_LEN) for t in texts]
X_train, X_test, y_train, y_test = train_test_split(
    X, labels, test_size=0.2, stratify=labels, random_state=42
)

class TextDataset(Dataset):
    def __init__(self, inputs, labels):
        self.inputs = inputs
        self.labels = labels
    def __len__(self):
        return len(self.inputs)
    def __getitem__(self, idx):
        return {
            'input_ids': torch.tensor(self.inputs[idx], dtype=torch.long),
            'label': torch.tensor(self.labels[idx], dtype=torch.long)
        }

train_loader = DataLoader(TextDataset(X_train, y_train), batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(TextDataset(X_test, y_test), batch_size=BATCH_SIZE)

# Self-attention layer
class SelfAttention(nn.Module):
    def __init__(self, hidden_dim):
        super().__init__()
        self.query = nn.Linear(hidden_dim, hidden_dim)
        self.key = nn.Linear(hidden_dim, hidden_dim)
        self.value = nn.Linear(hidden_dim, hidden_dim)
        self.scale = hidden_dim ** 0.5

    def forward(self, x):
        Q = self.query(x)  # (B, T, H)
        K = self.key(x)
        V = self.value(x)
        scores = torch.matmul(Q, K.transpose(-2, -1)) / self.scale  # (B, T, T)
        weights = torch.softmax(scores, dim=-1)                    # (B, T, T)
        out = torch.matmul(weights, V)                             # (B, T, H)
        return out

# Model: Embedding + BiLSTM + SelfAttention + Classifier
class LSTMAttentionClassifier(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_classes):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
        self.lstm = nn.LSTM(embed_dim, hidden_dim, batch_first=True, bidirectional=True)
        self.attention = SelfAttention(hidden_dim*2)
        self.dropout = nn.Dropout(0.3)
        self.fc = nn.Linear(hidden_dim*2, num_classes)

    def forward(self, x):
        emb = self.embedding(x)             # (B, T, E)
        lstm_out, _ = self.lstm(emb)       # (B, T, 2*H)
        attn_out = self.attention(lstm_out) # (B, T, 2*H)
        # Pooling (mean over time)
        pooled = attn_out.mean(dim=1)       # (B, 2*H)
        dropped = self.dropout(pooled)
        logits = self.fc(dropped)           # (B, num_classes)
        return logits

model = LSTMAttentionClassifier(
    vocab_size=len(vocab),
    embed_dim=128,
    hidden_dim=64,
    num_classes=num_classes
).to(DEVICE)

optimizer = torch.optim.Adam(model.parameters(), lr=LR)
criterion = nn.CrossEntropyLoss()

train_acc_list = []

for epoch in range(EPOCHS):
    model.train()
    total_correct, total = 0, 0
    total_loss = 0
    for batch in train_loader:
        inputs = batch['input_ids'].to(DEVICE)
        labels = batch['label'].to(DEVICE)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        preds = outputs.argmax(dim=1)
        total_correct += (preds == labels).sum().item()
        total += labels.size(0)
        total_loss += loss.item()
    acc = total_correct / total
    train_acc_list.append(acc)
    print(f"Epoch {epoch+1} | Loss: {total_loss/len(train_loader):.4f} | Train Accuracy: {acc:.4f}")

model.eval()
y_true, y_pred = [], []
with torch.no_grad():
    for batch in test_loader:
        inputs = batch['input_ids'].to(DEVICE)
        labels = batch['label'].to(DEVICE)
        outputs = model(inputs)
        preds = outputs.argmax(dim=1)
        y_true.extend(labels.cpu().numpy())
        y_pred.extend(preds.cpu().numpy())

acc = accuracy_score(y_true, y_pred)
prec = precision_score(y_true, y_pred, average='macro')
rec = recall_score(y_true, y_pred, average='macro')
f1 = f1_score(y_true, y_pred, average='macro')
mae = mean_absolute_error(y_true, y_pred)
mcc = matthews_corrcoef(y_true, y_pred)
r2 = r2_score(y_true, y_pred)

print(f"\nFinal Test Metrics:")
print(f"Accuracy : {acc:.4f}")
print(f"Precision: {prec:.4f}")
print(f"Recall   : {rec:.4f}")
print(f"F1 Score : {f1:.4f}")
print(f"MAE      : {mae:.4f}")
print(f"MCC      : {mcc:.4f}")
print(f"R² Score : {r2:.4f}")

end_time = time.time()
total_time = end_time - start_time
minutes = total_time // 60
seconds = total_time % 60
print(f"\nTotal time taken: {int(minutes)} minutes and {int(seconds)} seconds")

plt.plot(range(1, EPOCHS+1), train_acc_list, marker='o')
plt.title("Training Accuracy")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.grid()
plt.show()

cm = confusion_matrix(y_true, y_pred)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.title("Confusion Matrix")
plt.xlabel("Predicted")
plt.ylabel("Actual")
plt.show()
