### generate 10% as test dataset, 10% validation dataset, and 80% for training dataset. Labels are required for both training dataset and validation dataset.

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split

# Load DataFrame
df = pd.read_csv('random_90_percent.csv')

# Split the DataFrame into training and validation sets
train_df, val_df = train_test_split(df, test_size=0.11, random_state=42)  # Adjust test_size as needed


In [None]:
df

In [None]:
train_df.to_csv('train_df.csv', index=False)
val_df.to_csv('validation_df.csv', index=False)


In [None]:
#2. Prepare Data
from datasets import load_dataset

dataset = load_dataset('csv', data_files={'train': 'train.csv', 'validation': 'validation.csv'})


In [None]:
from transformers import GPT2Tokenizer

# Load the tokenizer
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')

# Add a padding token if it does not exist
if tokenizer.pad_token is None:
    tokenizer.add_special_tokens({'pad_token': '[PAD]'})

# Verify that the padding token was added
print(f"Padding token: {tokenizer.pad_token}")


In [None]:
from transformers import GPT2ForSequenceClassification

# Load the model
model = GPT2ForSequenceClassification.from_pretrained('gpt2', num_labels=3)

# Resize token embeddings layer to accommodate the new special tokens
model.resize_token_embeddings(len(tokenizer))


In [None]:
# Load and Tokenize Data
from datasets import load_dataset

# Load dataset
dataset = load_dataset('csv', data_files={'train': 'train.csv', 'validation': 'validation.csv'})

# Define tokenization function
def tokenize_function(examples):
    return tokenizer(examples['text'], padding='max_length', truncation=True, max_length=512)

# Tokenize dataset
tokenized_datasets = dataset.map(tokenize_function, batched=True)

# Verify tokenized datasets
print(tokenized_datasets)


In [None]:
#5. Setup Training Arguments
#Define Training Arguments:

from transformers import TrainingArguments

training_args = TrainingArguments(
    output_dir='./results',
    evaluation_strategy='epoch',
    learning_rate=5e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=3,
    weight_decay=0.01,
)


In [None]:
#6. Train the Model
#Initialize Trainer:
from transformers import Trainer

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_datasets['train'],
    eval_dataset=tokenized_datasets['validation'],
    tokenizer=tokenizer,
)


In [None]:
# Assuming the dataset has a 'label' column with values 'positive', 'neutral', 'negative'
label_mapping = {'positive': 2, 'neutral': 1, 'negative': 0}

# Convert labels in the dataset
dataset['train'] = dataset['train'].map(lambda example: {'label': label_mapping[example['label']]})
dataset['validation'] = dataset['validation'].map(lambda example: {'label': label_mapping[example['label']]})


In [None]:
def tokenize_function(examples):
    return tokenizer(examples['text'], padding='max_length', truncation=True, max_length=512)

tokenized_datasets = dataset.map(tokenize_function, batched=True)


In [None]:
print(tokenized_datasets['train']['label'][:5])  # This should print a list of integers


In [None]:
# Set pad_token to eos_token (or define a new one)
tokenizer.pad_token = tokenizer.eos_token  # or define a new one: tokenizer.pad_token = '<PAD>'

# Resize the model embeddings to account for the new padding token
model.resize_token_embeddings(len(tokenizer))


In [None]:
# Set pad_token_id in model config
model.config.pad_token_id = tokenizer.pad_token_id


In [None]:
from transformers import DataCollatorWithPadding

# Define the data collator for padding
data_collator = DataCollatorWithPadding(tokenizer=tokenizer, pad_to_multiple_of=8)


In [None]:
from transformers import DataCollatorWithPadding
from transformers import Trainer, TrainingArguments

training_args = TrainingArguments(
    output_dir='./results', 
    evaluation_strategy='epoch',
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=3,
    weight_decay=0.01
)


# Create the Trainer instance
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_datasets['train'],
    eval_dataset=tokenized_datasets['validation'],
    tokenizer=tokenizer,
    data_collator=data_collator  # Now it's defined
)


In [None]:
#Train the Model:
trainer.train()

