In [50]:
# Install necessary libraries
!pip install transformers peft datasets torch



In [51]:
import os
import re
import logging
from dataclasses import dataclass, field
from typing import List, Optional

import torch
# from torch.utils.data import Dataset # Remove this line to avoid conflict
from datasets import load_dataset, DatasetDict, Features, Value, Dataset # Import Dataset from datasets
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    TrainingArguments,
    Trainer,
    DataCollatorWithPadding,
    set_seed,
)
from peft import LoraConfig, get_peft_model, TaskType, PeftModel
from sklearn.model_selection import train_test_split
import numpy as np
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix

# Configure logging
logging.basicConfig(level=logging.INFO)

In [52]:
# Define constants
MODEL_NAME = "bert-base-uncased"
MAX_SEQ_LENGTH = 128
SEED = 42
set_seed(SEED)

# Label encodings
POLARITY_MAPPING = {"Positive": 0, "Negative": 1, "Neutral": 2}
ASPECT_TAG_MAPPING = {"B-ASP": 0, "I-ASP": 1}

# Helper function to handle errors
def handle_error(error_msg, data_instance=None):
    logging.error(error_msg)
    if data_instance:
        logging.error(f"Data instance: {data_instance}")


In [53]:
@dataclass
class DataInstance:
    sentence: str
    aspects: List[dict] = field(default_factory=list)

def read_atepc_data(file_path: str) -> List[DataInstance]:
    """Reads and parses the ATEPC data from a file."""
    try:
        with open(file_path, "r", encoding="utf-8") as f:
            lines = f.readlines()
    except FileNotFoundError:
        handle_error(f"File not found: {file_path}")
        return []

    data_instances = []
    current_sentence = []
    current_aspects = []
    current_aspect_text = []
    current_aspect_start = -1
    current_aspect_polarity = None

    for i, line in enumerate(lines):
        line = line.strip()
        if not line:
            if current_sentence:
                sentence_str = " ".join([word for word, _, _ in current_sentence])

                # Add any remaining aspect
                if current_aspect_text:
                    current_aspects.append(
                        {
                            "text": " ".join(current_aspect_text),
                            "start": current_aspect_start,
                            "end": current_aspect_start + len(current_aspect_text),
                            "polarity": current_aspect_polarity,
                            "aspect_tag": [ASPECT_TAG_MAPPING.get(tag) for _, tag, _ in current_sentence if tag != 'O' and tag != '-100'],
                        }
                    )

                data_instances.append(
                    DataInstance(sentence=sentence_str, aspects=current_aspects)
                )
                current_sentence = []
                current_aspects = []
                current_aspect_text = []
                current_aspect_start = -1
                current_aspect_polarity = None
            continue

        parts = line.split()
        if len(parts) != 3:
            handle_error(f"Malformed line: {line}", line)
            continue

        word, tag, polarity_str = parts
        current_sentence.append((word, tag, polarity_str))

        if tag == "B-ASP":
            if current_aspect_text:
                current_aspects.append(
                    {
                        "text": " ".join(current_aspect_text),
                        "start": current_aspect_start,
                        "end": current_aspect_start + len(current_aspect_text),
                        "polarity": current_aspect_polarity,
                        "aspect_tag": [ASPECT_TAG_MAPPING.get(tag) for _, tag, _ in current_sentence if tag != 'O' and tag != '-100'],
                    }
                )
            current_aspect_text = [word]
            current_aspect_start = len(current_sentence) - 1
            if polarity_str != '-100':
                current_aspect_polarity = POLARITY_MAPPING.get(polarity_str)
            else:
                current_aspect_polarity = None

        elif tag == "I-ASP":
            if not current_aspect_text:
                handle_error(
                    f"I-ASP tag without preceding B-ASP: {line}", line
                )
            else:
                current_aspect_text.append(word)
                if polarity_str != '-100':
                    current_aspect_polarity = POLARITY_MAPPING.get(polarity_str)

        elif tag == "O":
          if current_aspect_text:
                current_aspects.append(
                    {
                        "text": " ".join(current_aspect_text),
                        "start": current_aspect_start,
                        "end": current_aspect_start + len(current_aspect_text),
                        "polarity": current_aspect_polarity,
                        "aspect_tag": [ASPECT_TAG_MAPPING.get(tag) for _, tag, _ in current_sentence if tag != 'O' and tag != '-100'],
                    }
                )
          current_aspect_text = []
          current_aspect_start = -1
          current_aspect_polarity = None

    # Add the last sentence if it exists
    if current_sentence:
        sentence_str = " ".join([word for word, _, _ in current_sentence])
        if current_aspect_text:
            current_aspects.append(
                {
                    "text": " ".join(current_aspect_text),
                    "start": current_aspect_start,
                    "end": current_aspect_start + len(current_aspect_text),
                    "polarity": current_aspect_polarity,
                    "aspect_tag": [ASPECT_TAG_MAPPING.get(tag) for _, tag, _ in current_sentence if tag != 'O' and tag != '-100'],
                }
            )
        data_instances.append(
            DataInstance(sentence=sentence_str, aspects=current_aspects)
        )

    return data_instances

