In [None]:
%%capture
%pip install datasets transformers pandas matplotlib tqdm --upgrade --quiet

In [None]:
# Automatically loads changes in other files in this project
%load_ext autoreload
%autoreload 2

## Importing Libraries


In [None]:
import torch
from torch.utils.data import DataLoader
import pandas as pd
from transformers import pipeline
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm
from sklearn.model_selection import train_test_split


In [None]:
%cd ..
%pwd

In [None]:
df = pd.read_csv('validated_labeled_data_cleaned.csv')

In [None]:
df.head()

In [None]:
print(df['brand_label'].dtype)

In [None]:
df = df.dropna(subset=['brand_label', 'Text', 'emotion_label'])
df.info()

In [None]:
def convert_to_list_of_strings(value):
    # Ensure that the input is actually a string
    if isinstance(value, str):
        # Remove unwanted characters and split
        value = value.strip("[]").replace("'", "").split(", ")
    return value

# Convert Text to str type
df['Text'] = df['Text'].astype(str)
df['Text'] = df['Text'].str.rstrip(",")
df.rename(columns={'Text': 'text'}, inplace=True)

# Convert brand_label and emotion_label to list of strings
df['brand_label'] = df['brand_label'].apply(convert_to_list_of_strings)
df['emotion_label'] = df['emotion_label'].apply(convert_to_list_of_strings)

# Check the types to verify
print(df.dtypes)

In [None]:
df.head(5)

In [None]:
brand_perception_labels_map_to_label = {
        0: 'product quality',
        1: 'reputation & heritage',
        2: 'customer service',
        3: 'social impact',
        4: 'ethical practices',
        5: 'sustainability'
    }

emotion_labels_map_to_emotion = {0: "admiration",
    1: "amusement",
    2: "anger",
    3: "annoyance",
    4: "approval",
    5: "caring",
    6: "confusion",
    7: "curiosity",
    8: "desire",
    9: "disappointment",
    10: "disapproval",
    11: "disgust",
    12: "embarrassment",
    13: "excitement",
    14: "fear",
    15: "gratitude",
    16: "grief",
    17: "joy",
    18: "love",
    19: "nervousness",
    20: "optimism",
    21: "pride",
    22: "realization",
    23: "relief",
    24: "remorse",
    25: "sadness",
    26: "surprise",
    27: "neutral"}

brand_perception_labels_map_to_index = {
        'product quality': 0,
        'reputation & heritage': 1,
        'customer service': 2,
        'social impact': 3,
        'ethical practices': 4,
        'sustainability': 5
    }

emotion_labels_map_to_index = {
    "admiration": 0,
    "amusement": 1,
    "anger": 2,
    "annoyance": 3,
    "approval": 4,
    "caring": 5,
    "confusion": 6,
    "curiosity": 7,
    "desire": 8,
    "disappointment": 9,
    "disapproval": 10,
    "disgust": 11,
    "embarrassment": 12,
    "excitement": 13,
    "fear": 14,
    "gratitude": 15,
    "grief": 16,
    "joy": 17,
    "love": 18,
    "nervousness": 19,
    "optimism": 20,
    "pride": 21,
    "realization": 22,
    "relief": 23,
    "remorse": 24,
    "sadness": 25,
    "surprise": 26,
    "neutral": 27
}

## Create datasets

In [None]:
# Get text from df and put in a list of strings
texts = [item for item in df['text'] if isinstance(item, str) and item.strip() != '']

In [None]:
# Create a list of hot encoded values for brand aspects 
def hot_encode_brand_perception(row):
    result = np.zeros(6)
    for label in row['brand_label']:  # iterate through the list of labels in each row
        if label in brand_perception_labels_map_to_index:
            result[brand_perception_labels_map_to_index[label]] = 1
    return result

# Apply the function to each row
brand_labels = df.apply(hot_encode_brand_perception, axis=1).tolist()

In [None]:
# Assuming emotion_labels_map is defined
random_emotions = []
for emotion_list in df['emotion_label']:
    for emotion in emotion_list:
        if emotion not in emotion_labels_map_to_index:
            random_emotions.append(emotion)
random_emotion_dict = {}
for emotion in random_emotions:
    if emotion in random_emotion_dict:
        random_emotion_dict[emotion] += 1
    else:
        random_emotion_dict[emotion] = 1
print(random_emotion_dict)
    
                  