In [None]:
#7. Evaluate the Model
results = trainer.evaluate()
print(results)


In [None]:
# Define a mapping from labels to integers
from torch.utils.data import Dataset
import torch
label_mapping = {'negative': 0, 'neutral': 1, 'positive': 2}

class SentimentDataset(torch.utils.data.Dataset):
    def __init__(self, dataframe, tokenizer, max_len):
        self.dataframe = dataframe
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, index):
        text = self.dataframe.iloc[index]['text']
        label_str = self.dataframe.iloc[index]['label']
        
        # Map label string to integer
        label = label_mapping[label_str]

        encoding = self.tokenizer(
            text,
            truncation=True,
            max_length=self.max_len,
            padding='max_length',
            return_attention_mask=True,
            return_tensors='pt'
        )
        
        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': torch.tensor(label, dtype=torch.long)
        }


In [None]:
import torch
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    
    accuracy = accuracy_score(labels, preds)
    precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='weighted', zero_division=1)
    
    return {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1': f1
    }

trainer.evaluate(eval_dataset=tokenized_datasets['validation'], metric_key_prefix="eval")


In [None]:
from transformers import Trainer

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_datasets['train'],
    eval_dataset=tokenized_datasets['validation'],
    tokenizer=tokenizer,
    compute_metrics=compute_metrics  # Add the metrics computation function
)


In [None]:
eval_results = trainer.evaluate(eval_dataset=tokenized_datasets['validation'], metric_key_prefix="eval")
print(eval_results)


In [None]:
# Redefine analyze_predictions to also compute metrics
from collections import Counter

def analyze_predictions(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)

    print("Label Distribution in True Labels:", Counter(labels))
    print("Label Distribution in Predictions:", Counter(preds))

    return compute_metrics(pred)

# Update the Trainer initialization
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_datasets["train"],
    eval_dataset=tokenized_datasets["validation"],
    tokenizer=tokenizer,
    compute_metrics=analyze_predictions  # Pass it here
)

# Evaluate
trainer.evaluate(eval_dataset=tokenized_datasets['validation'], metric_key_prefix="eval")


In [None]:
#Step 8: Inference

inputs = tokenizer("This product is not fantastic!", return_tensors="pt", truncation=True, padding=True)
outputs = model(**inputs)
predicted_class = torch.argmax(outputs.logits, dim=-1)
print(outputs)
print(predicted_class)

In [None]:
#9. Save and Load the Model
#Save the Model and Tokenizer:

model.save_pretrained('./saved_model')
tokenizer.save_pretrained('./saved_model')


In [None]:
#Load the Model and Tokenizer Later:

model = GPT2ForSequenceClassification.from_pretrained('./saved_model')
tokenizer = GPT2Tokenizer.from_pretrained('./saved_model')


In [None]:
model

In [None]:
tokenizer

In [None]:
# get a confusion matrix for the GPT model.
from sklearn.metrics import confusion_matrix
import torch

# Step 1: Get predictions and true labels
predictions = []
true_labels = []

for batch in tokenized_datasets['validation']:
    inputs = tokenizer(batch['text'], return_tensors='pt', padding=True, truncation=True)
    with torch.no_grad():
        outputs = model(**inputs)
    
    # Step 2: Convert model outputs (logits) to predicted class labels
    logits = outputs.logits
    predicted_class = torch.argmax(logits, dim=-1).cpu().numpy()
    
    predictions.extend(predicted_class)
    true_labels.append(batch['label'])  # Use append instead of extend

# Step 3: Compute confusion matrix
conf_matrix = confusion_matrix(true_labels, predictions, labels=[0, 1, 2])  # For 'negative', 'neutral', 'positive'

print(conf_matrix)


In [None]:
import torch

dataloader = DataLoader(tokenized_datasets['validation'], batch_size=32, shuffle=True)
for batch in dataloader:
    input_ids = batch['input_ids']  
 

In [None]:
print(tokenized_datasets['train'].column_names)


In [None]:
print(tokenized_datasets['validation'].column_names)

