In [1]:
import os
import sys
import glob
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from nltk.tokenize import TweetTokenizer
from collections import Counter
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split

In [2]:
# Define the LSTM model for classification
class TextLSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_size, num_classes, num_layers=1, dropout_rate=0.8):
        super(TextLSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        # Embedding layer
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        
        # LSTM layer
        self.lstm = nn.LSTM(embedding_dim, hidden_size, num_layers, batch_first=True, dropout=dropout_rate)
        
        # Dropout layer to prevent overfitting
        self.dropout = nn.Dropout(dropout_rate)

        # Fully connected layer for classification
        self.fc = nn.Linear(hidden_size, num_classes)
        
    def forward(self, x):
        # Embedding
        embedded = self.embedding(x)
        
        # Initialize hidden state and cell state with zeros
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        
        # Forward propagate LSTM
        out, _ = self.lstm(embedded, (h0, c0))

        # Apply dropout
        out = self.dropout(out[:, -1, :])  # Get the last time step output
        
        # Pass the output of the last time step to the classifier
        out = self.fc(out)
        
        return out

In [3]:
# Custom dataset for text files with labels
class TextDataset(Dataset):
    def __init__(self, file_ids, labels, file_dir, tokenizer, word_to_idx, max_length=20):
        self.file_ids = file_ids
        self.labels = labels
        self.file_dir = file_dir
        self.tokenizer = tokenizer
        self.word_to_idx = word_to_idx
        self.max_length = max_length
        
    def __len__(self):
        return len(self.file_ids)
    
    def __getitem__(self, idx):
        file_id = self.file_ids[idx]
        label = self.labels[idx]
        
        # Read text file
        file_path = os.path.join(self.file_dir, f"{file_id}.txt")
        with open(file_path, 'r', encoding='utf-8') as f:
            text = f.read()
        
        # Normalize and Tokenize
        tokens = self.tokenizer.tokenize(text.lower())
        
        # Convert tokens to indices
        indices = [self.word_to_idx.get(token, self.word_to_idx['<UNK>']) for token in tokens]
        
        # Truncate or pad sequence
        if len(indices) > self.max_length:
            indices = indices[:self.max_length]
        else:
            indices = indices + [self.word_to_idx['<PAD>']] * (self.max_length - len(indices))
            
        return torch.tensor(indices, dtype=torch.long), torch.tensor(label, dtype=torch.long)


In [4]:
# Build vocabulary from all text files
def build_vocabulary(file_paths, tokenizer, min_freq=2, vocab_file='vocabulary_lstm.txt'):
    print("Building vocabulary...")

    # load vocabulary from file if it exists
    if os.path.exists(vocab_file):
        print(f"Vocabulary file {vocab_file} already exists. Loading...")
        word_to_idx = {}
        with open(vocab_file, 'r', encoding='utf-8') as f:
            for line in f:
                word, idx = line.strip().split('\t')
                word_to_idx[word] = int(idx)
        return word_to_idx
    
    # Count word frequencies
    word_counts = Counter()
    
    for file_path in file_paths:
        with open(file_path, 'r', encoding='utf-8') as f:
            text = f.read()
        tokens = tokenizer.tokenize(text.lower())
        word_counts.update(tokens)
    
    # Filter words by frequency
    words = [word for word, count in word_counts.items() if count >= min_freq]
    
    # Add special tokens
    word_to_idx = {'<PAD>': 0, '<UNK>': 1}
    for word in words:
        word_to_idx[word] = len(word_to_idx)

    # save vocabulary to file
    with open(vocab_file, 'w', encoding='utf-8') as f:
        for word, idx in word_to_idx.items():
            f.write(f"{word}\t{idx}\n")
    print(f"Vocabulary saved to {vocab_file}")
    
    return word_to_idx

In [5]:
def train_model(model, train_loader, val_loader, criterion, optimizer, device, patience=3, num_epochs=10):
    model.to(device)
    
    best_val_accuracy = 0.0
    patience_counter = 0
    best_model_state = None
    
    for epoch in range(num_epochs):
        # Training phase
        model.train()
        total_train_loss = 0
        train_predictions = []
        train_labels = []
        
        for batch_idx, (data, labels) in enumerate(train_loader):
            data, labels = data.to(device), labels.to(device)

            optimizer.zero_grad()
            
            # Forward pass
            outputs = model(data)
            loss = criterion(outputs, labels)
            
            # Backward and optimize
            loss.backward()
            optimizer.step()
            
            total_train_loss += loss.item()
            
            # Get predictions
            _, predicted = torch.max(outputs.data, 1)
            train_predictions.extend(predicted.cpu().numpy())
            train_labels.extend(labels.cpu().numpy())
            
            if (batch_idx + 1) % 10 == 0:
                print(f'Epoch [{epoch+1}/{num_epochs}], Batch [{batch_idx+1}/{len(train_loader)}], Loss: {loss.item():.4f}')
        
        # Calculate training metrics
        train_accuracy = accuracy_score(train_labels, train_predictions)
        avg_train_loss = total_train_loss / len(train_loader)
        
        # Validation phase
        model.eval()
        total_val_loss = 0
        val_predictions = []
        val_labels = []
        
        with torch.no_grad():
            for data, labels in val_loader:
                data, labels = data.to(device), labels.to(device)
                
                # Forward pass
                outputs = model(data)
                loss = criterion(outputs, labels)
                
                total_val_loss += loss.item()
                
                # Get predictions
                _, predicted = torch.max(outputs.data, 1)
                val_predictions.extend(predicted.cpu().numpy())
                val_labels.extend(labels.cpu().numpy())
        
        # Calculate validation metrics
        val_accuracy = accuracy_score(val_labels, val_predictions)
        avg_val_loss = total_val_loss / len(val_loader)
        
        print(f'Epoch [{epoch+1}/{num_epochs}], '
              f'Train Loss: {avg_train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}, '
              f'Val Loss: {avg_val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}')
        
        # Early stopping check
        if val_accuracy > best_val_accuracy:
            best_val_accuracy = val_accuracy
            best_model_state = model.state_dict().copy()
            patience_counter = 0
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print(f'Early stopping triggered after {epoch+1} epochs')
                break
    
    # Load the best model state
    if best_model_state is not None:
        model.load_state_dict(best_model_state)
    
    return model, best_val_accuracy

In [6]:
def evaluate_model(model, test_loader, criterion, device):
    model.to(device)
    model.eval()
    
    total_loss = 0
    all_predictions = []
    all_labels = []
    
    with torch.no_grad():
        for data, labels in test_loader:
            data, labels = data.to(device), labels.to(device)
            
            # Forward pass
            outputs = model(data)
            loss = criterion(outputs, labels)
            
            total_loss += loss.item()
            
            # Get predictions
            _, predicted = torch.max(outputs.data, 1)
            all_predictions.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    # Calculate metrics
    accuracy = accuracy_score(all_labels, all_predictions)
    avg_loss = total_loss / len(test_loader)
    
    print(f'Test Loss: {avg_loss:.4f}, Test Accuracy: {accuracy:.4f}')
    return accuracy, avg_loss, all_predictions

In [7]:
def process_data_with_splits(model_type='lstm_with_dropout', train=True):
    # Configuration
    config = {
        'val_split_ratio': 0.15,  
        'test_split_ratio': 0.15,
        'seed': 42,
        'batch_size': 16,
        'embedding_dim': 100,
        'hidden_size': 128,
        'num_layers': 2,
        'learning_rate': 0.001,
        'num_epochs': 15,
        'patience': 3
    }
    
    # Set device
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")
    
    # Initialize NLTK tokenizer
    tokenizer = TweetTokenizer()
    
    # Load labels from CSV
    labels_df = pd.read_csv('../../label.csv')
    print(f"Loaded {len(labels_df)} labels from CSV")

    df = labels_df.copy().dropna(how='all')
    df['ID'] = df['ID'].astype(int)
    df['class'] = df['class'].astype(int)
    
    # # Map label text to numerical values
    label_map = {'negative': 0, 'neutral': 1, 'positive': 2}
    # labels_df['text_numeric_label'] = labels_df['class'].apply(lambda x: label_map.get(x.lower(), 0))
    
    # # Create a dataframe for splitting
    # df = labels_df[['ID', 'text_numeric_label']].rename(columns={'numeric_label': 'text_label'})
    
    # Split the data into train, validation, and test sets
    print("Splitting data...")
    val_test_size = config['val_split_ratio'] + config['test_split_ratio']
    if val_test_size >= 1.0:
        print("Error: Sum of validation and test split ratios must be less than 1.")
        sys.exit(1)

    # Adjust test size relative to the remaining data after validation split
    relative_test_size = config['test_split_ratio'] / (1.0 - config['val_split_ratio'])

    try:
        # Split into train and temp (val + test)
        train_df, temp_df = train_test_split(
            df,
            test_size=val_test_size,
            random_state=config['seed'],
            stratify=df['label'] # Stratify if labels are imbalanced
        )
        # Split temp into val and test
        val_df, test_df = train_test_split(
            temp_df,
            test_size=relative_test_size,
            random_state=config['seed'],
            stratify=temp_df['label'] # Stratify if labels are imbalanced
        )
    except Exception as e:
        print(f"Error during data splitting: {e}. Check split ratios and data.")
        # Might happen if a label class has too few samples for stratification
        print("Attempting split without stratification...")
        try:
            train_df, temp_df = train_test_split(df, test_size=val_test_size, random_state=config['seed'])
            val_df, test_df = train_test_split(temp_df, test_size=relative_test_size, random_state=config['seed'])
        except Exception as e_nostrat:
            print(f"Error during non-stratified split: {e_nostrat}.")
            sys.exit(1)
    
    print(f"Train set: {len(train_df)} samples")
    print(f"Validation set: {len(val_df)} samples")
    print(f"Test set: {len(test_df)} samples")
    
    # Create full file paths for building vocabulary
    train_ids = train_df['ID'].astype(int).values
    val_ids = val_df['ID'].astype(int).values
    test_ids = test_df['ID'].astype(int).values
    
    train_labels = train_df['class'].values
    val_labels = val_df['class'].values
    test_labels = test_df['class'].values
    
    # Build vocabulary from training data only to prevent data leakage
    file_paths = [f"../../raw_data/{id}.txt" for id in train_ids if os.path.exists(f"../../raw_data/{id}.txt")]
    word_to_idx = build_vocabulary(file_paths, tokenizer)
    vocab_size = len(word_to_idx)
    print(f"Vocabulary size: {vocab_size}")
    
    # Create datasets
    train_dataset = TextDataset(train_ids, train_labels, '../../raw_data', tokenizer, word_to_idx)
    val_dataset = TextDataset(val_ids, val_labels, '../../raw_data', tokenizer, word_to_idx)
    test_dataset = TextDataset(test_ids, test_labels, '../../raw_data', tokenizer, word_to_idx)
    
    # Create data loaders
    train_loader = DataLoader(train_dataset, batch_size=config['batch_size'], shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=config['batch_size'], shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=config['batch_size'], shuffle=False)
    
    # Initialize the model based on type
    num_classes = 3  # negative (0), neutral (1), positive (2)
    # if model_type.lower() == 'lstm':
    model = TextLSTM(vocab_size, config['embedding_dim'], config['hidden_size'], num_classes, config['num_layers'])
    print("Using LSTM model")
    # else:
    #     model = TextRNN(vocab_size, config['embedding_dim'], config['hidden_size'], num_classes, config['num_layers'])
    #     print("Using RNN model")
    
    # Define loss function and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=config['learning_rate'])
    
    # Train the model if requested
    if train:
        print("Starting training...")
        model, best_val_acc = train_model(
            model, 
            train_loader, 
            val_loader, 
            criterion, 
            optimizer, 
            device, 
            patience=config['patience'], 
            num_epochs=config['num_epochs']
        )
        
        # Save the trained model
        model_save_path = f"{model_type}_text_classifier.pth"
        torch.save({
            'model_state_dict': model.state_dict(),
            'vocab': word_to_idx,
            'config': config
        }, model_save_path)
        print(f"Model saved to {model_save_path}")
    else:
        # Load pre-trained model
        model_load_path = f"{model_type}_text_classifier.pth"
        if os.path.exists(model_load_path):
            checkpoint = torch.load(model_load_path, map_location=device)
            model.load_state_dict(checkpoint['model_state_dict'])
            print(f"Loaded pre-trained model from {model_load_path}")
        else:
            print(f"No pre-trained model found at {model_load_path}. Using untrained model.")
    
    # Evaluate the model on test set
    print("Evaluating model on test set...")
    test_accuracy, test_loss, test_predictions = evaluate_model(model, test_loader, criterion, device)
    
    # Create results dataframe for test set
    results = pd.DataFrame({
        'ID': test_ids,
        'true_label': test_labels,
        'predicted_label': test_predictions
    })
    
    # Map numeric labels back to text
    reverse_label_map = {v: k for k, v in label_map.items()}
    results['true_class'] = results['true_label'].map(reverse_label_map)
    results['predicted_class'] = results['predicted_label'].map(reverse_label_map)
    
    # Save results
    results.to_csv(f"{model_type}_classification_results.csv", index=False)
    print(f"Results saved to {model_type}_classification_results.csv")
    
    return model, test_accuracy, test_loss

In [8]:
print("Processing with LSTM model...")
lstm_model, lstm_accuracy, lstm_loss = process_data_with_splits(model_type='lstm_with_dropout', train=True)

Processing with LSTM model...
Using device: cpu
Loaded 4869 labels from CSV
Splitting data...
Train set: 3157 samples
Validation set: 1115 samples
Test set: 239 samples
Building vocabulary...
Vocabulary file vocabulary_lstm.txt already exists. Loading...
Vocabulary size: 3703
Using LSTM model
Starting training...
Epoch [1/15], Batch [10/198], Loss: 1.0682
Epoch [1/15], Batch [20/198], Loss: 0.9480
Epoch [1/15], Batch [30/198], Loss: 0.9051
Epoch [1/15], Batch [40/198], Loss: 0.7788
Epoch [1/15], Batch [50/198], Loss: 0.9236
Epoch [1/15], Batch [60/198], Loss: 0.8900
Epoch [1/15], Batch [70/198], Loss: 0.9462
Epoch [1/15], Batch [80/198], Loss: 0.8539
Epoch [1/15], Batch [90/198], Loss: 1.0645
Epoch [1/15], Batch [100/198], Loss: 0.8667
Epoch [1/15], Batch [110/198], Loss: 1.1650
Epoch [1/15], Batch [120/198], Loss: 0.9897
Epoch [1/15], Batch [130/198], Loss: 0.8484
Epoch [1/15], Batch [140/198], Loss: 0.7317
Epoch [1/15], Batch [150/198], Loss: 1.0765
Epoch [1/15], Batch [160/198], Los

In [9]:
print("\nComparison of models:")
print(f"LTSM - Accuracy: {lstm_accuracy:.4f}, Loss: {lstm_loss:.4f}")


Comparison of models:
LTSM - Accuracy: 0.6444, Loss: 1.1704
