In [None]:
import pandas as pd
import re
from sklearn.model_selection import train_test_split
from transformers import DistilBertTokenizer, DistilBertForSequenceClassification, Trainer, TrainingArguments
import torch
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
import matplotlib.pyplot as plt

# the CSV should have columns: 'id', 'label', 'tweet'
df = pd.read_csv("/Users/neilkadian/Downloads/Default Safari Downloads Folder/sentiment_analysis.csv")

# verify the first few rows
print(df.head())

# define common negation words
NEGATION_WORDS = ["not", "never", "no", "none", "n't", "cannot", "neither", "nor"]

def contains_negation(text):
    """Check if a text contains any negation words."""
    text = text.lower()
    return any(re.search(rf'\b{neg_word}\b' if neg_word != "n't" else rf"{neg_word}\b", text) for neg_word in NEGATION_WORDS)

# define negation and non-negation categories
df['negation'] = df['tweet'].apply(contains_negation)

# sample the full dataset
sample_fraction = 1
df_sampled = df.sample(frac=sample_fraction, random_state=42)

# split the dataset into train and test sets
train_texts, test_texts, train_labels, test_labels = train_test_split(
    df_sampled["tweet"], df_sampled["label"], test_size=0.5, random_state=42
)

print(f"Size of train_texts: {len(train_texts)}")
print(f"Size of test_texts: {len(test_texts)}")

# Load the DistilBERT tokenizer
tokenizer = DistilBertTokenizer.from_pretrained("distilbert-base-uncased")

# tokenize the data
def tokenize_function(texts):
    return tokenizer(list(texts), padding="max_length", truncation=True, max_length=128)

train_encodings = tokenize_function(train_texts)
test_encodings = tokenize_function(test_texts)

# convert to Hugging Face Dataset format
from datasets import Dataset
train_dataset = Dataset.from_dict({
    "input_ids": train_encodings["input_ids"],
    "attention_mask": train_encodings["attention_mask"],
    "labels": list(train_labels)
})

test_dataset = Dataset.from_dict({
    "input_ids": test_encodings["input_ids"],
    "attention_mask": test_encodings["attention_mask"],
    "labels": list(test_labels)
})

# load DistilBERT model for sequence classification
model = DistilBertForSequenceClassification.from_pretrained("distilbert-base-uncased", num_labels=2)

In [None]:
# define evaluation metrics
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = torch.argmax(torch.tensor(logits), dim=1)
    precision, recall, f1, _ = precision_recall_fscore_support(labels, predictions, average="binary")
    acc = accuracy_score(labels, predictions)
    return {"accuracy": acc, "f1": f1, "precision": precision, "recall": recall}

# define training arguments
training_args = TrainingArguments(
    output_dir="./results",
    evaluation_strategy="epoch",
    learning_rate=1e-4,
    per_device_train_batch_size=32,
    per_device_eval_batch_size=32,
    num_train_epochs=1,
    weight_decay=0.01,
    save_strategy="epoch",
    load_best_model_at_end=True,
    metric_for_best_model="f1",
    logging_dir="./logs",
)

# trainer setup
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
)


# fine-tune the model
trainer.train()

# evaluate the model
eval_results = trainer.evaluate()
print("Evaluation Results:", eval_results)

### - Visualize negation handling accuracy

In [None]:
import numpy as np
import matplotlib.pyplot as plt

# define the categories for analysis
categories = [
    "Negative Label, Not Negated",
    "Negative Label, Negated",
    "Positive Label, Not Negated",
    "Positive Label, Negated",
]

# Predict on the test dataset
predictions = trainer.predict(test_dataset)
predicted_labels = np.argmax(predictions.predictions, axis=1)

# process test data into input text and true labels
test_texts = test_texts.reset_index(drop=True)  # Ensure proper indexing
test_labels = test_labels.reset_index(drop=True)

# initialize the matrix
matrix = np.zeros((4, 2), dtype=int)

