In [32]:
import os
import re
import json
import warnings
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt

from torch.optim import AdamW  
from sklearn.metrics import accuracy_score


os.environ["WANDB_DISABLED"] = "true"  # Disabling wandb

warnings.filterwarnings("ignore")  # Suppress warnings


In [33]:

def clean_sentence(sentence):
    
    
    sentence = re.sub(r"(\w)'(\w)", r"\1\2", sentence)
    
    sentence = re.sub(r"[^a-zA-Z0-9\s]", "", sentence)

    return sentence

def clean_aspect(aspect):
    
    aspect = re.sub(r"(\w)'(\w)", r"\1\2", aspect)

    aspect = re.sub(r"[^a-zA-Z0-9\s]", "", aspect)
    
    return aspect.strip()

def preprocess_file(input_path, output_path):

    def process_aspect(sentence_orig, aspect):
        # Extract and validate aspect info
        polarity = aspect.get("polarity", "").strip()
        try:
            idx_from = int(aspect.get("from", 0))
            idx_to = int(aspect.get("to", 0))
        except ValueError:
            idx_from, idx_to = 0, 0

        # Clean and tokenize sentence parts
        aspect_substr = sentence_orig[idx_from:idx_to]
        aspect_clean = clean_aspect(aspect_substr)
        before = clean_sentence(sentence_orig[:idx_from])
        after = clean_sentence(sentence_orig[idx_to:])
        
        tokens_before = before.split()
        tokens_after = after.split()
        aspect_tokens = aspect_clean.split()
        
        # Combine tokens and get aspect index
        tokens = tokens_before + aspect_tokens + tokens_after
        token_idx = len(tokens_before)

        if not aspect_clean:
            warnings.warn(f"Extracted aspect term is empty for sentence: {sentence_orig}")
            token_idx = 0

        return {
            "tokens": tokens,
            "polarity": polarity,
            "aspect_term": aspect_clean,
            "index": token_idx
        }

    # Main processing
    with open(input_path, "r", encoding="utf-8") as f:
        data = json.load(f)
    
    processed = []
    for item in data:
        sentence_orig = item.get("sentence", "")
        for aspect in item.get("aspect_terms", []):
            instance = process_aspect(sentence_orig, aspect)
            processed.append(instance)

    with open(output_path, "w", encoding="utf-8") as f:
        json.dump(processed, f, indent=2)
    print(f"Saved preprocessed data to {output_path}")

In [34]:
class ABSADataset(Dataset):
    def __init__(self, data_path, vocab=None, max_len=50):
        with open(data_path, "r", encoding="utf-8") as f:
            self.data = json.load(f)
        self.max_len = max_len
        if vocab is None:
            self.build_vocab()
        else:
            self.vocab = vocab
            with open("/kaggle/working/vocab_task2.json","w") as f:
              json.dump(self.vocab, f)
            print("saved")
        self.label_map = {"positive": 0, "negative": 1, "neutral": 2, "conflict": 3}
    
    def build_vocab(self):
        self.vocab = {"<PAD>": 0, "<UNK>": 1}
        idx = 2
        for item in self.data:
            for token in item["tokens"]:
                token_lower = token.lower()
                if token_lower not in self.vocab:
                    self.vocab[token_lower] = idx
                    idx += 1
    
    def encode(self, tokens):
        def get_token_indices(tokens):
            return [self.vocab.get(token.lower(), self.vocab["<UNK>"]) for token in tokens]
        
        def pad_indices(indices):
            if len(indices) < self.max_len:
                indices += [self.vocab["<PAD>"]] * (self.max_len - len(indices))
            else:
                indices = indices[:self.max_len]
            return indices
            
        indices = get_token_indices(tokens)
        return pad_indices(indices)
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):

        def prepare_input_tensors(tokens, aspect_index):
            encoded = self.encode(tokens)
            aspect_index = min(aspect_index, self.max_len - 1)
            return {
                "input_ids": torch.tensor(encoded, dtype=torch.long),
                "aspect_index": torch.tensor(aspect_index, dtype=torch.long)
            }

        def get_label(polarity):
            return torch.tensor(self.label_map.get(polarity.lower(), 2), dtype=torch.long)

        item = self.data[idx]
        tensors = prepare_input_tensors(item["tokens"], item["index"])
        tensors["label"] = get_label(item["polarity"])
        return tensors


