In [None]:
import dataset_util
import json
import torch

In [None]:
rest_train = dataset_util.load_dataset(split="train", domain="rest") + dataset_util.load_dataset(split="dev", domain = "rest")
rest_test = dataset_util.load_dataset(split="test", domain="rest")
laptop_train = dataset_util.load_dataset(split="train", domain="laptop") + dataset_util.load_dataset(split="dev", domain = "laptop")
laptop_test = dataset_util.load_dataset(split="test", domain="laptop")

In [None]:
dataset_util.preprocess_dataset(rest_train)
dataset_util.preprocess_dataset(laptop_train)
dataset_util.preprocess_dataset(rest_test)
dataset_util.preprocess_dataset(laptop_test)

In [None]:
print(rest_train[0])

### TFIDF with Logistic Regression

In [None]:
rest_train_sentences = [" ".join(item['sentence']) for item in rest_train]
rest_train_polarities = [item['polarity'] for item in rest_train]

rest_test_sentences = [" ".join(item['sentence']) for item in rest_test]
rest_test_polarities = [item['polarity'] for item in rest_test]

In [None]:
laptop_train_sentences = [" ".join(item['sentence']) for item in laptop_train]
laptop_train_polarities = [item['polarity'] for item in laptop_train]

laptop_test_sentences = [" ".join(item['sentence']) for item in laptop_test]
laptop_test_polarities = [item['polarity'] for item in laptop_test]

### Roberta Model

In [None]:
rest_train = dataset_util.load_dataset(split="train", domain="rest") + load_dataset(split="dev", domain = "rest")
rest_test = dataset_util.load_dataset(split="test", domain="rest")
laptop_train = dataset_util.load_dataset(split="train", domain="laptop") + load_dataset(split="dev", domain = "laptop")
laptop_test = dataset_util.load_dataset(split="test", domain="laptop")

In [None]:
dataset_util.clean_dataset(rest_train)
dataset_util.clean_dataset(rest_test)
dataset_util.clean_dataset(laptop_train)
dataset_util.clean_dataset(laptop_test)

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(device)

In [None]:
rest_train_sentences = [item['term'] + ' [SEP] ' + item['sentence'] for item in rest_train]

rest_test_sentences = [item['term'] + ' [SEP] ' + item['sentence'] for item in rest_test]
 
laptop_train_sentences = [item['term'] + ' [SEP] ' + item['sentence'] for item in laptop_train]

laptop_test_sentences = [item['term'] + ' [SEP] ' + item['sentence'] for item in laptop_test]

In [None]:
#rest_train_sentences = [" ".join(item['sentence']) for item in rest_train]
rest_train_polarities = [item['polarity'] for item in rest_train]

#rest_test_sentences = [" ".join(item['sentence']) for item in rest_test]
rest_test_polarities = [item['polarity'] for item in rest_test]


#laptop_train_sentences = [" ".join(item['sentence']) for item in laptop_train]
laptop_train_polarities = [item['polarity'] for item in laptop_train]

#laptop_test_sentences = [" ".join(item['sentence']) for item in laptop_test]
laptop_test_polarities = [item['polarity'] for item in laptop_test]

To get results with the different subsets (e.g. laptop of restaurant dataset) simply remove the unwanted subset from the cell below.

In [None]:
train_sentences = laptop_train_sentences #laptop_train_sentences / rest_train_sentences
train_polarities = laptop_train_polarities #laptop_train_polarities / rest_train_polarities

test_sentences = laptop_test_sentences #laptop_test_sentences / rest_test_sentences 
test_polarities = laptop_test_polarities #laptop_test_polarities / rest_test_polarities

In [None]:
import random
random_index = random.randint(0, len(train_sentences))
random_sentence = train_sentences[random_index]
random_polarity = train_polarities[random_index]
print(random_sentence)
print(random_polarity)

In [None]:
pos = 0
neg = 0
neu = 0
for polarity in train_polarities:
    if polarity == 'positive':
        pos += 1
    elif polarity == 'negative':
        neg += 1
    elif polarity == 'neutral':
        neu += 1