In [None]:
# Step 1: Create the mapping
incorrect_to_correct = {
    "horrible": ["disgust", "sadness"],
    "love": ["admiration", "joy"],
    "neutral": ["neutral"],
    "bad": ["annoyance", "disapproval"],
    "hate": ["anger", "disgust"],
    "excited": ["excitement"],
    "worse": ["disappointment"],
    "disappointed": ["disappointment"],
    "great": ["joy", "admiration"],
    "amazing": ["joy", "admiration"],
    "impressed": ["admiration"],
    "thrilled": ["joy", "excitement"],
    "terrible": ["disgust", "sadness"],
    "amused": ["amusement"],
    "curious": ["curiosity"],
    "worst": ["disgust", "sadness"],
    "good": ["approval", "joy"],
    "regret": ["remorse"],
    "need": ["desire"],
    "trust": ["admiration"],
    "inspired": ["admiration", "joy"],
    "amazed": ["surprise", "admiration"],
    "confused": ["confusion"],
    "happy": ["joy"],
    "better": ["approval", "optimism"]
}

# Step 2: Write a function to process the column
def map_emotions(emotion_labels):
    return [synonym for emotion in emotion_labels for synonym in incorrect_to_correct.get(emotion, [emotion])]

# Step 3: Apply the function to the DataFrame
df['emotion_label'] = df['emotion_label'].apply(map_emotions)


In [None]:
def hot_encode_emotions(row):
    result = np.zeros(28)
    for label in row['emotion_label']:  # iterate through the list of labels in each row
        if label in emotion_labels_map_to_index:
            result[emotion_labels_map_to_index[label]] = 1
    return result

# Apply the function to each row
emotion_labels = df.apply(hot_encode_emotions, axis=1).tolist()

In [None]:
print(df['emotion_label'][8])

In [None]:
# Assuming emotion_labels_map is defined
random_emotions = []
for emotion_list in df['emotion_label']:
    for emotion in emotion_list:
        if emotion not in emotion_labels_map_to_index:
            random_emotions.append(emotion)
random_emotion_dict = {}
for emotion in random_emotions:
    if emotion in random_emotion_dict:
        random_emotion_dict[emotion] += 1
    else:
        random_emotion_dict[emotion] = 1
print(random_emotion_dict)

In [None]:
# Split into validation, test, and train splits

# First, split into train and temp (either test or validation)
texts_train, texts_temp, emotions_train, emotions_temp, brands_train, brands_temp = train_test_split(
    texts, emotion_labels, brand_labels, test_size=0.2, random_state=42)

# Then, split the temp data into validation and test sets
texts_val, texts_test, emotions_val, emotions_test, brands_val, brands_test = train_test_split(
    texts_temp, emotions_temp, brands_temp, test_size=0.5, random_state=42)  # This splits the remaining 20% into two 10% segments


In [None]:
# CODE I RAN ORIGINALLY BUT DONT RUN IT AGAIN BC I SAVED THE VARIABLES IN A FILE, 
# I JUST LEFT IT HERE TO DEMONSTRATE WHAT I DID - WE CAN TAKE IT OUT IF NEED BE
#from datasetss.brand_perception_dataset import BrandPerceptionDataset

#train_dataset = BrandPerceptionDataset(texts_train, emotions_train, brands_train)
#val_dataset = BrandPerceptionDataset(texts_val, emotions_val, brands_val)
#test_dataset = BrandPerceptionDataset(texts_test, emotions_test, brands_test)


In [None]:
# Loading datasets 
import pickle
with open('datasetss/train_dataset.pkl', 'rb') as f:
    train_dataset = pickle.load(f)

with open('datasetss/val_dataset.pkl', 'rb') as f:
    val_dataset = pickle.load(f)

with open('datasetss/test_dataset.pkl', 'rb') as f:
    test_dataset = pickle.load(f)

In [None]:
# Creating dataloaders
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)

In [None]:
from modules.BrandPerceptionModel import BrandPerceptionModel
config = {
    'model_name': 'SamLowe/roberta-base-go_emotions',
    'n_labels_bp': 6,
    'batch_size': 16,
    'lr': 1.5e-5,
    'warmup': 0.2, 
    'train_size': len(train_loader),
    'weight_decay': 0.001,
    'n_epochs': 10
}
print("Config:", config)

In [None]:
# # CODE I RAN ORIGINALLY BUT DONT RUN IT AGAIN BC I SAVED THE VARIABLES IN A FILE, 
# I JUST LEFT IT HERE TO DEMONSTRATE WHAT I DID - WE CAN TAKE IT OUT IF NEED BE
#import pytorch_lightning as pl
#trainer = pl.Trainer(max_epochs=config['n_epochs'], num_sanity_val_steps=5, accelerator='gpu')
# VALIDATION TOOK PLACE HERE:
#trainer.fit(model, train_loader, val_loader)
#trainer.save_checkpoint("models/brand_perception_model_checkpoint.ckpt")

In [None]:
# Load model
model = BrandPerceptionModel.load_from_checkpoint("models/brand_perception_model_checkpoint.ckpt", config=config)