In [35]:
class ABSA_RNN_AspectAttention(nn.Module):

    def __init__(self, vocab_size, embedding_dim=300, hidden_dim=128, output_dim=4, dropout_rate=0.5, pretrained_embeddings=None):
        super(ABSA_RNN_AspectAttention, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        if pretrained_embeddings is not None:
            self.embedding.weight.data.copy_(pretrained_embeddings)
        self.dropout = nn.Dropout(dropout_rate)
        
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=2, bidirectional=True, 
                            batch_first=True, dropout=dropout_rate)
        
        
        
        self.attention_fc = nn.Linear(hidden_dim * 2, hidden_dim)
        
        self.attention_aspect = nn.Linear(embedding_dim, hidden_dim)
        
        self.attention_v = nn.Linear(hidden_dim, 1)
        
        self.fc = nn.Linear(hidden_dim * 2, output_dim)
    
    def forward(self, input_ids, aspect_index):

        def process_lstm_output():
            embeds = self.embedding(input_ids)               
            embeds = self.dropout(embeds)
            lstm_out, _ = self.lstm(embeds)                     
            lstm_out = self.dropout(lstm_out)
            
            batch_size = input_ids.size(0)
            aspect_emb = embeds[torch.arange(batch_size, device=input_ids.device), aspect_index]
            aspect_emb_exp = aspect_emb.unsqueeze(1).repeat(1, lstm_out.size(1), 1)
            return lstm_out, aspect_emb_exp

        def compute_attention(lstm_out, aspect_emb_exp):
            attn_hidden = torch.tanh(self.attention_fc(lstm_out) + self.attention_aspect(aspect_emb_exp))
            attn_scores = self.attention_v(attn_hidden)
            attn_weights = torch.softmax(attn_scores, dim=1)
            context = torch.sum(attn_weights * lstm_out, dim=1)
            return context

        lstm_out, aspect_emb_exp = process_lstm_output()
        context = compute_attention(lstm_out, aspect_emb_exp)
        logits = self.fc(context)
        return logits

In [36]:
def train_epoch(model, train_loader, optimizer, criterion, device):
    model.train()
    epoch_loss = 0
    correct = 0
    total = 0
    
    for batch in train_loader:
        input_ids = batch["input_ids"].to(device)
        labels = batch["label"].to(device)
        aspect_index = batch["aspect_index"].to(device)
        
        optimizer.zero_grad()
        outputs = model(input_ids, aspect_index)
        loss = criterion(outputs, labels)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=5.0)
        optimizer.step()
        
        epoch_loss += loss.item() * input_ids.size(0)
        preds = outputs.argmax(dim=1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)
    
    return epoch_loss / total, correct / total

def validate(model, val_loader, criterion, device):
    model.eval()
    val_loss = 0
    correct = 0
    total = 0
    
    with torch.no_grad():
        for batch in val_loader:
            input_ids = batch["input_ids"].to(device)
            labels = batch["label"].to(device)
            aspect_index = batch["aspect_index"].to(device)
            
            outputs = model(input_ids, aspect_index)
            loss = criterion(outputs, labels)
            val_loss += loss.item() * input_ids.size(0)
            preds = outputs.argmax(dim=1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)
            
    return val_loss / total, correct / total

In [37]:
def train_model(model, train_loader, val_loader, epochs=10, lr=1e-3, device="cpu"):
    criterion = nn.CrossEntropyLoss()
    optimizer = AdamW(model.parameters(), lr=lr, weight_decay=0.01)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs, eta_min=1e-6)
    
    best_val_acc = 0.0
    train_losses = []
    val_losses = []
    
    for epoch in range(epochs):
        train_loss, train_acc = train_epoch(model, train_loader, optimizer, criterion, device)
        val_loss, val_acc = validate(model, val_loader, criterion, device)
        
        train_losses.append(train_loss)
        val_losses.append(val_loss)
        
        print(f"Epoch {epoch+1}: Train loss {train_loss:.4f} acc {train_acc:.4f} | Val loss {val_loss:.4f} acc {val_acc:.4f}")
        
        scheduler.step()
        
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save(model.state_dict(), "best_custom_model.pt")
            print("Best custom model saved.")
    
    plt.figure()
    plt.plot(range(1, epochs+1), train_losses, label="Train Loss")
    plt.plot(range(1, epochs+1), val_losses, label="Validation Loss")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend()
    plt.savefig("custom_model_loss.png")
    plt.close()
    
    return best_val_acc

In [38]:
def test_model(model, test_dataset, device="cpu"):

    def prepare_evaluation():
        test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
        model.eval()
        return test_loader

    def compute_accuracy(test_loader):
        correct = 0
        total = 0
        with torch.no_grad():
            for batch in test_loader:
                input_ids = batch["input_ids"].to(device)
                labels = batch["label"].to(device)
                aspect_index = batch["aspect_index"].to(device)
                outputs = model(input_ids, aspect_index)
                preds = outputs.argmax(dim=1)
                correct += (preds == labels).sum().item()
                total += labels.size(0)
        return correct / total

    test_loader = prepare_evaluation()
    accuracy = compute_accuracy(test_loader)
    print(f"Test Accuracy (Custom Model): {accuracy:.4f}")
    return accuracy

In [39]:

def load_glove_word_vectors(embedding_file, embedding_dim=300):
    
    embeddings_index = {}
    with open(embedding_file, 'r', encoding="utf-8") as f:
        for line in f:
            values = line.rstrip().split(' ')
            word = values[0]
            try:
                vector = np.asarray(values[1:], dtype='float32')
                if vector.shape[0] == embedding_dim:
                    embeddings_index[word] = vector
            except:
                continue
    return embeddings_index

In [40]:
def create_embedding_matrix(vocab, embeddings_index, embedding_dim=300):
    
    vocab_size = len(vocab)
    embedding_matrix = np.random.uniform(-0.05, 0.05, (vocab_size, embedding_dim)).astype('float32')
    for word, i in vocab.items():
        embedding_vector = embeddings_index.get(word)
        if embedding_vector is not None:
            embedding_matrix[i] = embedding_vector
    return torch.tensor(embedding_matrix)

def load_glove_embeddings(embedding_file, vocab, embedding_dim=300):
    
    embeddings_index = load_glove_word_vectors(embedding_file, embedding_dim)
    return create_embedding_matrix(vocab, embeddings_index, embedding_dim)



In [41]:
if __name__ == "__main__":

    test = False

    # Determine device: use GPU if available, otherwise CPU.
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print("Using device:", device)
    
    # Preprocess train and validation files
    preprocess_file("/kaggle/input/aspect-based-sent-analysis/train.json", "train_task2.json")
    preprocess_file("/kaggle/input/aspect-based-sent-analysis/val.json", "val_task2.json")
    
    # Create datasets and dataloaders for custom model training
    train_dataset = ABSADataset("train_task2.json", max_len=50)
    val_dataset = ABSADataset("val_task2.json", vocab=train_dataset.vocab, max_len=50)
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
    
    
    glove_embedding_file = "/kaggle/input/aspect-based-sent-analysis/glove.6B.300d.txt"
    pretrained_embeddings = load_glove_embeddings(glove_embedding_file, train_dataset.vocab, embedding_dim=300)
    
    # Initialize and train custom RNN model with pre-trained GloVe embeddings
    model_custom = ABSA_RNN_AspectAttention(vocab_size=len(train_dataset.vocab),
                                         embedding_dim=300,
                                         hidden_dim=256,
                                         output_dim=4,
                                         dropout_rate=0.5,
                                         pretrained_embeddings=pretrained_embeddings)
    
    model_custom.to(device)

    best_val_acc = train_model(model_custom, train_loader, val_loader, epochs=10, lr=1e-3, device=device)
    print(f"Best Validation Accuracy (Custom Model): {best_val_acc:.4f}")


    preprocess_file("/kaggle/input/aspect-based-sent-analysis/test.json", "test_task2.json")
    test_dataset = ABSADataset("test_task2.json", vocab=train_dataset.vocab, max_len=50)
    model_custom_loaded = ABSA_RNN_AspectAttention(vocab_size=len(train_dataset.vocab),
                                         embedding_dim=300,
                                         hidden_dim=256,
                                         output_dim=4,
                                         dropout_rate=0.5,
                                         pretrained_embeddings=pretrained_embeddings)

    model_custom_loaded.load_state_dict(torch.load("best_custom_model.pt", map_location=device))
    model_custom_loaded.to(device)
    test_accuracy = test_model(model_custom_loaded, test_dataset, device=device)
    
    print("\nTesting complete. Final Test Accuracy (Custom Model):", test_accuracy)
    

Using device: cuda
Saved preprocessed data to train_task2.json
Saved preprocessed data to val_task2.json
saved
Epoch 1: Train loss 0.9909 acc 0.5981 | Val loss 1.0132 acc 0.5580
Best custom model saved.
Epoch 2: Train loss 0.8052 acc 0.6751 | Val loss 0.9349 acc 0.6496
Best custom model saved.
Epoch 3: Train loss 0.6880 acc 0.7281 | Val loss 0.8873 acc 0.6469
Epoch 4: Train loss 0.5938 acc 0.7700 | Val loss 0.9303 acc 0.6442
Epoch 5: Train loss 0.5498 acc 0.7889 | Val loss 0.9578 acc 0.6523
Best custom model saved.
Epoch 6: Train loss 0.4945 acc 0.8072 | Val loss 0.9379 acc 0.6280
Epoch 7: Train loss 0.4566 acc 0.8173 | Val loss 1.0135 acc 0.6253
Epoch 8: Train loss 0.4259 acc 0.8322 | Val loss 0.9307 acc 0.6388
Epoch 9: Train loss 0.3928 acc 0.8423 | Val loss 0.9911 acc 0.6388
Epoch 10: Train loss 0.3875 acc 0.8436 | Val loss 0.9845 acc 0.6442
Best Validation Accuracy (Custom Model): 0.6523
Saved preprocessed data to test_task2.json
saved
Test Accuracy (Custom Model): 0.6250

Testing 