
# Context-Aware Sentiment Analysis using LLMs — End-to-End

**What this notebook does**  
- Loads **manually downloaded CSVs** (`train.csv`, `valid.csv`, `test.csv`) from `frankdarkluo/DailyDialog` (Hugging Face).  
- Builds **context-aware inputs** by concatenating `context` + `response`.  
- Trains & evaluates two LLM baselines:
  - `cardiffnlp/twitter-roberta-base-sentiment` (RoBERTa, 3-class head)
  - `distilbert-base-uncased-finetuned-sst-2-english` (DistilBERT, re-initialized head if needed)
- Reports accuracy, macro-F1, weighted-F1, confusion matrix.  
- Saves artifacts to `./outputs/`.

> Place your CSVs in the working directory or update the paths below.


In [None]:

# # If needed, uncomment to install deps (Colab/clean venv):
# !pip install -U transformers datasets evaluate scikit-learn matplotlib pandas numpy accelerate


In [None]:

import os, json, math, random, time
from pathlib import Path

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.metrics import accuracy_score, f1_score, precision_recall_fscore_support, classification_report, confusion_matrix
from transformers import (AutoTokenizer, AutoModelForSequenceClassification,
                          TrainingArguments, Trainer, DataCollatorWithPadding,
                          EarlyStoppingCallback, pipeline, set_seed)
from datasets import Dataset, DatasetDict

# -----------------------
# Configuration
# -----------------------
class CFG:
    # Paths to your local CSVs
    TRAIN_PATH = "train.csv"   # update if needed
    VALID_PATH = "valid.csv"
    TEST_PATH  = "test.csv"

    # Column names (adjust if your CSVs differ)
    CONTEXT_COL   = "context"
    RESPONSE_COL  = "response"
    LABEL_COLS_TRY = ["label", "sentiment", "target"]  # searched in this order
    EMOTION_COLS_TRY = ["emotion", "emotion_id", "label_emotion"]

    # Emotion -> Sentiment mapping (DailyDialog-style)
    # If you have emotion IDs 0..6, map to names here:
    EMOTION_ID_TO_NAME = {
        0: "no_emotion",  # aka neutral
        1: "anger",
        2: "disgust",
        3: "fear",
        4: "happiness",
        5: "sadness",
        6: "surprise",
    }
    # Name -> sentiment class (3-way)
    EMOTION_TO_SENTIMENT = {
        "no_emotion": "neutral",
        "anger": "negative",
        "disgust": "negative",
        "fear": "negative",
        "happiness": "positive",
        "sadness": "negative",
        "surprise": "neutral",  # can be argued positive/neutral; choose neutral by default
        "neutral": "neutral",
    }

    # Sentiment label set (3-class)
    SENTIMENT_LABELS = ["negative", "neutral", "positive"]
    LABEL2ID = {k:i for i,k in enumerate(SENTIMENT_LABELS)}
    ID2LABEL = {i:k for k,i in LABEL2ID.items()}

    # Models to train
    MODELS = [
        ("cardiffnlp/twitter-roberta-base-sentiment", "roberta_twitter"),
        ("distilbert-base-uncased-finetuned-sst-2-english", "distilbert_sst2"),
    ]

    # Training
    SEED = 42
    MAX_LENGTH = 256              # truncate long dialogues
    BATCH_SIZE = 16
    EPOCHS = 3
    LR = 2e-5
    WEIGHT_DECAY = 0.01
    WARMUP_RATIO = 0.06
    PATIENCE = 2                  # early stopping on eval loss
    LOG_STEPS = 50

    # Context ablation toggle (train/eval both variants)
    TRAIN_WITH_CONTEXT = True
    TRAIN_WITHOUT_CONTEXT = True

    # If labels are missing, auto-label using RoBERTa (silver labels)
    AUTO_LABEL_IF_MISSING = True

    # Output dir
    OUTDIR = Path("outputs")


set_seed(CFG.SEED)
CFG.OUTDIR.mkdir(parents=True, exist_ok=True)
print("Config loaded.")


In [None]:

def _read_csv(path):
    if not Path(path).exists():
        raise FileNotFoundError(f"CSV not found: {path}")
    df = pd.read_csv(path)
    # Strip possible BOM or whitespace columns names
    df.columns = [c.strip() for c in df.columns]
    return df

train_df = _read_csv(CFG.TRAIN_PATH)
valid_df = _read_csv(CFG.VALID_PATH)
test_df  = _read_csv(CFG.TEST_PATH)