print(f"Positive : {pos}")
print(f"Negative : {neg}")
print(f"Neutral : {neu}")

In [None]:
from sklearn.preprocessing import LabelEncoder

label_encoder = LabelEncoder()
y_train = label_encoder.fit_transform(train_polarities)
y_test = label_encoder.transform(test_polarities)

In [None]:
from transformers import RobertaTokenizer, RobertaForSequenceClassification
import torch
from torch.utils.data import DataLoader, RandomSampler, SequentialSampler, TensorDataset

In [None]:
from transformers import BertTokenizer
model_name = 'roberta-base'
# Load the BERT tokenizer
tokenizer = RobertaTokenizer.from_pretrained(model_name)

def encode_sentences(sentences, max_length=512):
    input_ids = []
    attention_masks = []

    for sentence in sentences:
        encoded_dict = tokenizer.encode_plus(
                            sentence,                      # Sentence to encode
                            add_special_tokens=True,       # Add '[CLS]' and '[SEP]'
                            max_length=max_length,         # Pad & truncate all sentences
                            padding='max_length',          # Pad all sentences to max length
                            truncation=True,               # Explicitly truncate to max length
                            return_attention_mask=True,    # Construct attention masks
                            return_tensors='pt',           # Return pytorch tensors
                        )
        
        input_ids.append(encoded_dict['input_ids'])
        attention_masks.append(encoded_dict['attention_mask'])
    return torch.cat(input_ids, dim=0), torch.cat(attention_masks, dim=0)

# Encode the sentences (X_train and X_test)
train_inputs, train_masks = encode_sentences(train_sentences)
test_inputs, test_masks = encode_sentences(test_sentences)

In [None]:
train_labels = torch.tensor(y_train)
test_labels = torch.tensor(y_test)

In [None]:
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler

batch_size = 16  # Adjust this according to your GPU capacity

# Create the DataLoader for our training set
train_data = TensorDataset(train_inputs, train_masks, train_labels)
train_sampler = RandomSampler(train_data)
train_dataloader = DataLoader(train_data, sampler=train_sampler, batch_size=batch_size)

# Create the DataLoader for our test set
test_data = TensorDataset(test_inputs, test_masks, test_labels)
test_sampler = SequentialSampler(test_data)
test_dataloader = DataLoader(test_data, sampler=test_sampler, batch_size=batch_size)

In [None]:
model = RobertaForSequenceClassification.from_pretrained(
    model_name,  # Use the 12-layer BERT model, with an uncased vocab
    num_labels=3,        # Number of output labels (3 for positive/negative/neutral)
    output_attentions=False,
    output_hidden_states=False,
)

# Tell the model to run on GPU
model.to(device)

In [None]:
from transformers import get_linear_schedule_with_warmup
import torch
# Implement the training loop
epochs = 5
# Define the optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=2e-5, eps=1e-8)

# Total number of training steps
total_steps = len(train_dataloader) * epochs

# Create the learning rate scheduler.
#scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=total_steps)

In [None]:
from sklearn.metrics import precision_recall_fscore_support, accuracy_score, confusion_matrix
import numpy as np
from tqdm import tqdm

