<a href="https://colab.research.google.com/github/raycmarange/AIML431New/blob/main/assing4_task1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, random_split
from torch.optim import AdamW  # Import from torch instead of transformers
from transformers import BertTokenizer, BertForSequenceClassification
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix, classification_report
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import os
import warnings
from tqdm import tqdm
import seaborn as sns

# Suppress warnings
warnings.filterwarnings('ignore')

class SpamDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length=128):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = str(self.texts[idx])
        label = self.labels[idx]

        encoding = self.tokenizer(
            text,
            truncation=True,
            padding='max_length',
            max_length=self.max_length,
            return_tensors='pt'
        )

        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': torch.tensor(label, dtype=torch.long)
        }

class BERTSpamClassifier:
    def __init__(self, model_name='bert-base-uncased', max_length=128, batch_size=16):
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        print(f"Using device: {self.device}")

        self.tokenizer = BertTokenizer.from_pretrained(model_name)
        self.model = BertForSequenceClassification.from_pretrained(model_name, num_labels=2)
        self.model.to(self.device)

        self.max_length = max_length
        self.batch_size = batch_size
        self.train_dataset = None
        self.test_dataset = None
        self.train_dataloader = None
        self.test_dataloader = None

    def load_data(self, file_path):
        """Load and preprocess the SMS spam dataset"""
        print(f"Looking for file at: {file_path}")

        try:
            df = pd.read_csv(file_path, delimiter='\t', header=None, names=['label', 'text'])
            print("Successfully loaded with tab delimiter")
        except:
            df = pd.read_csv(file_path, encoding='latin-1', header=None, names=['label', 'text'])
            print("Successfully loaded with latin-1 encoding")

        print(f"Dataset loaded: {len(df)} samples")
        print("Label distribution:")
        print(df['label'].value_counts())

        # Map labels to numerical values
        df['label'] = df['label'].map({'ham': 0, 'spam': 1})
        print("Label mapping result:")
        print(df['label'].value_counts())

        # Check for missing values
        print(f"Missing values: {df.isnull().sum()}")
        df = df.dropna()
        print(f"Final dataset size: {len(df)}")

        self.texts = df['text'].values
        self.labels = df['label'].values
        self.class_distribution = dict(df['label'].value_counts())
        print(f"Class distribution: {self.class_distribution}")

        return df

    def prepare_datasets(self, test_size=0.2):
        """Prepare train/test datasets"""
        dataset = SpamDataset(self.texts, self.labels, self.tokenizer, self.max_length)

        # Calculate split sizes
        test_size = int(len(dataset) * test_size)
        train_size = len(dataset) - test_size

        # Split dataset
        self.train_dataset, self.test_dataset = random_split(
            dataset, [train_size, test_size]
        )

        print(f"Training samples: {len(self.train_dataset)}")
        print(f"Testing samples: {len(self.test_dataset)}")

        # Create data loaders
        self.train_dataloader = DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=True)
        self.test_dataloader = DataLoader(self.test_dataset, batch_size=self.batch_size, shuffle=False)

        # Print test set distribution
        test_labels = [self.test_dataset[i]['labels'].item() for i in range(len(self.test_dataset))]
        test_distribution = {0: test_labels.count(0), 1: test_labels.count(1)}
        print(f"Test set class distribution: {test_distribution}")

        return self.train_dataloader, self.test_dataloader

    def train(self, epochs=3, learning_rate=2e-5):
        """Train the BERT model"""
        optimizer = AdamW(self.model.parameters(), lr=learning_rate)
        self.model.train()

        for epoch in range(epochs):
            total_loss = 0
            progress_bar = tqdm(self.train_dataloader, desc=f'Epoch {epoch+1}/{epochs}')

            for step, batch in enumerate(progress_bar):
                # Move batch to device
                input_ids = batch['input_ids'].to(self.device)
                attention_mask = batch['attention_mask'].to(self.device)
                labels = batch['labels'].to(self.device)

                # Zero gradients
                optimizer.zero_grad()

                # Forward pass
                outputs = self.model(
                    input_ids=input_ids,
                    attention_mask=attention_mask,
                    labels=labels
                )

                loss = outputs.loss
                total_loss += loss.item()

                # Backward pass
                loss.backward()
                optimizer.step()

                # Update progress bar
                if step % 10 == 0:
                    progress_bar.set_postfix({'Loss': f'{loss.item():.4f}'})

            avg_loss = total_loss / len(self.train_dataloader)
            print(f'Epoch {epoch+1}/{epochs}, Average Loss: {avg_loss:.4f}')

    def evaluate(self):
        """Evaluate the model on test set"""
        self.model.eval()
        predictions = []
        true_labels = []

        with torch.no_grad():
            for batch in tqdm(self.test_dataloader, desc='Evaluating'):
                input_ids = batch['input_ids'].to(self.device)
                attention_mask = batch['attention_mask'].to(self.device)
                labels = batch['labels'].to(self.device)

                outputs = self.model(
                    input_ids=input_ids,
                    attention_mask=attention_mask
                )

                logits = outputs.logits
                preds = torch.argmax(logits, dim=1)

                predictions.extend(preds.cpu().numpy())
                true_labels.extend(labels.cpu().numpy())

        # Calculate metrics
        accuracy = accuracy_score(true_labels, predictions)
        weighted_f1 = f1_score(true_labels, predictions, average='weighted')
        cm = confusion_matrix(true_labels, predictions)

        print("\n" + "="*50)
        print("EVALUATION RESULTS")
        print("="*50)
        print(f"Test Accuracy: {accuracy:.4f}")
        print(f"Weighted F1 Score: {weighted_f1:.4f}")
        print(f"Test set size: {len(true_labels)}")
        print(f"Unique labels in test set: {np.unique(true_labels)}")
        print(f"\nConfusion Matrix:\n{cm}")
        print(f"\nClassification Report:\n{classification_report(true_labels, predictions, target_names=['ham', 'spam'])}")

        return accuracy, weighted_f1, cm, predictions, true_labels

    def create_tsne_visualization(self, n_samples=1000, random_state=42):
        """Create t-SNE visualization of hidden states"""
        print("Creating t-SNE visualization...")

        # Use a subset if dataset is too large
        if len(self.test_dataset) > n_samples:
            indices = np.random.choice(len(self.test_dataset), n_samples, replace=False)
            subset_dataset = torch.utils.data.Subset(self.test_dataset, indices)
            dataloader = DataLoader(subset_dataset, batch_size=self.batch_size, shuffle=False)
            print(f"Using {n_samples} samples for t-SNE (random subset)")
        else:
            dataloader = self.test_dataloader
            print(f"Fitting TSNE with {len(self.test_dataset)} samples...")

        # Get hidden states
        self.model.eval()
        hidden_states = []
        labels_list = []

        with torch.no_grad():
            for batch in tqdm(dataloader, desc='Extracting hidden states'):
                input_ids = batch['input_ids'].to(self.device)
                attention_mask = batch['attention_mask'].to(self.device)
                batch_labels = batch['labels'].to(self.device)

                outputs = self.model(
                    input_ids=input_ids,
                    attention_mask=attention_mask,
                    output_hidden_states=True
                )

                # Use the last hidden state and average over sequence length
                last_hidden_state = outputs.hidden_states[-1]
                averaged_hidden = last_hidden_state.mean(dim=1)

                hidden_states.append(averaged_hidden.cpu().numpy())
                labels_list.append(batch_labels.cpu().numpy())

        hidden_states = np.vstack(hidden_states)
        labels = np.hstack(labels_list)

        # Apply t-SNE with error handling
        try:
            # Set environment variable to avoid the CPU core detection issue
            import os
            os.environ['LOKY_MAX_CPU_COUNT'] = '1'

            tsne = TSNE(n_components=2, random_state=random_state, verbose=1, n_jobs=1)
            hidden_2d = tsne.fit_transform(hidden_states)

            # Create visualization
            plt.figure(figsize=(12, 10))
            scatter = plt.scatter(hidden_2d[:, 0], hidden_2d[:, 1], c=labels, cmap='viridis', alpha=0.7)
            plt.colorbar(scatter, label='Label (0=ham, 1=spam)')
            plt.title('t-SNE Visualization of BERT Hidden States')
            plt.xlabel('t-SNE Component 1')
            plt.ylabel('t-SNE Component 2')
            plt.grid(True, alpha=0.3)

            # Add legend
            unique_labels = np.unique(labels)
            for label in unique_labels:
                plt.scatter([], [], c=['blue', 'orange'][label], label=f"{'ham' if label == 0 else 'spam'} ({np.sum(labels == label)})")
            plt.legend()

            plt.tight_layout()
            plt.show()

            return hidden_2d, labels

        except Exception as e:
            print(f"Error in t-SNE: {e}")
            print("Creating alternative visualization using PCA...")
            return self.create_pca_visualization(hidden_states, labels)

    def create_pca_visualization(self, hidden_states, labels):
        """Fallback visualization using PCA"""
        from sklearn.decomposition import PCA

        pca = PCA(n_components=2, random_state=42)
        hidden_2d = pca.fit_transform(hidden_states)

        plt.figure(figsize=(12, 10))
        scatter = plt.scatter(hidden_2d[:, 0], hidden_2d[:, 1], c=labels, cmap='viridis', alpha=0.7)
        plt.colorbar(scatter, label='Label (0=ham, 1=spam)')
        plt.title('PCA Visualization of BERT Hidden States (Fallback)')
        plt.xlabel('Principal Component 1')
        plt.ylabel('Principal Component 2')
        plt.grid(True, alpha=0.3)

        unique_labels = np.unique(labels)
        for label in unique_labels:
            plt.scatter([], [], c=['blue', 'orange'][label], label=f"{'ham' if label == 0 else 'spam'} ({np.sum(labels == label)})")
        plt.legend()

        plt.tight_layout()
        plt.show()

        return hidden_2d, labels

    def save_model(self, filepath):
        """Save the trained model"""
        torch.save(self.model.state_dict(), filepath)
        print(f"Model saved to {filepath}")

    def load_model(self, filepath):
        """Load a trained model"""
        self.model.load_state_dict(torch.load(filepath, map_location=self.device))
        print(f"Model loaded from {filepath}")