print("Train shape:", train_df.shape)
print("Valid shape:", valid_df.shape)
print("Test  shape:",  test_df.shape)
print("Columns (train):", list(train_df.columns)[:20])

# Ensure required text columns exist
for col in [CFG.CONTEXT_COL, CFG.RESPONSE_COL]:
    if col not in train_df.columns:
        raise KeyError(f"Expected column '{col}' not found in train.csv. Found: {train_df.columns}")
    if col not in valid_df.columns or col not in test_df.columns:
        raise KeyError(f"Expected column '{col}' missing from valid/test CSV.")


In [None]:

def discover_or_create_labels(df: pd.DataFrame) -> pd.Series:
    # 1) Direct sentiment column?
    for c in CFG.LABEL_COLS_TRY:
        if c in df.columns:
            vals = df[c]
            # If categorical strings like 'positive', 'neutral', 'negative'
            if vals.dtype == object:
                return vals.map(lambda x: str(x).strip().lower())
            # If numeric IDs 0..N, assume already mapped to 3-class or convert via emotion if needed
            if np.issubdtype(vals.dtype, np.integer):
                # Heuristic: if max id <= 2, assume already 3-class sentiment
                if int(vals.max()) <= 2:
                    inv = {0:"negative", 1:"neutral", 2:"positive"}
                    return vals.map(inv)
                # Otherwise treat as emotion IDs 0..6 -> map to sentiment
                else:
                    emo_names = vals.map(CFG.EMOTION_ID_TO_NAME)
                    return emo_names.map(CFG.EMOTION_TO_SENTIMENT)

    # 2) Emotion column -> map to sentiment
    for c in CFG.EMOTION_COLS_TRY:
        if c in df.columns:
            vals = df[c]
            if np.issubdtype(vals.dtype, np.integer):
                emo_names = vals.map(CFG.EMOTION_ID_TO_NAME)
            else:
                emo_names = vals.map(lambda x: str(x).strip().lower())
            return emo_names.map(CFG.EMOTION_TO_SENTIMENT)

    # 3) No labels present — optionally auto-label via RoBERTa
    if CFG.AUTO_LABEL_IF_MISSING:
        print("No label/emotion column found — auto-labeling with RoBERTa sentiment model (silver labels).")
        clf = pipeline('sentiment-analysis', model='cardiffnlp/twitter-roberta-base-sentiment', top_k=None)
        texts = (df[CFG.CONTEXT_COL].fillna('') + " [SEP] " + df[CFG.RESPONSE_COL].fillna('')).tolist()
        preds = []
        for t in texts:
            out = clf(t)[0]  # [{'label': 'LABEL_0', 'score': ...}, ...] depending on model
            # cardiffnlp returns labels like 'negative', 'neutral', 'positive'
            # To be safe, pick top label by score
            top = max(out, key=lambda x: float(x['score']))
            preds.append(top['label'].lower())
        return pd.Series(preds, index=df.index)
    else:
        raise ValueError("No label/emotion column found and AUTO_LABEL_IF_MISSING=False. Please add labels.")

# Build unified 'sentiment' column (str): 'negative'/'neutral'/'positive'
train_df['sentiment'] = discover_or_create_labels(train_df)
valid_df['sentiment'] = discover_or_create_labels(valid_df)
test_df['sentiment']  = discover_or_create_labels(test_df)

# Drop any rows with missing labels
def _drop_missing_labels(df):
    before = len(df)
    df = df[df['sentiment'].isin(CFG.SENTIMENT_LABELS)].copy()
    after = len(df)
    if before != after:
        print(f"Dropped {before-after} rows due to invalid labels.")
    return df

train_df = _drop_missing_labels(train_df)
valid_df = _drop_missing_labels(valid_df)
test_df  = _drop_missing_labels(test_df)

# Encode labels
train_df['label_id'] = train_df['sentiment'].map(CFG.LABEL2ID)
valid_df['label_id'] = valid_df['sentiment'].map(CFG.LABEL2ID)
test_df['label_id']  = test_df['sentiment'].map(CFG.LABEL2ID)

print(train_df['sentiment'].value_counts())


In [None]:

# ----- EDA: Class distribution -----
def plot_class_distribution(df, title):
    counts = df['sentiment'].value_counts().reindex(CFG.SENTIMENT_LABELS).fillna(0)
    plt.figure()
    counts.plot(kind='bar')
    plt.title(title)
    plt.xlabel("Class")
    plt.ylabel("Count")
    plt.show()

plot_class_distribution(train_df, "Train class distribution")
plot_class_distribution(valid_df, "Validation class distribution")
plot_class_distribution(test_df,  "Test class distribution")

