In [None]:
# Install dependencies
!pip install transformers --quiet

import os
import numpy as np
import torch
import matplotlib.pyplot as plt
from transformers import BertTokenizer
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score

# Use GPU if available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("Using device:", device)


In [None]:
# Set paths (change if using Google Drive)
DATA_DIR = "/content/text_classification"  # Upload this folder manually
RESULTS_DIR = "/content/results"
os.makedirs(RESULTS_DIR, exist_ok=True)

# Load .txt files from pos/neg folders
def load_classification_data(split_dir):
    data = []
    for label_name, label_val in [('pos', 1), ('neg', 0)]:
        label_dir = os.path.join(split_dir, label_name)
        for fname in os.listdir(label_dir):
            fpath = os.path.join(label_dir, fname)
            if os.path.isfile(fpath) and fpath.endswith('.txt'):
                with open(fpath, 'r', encoding='utf-8') as f:
                    text = f.read().strip()
                    data.append({'text': text, 'label': label_val})
    return data

# Convert to tokenized tensors
def prepare_tensor(dataset_list, tokenizer, max_len=128):
    texts = [d['text'] for d in dataset_list]
    labels = [d['label'] for d in dataset_list]
    encodings = tokenizer(texts, padding='max_length', truncation=True, max_length=max_len, return_tensors='pt')
    X = torch.stack([encodings['input_ids'], encodings['attention_mask']], dim=1).to(device)
    y = torch.LongTensor(labels).to(device)
    return X, y


In [None]:
class Evaluate_Metrics:
    def __init__(self, name, desc): pass
    data = None
    metrics = ['Accuracy', 'F1 micro', 'F1 macro', 'F1 weighted',
               'Precision micro', 'Precision macro', 'Precision weighted',
               'Recall micro', 'Recall macro', 'Recall weighted']
    def evaluate(self):
        return {
            'Accuracy': accuracy_score(self.data['true_y'], self.data['pred_y']),
            'F1 micro': f1_score(self.data['true_y'], self.data['pred_y'], average='micro'),
            'F1 macro': f1_score(self.data['true_y'], self.data['pred_y'], average='macro'),
            'F1 weighted': f1_score(self.data['true_y'], self.data['pred_y'], average='weighted'),
            'Precision micro': precision_score(self.data['true_y'], self.data['pred_y'], average='micro', zero_division=0.0),
            'Precision macro': precision_score(self.data['true_y'], self.data['pred_y'], average='macro', zero_division=0.0),
            'Precision weighted': precision_score(self.data['true_y'], self.data['pred_y'], average='weighted', zero_division=0.0),
            'Recall micro': recall_score(self.data['true_y'], self.data['pred_y'], average='micro'),
            'Recall macro': recall_score(self.data['true_y'], self.data['pred_y'], average='macro'),
            'Recall weighted': recall_score(self.data['true_y'], self.data['pred_y'], average='weighted')
        }


In [None]:
import torch.nn as nn

