In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from sklearn.metrics import accuracy_score, f1_score

In [None]:
import os

# Define the new directory path
new_directory = '/content/drive/MyDrive/Colab Notebooks'

# Change the current working directory to the new one
os.chdir(new_directory)

# Verify the change by printing the current working directory
print("Current working directory:", os.getcwd())

Current working directory: /content/drive/MyDrive/Colab Notebooks


In [None]:
# Load the CSV file into a DataFrame
df = pd.read_csv('Datasets/data_amazon_product_reviews_video_games.csv')

In [None]:
df.drop(labels= ['Unnamed: 0', 'reviewerID', 'asin', 'reviewerName', 'helpful',
       'unixReviewTime', 'reviewTime'], axis= 1, inplace= True)

In [None]:
df.dropna(inplace= True)
#df.isna().sum()

In [None]:
df['overall']= df['overall'].astype(dtype= 'int64')
#df.info()

In [None]:
df['new_text']= df['reviewText'] + ' ' + df['summary']

In [None]:
texts= df['new_text'].tolist()
labels= df['overall'].tolist()

In [None]:
# Define hyperparameters
batch_size = 16

In [None]:
# Split the data into train and test sets (80% train, 20% test)
train_texts, test_texts, train_labels, test_labels = train_test_split(texts, labels, test_size=0.2, random_state=42)

# Further split the test set into dev and test sets (50% dev, 50% test)
dev_texts, test_texts, dev_labels, test_labels = train_test_split(test_texts, test_labels, test_size=0.5, random_state=42)


In [None]:
# Load the pre-trained BERT tokenizer and model
checkpoint= "LiYuan/amazon-review-sentiment-analysis"
tokenizer = AutoTokenizer.from_pretrained(checkpoint)

In [None]:
# Tokenize the input texts
#tokenized_train_texts = tokenizer(train_texts, padding=True, truncation=True, return_tensors='pt')
#tokenized_dev_texts = tokenizer(dev_texts, padding=True, truncation=True, return_tensors='pt')
tokenized_test_texts = tokenizer(test_texts, padding=True, truncation=True, return_tensors='pt')

In [None]:
# Convert the labels to tensor
#train_labels = torch.tensor(train_labels)
#dev_labels = torch.tensor(dev_labels)
test_labels = torch.tensor(test_labels)

In [None]:
# Create TensorDatasets and DataLoaders for train, dev, and test sets
#train_dataset = TensorDataset(tokenized_train_texts['input_ids'], tokenized_train_texts['attention_mask'], train_labels)
#dev_dataset = TensorDataset(tokenized_dev_texts['input_ids'], tokenized_dev_texts['attention_mask'], dev_labels)
test_dataset = TensorDataset(tokenized_test_texts['input_ids'], tokenized_test_texts['attention_mask'], test_labels)

In [None]:
#train_dataloader = DataLoader(train_dataset, batch_size= batch_size, shuffle= True)
#dev_dataloader = DataLoader(dev_dataset, batch_size= batch_size, shuffle= False)
test_dataloader = DataLoader(test_dataset, batch_size= batch_size, shuffle= False, pin_memory=True)

In [None]:
# Load the pre-trained BERT model for sequence classification
model = AutoModelForSequenceClassification.from_pretrained(checkpoint)

device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
model.to(device)

# Define the loss function
criterion = nn.CrossEntropyLoss()

In [None]:
# Evaluate on the test set
model.eval()
y_true_test = []
y_pred_test = []
loss_epoch= []

with torch.no_grad():
    for batch in test_dataloader:
        input_ids, attention_mask, batch_labels = batch

        input_ids= input_ids.to(device)
        attention_mask= attention_mask.to(device)
        batch_labels= batch_labels.to(device)

        outputs = model(input_ids= input_ids, attention_mask= attention_mask)
        logits = outputs.logits
        _, predicted = torch.max(logits, 1)

        # Append true labels and predicted labels for later use
        y_true_test.extend(batch_labels.tolist())
        y_pred_test.extend(predicted.tolist())

        # Calculate the loss
        loss = criterion(logits, batch_labels)
        loss_epoch.append(loss)

# Calculate accuracy and F1 score for the test set
test_accuracy = accuracy_score(y_true_test, y_pred_test)
test_f1 = f1_score(y_true_test, y_pred_test, average='weighted')

# Calculate the average loss
loss_epoch_np = [tensor.cpu().detach().numpy() for tensor in loss_epoch]
average_loss= np.mean(loss_epoch_np)
#average_loss= np.mean(loss_epoch)

print(f"Testset accuracy: {round(test_accuracy,5)} , Testset F1 score: {round(test_f1,5)}, Average loss: {round(average_loss.tolist(),5)}")
Test_results= [test_accuracy, test_f1, average_loss]

Testset accuracy: 0.68494 , Testset F1 score: 0.67219, Average loss: 0.79407


In [None]:
# Saving Testset Results
data = {
    'Test_results': Test_results
}
df = pd.DataFrame(data)
df.to_csv('Outputs/LiYuan Model-Source_Test_results.csv', index= False)