# ----- EDA: Dialogue length (chars) -----
def plot_length_hist(df, title):
    lens = (df[CFG.CONTEXT_COL].fillna('') + " " + df[CFG.RESPONSE_COL].fillna('')).str.len()
    plt.figure()
    plt.hist(lens, bins=50)
    plt.title(title)
    plt.xlabel("Combined context+response length (chars)")
    plt.ylabel("Frequency")
    plt.show()

plot_length_hist(train_df, "Length distribution (train)")


In [None]:

# Build two views:
#  A) With context:   "CTX: {context} [SEP] RESP: {response}"
#  B) Response-only:  "{response}"
SEP = " [SEP] "

train_df['text_with_ctx'] = "CTX: " + train_df[CFG.CONTEXT_COL].fillna('') + SEP + "RESP: " + train_df[CFG.RESPONSE_COL].fillna('')
valid_df['text_with_ctx'] = "CTX: " + valid_df[CFG.CONTEXT_COL].fillna('') + SEP + "RESP: " + valid_df[CFG.RESPONSE_COL].fillna('')
test_df['text_with_ctx']  = "CTX: " + test_df[CFG.CONTEXT_COL].fillna('')  + SEP + "RESP: " + test_df[CFG.RESPONSE_COL].fillna('')

train_df['text_resp_only'] = train_df[CFG.RESPONSE_COL].fillna('')
valid_df['text_resp_only'] = valid_df[CFG.RESPONSE_COL].fillna('')
test_df['text_resp_only']  = test_df[CFG.RESPONSE_COL].fillna('')

# Convert to HF Datasets
def to_hf_dataset(df, text_col):
    return Dataset.from_pandas(df[[text_col, 'label_id']].rename(columns={text_col:'text'}), preserve_index=False)

ds_with_ctx = DatasetDict({
    "train": to_hf_dataset(train_df, 'text_with_ctx'),
    "validation": to_hf_dataset(valid_df, 'text_with_ctx'),
    "test": to_hf_dataset(test_df, 'text_with_ctx'),
})

ds_resp_only = DatasetDict({
    "train": to_hf_dataset(train_df, 'text_resp_only'),
    "validation": to_hf_dataset(valid_df, 'text_resp_only'),
    "test": to_hf_dataset(test_df, 'text_resp_only'),
})

print(ds_with_ctx)
print(ds_resp_only)


In [None]:

def tokenise_dataset(ds: DatasetDict, tokenizer):
    def _tok(batch):
        return tokenizer(batch['text'], truncation=True, max_length=CFG.MAX_LENGTH)
    return ds.map(_tok, batched=True, remove_columns=['text'])

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    preds = np.argmax(logits, axis=-1)
    acc = accuracy_score(labels, preds)
    p, r, f1_macro, _ = precision_recall_fscore_support(labels, preds, average='macro', zero_division=0)
    _, _, f1_weighted, _ = precision_recall_fscore_support(labels, preds, average='weighted', zero_division=0)
    return {
        "accuracy": acc,
        "f1_macro": f1_macro,
        "f1_weighted": f1_weighted,
        "precision_macro": p,
        "recall_macro": r,
    }