class Method_LSTM(nn.Module):
    def __init__(self, name, desc, output_dim=2, max_epoch=10):
        super().__init__()
        self.name = name
        self.max_epoch = max_epoch
        self.output_dim = output_dim
        self.learning_rate = 1e-3
        self.tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
        self.loss_function = nn.CrossEntropyLoss()
        self.metric_evaluator = Evaluate_Metrics('eval', '')
        self.curves = {'epochs': [], 'loss': [], 'test loss': [], 'test accuracy': []}
        for metric in self.metric_evaluator.metrics:
            self.curves[metric] = []

        self.vocab_size = self.tokenizer.vocab_size
        self.embedding = nn.Embedding(self.vocab_size, 128)
        self.lstm = nn.LSTM(128, 256, batch_first=True)
        self.fc = nn.Linear(256, output_dim)
        self.to(device)

    def forward(self, x):
        input_ids = x[:, 0, :].to(device)
        embeds = self.embedding(input_ids)
        out, _ = self.lstm(embeds)
        final_hidden = out[:, -1, :]
        return self.fc(final_hidden)

    def train_model(self, X, y, test_X, test_y):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.learning_rate)
        batch_size = 32
        for epoch in range(self.max_epoch):
            self.train()
            perm = torch.randperm(X.size(0))
            for i in range(0, X.size(0), batch_size):
                idx = perm[i:i+batch_size]
                x_batch = X[idx]
                y_batch = y[idx]
                optimizer.zero_grad()
                out = self.forward(x_batch)
                loss = self.loss_function(out, y_batch)
                loss.backward()
                optimizer.step()

            with torch.no_grad():
                pred_y = self.forward(X).argmax(1)
                self.metric_evaluator.data = {'true_y': y.cpu(), 'pred_y': pred_y.cpu()}
                evals = self.metric_evaluator.evaluate()
                print(f"Epoch {epoch}: " + ', '.join([f'{k}: {v:.4f}' for k, v in evals.items()]))
                for k, v in evals.items():
                    self.curves[k].append(v)
                self.curves['epochs'].append(epoch)
                self.curves['loss'].append(loss.item())

                test_preds = self.forward(test_X).argmax(1)
                test_loss = self.loss_function(self.forward(test_X), test_y).item()
                test_acc = accuracy_score(test_y.cpu(), test_preds.cpu())
                self.curves['test loss'].append(test_loss)
                self.curves['test accuracy'].append(test_acc)

    def test(self, X):
        return self.forward(X).argmax(1)

    def run(self, data):
        self.train_model(data['train']['X'], data['train']['y'], data['test']['X'], data['test']['y'])
        pred_y = self.test(data['test']['X'])
        return {'pred_y': pred_y, 'true_y': data['test']['y'], 'curves': self.curves}


In [None]:
# Load data
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
train_data_raw = load_classification_data(os.path.join(DATA_DIR, 'train'))
test_data_raw = load_classification_data(os.path.join(DATA_DIR, 'test'))
X_train, y_train = prepare_tensor(train_data_raw, tokenizer)
X_test, y_test = prepare_tensor(test_data_raw, tokenizer)
data = {'train': {'X': X_train, 'y': y_train}, 'test': {'X': X_test, 'y': y_test}}

# Train
model = Method_LSTM('LSTM Sentiment Classifier', '', output_dim=2, max_epoch=10)
results = model.run(data)
curves = results['curves']

# Plot metrics
metrics = ['Accuracy', 'F1 micro', 'F1 macro', 'F1 weighted']
plt.figure()
for metric in metrics:
    plt.plot(curves['epochs'], curves[metric], label=metric)
plt.title('Training Metrics'); plt.xlabel('Epochs'); plt.ylabel('Value'); plt.legend()
plt.savefig(os.path.join(RESULTS_DIR, 'metrics.png')); plt.show()

plt.figure()
plt.plot(curves['epochs'], curves['loss'], label='Train Loss')
plt.plot(curves['epochs'], curves['test loss'], label='Test Loss')
plt.title('Loss Curve'); plt.xlabel('Epochs'); plt.ylabel('Loss'); plt.legend()
plt.savefig(os.path.join(RESULTS_DIR, 'loss_curve.png')); plt.show()

plt.figure()
plt.plot(curves['epochs'], curves['Accuracy'], label='Train Accuracy')
plt.plot(curves['epochs'], curves['test accuracy'], label='Test Accuracy')
plt.title('Accuracy Curve'); plt.xlabel('Epochs'); plt.ylabel('Accuracy'); plt.legend()
plt.savefig(os.path.join(RESULTS_DIR, 'acc_curve.png')); plt.show()

# Final eval
eval = Evaluate_Metrics('final', '')
eval.data = {'true_y': results['true_y'].cpu(), 'pred_y': results['pred_y'].cpu()}
print("\nFinal Test Results:")
for k, v in eval.evaluate().items():
    print(f"{k}: {v:.4f}")
