In [None]:
import numpy as np
import gc, psutil
import re
import pandas as pd
from tqdm import tqdm
from datasets import Dataset
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer, TrainingArguments, Trainer, DataCollatorForSeq2Seq
from transformers import Seq2SeqTrainer, Seq2SeqTrainingArguments
import torch
import evaluate
import optuna
from optuna.exceptions import TrialPruned
import json
import os
from sklearn.model_selection import train_test_split

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split

# 1Ô∏è‚É£ Load dataset
data = pd.read_csv("sarcasm_KD_final.csv").fillna("")

train_df, val_df = train_test_split(data, test_size=0.2, random_state=4213)

print(train_df.head())

In [None]:
PROMPT_A = (
    "In exactly 1-2 sentences, identify the specific words or phrases that make the text sarcastic "
    "and explain how they create the sarcastic effect. "
    "Focus only on observable linguistic elements without adding interpretation beyond what's directly evident in the text.\n\n"
)

PROMPT_B = (
    "In exactly 1-2 sentences, explain what the speaker actually means by removing the sarcasm "
    "and stating their true intended message directly. "
    "Focus on the genuine sentiment or opinion being expressed beneath the sarcastic language.\n\n"
)

def build_target(row, column_name):
    """Construct the output text (label) for training."""
    exp = str(row[column_name]).strip()
    return f"Explanation: {exp}"

# Task A ‚Äî sarcasm cue identification
train_df["target_text_A"] = train_df.apply(lambda r: build_target(r, "part_sarcastic"), axis=1)
val_df["target_text_A"]   = val_df.apply(lambda r: build_target(r, "part_sarcastic"), axis=1)

# Task B ‚Äî true intent explanation
train_df["target_text_B"] = train_df.apply(lambda r: build_target(r, "sarcasm_explanation"), axis=1)
val_df["target_text_B"]   = val_df.apply(lambda r: build_target(r, "sarcasm_explanation"), axis=1)


taskA_train_ds = Dataset.from_pandas(train_df[["text", "target_text_A"]].rename(columns={"target_text_A": "target_text"}))
taskA_val_ds   = Dataset.from_pandas(val_df[["text", "target_text_A"]].rename(columns={"target_text_A": "target_text"}))

taskB_train_ds = Dataset.from_pandas(train_df[["text", "target_text_B"]].rename(columns={"target_text_B": "target_text"}))
taskB_val_ds   = Dataset.from_pandas(val_df[["text", "target_text_B"]].rename(columns={"target_text_B": "target_text"}))

model_name = "./flan_t5_full_sarcasm_final"
tokenizer = AutoTokenizer.from_pretrained(model_name)

MAX_SRC_LEN = 128
MAX_TGT_LEN = 64

def make_preprocess_fn(prompt, tokenizer):
    def preprocess(examples):
        inputs = [prompt + "Text: " + t for t in examples["text"]]  
        model_inputs = tokenizer(inputs, max_length=MAX_SRC_LEN, truncation=True)
        labels = tokenizer(examples["target_text"], max_length=MAX_TGT_LEN, truncation=True)  
        model_inputs["labels"] = labels["input_ids"]
        return model_inputs
    return preprocess

preprocess_A = make_preprocess_fn(PROMPT_A, tokenizer)
preprocess_B = make_preprocess_fn(PROMPT_B, tokenizer)

taskA_train_tok = taskA_train_ds.map(preprocess_A, batched=True, remove_columns=taskA_train_ds.column_names)
taskA_val_tok   = taskA_val_ds.map(preprocess_A,   batched=True, remove_columns=taskA_val_ds.column_names)

taskB_train_tok = taskB_train_ds.map(preprocess_B, batched=True, remove_columns=taskB_train_ds.column_names)
taskB_val_tok   = taskB_val_ds.map(preprocess_B,   batched=True, remove_columns=taskB_val_ds.column_names)

In [None]:
import os, json, optuna, numpy as np, evaluate, torch
from transformers import (
    AutoModelForSeq2SeqLM,
    DataCollatorForSeq2Seq,
    Seq2SeqTrainer,
    Seq2SeqTrainingArguments,
)