In [None]:
def pad_sequences(sequences, pad_value=0):
    max_length = max(len(seq) for seq in sequences)
    padded = [seq + [pad_value] * (max_length - len(seq)) for seq in sequences]
    return torch.tensor(padded, dtype=torch.long)

# Pad manually
input_ids_tensor = pad_sequences(input_ids).to(model.device)
attention_mask_tensor = pad_sequences(attention_mask).to(model.device)


In [None]:
import torch
from torch.nn.utils.rnn import pad_sequence

# Print the types and lengths for debugging
print(f"Type of input_ids: {type(input_ids)}, Length: {len(input_ids)}")
print(f"Type of attention_mask: {type(attention_mask)}, Length: {len(attention_mask)}")

try:
    # Ensure input_ids is a list of tensors
    if isinstance(input_ids, list) and all(isinstance(i, torch.Tensor) for i in input_ids):
        # Pad the sequences if they are of varying lengths
        input_ids_tensor = pad_sequence(input_ids, batch_first=True, padding_value=0)
    else:
        raise ValueError("input_ids is not a list of tensors or contains non-tensor elements.")
    
    print(f"Padded input_ids_tensor shape: {input_ids_tensor.shape}")

    # Ensure attention_mask is a list of tensors
    if isinstance(attention_mask, list) and all(isinstance(i, torch.Tensor) for i in attention_mask):
        # Pad the attention masks similarly
        attention_mask_tensor = pad_sequence(attention_mask, batch_first=True, padding_value=0)
    else:
        raise ValueError("attention_mask is not a list of tensors or contains non-tensor elements.")

    print(f"Padded attention_mask_tensor shape: {attention_mask_tensor.shape}")

    # Move tensors to the model's device (CPU or GPU)
    input_ids_tensor = input_ids_tensor.to(model.device)
    attention_mask_tensor = attention_mask_tensor.to(model.device)

    # Forward pass
    outputs = model(input_ids=input_ids_tensor, attention_mask=attention_mask_tensor)
    print(outputs)

except Exception as e:
    print(f"Error: {e}")


In [None]:
import torch

# Assume input_ids and attention_mask are provided as before
# input_ids = ...
# attention_mask = ...

# Check if input_ids is a list of tensors
if isinstance(input_ids, list) and isinstance(input_ids[0], torch.Tensor):
    # Stack tensors along a new dimension
    input_ids = torch.stack(input_ids)
elif isinstance(input_ids, list):
    # Convert directly to tensor if it's a flat list of integers
    input_ids = torch.tensor(input_ids, dtype=torch.long)

# Check if attention_mask is a list of tensors
if isinstance(attention_mask, list) and isinstance(attention_mask[0], torch.Tensor):
    # Stack tensors along a new dimension
    attention_mask = torch.stack(attention_mask)
elif isinstance(attention_mask, list):
    # Convert directly to tensor if it's a flat list of integers
    attention_mask = torch.tensor(attention_mask, dtype=torch.long)

# Now input_ids and attention_mask should be tensors
print("input_ids shape:", input_ids.shape)
print("attention_mask shape:", attention_mask.shape)


In [None]:
# Assign eos_token as pad_token or add [PAD] token if needed
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token  # Option 1
   
    # tokenizer.add_special_tokens({'pad_token': '[PAD]'})

# Now proceed with data processing
for batch in tokenized_datasets['validation']:
    inputs = tokenizer(batch['text'], return_tensors='pt', padding=True, truncation=True)
    with torch.no_grad():
        outputs = model(**inputs)
     

In [None]:
from sklearn.metrics import roc_auc_score
import numpy as np
import torch
import torch.nn.functional as F
dataset for evaluation

# Step 1: Get predicted probabilities and true labels
predicted_probs = []
true_labels = []

for batch in tokenized_datasets['validation']:
    inputs = tokenizer(batch['text'], return_tensors='pt', padding=True, truncation=True)
    with torch.no_grad():
        outputs = model(**inputs)
    
    # Step 2: Convert logits to probabilities using softmax
    logits = outputs.logits
    probs = F.softmax(logits, dim=-1).cpu().numpy()
    
    predicted_probs.extend(probs)
    
    # Corrected label handling based on whether batch['label'] is int or list
    if isinstance(batch['label'], int):  # If it's an integer
        true_labels.append(batch['label'])
    else:  # If it's a list of labels
        true_labels.extend(batch['label'])