In [54]:
def filter_and_create_dataset(data_instances: List[DataInstance]) -> DatasetDict:
    """Filters aspects with -100 polarity and creates a Hugging Face Dataset."""
    filtered_data = []
    for instance in data_instances:
        filtered_aspects = []
        for aspect in instance.aspects:
            if aspect["polarity"] is not None:
                filtered_aspects.append(aspect)
            else:
                logging.info(f"Removed aspect due to -100 polarity: {aspect}")
        filtered_data.append(
            {"sentence": instance.sentence, "aspects": filtered_aspects}
        )

    # Convert list of dictionaries to dictionary of lists
    dataset_dict = {}
    for key in filtered_data[0].keys():
        dataset_dict[key] = [d[key] for d in filtered_data]

    # Define the features explicitly
    features = Features({
        "sentence": Value("string"),
        "aspects": [
            {
                "text": Value("string"),
                "start": Value("int32"),
                "end": Value("int32"),
                "polarity": Value("int32"),
                "aspect_tag": [Value("int32")]
            }
        ]
    })

    dataset = Dataset.from_dict(dataset_dict, features=features)

    print("----- Dataset after filtering and creation -----")
    print(dataset)
    print(dataset[0])

    return DatasetDict({"train": dataset})


In [55]:
# Load your data (replace with your actual file paths)
TRAIN_DATA_PATH = "/content/train dataset.atepc"
TEST_DATA_PATH = "/content/test dataset.atepc"
train_data_instances = read_atepc_data(TRAIN_DATA_PATH)
test_data_instances = read_atepc_data(TEST_DATA_PATH)
train_data = filter_and_create_dataset(train_data_instances)
test_data = filter_and_create_dataset(test_data_instances)

----- Dataset after filtering and creation -----
Dataset({
    features: ['sentence', 'aspects'],
    num_rows: 31541
})
{'sentence': 'I charge it at night and skip taking the cord with me because of the good battery life .', 'aspects': [{'text': 'cord', 'start': 9, 'end': 10, 'polarity': 2, 'aspect_tag': [0]}]}
----- Dataset after filtering and creation -----
Dataset({
    features: ['sentence', 'aspects'],
    num_rows: 6060
})
{'sentence': 'Boot time is super fast , around anywhere from 35 seconds to 1 minute .', 'aspects': [{'text': 'Boot time', 'start': 0, 'end': 2, 'polarity': 0, 'aspect_tag': [0, 1]}]}


In [56]:
train_data

DatasetDict({
    train: Dataset({
        features: ['sentence', 'aspects'],
        num_rows: 31541
    })
})

In [57]:
test_data

DatasetDict({
    train: Dataset({
        features: ['sentence', 'aspects'],
        num_rows: 6060
    })
})

In [58]:
print(train_data)

DatasetDict({
    train: Dataset({
        features: ['sentence', 'aspects'],
        num_rows: 31541
    })
})


In [59]:
print(test_data)

DatasetDict({
    train: Dataset({
        features: ['sentence', 'aspects'],
        num_rows: 6060
    })
})


In [60]:
for i in range(min(5, len(train_data['train']))):
    print(train_data['train'][i])