# fill the matrix
for i, text in enumerate(test_texts):
    true_label = test_labels[i]
    predicted_label = predicted_labels[i]
    negation_flag = contains_negation(text)  # Use the negation function defined earlier ######

    # determine the row index based on true label and negation status
    if true_label == 1:  # Negative sentiment
        row = 0 if not negation_flag else 1
    elif true_label == 0:  # Positive sentiment
        row = 2 if not negation_flag else 3

    # determine the column index based on predicted label
    col = 0 if predicted_label == 1 else 1

    # Update the matrix
    matrix[row, col] += 1

# calculate total number of test samples
total_test_samples = len(test_texts)

# calculate percentages
percent_matrix = (matrix / total_test_samples) * 100

# visualize the matrix
# fig, ax = plt.subplots(figsize=(10, 6))

# Calculate total negated and non-negated inputs
negated_rows = [1, 3]  # rows corresponding to negated inputs
non_negated_rows = [0, 2]  # rows corresponding to non-negated inputs

# correctly labeled inputs for negated and non-negated cases
correct_negated = matrix[1, 0] + matrix[3, 1]  # negated, correct: True Negative + True Positive
correct_non_negated = matrix[0, 0] + matrix[2, 1]  # Non-Negated, correct: True Negative + True Positive

# Total negated and non-negated inputs
total_negated = matrix[1, 0] + matrix[1, 1] + matrix[3, 0] + matrix[3, 1]
total_non_negated = matrix[0, 0] + matrix[0, 1] + matrix[2, 0] + matrix[2, 1]

# calculate overall percentages
percent_correct_negated = (correct_negated / total_negated) * 100 if total_negated > 0 else 0
percent_correct_non_negated = (correct_non_negated / total_non_negated) * 100 if total_non_negated > 0 else 0

# Calculate total number of correct predictions
correct_predictions = matrix[0, 0] + matrix[1, 0] + matrix[2, 1] + matrix[3, 1]
# print(f" {matrix[0, 0]} ")
# print(f" {matrix[1, 0]} ")
# print(f" {matrix[2, 1]} ")
# print(f" {matrix[3, 1]} ")

# ccalculate overall accuracy
overall_accuracy = (correct_predictions / total_test_samples) * 100

# visualize the matrix
fig, ax = plt.subplots(figsize=(8, 5))

im = ax.imshow(matrix, cmap="Blues", aspect="auto")

# annotate the matrix with counts and percentages
for i in range(matrix.shape[0]):
    for j in range(matrix.shape[1]):
        count = matrix[i, j]
        percent = percent_matrix[i, j]
        ax.text(j, i, f"{count}\n({percent:.2f}%)", ha="center", va="center", color="black")

# add labels and titles
ax.set_xticks([0, 1])
ax.set_yticks(range(4))
ax.set_xticklabels(["Predicted Negative", "Predicted Positive"])
ax.set_yticklabels(categories)
ax.set_title("Performance Visualization Matrix (Counts and Percentages)")
ax.set_xlabel("Predicted Sentiment")
ax.set_ylabel("Input Categories")

# Add overall percentages below the matrix
fig.text(
    0.55,
    -0.08,
    f"Correctly Labeled Negated Inputs: {percent_correct_negated:.2f}%\n"
    f"Correctly Labeled Non-Negated Inputs: {percent_correct_non_negated:.2f}%\n"
    f"Overall Accuracy: {overall_accuracy:.2f}%",
    ha="center",
    fontsize=12,
    color="black",
)

plt.colorbar(im, ax=ax)
plt.tight_layout()
plt.show()

### - Examine the first few examples from each of the 8 categories:

In [None]:
# initialize matrix and examples dictionary
matrix = np.zeros((4, 2), dtype=int)
examples = {(row, col): [] for row in range(4) for col in range(2)}

# Fill the matrix and collect examples
for i, text in enumerate(test_texts):
    true_label = test_labels[i]
    predicted_label = predicted_labels[i]
    negation_flag = contains_negation(text)  # Use the negation function defined earlier

    # determine the row index based on true label and negation status
    if true_label == 1:  # Negative sentiment
        row = 0 if not negation_flag else 1
    elif true_label == 0:  # Positive sentiment
        row = 2 if not negation_flag else 3

    # deetermine the column index based on predicted label
    col = predicted_label

    # Update the matrix
    matrix[row, col] += 1

    # collect an example (limit to a small number per category for inspection)
    if len(examples[(row, col)]) < 78:
        examples[(row, col)].append(text)