In [None]:
# Test model
import pytorch_lightning as pl
trainer = pl.Trainer(max_epochs=config['n_epochs'], accelerator="gpu" if torch.cuda.is_available() else "cpu")
trainer.test(model, dataloaders=test_loader)

In [None]:
# Load data (data for one specifc brand: Amiri)
amiri_df = pd.read_csv('filtered_amiri_data.csv')

In [None]:
# Construct data set and loader
from datasetss.brand_perception_dataset import BrandPerceptionDataset
amiri_texts = [item for item in amiri_df['text'] if isinstance(item, str) and item.strip() != '']
amiri_dataset = BrandPerceptionDataset(amiri_texts)
amiri_loader = DataLoader(amiri_dataset, batch_size=4, num_workers=4)

In [None]:
# CODE I RAN ORIGINALLY BUT DONT RUN IT AGAIN BC IT REQUIRES A GPU AND WILL TAKE TOO LONG, 
# I JUST LEFT IT HERE TO DEMONSTRATE WHAT I DID - WE CAN TAKE IT OUT IF NEED BE
import torch
from torch.cuda.amp import autocast, GradScaler
import torch.utils.checkpoint as checkpoint

# Determine device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
scaler = GradScaler()

all_emotion_probs = []
all_brand_probs = []

# Move model to GPU
model.to(device)

def checkpointed_predict_step(batch):
    def forward_func(input_ids, attention_mask):
        return model(input_ids, attention_mask)
    return checkpoint.checkpoint(forward_func, batch['input_ids'], batch['attention_mask'])

for batch_idx, batch in enumerate(amiri_loader):
    batch = {
        "input_ids": batch['input_ids'].to(device),
        "attention_mask": batch['attention_mask'].to(device),
        "labels_emotion": batch['labels_emotion'].to(device),
        "labels_brand": batch['labels_brand'].to(device),
    }

    with autocast():
        # Use checkpointing to manage memory
        loss, emotion_probs, brand_probs = checkpointed_predict_step(batch)

    emotion_probs = emotion_probs.cpu()
    brand_probs = brand_probs.cpu()

    all_emotion_probs.append(emotion_probs)
    all_brand_probs.append(brand_probs)

    # Clear GPU memory
    torch.cuda.empty_cache()

    # Clear variables
    del batch, loss, emotion_probs, brand_probs

# Concatenate all probabilities for final results
all_emotion_probs = torch.cat(all_emotion_probs, dim=0)
all_brand_probs = torch.cat(all_brand_probs, dim=0)

# Display final results
print(f"Emotion probabilities shape: {all_emotion_probs.shape}")
print(f"Brand probabilities shape: {all_brand_probs.shape}")

# Print memory summary
print(torch.cuda.memory_summary(device=device, abbreviated=True))


In [None]:
# # CODE I RAN ORIGINALLY BUT DONT RUN IT AGAIN BC I SAVED THE VARIABLES IN A FILE, 
# I JUST LEFT IT HERE TO DEMONSTRATE WHAT I DID - WE CAN TAKE IT OUT IF NEED BE
with open("probs.pkl", "wb") as f:
    pickle.dump((all_emotion_probs, all_brand_probs), f)

In [None]:
# Load the results for Amiri
with open("probs.pkl", "rb") as f:
    all_emotion_probs, all_brand_probs = pickle.load(f)

In [None]:
import torch
import torch.nn.functional as F

# Apply sigmoid to convert logits to probabilities
all_emotion_probs = F.sigmoid(all_emotion_probs)
all_brand_perception_probs = F.sigmoid(all_brand_probs)

# Calculate the average probabilities for each emotion
avg_emotion_probs = all_emotion_probs.mean(dim=0)
avg_brand_perception_probs = all_brand_perception_probs.mean(dim=0)

print(f"Average Emotion Probabilities: {avg_emotion_probs}")
print(f"Average Brand Perception Probabilities: {avg_brand_perception_probs}")


In [None]:
# Function to map dimensions of tensor to labels
def map_to_labels(tensor, labels_map):
    labels = []
    for i, value in enumerate(tensor):
        label = labels_map.get(i, "Unknown")
        labels.append((label, value.item()))
    return labels

In [None]:
# Map indices to labels for brand perception tensor
amiri_brand_perception_labels = map_to_labels(avg_brand_perception_probs, brand_perception_labels_map_to_label)
print("Brand Perception:")
for label, value in amiri_brand_perception_labels:
    print(f"{label}: {value}")

# Map indices to labels for emotion tensor
amiri_emotion_labels = map_to_labels(avg_emotion_probs, emotion_labels_map_to_emotion)
print("\nEmotion:")
for label, value in amiri_emotion_labels:
    print(f"{label}: {value}")