{'sentence': 'I charge it at night and skip taking the cord with me because of the good battery life .', 'aspects': [{'text': 'cord', 'start': 9, 'end': 10, 'polarity': 2, 'aspect_tag': [0]}]}
{'sentence': 'I charge it at night and skip taking the cord with me because of the good battery life .', 'aspects': [{'text': 'battery life', 'start': 16, 'end': 18, 'polarity': 0, 'aspect_tag': [0, 0, 1]}]}
{'sentence': "The tech guy then said the service center does not do 1 - to - 1 exchange and I have to direct my concern to the ` ` sales ' ' team , which is the retail shop which I bought my netbook from .", 'aspects': [{'text': 'service center', 'start': 6, 'end': 8, 'polarity': 1, 'aspect_tag': [0, 1, 0, 1]}]}
{'sentence': "The tech guy then said the service center does not do 1 - to - 1 exchange and I have to direct my concern to the ` ` sales ' ' team , which is the retail shop which I bought my netbook from .", 'aspects': [{'text': "` ` sales '", 'start': 26, 'end': 30, 'polarity': 1, 'a

In [61]:
from datasets import Dataset

class ABSADataset(Dataset):
    def __init__(self, dataset, tokenizer, max_seq_length=128):
        # super().__init__() # Call the parent class's __init__ is not needed anymore
        self.dataset = dataset
        self.tokenizer = tokenizer
        self.max_seq_length = max_seq_length
        # Instead of assigning to self.data, create a new attribute
        self.processed_data = self.preprocess_data()

    def preprocess_data(self):
        data = []
        for item in self.dataset:
            sentence = item['sentence']
            aspects = item['aspects']

            if isinstance(aspects, list) and aspects and isinstance(aspects[0], dict):
                for aspect in aspects:
                    aspect_text = aspect["text"]
                    aspect_start = sentence.find(aspect_text)
                    aspect_end = aspect_start + len(aspect_text)

                    modified_sentence = sentence[:aspect_start] + "[ASP]" + aspect_text + "[ASP]" + sentence[aspect_end:]

                    encoding = self.tokenizer(
                        modified_sentence,
                        add_special_tokens=True,
                        max_length=self.max_seq_length,
                        padding="max_length",
                        truncation=True,
                        return_tensors="pt",
                    )

                    data.append({
                "input_ids": encoding["input_ids"].squeeze().tolist(),
                "attention_mask": encoding["attention_mask"].squeeze().tolist(),
                "labels": int(aspect["polarity"]),
                # Add 'aspect_tag' to the data dictionary:
                "aspect_tag": aspect["aspect_tag"],  # Keep it as a list
            })
            else:
                logging.warning(f"Unexpected format for 'aspects' in item: {item}")

        # Convert the processed data into a Hugging Face Dataset
        features = Features({
            "input_ids": [Value("int64")],  # Changed to list of int64
            "attention_mask": [Value("int64")],  # Changed to list of int64
            "labels": Value("int64"),
            "aspect_tag": [Value("int32")] # Define aspect_tag as a list of int32
        })

        # Using a dictionary comprehension to create the dataset input
        # Ensures data is in correct format
        dataset_input = {
            k: [d[k] for d in data]
            for k in ["input_ids", "attention_mask", "labels", "aspect_tag"]
        }

        return Dataset.from_dict(dataset_input, features=features)


    def __len__(self):
        return len(self.processed_data)  # Use the processed_data attribute

    def __getitem__(self, idx):
        # Modification: Handle list or int indices
        if isinstance(idx, int):
            return self.processed_data[idx]
        elif isinstance(idx, list):
            return [self.processed_data[i] for i in idx]
        # Add support for slices
        elif isinstance(idx, slice):
            return self.processed_data[idx] # Return a slice of the data
        else:
            raise TypeError("Invalid index type. Expected int, list, or slice.")

    # Implement __getitems__ to handle batched indices
    def __getitems__(self, keys):
        """
        Handles batched indices by returning a list of dictionaries, each
        representing a single data point.
        """
        return [self[key] for key in keys]

In [73]:
class ABSADataCollator(DataCollatorWithPadding):
    def __call__(self, features):
        # Extract input features and labels
        input_features = [{"input_ids": f["input_ids"], "attention_mask": f["attention_mask"]} for f in features]
        labels = [f["labels"] for f in features]

        # Pad input features
        batch = self.tokenizer.pad(
            input_features,
            padding=True,  # Use boolean padding
            max_length=self.max_length,
            pad_to_multiple_of=self.pad_to_multiple_of,
            return_tensors=self.return_tensors,
        )

        # Add labels to the batch, converting each label to a tensor
        batch["labels"] = torch.tensor(labels)  # Convert to tensor directly

        return batch

In [74]:
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

# Recreate datasets
train_dataset = ABSADataset(train_data['train'], tokenizer)
test_dataset = ABSADataset(test_data['train'], tokenizer)

In [75]:
print("\n----- Train Dataset -----")
print(train_dataset[0])


----- Train Dataset -----
{'input_ids': [101, 1045, 3715, 2009, 2012, 2305, 1998, 13558, 2635, 1996, 1031, 2004, 2361, 1033, 11601, 1031, 2004, 2361, 1033, 2007, 2033, 2138, 1997, 1996, 2204, 6046, 2166, 1012, 102, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], 'attention_mask': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], 'labels': 2, 'aspect_tag': [0]}


In [76]:
print("\n----- Test Dataset -----")
print(test_dataset[0])


----- Test Dataset -----
{'input_ids': [101, 1031, 2004, 2361, 1033, 9573, 2051, 1031, 2004, 2361, 1033, 2003, 3565, 3435, 1010, 2105, 5973, 2013, 3486, 3823, 2000, 1015, 3371, 1012, 102, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], 'attention_mask': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], 'labels': 0, 'aspect_tag': [0, 1]}


In [77]:
# Update data collator
data_collator = ABSADataCollator(tokenizer=tokenizer)  # Use the new data collator

In [78]:
model = AutoModelForSequenceClassification.from_pretrained(
    MODEL_NAME, num_labels=3
)  # 3 classes: Positive, Negative, Neutral

# LoRA configuration
lora_config = LoraConfig(
    r=8,
    lora_alpha=8,
    target_modules=["query", "value"],
    lora_dropout=0.1,
    bias="none",
    task_type=TaskType.SEQ_CLS,
)

model = get_peft_model(model, lora_config)
model.print_trainable_parameters()

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


trainable params: 297,219 || all params: 109,781,766 || trainable%: 0.2707


In [79]:
OUTPUT_DIR = "absa_model"
training_args = TrainingArguments(
    output_dir=OUTPUT_DIR,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=32,
    num_train_epochs=7,
    learning_rate=2e-5,
    weight_decay=0.01,
    evaluation_strategy="epoch",
    save_strategy="epoch",
    logging_dir="./logs",
    logging_steps=100,
    save_total_limit=2,
    load_best_model_at_end=True,
    metric_for_best_model="f1",
    greater_is_better=True,
    seed=SEED,
    report_to="none"
)



In [83]:
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix
import numpy as np

def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    predictions = np.argmax(predictions, axis=1)

    accuracy = accuracy_score(labels, predictions)
    f1_macro = f1_score(labels, predictions, average="macro")
    f1_weighted = f1_score(labels, predictions, average="weighted")
    conf_matrix = confusion_matrix(labels, predictions)

    # Convert the confusion matrix to a list before returning
    return {
        "accuracy": accuracy,
        "f1": f1_macro,
        "f1_weighted": f1_weighted,
        "confusion_matrix": conf_matrix.tolist(),  # Convert to list
    }

In [84]:
# Split the original training dataset into train and validation sets
train_val_dataset = train_dataset.dataset

stratify_data = [len(item['aspects']) for item in train_val_dataset]

train_idx, val_idx = train_test_split(
    np.arange(len(train_val_dataset)),
    test_size=0.2,
    random_state=SEED,
    stratify=stratify_data
)

train_hf_dataset = train_val_dataset.select(train_idx.tolist())
val_hf_dataset = train_val_dataset.select(val_idx.tolist())

train_dataset = ABSADataset(train_hf_dataset, tokenizer)
val_dataset = ABSADataset(val_hf_dataset, tokenizer)


In [85]:
# ipython-input-71-fdda2b3d9e86
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset.processed_data,  # Use the processed Hugging Face Dataset
    eval_dataset=val_dataset.processed_data,  # Use the processed Hugging Face Dataset
    data_collator=data_collator,
    compute_metrics=compute_metrics,
)