def main():
    # Initialize classifier
    classifier = BERTSpamClassifier(batch_size=16)

    # Load data
    file_path = "SMSSpamCollection.txt"
    classifier.load_data(file_path)

    # Prepare datasets
    classifier.prepare_datasets(test_size=0.2)

    # Train model
    print("Starting training...")
    classifier.train(epochs=3)

    # Evaluate model
    accuracy, f1, cm, predictions, true_labels = classifier.evaluate()

    # Create t-SNE visualization
    classifier.create_tsne_visualization(n_samples=1000)

    # Save model
    classifier.save_model("bert_spam_classifier_fixed.pth")

if __name__ == "__main__":
    main()

Using device: cpu


Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Looking for file at: SMSSpamCollection.txt
Successfully loaded with tab delimiter
Dataset loaded: 5572 samples
Label distribution:
label
ham     4825
spam     747
Name: count, dtype: int64
Label mapping result:
label
0    4825
1     747
Name: count, dtype: int64
Missing values: label    0
text     0
dtype: int64
Final dataset size: 5572
Class distribution: {0: np.int64(4825), 1: np.int64(747)}
Training samples: 4458
Testing samples: 1114
Test set class distribution: {0: 965, 1: 149}
Starting training...


Epoch 1/3:  97%|█████████▋| 272/279 [1:28:02<02:14, 19.19s/it, Loss=0.0151]