def subset_dataset(ds, fraction=0.2, seed=4213):
    total = len(ds)
    subset_size = int(total * fraction)
    np.random.seed(seed)
    indices = np.random.choice(total, subset_size, replace=False)
    return ds.select(indices.tolist())

# Create smaller subsets for both tasks
taskA_train_tok_sub = subset_dataset(taskA_train_tok)
taskA_val_tok_sub   = subset_dataset(taskA_val_tok)
taskB_train_tok_sub = subset_dataset(taskB_train_tok)
taskB_val_tok_sub   = subset_dataset(taskB_val_tok)

rouge = evaluate.load("rouge")

def compute_metrics(eval_preds):
    preds, labels = eval_preds
    if isinstance(preds, tuple):
        preds = preds[0]
    preds = np.where(preds != -100, preds, tokenizer.pad_token_id)
    labels = np.where(labels != -100, labels, tokenizer.pad_token_id)
    pred_texts = tokenizer.batch_decode(preds, skip_special_tokens=True)
    label_texts = tokenizer.batch_decode(labels, skip_special_tokens=True)
    rouge_result = rouge.compute(predictions=pred_texts, references=label_texts)
    return {"rougeL": round(rouge_result["rougeL"], 4)}


def make_objective(train_ds, val_ds, model_name):
    def objective(trial):
        learning_rate = trial.suggest_categorical("learning_rate", [1e-5, 3e-5, 1e-4, 3e-4])
        batch_size    = trial.suggest_categorical("batch_size", [8, 16, 32])
        dropout_rate  = trial.suggest_categorical("dropout_rate", [0.1, 0.2, 0.3])
        weight_decay  = trial.suggest_categorical("weight_decay", [0.0, 0.01, 0.05])
        warmup_ratio  = trial.suggest_categorical("warmup_ratio", [0.03, 0.06, 0.1])

        try:
            model = AutoModelForSeq2SeqLM.from_pretrained(model_name)
            model.config.dropout_rate = dropout_rate
            model.config.attention_dropout_rate = dropout_rate
            model.to("cpu")

            collator = DataCollatorForSeq2Seq(tokenizer, model=model)
            training_args = Seq2SeqTrainingArguments(
                learning_rate=learning_rate,
                per_device_train_batch_size=batch_size,
                per_device_eval_batch_size=max(2, batch_size // 2),
                num_train_epochs=1,  
                weight_decay=weight_decay,
                warmup_ratio=warmup_ratio,
                eval_strategy="epoch",
                save_strategy="no",
                logging_strategy="epoch",
                predict_with_generate=True,
                gradient_accumulation_steps=max(1, 32 // batch_size),
                fp16=False,
                report_to="none",
            )

            trainer = Seq2SeqTrainer(
                model=model,
                args=training_args,
                train_dataset=train_ds,
                eval_dataset=val_ds,
                data_collator=collator,
                tokenizer=tokenizer,
                compute_metrics=compute_metrics,
            )

            trainer.train()
            eval_results = trainer.evaluate()
            return eval_results.get("eval_rougeL", 0.0)

        except RuntimeError as e:
            if "out of memory" in str(e).lower():
                raise optuna.TrialPruned()
            else:
                raise e
    return objective


save_dir = "./optuna_results"
os.makedirs(save_dir, exist_ok=True)

print("\n Running Optuna tuning for Task A (sarcastic cue identification)...")
studyA = optuna.create_study(direction="maximize", study_name="t5_taskA_optuna")
studyA.optimize(make_objective(taskA_train_tok_sub, taskA_val_tok_sub, model_name), n_trials=6)

bestA = studyA.best_trial
bestA_params = bestA.params
bestA_params["best_rougeL"] = bestA.value

json_path_A = os.path.join(save_dir, "best_t5_taskA_params.json")
with open(json_path_A, "w") as f:
    json.dump(bestA_params, f, indent=4)
print(f" Task A best params saved to {json_path_A}\n", json.dumps(bestA_params, indent=4))

# ----- Task B -----
print("\n Running Optuna tuning for Task B (true intent explanation)...")
studyB = optuna.create_study(direction="maximize", study_name="t5_taskB_optuna")
studyB.optimize(make_objective(taskB_train_tok_sub, taskB_val_tok_sub, model_name), n_trials=6)

bestB = studyB.best_trial
bestB_params = bestB.params
bestB_params["best_rougeL"] = bestB.value

json_path_B = os.path.join(save_dir, "best_t5_taskB_params.json")
with open(json_path_B, "w") as f:
    json.dump(bestB_params, f, indent=4)
print(f" Task B best params saved to {json_path_B}\n", json.dumps(bestB_params, indent=4))


In [None]:
def clear_memory(tag=""):
    """Clear CUDA + CPU memory to prevent OOM between runs."""
    print(f"\n Clearing memory {tag} ...")
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
        torch.cuda.ipc_collect()
        torch.cuda.synchronize()
        mem_alloc = torch.cuda.memory_allocated() / 1024**2
        mem_reserved = torch.cuda.memory_reserved() / 1024**2
        print(f"   CUDA memory allocated: {mem_alloc:.2f} MB | reserved: {mem_reserved:.2f} MB")
    process = psutil.Process(os.getpid())
    print(f"   CPU RSS: {process.memory_info().rss / 1024**2:.2f} MB\n")


params_path_A = "./optuna_results/best_t5_taskA_params.json"
with open(params_path_A, "r") as f:
    bestA = json.load(f)

learning_rate_A = bestA["learning_rate"]
batch_size_A    = bestA["batch_size"]
dropout_rate_A  = bestA["dropout_rate"]
weight_decay_A  = bestA["weight_decay"]
warmup_ratio_A  = bestA["warmup_ratio"]

print(" Loaded Task A tuned params:")
print(json.dumps(bestA, indent=4))

params_path_B = "./optuna_results/best_t5_taskB_params.json"
with open(params_path_B, "r") as f:
    bestB = json.load(f)

learning_rate_B = bestB["learning_rate"]
batch_size_B    = bestB["batch_size"]
dropout_rate_B  = bestB["dropout_rate"]
weight_decay_B  = bestB["weight_decay"]
warmup_ratio_B  = bestB["warmup_ratio"]

print(" Loaded Task B tuned params:")
print(json.dumps(bestB, indent=4))


rouge = evaluate.load("rouge")
model_name = "./flan_t5_full_sarcasm_final"
tokenizer = AutoTokenizer.from_pretrained(model_name)

def compute_metrics(eval_preds):
    preds, labels = eval_preds
    preds = preds[0] if isinstance(preds, tuple) else preds
    preds = np.where(preds != -100, preds, tokenizer.pad_token_id)
    labels = np.where(labels != -100, labels, tokenizer.pad_token_id)
    pred_texts = tokenizer.batch_decode(preds, skip_special_tokens=True)
    label_texts = tokenizer.batch_decode(labels, skip_special_tokens=True)
    score = rouge.compute(predictions=pred_texts, references=label_texts)
    return {"rougeL": round(score["rougeL"], 4)}


# Train on sarcastic cue identification
modelA = AutoModelForSeq2SeqLM.from_pretrained(model_name)
modelA.config.dropout_rate = dropout_rate_A
modelA.config.attention_dropout_rate = dropout_rate_A

collatorA = DataCollatorForSeq2Seq(tokenizer, model=modelA)

argsA = Seq2SeqTrainingArguments(
    learning_rate=learning_rate_A,
    per_device_train_batch_size=batch_size_A,
    per_device_eval_batch_size=max(2, batch_size_A // 2),
    num_train_epochs=3,
    weight_decay=weight_decay_A,
    warmup_ratio=warmup_ratio_A,
    eval_strategy="epoch",
    save_strategy="epoch",
    logging_strategy="epoch",
    predict_with_generate=True,
    report_to="none"
)

trainerA = Seq2SeqTrainer(
    model=modelA,
    args=argsA,
    train_dataset=taskA_train_tok,
    eval_dataset=taskA_val_tok,
    data_collator=collatorA,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
)

print(" Starting fine-tuning on Task A...")
trainerA.train()
trainerA.save_model("./model_final_taskA")

del modelA, trainerA, collatorA, argsA
clear_memory("(after Task A)")

# Fine-tuning on true intent explanation
modelB = AutoModelForSeq2SeqLM.from_pretrained(model_name)
modelB.config.dropout_rate = dropout_rate_B
modelB.config.attention_dropout_rate = dropout_rate_B

collatorB = DataCollatorForSeq2Seq(tokenizer, model=modelB)

argsB = Seq2SeqTrainingArguments(
    learning_rate=learning_rate_B,
    per_device_train_batch_size=batch_size_B,
    per_device_eval_batch_size=max(2, batch_size_B // 2),
    num_train_epochs=3,
    weight_decay=weight_decay_B,
    warmup_ratio=warmup_ratio_B,
    eval_strategy="epoch",
    save_strategy="epoch",
    logging_strategy="epoch",
    predict_with_generate=True,
    report_to="none"
)

trainerB = Seq2SeqTrainer(
    model=modelB,
    args=argsB,
    train_dataset=taskB_train_tok,
    eval_dataset=taskB_val_tok,
    data_collator=collatorB,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
)

print(" Fine-tuning on Task B...")
trainerB.train()
trainerB.save_model("./model_final_taskB")

In [None]:
modelA_name = "./model_final_taskA"
base_name = "google/flan-t5-base"

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


tokenizer = AutoTokenizer.from_pretrained(modelA_name)


modelA = AutoModelForSeq2SeqLM.from_pretrained(modelA_name).to(device)
modelbase = AutoModelForSeq2SeqLM.from_pretrained(base_name).to(device)


def generate_response(model, sentence):
    prompt = (
        "In exactly 1-2 sentences, identify the specific words or phrases "
        "that make the text sarcastic and explain how they create the sarcastic effect. "
        "Focus only on observable linguistic elements without adding interpretation "
        "beyond what's directly evident in the text."
        f"Sentence: \"{sentence}\"\n"
    )

    inputs = tokenizer(prompt, return_tensors="pt").to(device)
    outputs = model.generate(
            **inputs,
            max_new_tokens=80,
            do_sample=True,
            temperature=2.3,       
            top_p=0.6,            
            top_k=60,
            num_beams=10,           
            no_repeat_ngram_size=3,
            repetition_penalty=1.4,  
            length_penalty=1.0,
        )

    text = tokenizer.decode(outputs[0], skip_special_tokens=True).strip()
    text = re.sub(r"^(Explanation|Answer|Response)\s*:\s*", "", text, flags=re.IGNORECASE)
    return text

# -------------------------------------------------
# 3Ô∏è‚É£ Compare models
# -------------------------------------------------
sentences = [
    "Yeah, join the military and help the empire spread destruction across the world in the name of human rights, get PTSD, become homeless, sounds like a great way to improve one's situation.",
    "Sorry, I'm too busy eating lobster in my yacht (paid for with self-published comics profits) to comment. <USER> <USER>",
    "Oh perfect, the fire alarm goes off right when I start my presentation.",
    "I just love when my boss schedules a meeting during lunch.",
    "Oh great, the printer jammed right before the deadline.",
    "Wonderful, traffic is even worse than yesterday!",
    "Yeah, because everyone totally loves working overtime for free.",
    "Crying before I go into work... This is going to be a great night. #Sarcasm #WishItWasTrue",
    "Oh sure, because staying up till 3am totally helps with productivity üòí",
    "hey <user> thanks for making it easy for me to take my music with me . # ihateyourupdates",
    "It could confuse your muscles and make muscle grow in places where you didn't actually work out.",
    "Yay, 2-hour traffic for a 10-minute errand. Exactly what I needed üôÉ",
    "This guy gets a gold star for such excellent parking in the handicap lot!",
    "How else will we feel superior if not by our amazing taste in phones?",
    "Guess I‚Äôll just refresh the page for the 20th time. Maybe that‚Äôll fix it ü§°",
    "My phone dying at 5% is the highlight of my day.",
    "parrot's previous owner obviously watched a lot of the price is right",
    "Oh totally, I love when people reply ‚Äòk‚Äô to my long texts.",
    "Gotta save our children from the dangers of text on a screen in a rhythm game",
    "Great, another inspirational quote on LinkedIn. Just what I needed.",
    "even aside from the blatant misogyny, this is great because we have so much space in our prisons!"
]

for s in sentences:
    A_out = generate_response(modelA, s)
    base_out = generate_response(modelbase, s)
    
    print(f"\n Sentence: {s}")
    print(f"Explain task Flan-T5: {A_out}")
    print(f"Base Flan-T5: {base_out}")

In [None]:
base_name = "google/flan-t5-base"
modelB_name = "./model_final_taskB"

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained(modelB_name)

# Load base model and fully fine-tuned model separately
modelbase = AutoModelForSeq2SeqLM.from_pretrained(base_name).to(device)
modelB = AutoModelForSeq2SeqLM.from_pretrained(modelB_name).to(device)

def generate_response(model, sentence):
    prompt = (
        "In exactly 1-2 sentences, explain what the speaker actually means by removing the sarcasm "
        "and stating their true intended message directly. "
        "Focus on the genuine sentiment or opinion being expressed beneath the sarcastic language.\n\n"
        f"Sentence: \"{sentence}\"\n"
    )

    inputs = tokenizer(prompt, return_tensors="pt").to(device)
    outputs = model.generate(
            **inputs,
            max_new_tokens=80,
            do_sample=True,
            temperature=2.3,       
            top_p=0.6,            
            top_k=60,
            num_beams=10,           
            no_repeat_ngram_size=3,
            repetition_penalty=1.4,  
            length_penalty=1.0,
        )

    text = tokenizer.decode(outputs[0], skip_special_tokens=True).strip()
    text = re.sub(r"^(Explanation|Answer|Response)\s*:\s*", "", text, flags=re.IGNORECASE)
    return text

sentences = [
    "Yeah, join the military and help the empire spread destruction across the world in the name of human rights, get PTSD, become homeless, sounds like a great way to improve one's situation.",
    "Sorry, I'm too busy eating lobster in my yacht (paid for with self-published comics profits) to comment. <USER> <USER>",
    "Oh perfect, the fire alarm goes off right when I start my presentation.",
    "I just love when my boss schedules a meeting during lunch.",
    "Oh great, the printer jammed right before the deadline.",
    "Wonderful, traffic is even worse than yesterday!",
    "Yeah, because everyone totally loves working overtime for free.",
    "Crying before I go into work... This is going to be a great night. #Sarcasm #WishItWasTrue",
    "Oh sure, because staying up till 3am totally helps with productivity üòí",
    "hey <user> thanks for making it easy for me to take my music with me . # ihateyourupdates",
    "It could confuse your muscles and make muscle grow in places where you didn't actually work out.",
    "Yay, 2-hour traffic for a 10-minute errand. Exactly what I needed üôÉ",
    "This guy gets a gold star for such excellent parking in the handicap lot!",
    "How else will we feel superior if not by our amazing taste in phones?",
    "Guess I‚Äôll just refresh the page for the 20th time. Maybe that‚Äôll fix it ü§°",
    "My phone dying at 5% is the highlight of my day.",
    "parrot's previous owner obviously watched a lot of the price is right",
    "Oh totally, I love when people reply ‚Äòk‚Äô to my long texts.",
    "Gotta save our children from the dangers of text on a screen in a rhythm game",
    "Great, another inspirational quote on LinkedIn. Just what I needed.",
    "even aside from the blatant misogyny, this is great because we have so much space in our prisons!"
]


for s in sentences:
    base_out = generate_response(modelbase, s)
    B_out = generate_response(modelB, s)
    
    print(f"\n Sentence: {s}")
    print(f"Explain Task Flan-T5: {B_out}")
    print(f"Base Flan-T5: {base_out}")