trainer.train()

Epoch,Training Loss,Validation Loss,Accuracy,F1,F1 Weighted,Confusion Matrix
1,0.7941,0.755579,0.645602,0.598061,0.61811,"[[1858, 138, 198], [172, 991, 168], [608, 505, 410]]"
2,0.7322,0.707557,0.674723,0.643112,0.661651,"[[1885, 96, 213], [160, 906, 265], [533, 375, 615]]"
3,0.7363,0.665203,0.706616,0.684235,0.699187,"[[1865, 101, 228], [130, 958, 243], [456, 323, 744]]"
4,0.6418,0.641087,0.72504,0.710016,0.722418,"[[1803, 112, 279], [105, 997, 229], [360, 303, 860]]"
5,0.6702,0.632941,0.729596,0.714268,0.726647,"[[1838, 102, 254], [120, 966, 245], [382, 262, 879]]"
6,0.6351,0.624991,0.732964,0.719526,0.731176,"[[1811, 105, 278], [108, 984, 239], [353, 265, 905]]"
7,0.6531,0.6258,0.735341,0.72061,0.732569,"[[1836, 104, 254], [111, 989, 231], [367, 269, 887]]"


TrainOutput(global_step=8834, training_loss=0.7126210201696112, metrics={'train_runtime': 2494.0632, 'train_samples_per_second': 56.669, 'train_steps_per_second': 3.542, 'total_flos': 9329177456460288.0, 'train_loss': 0.7126210201696112, 'epoch': 7.0})

In [87]:
evaluation_results = trainer.evaluate(test_dataset.processed_data)
print(evaluation_results)

{'eval_loss': 0.5667867064476013, 'eval_accuracy': 0.7657152285101468, 'eval_f1': 0.7259628038250888, 'eval_f1_weighted': 0.7622398213584392, 'eval_confusion_matrix': [[2748, 167, 262], [130, 1109, 198], [387, 276, 784]], 'eval_runtime': 48.2499, 'eval_samples_per_second': 125.617, 'eval_steps_per_second': 3.938, 'epoch': 7.0}