# print examples for each category
categories = [
    "Negative Label, Not Negated",
    "Negative Label, Negated",
    "Positive Label, Not Negated",
    "Positive Label, Negated",
]
for row in range(4):
    for col in range(2):
        print(f"Category: {categories[row]} | Predicted {'Positive' if col == 0 else 'Negative'}")
        print(f"Count: {matrix[row, col]}")
        print("Examples:")
        for example in examples[(row, col)]:
            print(f"  - {example}")
        print("-" * 50)


### - Test Specific Input

In [None]:
import torch

# check if MPS is available
device = torch.device("mps") if torch.backends.mps.is_available() else torch.device("cpu")
print(f"Using device: {device}")

# move the model to the MPS device
model = model.to(device)

# function to test the model on a single input
def test_model_on_input(input_text, model, tokenizer, device):
    # tokenize the input
    encoding = tokenizer(input_text, return_tensors="pt", truncation=True, padding="max_length", max_length=128)
    
    # Move the tokenized input to the correct device
    encoding = {key: value.to(device) for key, value in encoding.items()}
    
    # pass the tokenized input through the model
    with torch.no_grad():  # disable gradient calculation
        outputs = model(**encoding)
    
    # Get the predicted label
    logits = outputs.logits
    predicted_label = torch.argmax(logits, dim=1).item()
    
    # map the predicted label to sentiment
    sentiment_map = {1: "Negative", 0: "Positive"}
    return sentiment_map[predicted_label]

# example usage
input_text = "Just switched from iphone to android. I'm in love and don't regret it! #samsung #GlobeGalaxyS4LTE"
predicted_sentiment = test_model_on_input(input_text, model, tokenizer, device)
print(f"Input Text: {input_text}")
print(f"Predicted Sentiment: {predicted_sentiment}")

### Make Saliency graph for specific input

In [None]:
def compute_saliency(input_text, model, tokenizer, device):
    # tokenize input text
    encoding = tokenizer(input_text, return_tensors="pt", truncation=True, padding="max_length", max_length=128)
    input_ids = encoding["input_ids"].to(device)
    attention_mask = encoding["attention_mask"].to(device)

    model.eval()

    # get embeddings from input_ids
    embeddings = model.get_input_embeddings()(input_ids)
    embeddings.requires_grad_()  # enable gradient computation for embeddings
    embeddings.retain_grad()  # retain gradients for the embeddings

    # forward pass
    outputs = model(inputs_embeds=embeddings, attention_mask=attention_mask)
    logits = outputs.logits

    # compute gradients with respect to the predicted class
    predicted_class = torch.argmax(logits, dim=1).item()
    loss = logits[0, predicted_class]  # Focus on the predicted class
    loss.backward()

    # compute token-level saliency scores (gradient magnitude)
    gradients = embeddings.grad.abs().sum(dim=-1).squeeze().cpu().numpy()
    tokens = tokenizer.convert_ids_to_tokens(input_ids[0])

    # remove `[PAD]` tokens
    valid_indices = attention_mask[0].cpu().numpy().astype(bool)
    tokens = [tokens[i] for i in range(len(tokens)) if valid_indices[i]]
    gradients = gradients[valid_indices]

    return tokens, gradients

# example usage
input_text = "I have to give some to #apple for the #Iphone . I dropped my #iphone in the sink today and not one problem. Thank you #Apple !"
tokens, gradients = compute_saliency(input_text, model, tokenizer, device)

# plot saliency
plt.figure(figsize=(10, 6))
plt.barh(tokens, gradients, color="skyblue")
plt.xlabel("Saliency Score")
plt.title("Gradient-Based Interpretability")
plt.gca().invert_yaxis()  # Invert y-axis for readability
plt.show()

predicted_sentiment = test_model_on_input(input_text, model, tokenizer, device)
print(f"Input Text: {input_text}")
print(f"Predicted Sentiment: {predicted_sentiment}")