# Step 3: Convert true labels to one-hot encoding for multiclass ROC-AUC
true_labels_one_hot = np.zeros((len(true_labels), 3))  # Assuming 3 classes
for i, label in enumerate(true_labels):
    true_labels_one_hot[i, label] = 1

# Step 4: Compute the ROC-AUC score using one-vs-rest approach
roc_auc = roc_auc_score(true_labels_one_hot, np.array(predicted_probs), multi_class='ovr')

print(f'ROC-AUC Score: {roc_auc}')


### the following is to predict on the test dataset.

In [None]:
import pandas as pd

# Load DataFrame
df_test = pd.read_csv('df_remaining_10.csv')
df_test

In [None]:
import torch
from transformers import GPT2Tokenizer, GPT2ForSequenceClassification
import numpy as np

# Load trained model and tokenizer
tokenizer = GPT2Tokenizer.from_pretrained('./saved_model')
model = GPT2ForSequenceClassification.from_pretrained('./saved_model')

# Ensure the model is in evaluation mode
model.eval()

def predict_sentiment(texts, tokenizer, model):
    results = []
    for text in texts:
        # Tokenize the input text
        inputs = tokenizer(text, return_tensors='pt', padding=True, truncation=True, max_length=512)
        input_ids = inputs['input_ids']
        attention_mask = inputs['attention_mask']
        
        # Run the model
        with torch.no_grad():
            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        
        # Get the logits and apply softmax
        logits = outputs.logits
        probabilities = torch.nn.functional.softmax(logits, dim=-1).numpy()
        
        # Get the predicted label
        predicted_label = np.argmax(probabilities, axis=1)[0]
        sentiment_score = probabilities[0][predicted_label]
        confidence = np.max(probabilities)

        results.append({
            'label': predicted_label,
            'sentiment_score': sentiment_score,
            'confidence': confidence
        })
    return results


In [None]:
from transformers import GPT2Tokenizer, GPT2ForSequenceClassification

local_model_path = './saved_model'

# Load the model and tokenizer from local directory
tokenizer = GPT2Tokenizer.from_pretrained(local_model_path)
model = GPT2ForSequenceClassification.from_pretrained(local_model_path)

# Example usage
model.eval()
text = "I use this product!"
inputs = tokenizer(text, return_tensors='pt', padding=True, truncation=True)
outputs = model(**inputs)
logits = outputs.logits
probabilities = torch.nn.functional.softmax(logits, dim=-1).detach().numpy()
predicted_label = np.argmax(probabilities, axis=1)[0]
confidence = np.max(probabilities)

print(f"Predicted Label: {predicted_label}, Confidence: {confidence}")


In [None]:
# Apply the prediction function
predictions = predict_sentiment(df_test['clean_text'], tokenizer, model)

# Create a new DataFrame with predictions
predictions_df = pd.DataFrame(predictions)

# Concatenate the original test DataFrame with the predictions
result_df = pd.concat([df_test, predictions_df], axis=1)

# Save to CSV
result_df.to_csv('predictions.csv', index=False)


In [None]:
# Define the mapping from numeric labels to categorical labels
label_mapping = {0: 'negative', 1: 'neutral', 2: 'positive'}

# Convert the numeric labels to categorical labels
result_df['label'] = result_df['label'].map(label_mapping)

print(result_df)

In [None]:
result_df = result_df.drop(columns=['label_2'])
result_df

In [None]:
# Save to CSV
result_df.to_csv('predictions.csv', index=False)


In [None]:
# Load DataFrame
df = pd.read_csv('random_90_percent.csv')
df

In [None]:
# Concatenate the random 90% DataFrame with the predictions 10%
whole_df = pd.concat([df, result_df], axis=0)
whole_df

In [None]:
whole_df.to_csv('whole_df_916.csv', index=False)