In [None]:
best_accuracy = 0
for epoch in range(0, epochs):
    # Training step
    model.train()
    total_train_loss = 0
    progress_bar = tqdm(train_dataloader, desc="Epoch {:1d}".format(epoch+1), leave=False, disable=False)
    for batch in progress_bar:
        b_input_ids = batch[0].to(device)
        b_input_mask = batch[1].to(device)
        b_labels = batch[2].to(device)

        # Clear previously calculated gradients
        model.zero_grad()        

        # Perform a forward pass
        outputs = model(b_input_ids, token_type_ids=None, attention_mask=b_input_mask, labels=b_labels)
        loss = outputs.loss
        total_train_loss += loss.item()

        # Perform a backward pass
        loss.backward()

        # Update parameters and take a step using the computed gradient
        optimizer.step()
        #scheduler.step()
        # Update the progress bar
        progress_bar.set_postfix({'training_loss': '{:.3f}'.format(loss.item()/len(batch))})
    
    # Calculate the average loss over the training data.
    avg_train_loss = total_train_loss / len(train_dataloader)            
    print(f"  Average training loss: {avg_train_loss:.2f}")
    
    model.eval()

    # Initialize variables to gather predictions and true labels
    all_predictions = []
    all_true_labels = []

    with torch.no_grad():
        total_eval_loss = 0
        for batch in tqdm(test_dataloader):
            b_input_ids = batch[0].to(device)
            b_input_mask = batch[1].to(device)
            b_labels = batch[2].to(device)

            outputs = model(b_input_ids, token_type_ids=None, attention_mask=b_input_mask, labels=b_labels)

            loss = outputs.loss
            total_eval_loss += loss.item()

            logits = outputs.logits.detach().cpu().numpy()
            label_ids = b_labels.to('cpu').numpy()

            # Store predictions and true labels
            all_predictions.extend(np.argmax(logits, axis=1).flatten())
            all_true_labels.extend(label_ids.flatten())
    precision, recall, f1_score, _ = precision_recall_fscore_support(all_true_labels, all_predictions, average='weighted')
    accuracy = accuracy_score(all_true_labels, all_predictions)
    print(f'Accuracy: {accuracy:.4f}')
    print(f'Precision: {precision:.4f}')
    print(f'Recall: {recall:.4f}')
    print(f'F1-Score: {f1_score:.4f}')
    if accuracy >= best_accuracy:
        print(f"  Accuracy increased from {accuracy:.2f} to {best_accuracy:.2f}, saving model.")
        best_accuracy = accuracy
        best_model_state = model.state_dict()
#torch.save(best_model_state, 'best_BERT_model.bin')

In [None]:
from sklearn.metrics import precision_score
model.eval()

# Initialize variables to gather predictions and true labels
all_predictions = []
all_true_labels = []

with torch.no_grad():
    total_eval_loss = 0
    for batch in tqdm(test_dataloader):
        b_input_ids = batch[0].to(device)
        b_input_mask = batch[1].to(device)
        b_labels = batch[2].to(device)
        
        outputs = model(b_input_ids, token_type_ids=None, attention_mask=b_input_mask, labels=b_labels)
    
        loss = outputs.loss
        total_eval_loss += loss.item()
        
        logits = outputs.logits.detach().cpu().numpy()
        label_ids = b_labels.to('cpu').numpy()
        
        # Store predictions and true labels
        all_predictions.extend(np.argmax(logits, axis=1).flatten())
        all_true_labels.extend(label_ids.flatten())
precision, recall, f1_score, _ = precision_recall_fscore_support(all_true_labels, all_predictions, average='weighted')
#0 neg, 1 neut, 2 pos
accuracy = accuracy_score(all_true_labels, all_predictions)
conf_matrix = confusion_matrix(all_true_labels, all_predictions)
print(f'Accuracy: {accuracy:.4f}')
print(f'Precision: {precision:.4f}')
print(f'Recall: {recall:.4f}')
print(f'F1-Score: {f1_score:.4f}')
print('Confusion Matrix:\n', conf_matrix)


In [None]:
TP = conf_matrix[2, 2]
FP = np.sum(conf_matrix[:, 2]) - TP
precision_positve = TP / (TP + FP)


TP = conf_matrix[0, 0]
FP = np.sum(conf_matrix[:, 0]) - TP
precision_negative = TP / (TP + FP)

TP = conf_matrix[1, 1]
FP = np.sum(conf_matrix[:, 1]) - TP
precision_neutral = TP / (TP + FP)
print(f"Precision Positive {precision_positve}")
print(f"Precision Negative {precision_negative}")
print(f"Precision Neutral {precision_neutral}")