def train_and_eval(model_name: str, short_name: str, ds: DatasetDict, run_suffix: str):
    print(f"\n==== Training {model_name} ({short_name}) — {run_suffix} ====")
    outdir = CFG.OUTDIR / f"{short_name}_{run_suffix}"
    outdir.mkdir(parents=True, exist_ok=True)

    tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True)
    tokenized = tokenise_dataset(ds, tokenizer)

    num_labels = len(CFG.SENTIMENT_LABELS)

    model = AutoModelForSequenceClassification.from_pretrained(
        model_name,
        num_labels=num_labels,
        id2label=CFG.ID2LABEL,
        label2id=CFG.LABEL2ID,
        ignore_mismatched_sizes=True,   # re-init head if needed
    )

    collator = DataCollatorWithPadding(tokenizer=tokenizer)
    args = TrainingArguments(
        output_dir=str(outdir / "hf_runs"),
        per_device_train_batch_size=CFG.BATCH_SIZE,
        per_device_eval_batch_size=CFG.BATCH_SIZE,
        learning_rate=CFG.LR,
        num_train_epochs=CFG.EPOCHS,
        weight_decay=CFG.WEIGHT_DECAY,
        warmup_ratio=CFG.WARMUP_RATIO,
        evaluation_strategy="epoch",
        save_strategy="epoch",
        load_best_model_at_end=True,
        metric_for_best_model="f1_macro",
        greater_is_better=True,
        logging_steps=CFG.LOG_STEPS,
        report_to="none",
        seed=CFG.SEED,
        dataloader_num_workers=2,
    )

    early_stop = EarlyStoppingCallback(early_stopping_patience=CFG.PATIENCE)
    trainer = Trainer(
        model=model,
        args=args,
        train_dataset=tokenized["train"],
        eval_dataset=tokenized["validation"],
        tokenizer=tokenizer,
        data_collator=collator,
        compute_metrics=compute_metrics,
        callbacks=[early_stop],
    )

    trainer.train()

    # Evaluate
    eval_val = trainer.evaluate(tokenized["validation"])
    eval_test = trainer.evaluate(tokenized["test"])

    # Predictions & confusion matrix on test
    test_out = trainer.predict(tokenized["test"])
    test_preds = np.argmax(test_out.predictions, axis=-1)
    cm = confusion_matrix(test_out.label_ids, test_preds, labels=list(range(num_labels)))

    # Save artifacts
    trainer.save_model(str(outdir / "model"))
    tokenizer.save_pretrained(str(outdir / "model"))
    np.save(outdir / "confusion_matrix.npy", cm)
    with open(outdir / "metrics_val.json", "w") as f: json.dump(eval_val, f, indent=2)
    with open(outdir / "metrics_test.json", "w") as f: json.dump(eval_test, f, indent=2)

    # Plot CM
    plt.figure()
    plt.imshow(cm, interpolation='nearest')
    plt.title(f"Confusion Matrix — {short_name} ({run_suffix})")
    tick_marks = np.arange(num_labels)
    plt.xticks(tick_marks, CFG.SENTIMENT_LABELS, rotation=45)
    plt.yticks(tick_marks, CFG.SENTIMENT_LABELS)
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.colorbar()
    plt.tight_layout()
    plt.show()

    # Return summary row
    return {
        "model": short_name,
        "run": run_suffix,
        "val_accuracy": eval_val.get("eval_accuracy"),
        "val_f1_macro": eval_val.get("eval_f1_macro"),
        "val_f1_weighted": eval_val.get("eval_f1_weighted"),
        "test_accuracy": eval_test.get("eval_accuracy"),
        "test_f1_macro": eval_test.get("eval_f1_macro"),
        "test_f1_weighted": eval_test.get("eval_f1_weighted"),
    }


In [None]:

results = []
if CFG.TRAIN_WITH_CONTEXT:
    for model_name, short_name in CFG.MODELS:
        row = train_and_eval(model_name, short_name, ds_with_ctx, run_suffix="with_ctx")
        results.append(row)

if CFG.TRAIN_WITHOUT_CONTEXT:
    for model_name, short_name in CFG.MODELS:
        row = train_and_eval(model_name, short_name, ds_resp_only, run_suffix="resp_only")
        results.append(row)

res_df = pd.DataFrame(results)
res_path = CFG.OUTDIR / "summary_results.csv"
res_df.to_csv(res_path, index=False)
res_df


In [None]:

if len(results):
    best_idx = int(np.argmax([r['val_f1_macro'] for r in results]))
    best = results[best_idx]
    print("Best (by val macro-F1):", best)

    # Reload artifacts to print a detailed report
    run_dir = CFG.OUTDIR / f"{best['model']}_{best['run']}"
    # Recreate the dataset & trainer (quick path)
    model_name = [m for m in CFG.MODELS if m[1]==best['model']][0][0]
    tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True)
    ds = ds_with_ctx if best['run']=="with_ctx" else ds_resp_only
    tokenized = tokenise_dataset(ds, tokenizer)

    num_labels = len(CFG.SENTIMENT_LABELS)
    model = AutoModelForSequenceClassification.from_pretrained(
        str(run_dir / "model"), num_labels=num_labels, ignore_mismatched_sizes=True
    )
    collator = DataCollatorWithPadding(tokenizer=tokenizer)
    args = TrainingArguments(output_dir=str(run_dir/"tmp"), per_device_eval_batch_size=CFG.BATCH_SIZE, report_to="none")
    trainer = Trainer(model=model, args=args, tokenizer=tokenizer, data_collator=collator)

    test_out = trainer.predict(tokenized["test"])
    preds = np.argmax(test_out.predictions, axis=-1)
    print(classification_report(test_out.label_ids, preds, target_names=CFG.SENTIMENT_LABELS, digits=4))
else:
    print("No results to summarize.")
