In [None]:
import os
os.environ["CUDA_VISIBLE_DEVICES"]="0"
os.environ["HF_TOKEN"] = "HF_KEY"

# Dataset Generation

This section contains the code that was used to generate the RESPONSible Service dataset.

In [None]:
%pip install -r requirements_3.txt

In [None]:
from openai import AzureOpenAI
from pydantic import BaseModel
import pandas as pd
import random
import json

In [None]:
# Initialize the Azure OpenAI client with API key, API version, and deployment endpoint.
client = AzureOpenAI(
            api_key="AZUREOPENAI_KEY", # Your Azure OpenAI API key (keep this secret!)
            api_version="2024-08-01-preview", # API version to use
            azure_endpoint="https://openai-ds-instance-sweden.openai.azure.com/openai/deployments/gpt-4o/chat/completions?api-version=2024-08-01-preview"
        )

# Name of the deployed model
deployment_name='gpt-4o'

In [None]:
# Empty list to collect generated dataset entries
dataset = None

In [None]:
# Structure of each dataset entry using a Pydantic model for validation
class DatasetGeneration(BaseModel):
    request: str
    response0: str
    response1: str

# Number of dataset examples to generate
num_samples = 1000

# Target communication style for response generation
core_value = "polite"

# Loop to generate synthetic data
for i in range(num_samples):
    # Predefined list of customer service topics
    topics = ["orders", "refunds", "billing", "subscriptions", "account", "shipping",
              "feedbacks", "complaints", "product information", "technical assistance", "discounts"]

    # Randomly choose a topic for this example
    random_topic = random.choice(topics)

    # Prompt instructing the model to generate a dataset entry with 30-word fields
    prompt = f"""
    Create one dataset entry. The entry should include the following fields:
    - request: Request for assistance from a customer to customer service about {random_topic}. Generate exactly 30 words.
    - response0: Response from customer service to the customer's request. response0 must be phrased in a {core_value} way. Generate exactly 30 words.
    - response1: Response from customer service to the customer's request. response1 must be phrased in a more {core_value} way than response0. Generate exactly 30 words.
    """

    # Send the request to the Azure OpenAI service and parse the structured response
    completion = client.beta.chat.completions.parse(
        model=deployment_name,
        messages=[
            {"role": "system", "content": "You are a helpful assistant designed to generate synthetic data."},
            {"role": "user", "content": prompt},
        ],
        temperature=1.0,               # High temperature for creative output
        top_p=1.0,                     # Full probability mass
        response_format=DatasetGeneration,  # Use the Pydantic schema for structured parsing
    )

    # Extract the parsed output from the model's response
    generated_example = completion.choices[0].message.parsed

    # Convert the structured output to a dictionary and add to dataset
    generated_example_dict = generated_example.model_dump()
    dataset.append(generated_example_dict)

In [None]:
# Convert dataset to a DataFrame
df = pd.DataFrame(dataset)

# Write DataFrame to a Parquet file
df.to_parquet(f"dataset_{core_value}.parquet", index=False)

print("Parquet dataset saved successfully!")

## Human Evaluation

Ths section contains the script to process as .csv files the RESPONSible Service dataset that was just generated, so that it can be manually annotated and then re-uploaded to calculate human ground truth agreement.

In [None]:
import pandas as pd
import random
import os

def process_parquet_to_csv(parquet_file: str, output_folder: str = "hum_eval_dataset", sample_size: int = 1000):
    # Set random seed for reproducibility
    random.seed(42)

    # Load the Parquet file
    df = pd.read_parquet(parquet_file)

    # Sample rows
    df = df[["request", "response0", "response1"]]
    df = df.sample(n=sample_size, random_state=42).reset_index(drop=True)

    # Swap responses randomly and assign labels
    rows = []
    for _, row in df.iterrows():
        if random.random() < 0.5:
            rows.append({
                "request": row["request"],
                "response0": row["response0"],
                "response1": row["response1"],
                "label": 1
            })
        else:
            rows.append({
                "request": row["request"],
                "response0": row["response1"],
                "response1": row["response0"],
                "label": 0
            })

    df_final = pd.DataFrame(rows)

    # Separate label column
    labels = df_final["label"]
    df_final = df_final.drop(columns=["label"])
    df_final["human"] = ""

    # Prepare output filenames
    base_name = os.path.splitext(os.path.basename(parquet_file))[0]
    os.makedirs(output_folder, exist_ok=True)

    output_path = os.path.join(output_folder, f"{base_name}.csv")
    label_path = os.path.join(output_folder, f"{base_name}_labels.csv")

    # Save the CSVs
    df_final.to_csv(output_path, index=False, encoding="utf-8", sep=";")
    pd.DataFrame({"label": labels}).to_csv(label_path, index=False, encoding="utf-8", sep=";")

    print("✅ CSV export complete.")

In [None]:
process_parquet_to_csv("RESPONSible Service/dataset_clear.parquet")
process_parquet_to_csv("RESPONSible Service/dataset_friendly.parquet")
process_parquet_to_csv("RESPONSible Service/dataset_empathetic.parquet")
process_parquet_to_csv("RESPONSible Service/dataset_polite.parquet")

In [None]:
import pandas as pd

def compute_agreement(data_file: str, label_file: str) -> float:
    # Load main CSV with human annotations
    df_data = pd.read_csv(data_file, sep=";", encoding="utf-8")

    # Load label CSV with model-assigned labels
    df_labels = pd.read_csv(label_file, sep=";", encoding="utf-8")

    # Ensure both files have the same number of rows
    if len(df_data) != len(df_labels):
        raise ValueError("Files do not match in number of rows.")

    # Extract human and label columns as integer lists
    human = df_data["human"].astype(str).str.strip()
    label = df_labels["label"].astype(str).str.strip()

    # Filter out missing human annotations (empty strings or NaN)
    valid_idx = (human != "") & (human != "nan")

    human = human[valid_idx].astype(int).tolist()
    label = label[valid_idx].astype(int).tolist()

    if not human:
        print("No human annotations found.")
        return 0.0

    # Compute agreement
    matches = sum(h == l for h, l in zip(human, label))
    agreement_percent = matches / len(human) * 100

    return round(agreement_percent, 0)

In [None]:
# BEFORE YOU RUN THIS CELL, MAKE SURE YOU HAVE COMPLETED THE FOLLOWING PASSAGES:
    # 1. EXPORT THE dataset_{style}.csv FILES
    # 2. MANUALLY ADD THE HUMAN LABELS
    # 3. REPLACE THE dataset_{style}.csv FILES WITH THE MANUALLY ANNOTATED ONES (KEEP THE SAME FILE NAME)

subsets = ["clear", 'friendly', "empathetic", "polite"]
results = {"clear": None, "friendly": None, "empathetic": None, "polite": None}

for subset in subsets:
    csv1 = f"hum_eval_dataset/dataset_{subset}.csv"
    csv2 = f"hum_eval_dataset/dataset_{subset}_labels.csv"
    agreement_score = compute_agreement(csv1, csv2)
    results[subset] = agreement_score

print("Human Ground Truth Agreement (%) for the RESPONSible Service Dataset")
print()
for key,value in results.items():
    print(f"{key}: {value}%")

# Linguistic Data Profiling

The following sections include all the scripts that were used to compile the linguistic data profiling of the REPSONSible Service dataset.

In [None]:
%pip install -r requirements_nlp.txt

In [None]:
!python -m spacy download en_core_web_sm

In [None]:
import pandas as pd
import spacy
from collections import Counter, defaultdict
import numpy as np
from typing import List, Dict
from transformers import pipeline
import re
from scipy.stats import shapiro, f_oneway, kruskal, ttest_ind, ranksums, chi2_contingency
from itertools import combinations, zip_longest
from statsmodels.stats.multitest import multipletests
import scikit_posthocs as sp
import textstat
from bert_score import score as bertscore
from rouge_score import rouge_scorer
import torch

In [None]:
# Load dataset

path_clear = "RESPONSible Service/dataset_clear.parquet"
path_friendly = "RESPONSible Service/dataset_friendly.parquet"
path_empathetic = "RESPONSible Service/dataset_empathetic.parquet"
path_polite = "RESPONSible Service/dataset_polite.parquet"

df_clear = pd.read_parquet(path_clear)
df_friendly = pd.read_parquet(path_friendly)
df_empathetic = pd.read_parquet(path_empathetic)
df_polite = pd.read_parquet(path_polite)

req_c = df_clear['request'].tolist()
req_f = df_friendly['request'].tolist()
req_e = df_empathetic['request'].tolist()
req_p = df_polite['request'].tolist()

res_c = df_clear['response1'].tolist()
res_f = df_friendly['response1'].tolist()
res_e = df_empathetic['response1'].tolist()
res_p = df_polite['response1'].tolist()

In [None]:
# Load NGSL frequency lists and concreteness scores for lexical complexity and lexical concreteness analyses

with open("NGSL_1000.txt", "r", encoding="utf-8") as f:
    ngsl_1000_txt = f.read()

with open("NGSL_2000.txt", "r", encoding="utf-8") as f:
    ngsl_2000_txt = f.read()

with open("concreteness_scores.txt", "r", encoding="utf-8") as f:
    concreteness_txt = f.read()

In [None]:
# Load spaCy's English model
nlp = spacy.load("en_core_web_sm")

In [None]:
# CUSTOM FUNCTION TO MAKE TEST STATISTICS RESULTS VISUALLY INTERPRETABLE

def summarize_significant_differences_annotated(posthoc_matrix, styles):
    # Create a mapping from full style names to single-letter initials
    # e.g., {'clear': 'c', 'friendly': 'f', 'empathetic': 'e', 'polite': 'p'}
    initials = {s: s[0] for s in styles}

    summary = {}  # Will hold the final significance summary per style

    # Iterate over each style (as the row label)
    for i in styles:
        sig_letters = []  # Track which styles differ significantly from 'i'

        # Compare with all other styles (columns in the matrix)
        for j in styles:
            if i != j and posthoc_matrix.loc[i, j] < 0.05:
                sig_letters.append(initials[j])  # Add significant ones

        # Build the summary label:
        # '*' if significantly different from all others
        # '-' if not significantly different from any
        # Else: list of initials of significantly different styles
        if len(sig_letters) == len(styles) - 1:
            summary[i] = '*'
        elif len(sig_letters) == 0:
            summary[i] = '-'
        else:
            summary[i] = ''.join(sorted(sig_letters))

    return summary  # Dict: {style_name: significance_label}

## Frequency Analysis

In [None]:
with open('results.txt', 'w') as file:
    print("=== FREQUENCY LISTS ===", file=file)

In [None]:
# === TOKEN FREQUENCY ANALYSIS WITH RATIOS ===

# Function to count lemmatized, non-stopword, alphabetic tokens in a list of responses
def get_all_token_counts(responses, nlp):
    token_counts = Counter()
    total_tokens = 0
    for doc in nlp.pipe(responses):  # Efficient batch processing with spaCy
        for token in doc:
            if not token.is_stop and token.is_alpha:  # Filter out stopwords and non-alphabetic tokens
                lemma = token.lemma_.lower()
                token_counts[lemma] += 1
                total_tokens += 1
    return token_counts, total_tokens

# Apply token counting to each set of responses (e.g., for different tones)
counts_c, total_c = get_all_token_counts(res_c, nlp)  # "Clear"
counts_f, total_f = get_all_token_counts(res_f, nlp)  # "Friendly"
counts_e, total_e = get_all_token_counts(res_e, nlp)  # "Empathetic"
counts_p, total_p = get_all_token_counts(res_p, nlp)  # "Polite"

# Get the 20 most frequent tokens per tone
freq_clear = counts_c.most_common(20)
freq_friendly = counts_f.most_common(20)
freq_empathetic = counts_e.most_common(20)
freq_polite = counts_p.most_common(20)

# Store all token counts and total tokens for statistical comparison
all_counts = {
    "c": (counts_c, total_c),
    "f": (counts_f, total_f),
    "e": (counts_e, total_e),
    "p": (counts_p, total_p)
}

# Perform a chi-square test for each token's frequency in one subset vs. the others
def get_significance_label(token, initial):
    sig_against = []  # Track which other subsets the token is significantly different from
    all_subsets = {"c", "f", "e", "p"} - {initial}
    tok1, total1 = all_counts[initial]
    tok1_count = tok1.get(token, 0)
    not_tok1 = total1 - tok1_count

    for other in all_subsets:
        tok2, total2 = all_counts[other]
        tok2_count = tok2.get(token, 0)
        not_tok2 = total2 - tok2_count

        # Create a 2x2 contingency table for chi-square test
        contingency = [[tok1_count, not_tok1], [tok2_count, not_tok2]]
        chi2, p, _, _ = chi2_contingency(contingency)
        if p < 0.05:  # Statistically significant
            sig_against.append(other)

    # Return a label indicating significance
    if set(sig_against) == all_subsets:
        return "*"  # Significant against all other groups
    elif sig_against:
        return ''.join(sorted(sig_against))  # Significant against specific subsets
    else:
        return "-"  # Not significant

# Gather all unique top-20 tokens across subsets for later reference (optional use)
top_tokens = set([w for w, _ in freq_clear] + [w for w, _ in freq_friendly] +
                 [w for w, _ in freq_empathetic] + [w for w, _ in freq_polite])

# Write formatted results to a text file
with open('results.txt', 'a') as file:
    print("\nToken Frequency Analysis (Top 20 Lemmas)\n", file=file)

    # Header row: Rank, token, frequency (%) and significance score (SS) for each subset
    headers = ["Rank", "Clear", "Freq (%)", "SS", "Friendly", "Freq (%)", "SS",
               "Empath.", "Freq (%)", "SS", "Polite", "Freq (%)", "SS"]
    print(f"{headers[0]:<5} {headers[1]:<13} {headers[2]:<10} {headers[3]:<3} "
          f"{headers[4]:<13} {headers[5]:<10} {headers[6]:<3} "
          f"{headers[7]:<13} {headers[8]:<10} {headers[9]:<3} "
          f"{headers[10]:<13} {headers[11]:<10} {headers[12]:<3}", file=file)
    print("-" * 125, file=file)

    # Write each row of results for the top 20 tokens
    for i in range(20):
        row = []
        for label, freq_list, total, count_dict in zip(
            ['c', 'f', 'e', 'p'],
            [freq_clear, freq_friendly, freq_empathetic, freq_polite],
            [total_c, total_f, total_e, total_p],
            [counts_c, counts_f, counts_e, counts_p]
        ):
            word, freq = freq_list[i] if i < len(freq_list) else ("", 0)
            freq_pct = f"{(freq / total * 100):.1f}%" if total > 0 else "0.0%"
            sig_label = get_significance_label(word, label) if word else ""
            row.extend([word, f"{freq} ({freq_pct})", sig_label])

        # Print the formatted row
        print(f"{i+1:<5} "
              f"{row[0]:<13} {row[1]:<10} {row[2]:<3} "
              f"{row[3]:<13} {row[4]:<10} {row[5]:<3} "
              f"{row[6]:<13} {row[7]:<10} {row[8]:<3} "
              f"{row[9]:<13} {row[10]:<10} {row[11]:<3}", file=file)

In [None]:
# === POS FREQUENCY ANALYSIS ===

# Function to compute POS tag frequencies for a list of responses
def get_ranked_pos(responses):
    pos_freq = Counter()
    for doc in nlp.pipe(responses):  # Efficiently process with spaCy's pipeline
        for token in doc:
            if not token.is_space:  # Exclude spaces
                pos_freq[token.pos_] += 1  # Count the coarse-grained POS tag
    total = sum(pos_freq.values()) or 1  # Avoid division by zero
    # Return a list of (POS tag, raw count, percentage)
    return [(tag, count, count / total * 100) for tag, count in pos_freq.most_common()]

# === Compute POS tag frequencies for each tone-specific response set ===
clear = get_ranked_pos(res_c)        # "Clear" tone
friendly = get_ranked_pos(res_f)     # "Friendly" tone
empathetic = get_ranked_pos(res_e)   # "Empathetic" tone
polite = get_ranked_pos(res_p)       # "Polite" tone

# === Gather all unique POS tags across all subsets ===
all_tags = sorted(set(tag for data in [clear, friendly, empathetic, polite] for tag, _, _ in data))

# Helper: Convert list of tuples into a lookup dictionary {POS tag: count}
def build_lookup(data):
    return {tag: count for tag, count, _ in data}

# Build lookup dicts for quick access
clear_counts = build_lookup(clear)
friendly_counts = build_lookup(friendly)
empathetic_counts = build_lookup(empathetic)
polite_counts = build_lookup(polite)

# Total token counts per subset (used for chi-square)
total_counts = {
    "c": sum(clear_counts.values()),
    "f": sum(friendly_counts.values()),
    "e": sum(empathetic_counts.values()),
    "p": sum(polite_counts.values())
}

# POS tag lookup per subset
tag_lookup = {
    "c": clear_counts,
    "f": friendly_counts,
    "e": empathetic_counts,
    "p": polite_counts,
}

# === Chi-square significance testing across subsets ===
def get_significance_chi2(tag):
    labels = ["c", "f", "e", "p"]

    # Get the count of the tag in each subset
    counts = [tag_lookup[l].get(tag, 0) for l in labels]
    # Complementary counts (everything but this tag)
    comps = [total_counts[l] - counts[i] for i, l in enumerate(labels)]
    table = [counts, comps]

    # Overall chi-square test
    chi2, pval, _, _ = chi2_contingency(table)
    if pval >= 0.05:
        return {l: "-" for l in labels}  # Not statistically significant overall

    # If significant overall, run pairwise chi-square tests
    sig_map = {l: [] for l in labels}
    pvals = []
    pairs = []

    for a, b in combinations(labels, 2):
        a_count = tag_lookup[a].get(tag, 0)
        b_count = tag_lookup[b].get(tag, 0)
        sub_table = [
            [a_count, b_count],
            [total_counts[a] - a_count, total_counts[b] - b_count]
        ]
        _, pval_pair, _, _ = chi2_contingency(sub_table)
        pvals.append(pval_pair)
        pairs.append((a, b))

    # Apply Bonferroni correction for multiple comparisons
    reject, _, _, _ = multipletests(pvals, alpha=0.05, method='bonferroni')
    for (a, b), r in zip(pairs, reject):
        if r:  # If null hypothesis is rejected, mark as significantly different
            sig_map[a].append(b)
            sig_map[b].append(a)

    # Format significance result per subset
    result = {}
    for l in labels:
        if len(sig_map[l]) == 3:
            result[l] = "*"  # Significantly different from all others
        elif not sig_map[l]:
            result[l] = "-"  # No significant difference
        else:
            result[l] = ''.join(sorted(sig_map[l]))  # List of significantly different subsets
    return result

# === Attach significance labels to each POS tag record ===
def enrich_with_ss(data, subset_key):
    enriched = []
    for tag, count, pct in data:
        ss = get_significance_chi2(tag)[subset_key]  # Get significance label for this subset
        enriched.append((tag, count, pct, ss))
    return enriched

# Enrich each subset's data with significance annotations
clear_enriched = enrich_with_ss(clear, "c")
friendly_enriched = enrich_with_ss(friendly, "f")
empathetic_enriched = enrich_with_ss(empathetic, "e")
polite_enriched = enrich_with_ss(polite, "p")

# === Determine number of rows needed (longest list) ===
TOP_N = max(len(clear_enriched), len(friendly_enriched), len(empathetic_enriched), len(polite_enriched))

# Pad each list to the same length to ensure row alignment in the output
def pad(data, n):
    return data + [("", 0, 0.0, "")] * (n - len(data))

clear_enriched = pad(clear_enriched, TOP_N)
friendly_enriched = pad(friendly_enriched, TOP_N)
empathetic_enriched = pad(empathetic_enriched, TOP_N)
polite_enriched = pad(polite_enriched, TOP_N)

# === Write formatted output to a text file ===
with open("results.txt", "a") as file:
    header = (
        f"{'Rank':<5} "
        f"{'Clear':<10} {'Count':<7} {'%':<6} {'SS':<3}   "
        f"{'Friendly':<10} {'Count':<7} {'%':<6} {'SS':<3}   "
        f"{'Empathic':<10} {'Count':<7} {'%':<6} {'SS':<3}   "
        f"{'Polite':<10} {'Count':<7} {'%':<6} {'SS':<3}"
    )
    print("POS Frequency Analysis\n", file=file)
    print(header, file=file)
    print("-" * len(header), file=file)

    # Print each row of POS tag data across all subsets
    for i in range(TOP_N):
        c_tag, c_count, c_pct, c_ss = clear_enriched[i]
        f_tag, f_count, f_pct, f_ss = friendly_enriched[i]
        e_tag, e_count, e_pct, e_ss = empathetic_enriched[i]
        p_tag, p_count, p_pct, p_ss = polite_enriched[i]

        line = (
            f"{i+1:<5} "
            f"{c_tag:<10} {c_count:<7} {c_pct:<6.1f} {c_ss:<3}   "
            f"{f_tag:<10} {f_count:<7} {f_pct:<6.1f} {f_ss:<3}   "
            f"{e_tag:<10} {e_count:<7} {e_pct:<6.1f} {e_ss:<3}   "
            f"{p_tag:<10} {p_count:<7} {p_pct:<6.1f} {p_ss:<3}"
        )
        print(line, file=file)

In [None]:
# === TOKEN FREQUENCY ANALYSIS (PUNCTUATION) ===

# Function to count punctuation tokens across a set of responses
def get_all_token_counts(responses, nlp):
    token_counts = Counter()
    total_tokens = 0
    for doc in nlp.pipe(responses):  # Efficient batch processing
        for token in doc:
            if token.is_punct:  # Only include punctuation tokens
                token_counts[token.text] += 1
                total_tokens += 1
    return token_counts, total_tokens

# Count punctuation tokens in each subset
counts_c, total_c = get_all_token_counts(res_c, nlp)  # Clear tone
counts_f, total_f = get_all_token_counts(res_f, nlp)  # Friendly tone
counts_e, total_e = get_all_token_counts(res_e, nlp)  # Empathetic tone
counts_p, total_p = get_all_token_counts(res_p, nlp)  # Polite tone

# Sort punctuation tokens by relative frequency (percentage), descending
def sort_by_relative_freq(counts, total):
    return sorted(counts.items(), key=lambda x: x[1] / total, reverse=True)

# Top punctuation tokens for each subset
top_c = sort_by_relative_freq(counts_c, total_c)
top_f = sort_by_relative_freq(counts_f, total_f)
top_e = sort_by_relative_freq(counts_e, total_e)
top_p = sort_by_relative_freq(counts_p, total_p)

# Bundle token counts and totals for access in significance testing
all_counts = {
    "c": (counts_c, total_c),
    "f": (counts_f, total_f),
    "e": (counts_e, total_e),
    "p": (counts_p, total_p)
}

# Chi-square significance testing: is this token used differently in one subset vs others?
def get_significance_label(token, initial):
    sig_against = []
    all_subsets = {"c", "f", "e", "p"} - {initial}
    tok1, total1 = all_counts[initial]
    tok1_count = tok1.get(token, 0)
    not_tok1 = total1 - tok1_count

    for other in all_subsets:
        tok2, total2 = all_counts[other]
        tok2_count = tok2.get(token, 0)
        not_tok2 = total2 - tok2_count

        # 2x2 contingency table for chi-square test
        contingency = [[tok1_count, not_tok1], [tok2_count, not_tok2]]
        chi2, p, _, _ = chi2_contingency(contingency)
        if p < 0.05:
            sig_against.append(other)

    # Label formatting
    if set(sig_against) == all_subsets:
        return "*"  # Significant against all others
    elif sig_against:
        return ''.join(sorted(sig_against))  # Significant against some
    else:
        return "-"  # Not significant

# === Formatting and Output ===

# Define column widths for alignment
W_TOKEN = 8  # Token text width
W_ABS = 6    # Raw frequency width
W_REL = 7    # Relative frequency (percentage)
W_SS = 4     # Significance score

# Format one block of a token's data (text, count, percent, significance)
def format_block(token, freq, total, label):
    if not token:
        return " " * (W_TOKEN + W_ABS + W_REL + W_SS)  # Empty row if no token
    rel = f"{(freq / total * 100):.1f}%" if total > 0 else "0.0%"  # Compute %
    sig = get_significance_label(token, label)  # Get significance label
    return f"{token:<{W_TOKEN}}{freq:>{W_ABS}}{rel:>{W_REL}}{sig:>{W_SS}}"

# === Write the formatted output to a results file ===
with open('results.txt', 'a') as file:
    print("\nToken Frequency Analysis (Punctuation)\n", file=file)

    # Header with subset names
    header_names = ['Clear', 'Friendly', 'Empath.', 'Polite']
    header_line = "     " + "   ".join(
        f"{name:<{W_TOKEN + W_ABS + W_REL + W_SS}}" for name in header_names
    )

    # Subheader with column labels
    subheader_line = f"{'#':<4}" + "   ".join(
        f"{'Token':<{W_TOKEN}}{'Freq':>{W_ABS}}{'(%)':>{W_REL}}{'SS':>{W_SS}}" for _ in header_names
    )

    print(header_line, file=file)
    print(subheader_line, file=file)
    print("-" * len(subheader_line), file=file)

    # For each row, print data for all subsets (default to blank if no token)
    for i, rows in enumerate(zip_longest(top_c, top_f, top_e, top_p, fillvalue=("", 0))):
        row = f"{i+1:<4}"
        for (token, freq), label in zip(rows, ['c', 'f', 'e', 'p']):
            total = all_counts[label][1]
            row += format_block(token, freq, total, label) + "   "
        print(row.rstrip(), file=file)

## Lexical Analysis

In [None]:
with open('results.txt', 'a') as file:
    print("=== LEXICAL FEATURES ===", file=file)

In [None]:
# === LEXICAL DIVERSITY ANALYSIS USING TOKEN-TYPE RATIO (TTR) ===

# TTR calculation
def compute_ttr_value(texts, nlp):
    def calculate_ttr(text):
        doc = nlp(text)
        tokens = [token.lemma_.lower() for token in doc if token.is_alpha]
        if not tokens:
            return None
        return len(set(tokens)) / len(tokens)

    ttr_values = [calculate_ttr(text) for text in texts]
    ttr_values = [val for val in ttr_values if val is not None]
    avg_ttr = sum(ttr_values) / len(ttr_values) if ttr_values else 0
    return avg_ttr, ttr_values

# Input styles and responses
styles = ["clear", "friendly", "empathetic", "polite"]
requests = [req_c, req_f, req_e, req_p]
responses = [res_c, res_f, res_e, res_p]

ttr_report = {style: {} for style in styles}
ttr_scores = {}

# Compute TTR for each style
for style, res in zip(styles, responses):
    avg_ttr, ttr_vals = compute_ttr_value(res, nlp=nlp)
    ttr_report[style]["avg_ttr"] = avg_ttr
    ttr_scores[style] = ttr_vals

# Shapiro-Wilk test
shapiro_results = {style: shapiro(vals) for style, vals in ttr_scores.items()}
all_normal = all(result.pvalue >= 0.05 for result in shapiro_results.values())

# Statistical test
if all_normal:
    stat_test = "ANOVA"
    stat, p_val = f_oneway(*[ttr_scores[style] for style in styles])
else:
    stat_test = "Kruskal-Wallis"
    stat, p_val = kruskal(*[ttr_scores[style] for style in styles])

# Post-hoc test if significant
posthoc = None
if p_val < 0.05:
    all_scores = [val for vals in ttr_scores.values() for val in vals]
    all_labels = [style for style in styles for _ in ttr_scores[style]]
    df_posthoc = pd.DataFrame({"score": all_scores, "style": all_labels})
    posthoc = sp.posthoc_dunn(df_posthoc, val_col="score", group_col="style", p_adjust="bonferroni")

# Output results
with open('results.txt', 'a') as file:
    print("\nLexical Diversity (TTR, average)", file=file)
    print(f"\n\tStatistical Test: {stat_test} (p = {p_val:.4f})\n", file=file)

    if posthoc is not None:
        sig_annotated = summarize_significant_differences_annotated(posthoc, styles)
        for style in styles:
            mean_val = ttr_report[style]["avg_ttr"]
            sig_diff = sig_annotated[style]
            print(f"\t{style:<12}: {mean_val:.3f}  {sig_diff}", file=file)
    else:
        for style in styles:
            mean_val = ttr_report[style]["avg_ttr"]
            print(f"\t{style:<12}: {mean_val:.3f}", file=file)

In [None]:
# === LEXICAL COMPLEXITY ANALYSIS WITH STATISTICAL SIGNIFICANCE ===

def compute_ngsl_distribution_per_text(responses, ngsl_1000_txt, ngsl_2000_txt, nlp):
    # Compute the proportion of lemmatized content words in each response that appear in the NGSL (New General Service List) top 2000 vocabulary.
    # This proportion serves as a proxy for lexical simplicity: higher = simpler.

    def load_word_list(filename):
        # Load word list from raw text (one word per line)
        lines = filename.splitlines()
        return set(word.strip().lower() for word in lines)

    def tokenize_and_lemmatize(text):
        # Tokenize and lemmatize a string using spaCy
        doc = nlp(text)
        return [
            token.lemma_.lower()
            for token in doc
            if token.is_alpha and not token.is_punct and not token.is_stop
        ]

    # Load NGSL 1000 and 2000 lists
    ngsl_1000 = load_word_list(ngsl_1000_txt)
    ngsl_2000 = load_word_list(ngsl_2000_txt)

    # Combine and lemmatize all NGSL words
    ngsl_combined = set(tokenize_and_lemmatize(' '.join(ngsl_1000 | ngsl_2000)))

    # Compute NGSL ratio per response
    proportions = []
    for doc in nlp.pipe(responses, batch_size=50):
        tokens = [
            token.lemma_.lower()
            for token in doc
            if token.is_alpha and not token.is_punct and not token.is_stop
        ]
        total = len(tokens)
        if total == 0:
            proportions.append(0)
        else:
            count_ngsl = sum(1 for t in tokens if t in ngsl_combined)
            proportions.append(count_ngsl / total)

    return proportions


# === SETUP: Define tone groups and store results ===

styles = ["clear", "friendly", "empathetic", "polite"]
requests = [req_c, req_f, req_e, req_p]      # (Not used in this script)
responses = [res_c, res_f, res_e, res_p]

# Containers for output
ngsl_report = {style: {} for style in styles}  # To store average NGSL ratios
ngsl_scores = {}  # To store per-response NGSL ratios

# === COMPUTE: Get NGSL-based lexical simplicity scores ===

for style, res in zip(styles, responses):
    ratios = compute_ngsl_distribution_per_text(
        res,
        ngsl_1000_txt,
        ngsl_2000_txt,
        nlp=nlp
    )
    ngsl_report[style]["ngsl_ratio"] = np.mean(ratios)
    ngsl_scores[style] = ratios

# === STATISTICAL TESTING AND OUTPUT ===

with open('results.txt', 'a') as file:
    print("\nLexical Complexity (NGSL frequency list, average)", file=file)

    # 1. Shapiro-Wilk normality test
    shapiro_p = {s: shapiro(ngsl_scores[s]).pvalue for s in styles}
    all_normal = all(p >= 0.05 for p in shapiro_p.values())

    # 2. Choose appropriate test
    data = [ngsl_scores[s] for s in styles]
    if all_normal:
        test_type = "ANOVA"
        stat, p_val = f_oneway(*data)
    else:
        test_type = "Kruskal-Wallis"
        stat, p_val = kruskal(*data)

    print(f"\n\tStatistical Test: {test_type} (p = {p_val:.4f})\n", file=file)

    # 3. Posthoc testing if significant
    if p_val < 0.05:
        # Prepare flat list of all values and labels
        all_vals = [val for sublist in data for val in sublist]
        all_labels = sum([[s]*len(ngsl_scores[s]) for s in styles], [])
        df = pd.DataFrame({"score": all_vals, "style": all_labels})

        # Run Dunn's posthoc test with Bonferroni correction
        posthoc = sp.posthoc_dunn(df, val_col="score", group_col="style", p_adjust="bonferroni")
        sig_labels = summarize_significant_differences_annotated(posthoc, styles)

        # Print results with annotations
        for s in styles:
            mean_val = ngsl_report[s]["ngsl_ratio"]
            sig_diff = sig_labels[s]
            print(f"\t{s:<12}: {mean_val:.2f}  {sig_diff}", file=file)
    else:
        # If no significant difference, print averages only
        for s in styles:
            mean_val = ngsl_report[s]["ngsl_ratio"]
            print(f"\t{s:<12}: {mean_val:.2f}", file=file)

In [None]:
# === LEXICAL REPETITION ANALYSIS ===

# Function to compute repetition scores per text
def compute_lexical_repetition_per_text(texts, nlp):
    # For each text, computes a repetition score by sliding windows of sizes 2, 5, and 10, and counting repeated unigrams, bigrams, and trigrams within those windows.
    # The final score is the sum of all repetition counts across all windows.

    # Helper to extract n-grams of size n
    def get_ngrams(tokens, n):
        return [tuple(tokens[i:i+n]) for i in range(len(tokens) - n + 1)]

    # Count how many repeated n-grams exist within a window
    def count_lexical_repetitions(tokens, window_size):
        total_repetitions = 0
        for i in range(len(tokens) - window_size + 1):
            window = tokens[i:i+window_size]
            window_ngrams = []
            for n in [1, 2, 3]:  # Unigrams, bigrams, trigrams
                ngrams = get_ngrams(window, n)
                window_ngrams.extend(ngrams)
            counts = Counter(window_ngrams)
            repetitions = sum(count - 1 for count in counts.values() if count > 1)
            total_repetitions += repetitions
        return total_repetitions

    # Window sizes to scan for repetitions
    window_sizes = [2, 5, 10]
    repetition_scores = []

    # Process each text
    for text in texts:
        doc = nlp(text)
        tokens = [token.text.lower() for token in doc if token.is_alpha]  # Keep only alphabetic words
        score = sum(count_lexical_repetitions(tokens, w) for w in window_sizes)
        repetition_scores.append(score)

    return repetition_scores


# === SETUP ===

# Define style categories and collect corresponding responses
styles = ["clear", "friendly", "empathetic", "polite"]
requests = [req_c, req_f, req_e, req_p]     # (Unused in this script)
responses = [res_c, res_f, res_e, res_p]

# Containers for storing results
repetition_report = {style: {} for style in styles}
repetition_scores = {}

# Compute repetition scores for each style
for style, res in zip(styles, responses):
    scores = compute_lexical_repetition_per_text(res, nlp=nlp)
    repetition_report[style]["lexical_repetition"] = np.mean(scores)  # Average score per style
    repetition_scores[style] = scores  # Individual scores

# === OUTPUT: REPORT TO FILE ===

with open('results.txt', 'a') as file:
    print("\nLexical Repetition (across n-grams, average)", file=file)

    # Step 1: Test for normality using Shapiro-Wilk
    shapiro_p = {s: shapiro(repetition_scores[s]).pvalue for s in styles}
    all_normal = all(p >= 0.05 for p in shapiro_p.values())

    # Step 2: Choose statistical test based on normality
    data = [repetition_scores[s] for s in styles]
    if all_normal:
        test_type = "ANOVA"
        stat, p_val = f_oneway(*data)
    else:
        test_type = "Kruskal-Wallis"
        stat, p_val = kruskal(*data)

    print(f"\n\tStatistical Test: {test_type} (p = {p_val:.4f})\n", file=file)

    # Step 3: If significant, run posthoc test
    if p_val < 0.05:
        all_vals = [val for sublist in data for val in sublist]
        all_labels = sum([[s] * len(repetition_scores[s]) for s in styles], [])
        df = pd.DataFrame({"score": all_vals, "style": all_labels})

        # Dunn's posthoc test with Bonferroni correction
        posthoc = sp.posthoc_dunn(df, val_col="score", group_col="style", p_adjust="bonferroni")
        sig_labels = summarize_significant_differences_annotated(posthoc, styles)

        # Print results with annotations
        for s in styles:
            mean_val = repetition_report[s]["lexical_repetition"]
            sig_diff = sig_labels[s]
            print(f"\t{s:<12}: {mean_val:.2f}  {sig_diff}", file=file)
    else:
        # If not significant, print just the averages
        for s in styles:
            mean_val = repetition_report[s]["lexical_repetition"]
            print(f"\t{s:<12}: {mean_val:.2f}", file=file)

In [None]:
# === INFORMATION DENSITY ANALYSIS WITH STATISTICAL SIGNIFICANCE ===

# Function to compute information density per text
def compute_info_density_per_text(texts, nlp):
    # Computes information density for each text:
    # Defined as the proportion of content words (nouns, verbs, adjectives, adverbs) relative to the total number of non-punctuation, non-space tokens.
    
    densities = []
    for text in texts:
        doc = nlp(text)
        # Content words = semantic "load-bearers"
        content_words = [token for token in doc if token.pos_ in ["NOUN", "VERB", "ADJ", "ADV"]]
        # Total words = all tokens except spaces and punctuation
        total_words = [token for token in doc if not token.is_space and not token.is_punct]
        # Compute density
        density = len(content_words) / len(total_words) if total_words else 0
        densities.append(density)
    return densities


# === SETUP: Define tone categories and response sets ===

styles = ["clear", "friendly", "empathetic", "polite"]
responses_list = [res_c, res_f, res_e, res_p]  # Response texts per tone

# Containers to store results
info_density_report = {style: {} for style in styles}  # Average density per style
info_density_scores = {}  # All individual density scores per style


# === COMPUTE INFORMATION DENSITY SCORES ===

for style, resps in zip(styles, responses_list):
    densities = compute_info_density_per_text(resps, nlp=nlp)
    info_density_report[style]["information_density"] = np.mean(densities)
    info_density_scores[style] = densities

# === STATISTICAL TESTING AND OUTPUT TO FILE ===

with open('results.txt', 'a') as file:
    print("\nInformation density (content words / total words, average)", file=file)

    # 1. Normality Test (Shapiro-Wilk)
    shapiro_p = {s: shapiro(info_density_scores[s]).pvalue for s in styles}
    all_normal = all(p >= 0.05 for p in shapiro_p.values())

    # 2. Choose statistical test based on normality
    data = [info_density_scores[s] for s in styles]
    if all_normal:
        test_type = "ANOVA"
        stat, p_val = f_oneway(*data)
    else:
        test_type = "Kruskal-Wallis"
        stat, p_val = kruskal(*data)

    print(f"\n\tStatistical Test: {test_type} (p = {p_val:.4f})\n", file=file)

    # 3. If significant, run posthoc test
    if p_val < 0.05:
        # Flatten scores and build labeled DataFrame
        all_vals = [val for sublist in data for val in sublist]
        all_labels = sum([[s]*len(info_density_scores[s]) for s in styles], [])
        df = pd.DataFrame({"score": all_vals, "style": all_labels})

        # Perform Dunn's posthoc test with Bonferroni correction
        posthoc = sp.posthoc_dunn(df, val_col="score", group_col="style", p_adjust="bonferroni")

        # Summarize which groups differ significantly
        sig_labels = summarize_significant_differences_annotated(posthoc, styles)

        # Output average scores with significance labels
        for s in styles:
            mean_val = info_density_report[s]["information_density"]
            sig_diff = sig_labels[s]
            print(f"\t{s:<12}: {mean_val:.3f}  {sig_diff}", file=file)
    else:
        # If no significant differences, just print average scores
        for s in styles:
            mean_val = info_density_report[s]["information_density"]
            print(f"\t{s:<12}: {mean_val:.3f}", file=file)

In [None]:
# === TEXT LENGTH ANALYSIS ===

# Function to compute token length per response
def compute_token_lengths_per_text(responses, nlp):

    # Takes a list of text responses and returns a list of token counts
    # (i.e., number of tokens in each response), using spaCy's tokenizer.

    return [len(nlp(text)) for text in responses]


# === SETUP ===

# Define styles and corresponding responses
styles = ["clear", "friendly", "empathetic", "polite"]
responses_list = [res_c, res_f, res_e, res_p]

# Containers to store average and individual token lengths
token_length_report = {style: {} for style in styles}
token_length_scores = {}


# === COMPUTE TOKEN LENGTHS ===

for style, resps in zip(styles, responses_list):
    lengths = compute_token_lengths_per_text(resps, nlp=nlp)
    token_length_report[style]["token_length"] = np.mean(lengths)
    token_length_scores[style] = lengths

# === STATISTICAL TESTING & OUTPUT ===

with open('results.txt', 'a') as file:
    print("\nSentence length (average)", file=file)

    # Step 1: Test for normality using Shapiro-Wilk
    shapiro_p = {s: shapiro(token_length_scores[s]).pvalue for s in styles}
    all_normal = all(p >= 0.05 for p in shapiro_p.values())

    # Step 2: Choose statistical test
    data = [token_length_scores[s] for s in styles]
    if all_normal:
        test_type = "ANOVA"
        stat, p_val = f_oneway(*data)
    else:
        test_type = "Kruskal-Wallis"
        stat, p_val = kruskal(*data)

    print(f"\n\tStatistical Test: {test_type} (p = {p_val:.4f})\n", file=file)

    # Step 3: If significant, perform posthoc test
    if p_val < 0.05:
        all_vals = [val for sublist in data for val in sublist]
        all_labels = sum([[s] * len(token_length_scores[s]) for s in styles], [])
        df = pd.DataFrame({"score": all_vals, "style": all_labels})

        # Perform Dunn's posthoc test with Bonferroni correction
        posthoc = sp.posthoc_dunn(df, val_col="score", group_col="style", p_adjust="bonferroni")

        # Summarize significance results
        sig_labels = summarize_significant_differences_annotated(posthoc, styles)

        # Output mean values and annotations
        for s in styles:
            mean_val = token_length_report[s]["token_length"]
            sig_diff = sig_labels[s]
            print(f"\t{s:<12}: {mean_val:.2f}  {sig_diff}", file=file)
    else:
        # If not significant, output only mean values
        for s in styles:
            mean_val = token_length_report[s]["token_length"]
            print(f"\t{s:<12}: {mean_val:.2f}", file=file)

## Discourse Analysis

In [None]:
with open('results.txt', 'a') as file:
    print("\n=== DISCOURSE FEATURES ===", file=file)

In [None]:
# === COHESION ANALYSIS ===

# Function to compute the proportion of cohesive devices in each text
def compute_total_cohesion_per_text(texts, nlp):
    # List of common cohesive devices (connectives, transitions, etc.)
    cohesive_devices = [
        "and", "also", "too", "as well", "in addition", "furthermore", "moreover",
        "besides", "what's more", "for example", "for instance", "such as", "like",
        "including", "namely", "in particular", "similarly", "likewise", "in the same way",
        "equally", "just as", "in other words", "that is to say", "i.e.", "to put it another way",
        "because", "since", "as", "due to", "owing to", "for this reason", "so", "therefore",
        "thus", "consequently", "as a result", "hence", "accordingly", "in order to", "so that",
        "for the purpose of", "with the aim of", "but", "however", "although", "even though",
        "whereas", "while", "on the other hand", "in contrast", "yet", "nonetheless",
        "nevertheless", "still", "admittedly", "of course", "even so", "while it is true that",
        "or", "alternatively", "on the one hand", "on the other hand", "either", "or",
        "neither", "nor", "then", "next", "after that", "subsequently", "eventually",
        "finally", "at last", "while", "as", "at the same time", "meanwhile", "during",
        "before", "previously", "earlier", "up to that point", "until then", "after",
        "later", "afterwards", "since then", "now", "at present", "currently", "at that moment",
        "by then", "at that time", "in conclusion", "in summary", "to sum up", "overall",
        "eventually"
    ]

    # Create a set of lowercase cohesive devices for quick lookup
    cohesive_set = set(word.lower() for word in cohesive_devices)

    scores = []
    for text in texts:
        doc = nlp(text)  # Process the text with spaCy NLP pipeline
        # Count tokens that are not punctuation or whitespace
        token_count = len([t for t in doc if not t.is_punct and not t.is_space])
        # Count how many tokens match cohesive devices
        match_count = sum(1 for token in doc if token.text.lower() in cohesive_set)
        # Compute the ratio of cohesive devices to total tokens
        score = match_count / token_count if token_count else 0.0
        scores.append(score)

    return scores

# === PREPARATION ===

# Define the four styles
styles = ["clear", "friendly", "empathetic", "polite"]
# Corresponding responses for each style
responses_list = [res_c, res_f, res_e, res_p]

# Initialize a report dictionary to hold average cohesion per style
total_cohesion_report = {style: {} for style in styles}
# Dictionary to hold individual cohesion scores per response
total_cohesion_scores = {}

# === COMPUTE COHESION SCORES ===

# Compute cohesion scores for each style's responses
for style, resps in zip(styles, responses_list):
    scores = compute_total_cohesion_per_text(resps, nlp)
    total_cohesion_report[style]["total_cohesion"] = np.mean(scores)  # Store average
    total_cohesion_scores[style] = scores  # Store all individual scores

# === REPORTING AND STATISTICAL TESTING ===

# Append results to output file
with open('results.txt', 'a') as file:
    print("\nCohesion (frequency of connectives, average)", file=file)

    # Collect all scores for statistical testing
    data = [total_cohesion_scores[s] for s in styles]

    # Perform Shapiro-Wilk test for normality on each group
    shapiro_p = {s: shapiro(total_cohesion_scores[s]).pvalue for s in styles}
    all_normal = all(p >= 0.05 for p in shapiro_p.values())

    # Choose parametric or non-parametric test based on normality
    if all_normal:
        test_type = "ANOVA"
        stat, p_val = f_oneway(*data)
    else:
        test_type = "Kruskal-Wallis"
        stat, p_val = kruskal(*data)

    print(f"\n\tStatistical Test: {test_type} (p = {p_val:.4f})\n", file=file)

    # If significant differences exist, perform post-hoc testing
    if p_val < 0.05:
        # Flatten values and create labels for post-hoc analysis
        all_vals = [val for sublist in data for val in sublist]
        all_labels = sum([[s]*len(total_cohesion_scores[s]) for s in styles], [])
        df = pd.DataFrame({"score": all_vals, "style": all_labels})

        # Dunn's post-hoc test with Bonferroni correction
        posthoc = sp.posthoc_dunn(df, val_col="score", group_col="style", p_adjust="bonferroni")

        # Annotate significance results
        sig_labels = summarize_significant_differences_annotated(posthoc, styles)

        # Print results with significance markers
        for s in styles:
            mean_val = total_cohesion_report[s]["total_cohesion"]
            sig_diff = sig_labels[s]
            print(f"\t{s:<12}: {mean_val:.3f}  {sig_diff}", file=file)
    else:
        # If no significant differences, just print means
        for s in styles:
            mean_val = total_cohesion_report[s]["total_cohesion"]
            print(f"\t{s:<12}: {mean_val:.3f}", file=file)

In [None]:
# === REQUEST/RESPONSE SEMANTIC OVERLAP ANALYSIS (BERTScore) ===

# Function to compute BERTScore F1 for each request-response pair
def compute_bert_f1_scores(requests, responses):
    # Calculate BERTScore using pre-trained 'bert-base-uncased' model
    # Only the F1 scores are extracted here
    _, _, F1 = bertscore(responses, requests, lang='en', model_type='bert-base-uncased', verbose=True)
    return F1.tolist()  # Convert F1 scores from tensor to list

# Define response styles and corresponding request/response data
styles = ["clear", "friendly", "empathetic", "polite"]
requests_list = [req_c, req_f, req_e, req_p]   # Pre-defined lists of requests per style
responses_list = [res_c, res_f, res_e, res_p]  # Corresponding list of responses per style

# Initialize report containers
bertscore_report = {style: {} for style in styles}  # For storing average F1 scores
bert_scores = {}  # For storing raw F1 scores per instance

# Compute BERTScore F1 for each style's request-response pairs
for style, reqs, resps in zip(styles, requests_list, responses_list):
    f1_scores = compute_bert_f1_scores(reqs, resps)
    bertscore_report[style]["bert_f1"] = np.mean(f1_scores)  # Store average score
    bert_scores[style] = f1_scores  # Store individual scores

# Append results to output file
with open('results.txt', 'a') as file:
    print("\nSemantic REQ-RES Overlap (BERTScore (F1), average)", file=file)

    # Perform Shapiro-Wilk test to check normality of score distributions
    shapiro_p = {s: shapiro(bert_scores[s]).pvalue for s in styles}
    all_normal = all(p >= 0.05 for p in shapiro_p.values())  # Are all distributions normal?

    # Choose appropriate statistical test based on normality
    data = [bert_scores[s] for s in styles]
    if all_normal:
        test_type = "ANOVA"
        stat, p_val = f_oneway(*data)  # Use one-way ANOVA if all are normal
    else:
        test_type = "Kruskal-Wallis"
        stat, p_val = kruskal(*data)  # Otherwise, use non-parametric Kruskal-Wallis test

    # Report type of test and its p-value
    print(f"\n\tStatistical Test: {test_type} (p = {p_val:.4f})\n", file=file)

    # If significant differences exist between styles, conduct post-hoc analysis
    if p_val < 0.05:
        # Prepare data for pairwise post-hoc test
        all_vals = [val for sublist in data for val in sublist]
        all_labels = sum([[s]*len(bert_scores[s]) for s in styles], [])
        df = pd.DataFrame({"score": all_vals, "style": all_labels})

        # Conduct Dunn's test with Bonferroni correction for multiple comparisons
        posthoc = sp.posthoc_dunn(df, val_col="score", group_col="style", p_adjust="bonferroni")

        # Summarize and annotate significant differences between styles
        sig_labels = summarize_significant_differences_annotated(posthoc, styles)

        # Output average scores and significant differences
        for s in styles:
            mean_val = bertscore_report[s]["bert_f1"]
            sig_diff = sig_labels[s]
            print(f"\t{s:<12}: {mean_val:.3f}  {sig_diff}", file=file)
    else:
        # If no significant difference, simply print the average scores
        for s in styles:
            mean_val = bertscore_report[s]["bert_f1"]
            print(f"\t{s:<12}: {mean_val:.3f}", file=file)

In [None]:
# === N-GRAM REQUEST/RESPONSE OVERLAP ANALYSIS (ROUGE-L) ===

# Function to compute ROUGE-L F1 scores for each request/response pair
def compute_rouge_l_scores(requests, responses):
    # Initialize ROUGE-L scorer with stemming enabled
    scorer = rouge_scorer.RougeScorer(['rougeL'], use_stemmer=True)

    # Compute ROUGE-L F1 score for each (request, response) pair
    return [
        scorer.score(req, resp)['rougeL'].fmeasure
        for req, resp in zip(requests, responses)
    ]

# List of different stylistic categories being evaluated
styles = ["clear", "friendly", "empathetic", "polite"]

# Corresponding request and response sets for each style
requests_list = [req_c, req_f, req_e, req_p]   # Lists of requests by style
responses_list = [res_c, res_f, res_e, res_p]  # Lists of responses by style

# Initialize dictionaries to hold the results
rouge_report = {style: {} for style in styles}  # Holds average ROUGE-L per style
rouge_scores = {}  # Holds individual ROUGE-L scores per request/response pair per style

# Compute ROUGE-L scores for each style
for style, reqs, resps in zip(styles, requests_list, responses_list):
    scores = compute_rouge_l_scores(reqs, resps)  # Compute per-pair scores
    rouge_report[style]["rouge_l"] = np.mean(scores)  # Store average score
    rouge_scores[style] = scores  # Store all individual scores

# Write results to a file
with open('results.txt', 'a') as file:
    print("\nN-gram REQ-RES Overlap (ROUGE-L, average)", file=file)

    # Test for normality using Shapiro-Wilk test
    shapiro_p = {s: shapiro(rouge_scores[s]).pvalue for s in styles}
    all_normal = all(p >= 0.05 for p in shapiro_p.values())  # Check if all styles are normally distributed

    # Choose appropriate statistical test based on normality
    data = [rouge_scores[s] for s in styles]
    if all_normal:
        test_type = "ANOVA"  # Parametric test for normally distributed data
        stat, p_val = f_oneway(*data)
    else:
        test_type = "Kruskal-Wallis"  # Non-parametric test for non-normal data
        stat, p_val = kruskal(*data)

    # Report which statistical test was used and its p-value
    print(f"\n\tStatistical Test: {test_type} (p = {p_val:.4f})\n", file=file)

    if p_val < 0.05:
        # If significant differences exist, perform post-hoc analysis
        all_vals = [val for sublist in data for val in sublist]  # Flatten scores
        all_labels = sum([[s]*len(rouge_scores[s]) for s in styles], [])  # Create labels per score
        df = pd.DataFrame({"score": all_vals, "style": all_labels})  # Create DataFrame for analysis

        # Perform Dunn's post-hoc test with Bonferroni correction
        posthoc = sp.posthoc_dunn(df, val_col="score", group_col="style", p_adjust="bonferroni")

        # Generate significance annotations for each style
        sig_labels = summarize_significant_differences_annotated(posthoc, styles)

        # Print mean scores along with significance markers
        for s in styles:
            mean_val = rouge_report[s]["rouge_l"]
            sig_diff = sig_labels[s]
            print(f"\t{s:<12}: {mean_val:.3f}  {sig_diff}", file=file)
    else:
        # If no significant difference, just print the means
        for s in styles:
            mean_val = rouge_report[s]["rouge_l"]
            print(f"\t{s:<12}: {mean_val:.3f}", file=file)

## Analysis of Deixis and Modality

In [None]:
with open('results.txt', 'a') as file:
    print("\n=== DEICTICS AND MODALITY ===", file=file)

In [None]:
# === PROXIMAL DEICTIC RATIO ANALYSIS ===

# Function to compute the ratio of proximal deictic terms per response
def compute_deictic_ratios_per_text(responses: List[str], nlp) -> List[float]:
    # Define sets of proximal and distal deictic terms
    proximal = {"this", "these", "here"}

    ratios = []
    for text in responses:
        # Process each response using the provided NLP pipeline
        doc = nlp(text)
        count_proximal = 0
        count_distal = 0

        # Iterate through tokens in the text
        for token in doc:
            if token.is_alpha:  # Only consider alphabetic tokens
                tok = token.text.lower()

                # Count proximal deictic terms
                if tok in proximal:
                    count_proximal += 1

                # Count distal deictic terms with POS conditionals
                elif tok == "that":
                    if token.pos_ == "DET":  # "that" used as a determiner
                        count_distal += 1
                elif tok == "there":
                    if token.pos_ != "EX":  # Exclude existential "there"
                        count_distal += 1
                elif tok == "those":
                    count_distal += 1

        # Calculate proximal deictic ratio (proximal / total)
        total = count_proximal + count_distal
        ratio = count_proximal / total if total > 0 else 0.0
        ratios.append(ratio)

    return ratios  # Return list of ratios, one per text

# === PREPARE INPUTS AND OUTPUT CONTAINERS ===

# Define the stylistic categories to analyze
styles = ["clear", "friendly", "empathetic", "polite"]

# Corresponding list of responses for each style
responses_list = [res_c, res_f, res_e, res_p]

# Containers for computed ratios and summary results
deictic_report = {style: {} for style in styles}  # Holds average ratio per style
deictic_ratios = {}  # Holds individual ratios for each response

# === COMPUTE RATIOS PER STYLE ===

for style, resps in zip(styles, responses_list):
    # Compute proximal deictic ratios for each response
    ratios = compute_deictic_ratios_per_text(resps, nlp=nlp)
    # Store average ratio for current style
    deictic_report[style]["proximal_deictic_ratio"] = np.mean(ratios)
    # Store all ratios for further analysis
    deictic_ratios[style] = ratios

# === REPORT OUTPUT TO FILE ===

with open('results.txt', 'a') as file:
    print("\nProximal deictic ratio (corpus-level)", file=file)

    # Extract data for statistical testing
    data = [deictic_ratios[s] for s in styles]

    # Test for normality using Shapiro-Wilk test for each style
    shapiro_p = {s: shapiro(deictic_ratios[s]).pvalue for s in styles}
    all_normal = all(p >= 0.05 for p in shapiro_p.values())  # Check if all are normally distributed

    # Select appropriate test based on normality
    if all_normal:
        test_type = "ANOVA"  # Parametric test for normally distributed data
        stat, p_val = f_oneway(*data)
    else:
        test_type = "Kruskal-Wallis"  # Non-parametric test for non-normal data
        stat, p_val = kruskal(*data)

    # Print which statistical test was used and the resulting p-value
    print(f"\n\tStatistical Test: {test_type} (p = {p_val:.4f})\n", file=file)

    # If significant differences are found, perform post-hoc comparisons
    if p_val < 0.05:
        # Flatten the list of scores and create labels for each score
        all_vals = [val for sublist in data for val in sublist]
        all_labels = sum([[s]*len(deictic_ratios[s]) for s in styles], [])
        # Create DataFrame for post-hoc analysis
        df = pd.DataFrame({"score": all_vals, "style": all_labels})
        # Run Dunn's test with Bonferroni correction for multiple comparisons
        posthoc = sp.posthoc_dunn(df, val_col="score", group_col="style", p_adjust="bonferroni")
        # Annotate significance results
        sig_labels = summarize_significant_differences_annotated(posthoc, styles)

        # Print average ratios with significance annotations
        for s in styles:
            mean_val = deictic_report[s]["proximal_deictic_ratio"]
            sig_diff = sig_labels[s]
            print(f"\t{s:<12}: {mean_val:.3f}  {sig_diff}", file=file)
    else:
        # If no significant difference, just print the average ratios
        for s in styles:
            mean_val = deictic_report[s]["proximal_deictic_ratio"]
            print(f"\t{s:<12}: {mean_val:.3f}", file=file)

In [None]:
# === FIRST/SECOND PERSON PRONOUN TYPE RATIOS ANALYSIS ===

# Function to compute per-text ratios of first-person singular, first-person plural, and second-person pronouns
def compute_pronoun_type_ratios(responses: List[str], nlp):
    # Manual mapping of pronouns to pronoun types
    manual_pronoun_map = {
        "i": "1|Sing", "me": "1|Sing", "my": "1|Sing", "mine": "1|Sing", "myself": "1|Sing",
        "we": "1|Plur", "us": "1|Plur", "our": "1|Plur", "ours": "1|Plur", "ourselves": "1|Plur",
        "you": "2|Sing", "your": "2|Sing", "yours": "2|Sing", "yourself": "2|Sing", "yourselves": "2|Plur"
    }

    # Initialize lists to collect ratios for each type across all responses
    ratios = {"1|Sing": [], "1|Plur": [], "2": []}

    # Process each individual response
    for text in responses:
        doc = nlp(text)  # Run NLP pipeline on the text
        counts = {"1|Sing": 0, "1|Plur": 0, "2": 0}  # Initialize counts per type

        # Iterate through all tokens
        for token in doc:
            if token.pos_ == "PRON":  # Only consider pronouns
                key = manual_pronoun_map.get(token.text.lower())  # Map to type
                if key:
                    # Increment the appropriate count
                    if key.startswith("1|Sing"):
                        counts["1|Sing"] += 1
                    elif key.startswith("1|Plur"):
                        counts["1|Plur"] += 1
                    elif key.startswith("2"):
                        counts["2"] += 1

        # Compute ratios over the sum of first- and second-person pronouns
        total = sum(counts.values())
        for k in counts:
            ratios[k].append(counts[k] / total if total > 0 else 0.0)

    return ratios  # Return per-text ratio lists for each pronoun type

# === SETUP: STYLES AND CONTAINERS ===

# Define the stylistic categories
styles = ["clear", "friendly", "empathetic", "polite"]

# Corresponding responses for each style
responses_list = [res_c, res_f, res_e, res_p]

# Container to hold average pronoun ratios per style
pronoun_type_report = {style: {} for style in styles}

# Container to hold individual ratios per style, split by pronoun type
pronoun_type_scores = {"1|Sing": {}, "1|Plur": {}, "2": {}}

# === COMPUTE PRONOUN RATIOS PER STYLE ===

for style, resps in zip(styles, responses_list):
    # Compute per-text ratios for the current style
    style_ratios = compute_pronoun_type_ratios(resps, nlp=nlp)

    # Store average and per-response ratios
    for k in style_ratios:
        # Save the mean ratio for reporting
        pronoun_type_report[style][f"pronoun_ratio_{k}"] = np.mean(style_ratios[k])
        # Save individual ratios for statistical analysis
        pronoun_type_scores[k][style] = style_ratios[k]

# === REPORT OUTPUT TO FILE ===

with open('results.txt', 'a') as file:
    print("\nPronoun type ratio (average)", file=file)

    # Loop through each pronoun type separately
    for pron_type in ["1|Sing", "1|Plur", "2"]:
        print(f"\n\tPronoun type: {pron_type}", file=file)

        # Extract the per-text data for this pronoun type
        data = [pronoun_type_scores[pron_type][s] for s in styles]

        # Perform normality check using Shapiro-Wilk test
        shapiro_p = {s: shapiro(pronoun_type_scores[pron_type][s]).pvalue for s in styles}
        all_normal = all(p >= 0.05 for p in shapiro_p.values())

        # Select test based on normality
        if all_normal:
            test_type = "ANOVA"  # Parametric test
            stat, p_val = f_oneway(*data)
        else:
            test_type = "Kruskal-Wallis"  # Non-parametric test
            stat, p_val = kruskal(*data)

        # Output the test type and p-value
        print(f"\n\tStatistical Test: {test_type} (p = {p_val:.4f})\n", file=file)

        # If statistically significant, run post-hoc analysis
        if p_val < 0.05:
            # Flatten values and assign style labels
            all_vals = [val for sublist in data for val in sublist]
            all_labels = sum([[s]*len(pronoun_type_scores[pron_type][s]) for s in styles], [])
            df = pd.DataFrame({"score": all_vals, "style": all_labels})

            # Run Dunn's post-hoc test with Bonferroni correction
            posthoc = sp.posthoc_dunn(df, val_col="score", group_col="style", p_adjust="bonferroni")

            # Get annotation of significant differences
            sig_labels = summarize_significant_differences_annotated(posthoc, styles)

            # Print average ratio with significance marker
            for s in styles:
                mean_val = pronoun_type_report[s][f"pronoun_ratio_{pron_type}"]
                sig_diff = sig_labels[s]
                print(f"\t{s:<12}: {mean_val:.3f}  {sig_diff}", file=file)
        else:
            # No significant difference; print average ratio only
            for s in styles:
                mean_val = pronoun_type_report[s][f"pronoun_ratio_{pron_type}"]
                print(f"\t{s:<12}: {mean_val:.3f}", file=file)

In [None]:
# === PRESENT TENSE RATIO ANALYSIS ===

# Function to compute the ratio of present tense verbs per response
def compute_present_tense_ratios(responses: List[str], nlp) -> List[float]:
    # Set of POS tags corresponding to present tense forms
    present_tags = {"VBP", "VBZ", "VBG", "VB"}  # e.g., "run", "runs", "running", "to run"
    # Set of all verb-related POS tags (used to compute total verb count)
    all_verb_tags = {"VB", "VBD", "VBG", "VBN", "VBP", "VBZ"}

    ratios = []  # List to hold the present tense ratio for each response

    for text in responses:
        doc = nlp(text)  # Apply NLP pipeline to get POS tags
        present_count = 0
        verb_count = 0

        # Iterate through each token and count verbs
        for token in doc:
            if token.tag_ in all_verb_tags:  # Is it a verb?
                verb_count += 1
                if token.tag_ in present_tags:  # Is it present tense?
                    present_count += 1

        # Compute the ratio of present tense verbs to all verbs
        ratio = present_count / verb_count if verb_count > 0 else 0.0
        ratios.append(ratio)

    return ratios  # Return list of ratios (one per response)

# === INITIALIZE STYLE DATA ===

# Define different communication styles
styles = ["clear", "friendly", "empathetic", "polite"]

# Corresponding list of responses for each style
responses_list = [res_c, res_f, res_e, res_p]

# Containers for storing results
present_report = {style: {} for style in styles}  # Stores average present-tense ratio per style
present_ratios = {}  # Stores per-text present-tense ratios per style

# === COMPUTE RATIOS FOR EACH STYLE ===

for style, resps in zip(styles, responses_list):
    # Compute per-response present tense ratios for the given style
    ratios = compute_present_tense_ratios(resps, nlp=nlp)
    present_report[style]["present_tense_ratio"] = np.mean(ratios)  # Store mean ratio
    present_ratios[style] = ratios  # Store all individual ratios

# === REPORT RESULTS TO FILE ===

with open('results.txt', 'a') as file:
    print("\nPresent tense ratio (average)", file=file)

    # Prepare data for statistical testing
    data = [present_ratios[s] for s in styles]

    # Test for normality using Shapiro-Wilk test
    shapiro_p = {s: shapiro(present_ratios[s]).pvalue for s in styles}
    all_normal = all(p >= 0.05 for p in shapiro_p.values())

    # Choose appropriate test based on normality result
    if all_normal:
        test_type = "ANOVA"  # Parametric test for normally distributed data
        stat, p_val = f_oneway(*data)
    else:
        test_type = "Kruskal-Wallis"  # Non-parametric test for non-normal data
        stat, p_val = kruskal(*data)

    # Output the test type and p-value
    print(f"\n\tStatistical Test: {test_type} (p = {p_val:.4f})\n", file=file)

    # If test shows significant differences, perform post-hoc comparisons
    if p_val < 0.05:
        # Flatten all scores and generate corresponding style labels
        all_vals = [val for sublist in data for val in sublist]
        all_labels = sum([[s]*len(present_ratios[s]) for s in styles], [])
        df = pd.DataFrame({"score": all_vals, "style": all_labels})

        # Run Dunn’s post-hoc test with Bonferroni correction
        posthoc = sp.posthoc_dunn(df, val_col="score", group_col="style", p_adjust="bonferroni")

        # Get annotated summary of significant differences
        sig_labels = summarize_significant_differences_annotated(posthoc, styles)

        # Print average ratio with significance annotations
        for s in styles:
            mean_val = present_report[s]["present_tense_ratio"]
            sig_diff = sig_labels[s]
            print(f"\t{s:<12}: {mean_val:.3f}  {sig_diff}", file=file)
    else:
        # If no significant difference, print means only
        for s in styles:
            mean_val = present_report[s]["present_tense_ratio"]
            print(f"\t{s:<12}: {mean_val:.3f}", file=file)

In [None]:
# === CONDITIONALS RATIO ANALYSIS ===

# Function to compute how often conditional modal verbs appear in each text
def compute_conditionals_per_text(responses: List[str], nlp) -> List[float]:
    # Set of modal verbs commonly used to express conditionals
    modal_verbs = {"would", "could", "should", "might", "may"}
    # Include common negated forms of modal verbs
    negated_modals = {"wouldn't", "couldn't", "shouldn't"}

    counts = []  # List to store conditional counts per response

    for text in responses:
        doc = nlp(text)  # Run NLP pipeline to tokenize and tag
        count = 0  # Initialize count of conditionals in this text

        for token in doc:
            text_lower = token.text.lower()

            # Check for modal or negated modal verbs
            if text_lower in modal_verbs or text_lower in negated_modals:
                count += 1
            # Special case: contracted form "'d" + verb (e.g., "I'd go")
            elif text_lower == "'d":
                next_token = token.nbor(1) if token.i + 1 < len(doc) else None
                if next_token and next_token.tag_ == "VB":  # Ensure it’s followed by a base verb
                    count += 1

        counts.append(count)  # Store count for this response

    return counts  # Return list of counts per response

# === SETUP: STYLES AND CONTAINERS ===

# Define communication styles to analyze
styles = ["clear", "friendly", "empathetic", "polite"]

# Corresponding responses for each style
responses_list = [res_c, res_f, res_e, res_p]

# Initialize containers for average conditional usage and per-response counts
conditional_report = {style: {} for style in styles}
conditional_ratios = {}

# === COMPUTE CONDITIONAL USAGE PER STYLE ===

for style, resps in zip(styles, responses_list):
    counts = compute_conditionals_per_text(resps, nlp=nlp)  # Count conditionals per response
    conditional_report[style]["conditionals_per_text"] = np.mean(counts)  # Store average per text
    conditional_ratios[style] = counts  # Store all counts for later statistical analysis

# === REPORT OUTPUT TO FILE ===

with open('results.txt', 'a') as file:
    print("\nConditionals per text (average)", file=file)

    # Extract data for statistical analysis
    data = [conditional_ratios[s] for s in styles]

    # Check for normality of each group's data using Shapiro-Wilk test
    shapiro_p = {s: shapiro(conditional_ratios[s]).pvalue for s in styles}
    all_normal = all(p >= 0.05 for p in shapiro_p.values())  # Determine if parametric test is appropriate

    # Choose statistical test based on normality result
    if all_normal:
        test_type = "ANOVA"  # Use ANOVA if all groups are normally distributed
        stat, p_val = f_oneway(*data)
    else:
        test_type = "Kruskal-Wallis"  # Use Kruskal-Wallis if not all groups are normal
        stat, p_val = kruskal(*data)

    # Report test type and p-value
    print(f"\n\tStatistical Test: {test_type} (p = {p_val:.4f})\n", file=file)

    # If significant differences found, perform post-hoc analysis
    if p_val < 0.05:
        # Flatten all values and assign style labels to each
        all_vals = [val for sublist in data for val in sublist]
        all_labels = sum([[s] * len(conditional_ratios[s]) for s in styles], [])
        df = pd.DataFrame({"score": all_vals, "style": all_labels})

        # Run Dunn's post-hoc test with Bonferroni correction
        posthoc = sp.posthoc_dunn(df, val_col="score", group_col="style", p_adjust="bonferroni")

        # Annotate significant differences
        sig_labels = summarize_significant_differences_annotated(posthoc, styles)

        # Print average count and significance annotation for each style
        for s in styles:
            mean_val = conditional_report[s]["conditionals_per_text"]
            sig_diff = sig_labels[s]
            print(f"\t{s:<12}: {mean_val:.3f}  {sig_diff}", file=file)
    else:
        # If no significant differences, print only the means
        for s in styles:
            mean_val = conditional_report[s]["conditionals_per_text"]
            print(f"\t{s:<12}: {mean_val:.3f}", file=file)

## Sentiment and Emotion Analysis

In [None]:
with open('results.txt', 'a') as file:
    print("\n=== SENTIMENT AND EMOTION ===", file=file)

In [None]:
# === SENTIMENT ANALYSIS ===

# Function to compute sentiment scores for a list of texts using batch processing
def compute_sentiment_scores(texts: List[str], batch_size: int = 32) -> List[float]:
    # Load pipeline once outside loop
    pipe = pipeline("text-classification", model="cardiffnlp/twitter-roberta-base-sentiment", top_k=None, device=0 if torch.cuda.is_available() else -1)

    # Define label to score mapping
    label_scores = {'LABEL_0': 0, 'LABEL_1': 0.5, 'LABEL_2': 1}

    # Process in batches for efficiency
    results = pipe(texts, batch_size=batch_size)

    # Compute weighted average score for each item in batch
    sentiment_values = [
        sum(label_scores[entry['label']] * entry['score'] for entry in result)
        for result in results
    ]

    return sentiment_values

# === SETUP: CONTAINERS AND STYLES ===

# List of styles to analyze
styles = ["clear", "friendly", "empathetic", "polite"]

# Corresponding request and response sets
requests_list = [req_c, req_f, req_e, req_p]
responses_list = [res_c, res_f, res_e, res_p]

# Dictionary to hold mean sentiment scores per style
sentiment_report = {style: {} for style in styles}

# Store individual sentiment scores for statistical testing
sentiment_scores_requests = {}
sentiment_scores_responses = {}

# === COMPUTE SENTIMENT SCORES FOR REQUESTS AND RESPONSES ===

for style, reqs, resps in zip(styles, requests_list, responses_list):
    req_scores = compute_sentiment_scores(reqs)  # Sentiment for requests
    res_scores = compute_sentiment_scores(resps)  # Sentiment for responses

    # Store mean sentiment per style
    sentiment_report[style]["sentiment_requests"] = np.mean(req_scores)
    sentiment_report[style]["sentiment_responses"] = np.mean(res_scores)

    # Store individual scores for statistical testing
    sentiment_scores_requests[style] = req_scores
    sentiment_scores_responses[style] = res_scores

# === FUNCTION: BETWEEN-STYLES COMPARISON ===

# Run analysis across styles and output to file
def report_sentiment_analysis(scores_dict, label, file, report_key):
    data = [scores_dict[s] for s in styles]  # Gather scores by style

    # Test for normality using Shapiro-Wilk test
    shapiro_p = {s: shapiro(scores_dict[s]).pvalue for s in styles}
    all_normal = all(p >= 0.05 for p in shapiro_p.values())

    # Choose statistical test based on normality
    test_type = "ANOVA" if all_normal else "Kruskal-Wallis"
    stat, p_val = (f_oneway(*data) if all_normal else kruskal(*data))

    # Print test summary
    print(f"\nSentiment Score ({label.upper()}, average)", file=file)
    print(f"\n\tStatistical Test: {test_type} (p = {p_val:.4f})\n", file=file)

    # If results are statistically significant, perform post-hoc test
    if p_val < 0.05:
        # Flatten data and create corresponding labels
        all_vals = [val for sublist in data for val in sublist]
        all_labels = sum([[s] * len(scores_dict[s]) for s in styles], [])

        # Create DataFrame for analysis
        df = pd.DataFrame({"score": all_vals, "style": all_labels})

        # Run Dunn’s post-hoc test with Bonferroni correction
        posthoc = sp.posthoc_dunn(df, val_col="score", group_col="style", p_adjust="bonferroni")

        # Summarize significant pairwise differences
        sig_labels = summarize_significant_differences_annotated(posthoc, styles)

        # Output mean sentiment and significance annotations
        for s in styles:
            mean_val = sentiment_report[s][report_key]
            sig_diff = sig_labels[s]
            print(f"\t{s:<12}: {mean_val:.3f}  {sig_diff}", file=file)
    else:
        # If not significant, just report the means
        for s in styles:
            mean_val = sentiment_report[s][report_key]
            print(f"\t{s:<12}: {mean_val:.3f}", file=file)

# === FUNCTION: WITHIN-STYLE COMPARISON (REQUEST VS RESPONSE) ===

# Compare request vs. response sentiment within each style
def report_within_style_comparisons(file):
    print("\nSentiment Score: Request vs. Response (within style)\n", file=file)

    for s in styles:
        req = sentiment_scores_requests[s]
        res = sentiment_scores_responses[s]

        # Test normality for both groups
        p_req = shapiro(req).pvalue
        p_res = shapiro(res).pvalue

        # Choose appropriate test
        if p_req >= 0.05 and p_res >= 0.05:
            test = "t-test"
            stat, pval = ttest_ind(req, res)
        else:
            test = "Rank-sum"
            stat, pval = ranksums(req, res)

        # Output result of within-style test
        print(f"\t{s:<12}: {test:<8} (p = {pval:.4f})", file=file)

# === OUTPUT RESULTS TO FILE ===

with open('results.txt', 'a') as file:
    # Compare sentiment across styles for requests and responses separately
    report_sentiment_analysis(sentiment_scores_requests, "Request", file, "sentiment_requests")
    report_sentiment_analysis(sentiment_scores_responses, "Response", file, "sentiment_responses")

    # Compare request vs. response sentiment within each style
    report_within_style_comparisons(file)

In [None]:
# === EMOTION ANALYSIS ===

def compute_emotion_scores(responses: List[str], batch_size: int = 32) -> Dict[str, List[float]]:
    # Load pipeline once with GPU if available
    pipe = pipeline(
        "text-classification",
        model="j-hartmann/emotion-english-distilroberta-base",
        top_k=None,
        device=0 if torch.cuda.is_available() else -1
    )

    # Run batch inference
    results = pipe(responses, batch_size=batch_size)

    # Store scores for each emotion label across all responses
    emotion_lists = defaultdict(list)

    for response_result in results:
        for item in response_result:
            emotion_lists[item['label']].append(item['score'])

    return emotion_lists  # Dict[emotion → list of scores]

# === INITIALIZATION ===

# Define styles
styles = ["clear", "friendly", "empath.", "polite"]

# Response and request lists by style
requests_list = [req_c, req_f, req_e, req_p]
responses_list = [res_c, res_f, res_e, res_p]

# Containers to store average scores
emotion_report_resp = {style: {} for style in styles}  # Emotion means for responses
emotion_report_req = {style: {} for style in styles}  # Emotion means for requests

# Containers to store raw scores
emotion_scores_resp = defaultdict(dict)  # Dict[emotion][style] → list of response scores
emotion_scores_req = defaultdict(dict)  # Dict[emotion][style] → list of request scores

# === COMPUTE EMOTION SCORES FOR EACH STYLE ===

for style, reqs, resps in zip(styles, requests_list, responses_list):
    # Get emotion scores (per response/request) for each style
    emotion_lists_resp = compute_emotion_scores(resps)
    emotion_lists_req = compute_emotion_scores(reqs)

    # Store average and raw scores for responses
    for emotion, scores in emotion_lists_resp.items():
        emotion_report_resp[style][emotion] = np.mean(scores)
        emotion_scores_resp[emotion][style] = scores

    # Store average and raw scores for requests
    for emotion, scores in emotion_lists_req.items():
        emotion_report_req[style][emotion] = np.mean(scores)
        emotion_scores_req[emotion][style] = scores

# === REPORT EMOTION SCORES TO FILE ===

with open('results.txt', 'a') as file:
    # --- RESPONSE-BASED EMOTIONS ---

    print("\nEmotion Score (responses, average)\n", file=file)
    header = f"{'Emotion':<14}" + "".join([f"{s:<10}" for s in styles]) + "  HM   "
    print(header, file=file)
    print("-" * len(header), file=file)

    for emotion in sorted(emotion_scores_resp.keys()):
        # Skip if emotion not present for all styles
        data = [emotion_scores_resp[emotion][s] for s in styles if s in emotion_scores_resp[emotion]]
        if len(data) < len(styles):
            continue

        # Statistical testing (ANOVA or Kruskal-Wallis depending on normality)
        shapiro_p = {s: shapiro(emotion_scores_resp[emotion][s]).pvalue for s in styles}
        all_normal = all(p >= 0.05 for p in shapiro_p.values())
        test_type = "ANOVA" if all_normal else "Kruskal-Wallis"
        stat, p_val = (f_oneway(*data) if all_normal else kruskal(*data))

        # Get means per style
        means = [emotion_report_resp[s].get(emotion, 0.0) for s in styles]

        # Post-hoc if significant
        if p_val < 0.05:
            all_vals = [val for sublist in data for val in sublist]
            all_labels = sum([[s] * len(emotion_scores_resp[emotion][s]) for s in styles], [])
            df = pd.DataFrame({"score": all_vals, "style": all_labels})
            posthoc = sp.posthoc_dunn(df, val_col="score", group_col="style", p_adjust="bonferroni")
            sig_labels = summarize_significant_differences_annotated(posthoc, styles)
            annotations = [sig_labels[s] for s in styles]
        else:
            annotations = ["" for _ in styles]

        # Harmonic mean (as diversity/consistency metric)
        hm = len(means) / sum(1/m for m in means if m > 0) if all(m > 0 for m in means) else 0

        # Format row for output
        row = f"{emotion:<14}" + "".join([f"{m:.3f} {a:<4}" for m, a in zip(means, annotations)]) + f"{hm:>7.3f}"
        print(row, file=file)

    # --- REQUEST-BASED EMOTIONS ---

    print("\nEmotion Score (requests, average)\n", file=file)
    print(header, file=file)
    print("-" * len(header), file=file)

    for emotion in sorted(emotion_scores_req.keys()):
        data = [emotion_scores_req[emotion][s] for s in styles if s in emotion_scores_req[emotion]]
        if len(data) < len(styles):
            continue

        shapiro_p = {s: shapiro(emotion_scores_req[emotion][s]).pvalue for s in styles}
        all_normal = all(p >= 0.05 for p in shapiro_p.values())
        test_type = "ANOVA" if all_normal else "Kruskal-Wallis"
        stat, p_val = (f_oneway(*data) if all_normal else kruskal(*data))

        means = [emotion_report_req[s].get(emotion, 0.0) for s in styles]

        if p_val < 0.05:
            all_vals = [val for sublist in data for val in sublist]
            all_labels = sum([[s] * len(emotion_scores_req[emotion][s]) for s in styles], [])
            df = pd.DataFrame({"score": all_vals, "style": all_labels})
            posthoc = sp.posthoc_dunn(df, val_col="score", group_col="style", p_adjust="bonferroni")
            sig_labels = summarize_significant_differences_annotated(posthoc, styles)
            annotations = [sig_labels[s] for s in styles]
        else:
            annotations = ["" for _ in styles]

        hm = len(means) / sum(1/m for m in means if m > 0) if all(m > 0 for m in means) else 0
        row = f"{emotion:<14}" + "".join([f"{m:.3f} {a:<4}" for m, a in zip(means, annotations)]) + f"{hm:>7.3f}"
        print(row, file=file)

    # --- WITHIN-STYLE COMPARISON (REQUEST vs RESPONSE) ---

    print("\nEmotion Score (within style)\n", file=file)
    header_within = f"{'Emotion':<14}" + "".join([f"{s:<10}" for s in styles])
    print(header_within, file=file)
    print("-" * len(header_within), file=file)

    for emotion in sorted(emotion_scores_resp.keys()):
        row = f"{emotion:<14}"
        for s in styles:
            req = emotion_scores_req[emotion].get(s, None)
            res = emotion_scores_resp[emotion].get(s, None)
            if not req or not res:
                row += f"{'N/A':<14}"
                continue

            # Choose test based on normality of each group
            p_req = shapiro(req).pvalue
            p_res = shapiro(res).pvalue
            if p_req >= 0.05 and p_res >= 0.05:
                test_abbr = "TT"  # t-test
                _, pval = ttest_ind(req, res)
            else:
                test_abbr = "RS"  # Rank-sum (non-parametric)
                _, pval = ranksums(req, res)

            star = "*" if pval < 0.05 else ""  # Mark significance
            row += f"{test_abbr}{star:<8}"  # Add test abbreviation and sig marker
        print(row, file=file)

## Additional Metrics

In [None]:
with open('results.txt', 'a') as file:
    print("\n=== ADDITIONAL SCORES ===", file=file)

In [None]:
# === FLESCH-KINCAID READABILITY ANALYSIS ===

# Function to compute Flesch Reading Ease scores for a list of texts
def compute_fk_scores(responses: List[str]) -> List[float]:
    # Applies the Flesch Reading Ease formula to each response
    return [textstat.flesch_reading_ease(text) for text in responses]

# === SETUP: CONTAINERS AND STYLE GROUPS ===

# List of communication styles to evaluate
styles = ["clear", "friendly", "empathetic", "polite"]

# Corresponding list of response sets by style
responses_list = [res_c, res_f, res_e, res_p]

# Dictionary to store mean readability scores per style
fk_report = {style: {} for style in styles}

# Dictionary to store per-text FK scores for each style
fk_scores = {}

# === COMPUTE FK SCORES FOR EACH STYLE ===

for style, resps in zip(styles, responses_list):
    scores = compute_fk_scores(resps)  # Compute FK score for each response
    fk_report[style]["flesch_kincaid"] = np.mean(scores)  # Store average FK score
    fk_scores[style] = scores  # Store individual scores for statistical analysis

# === REPORT OUTPUT TO FILE ===

with open('results.txt', 'a') as file:
    print("\nReadibility Score (Flesch-Kincaid Reading Ease, average)", file=file)

    # Gather per-style score lists for statistical analysis
    data = [fk_scores[s] for s in styles]

    # Test each group's score distribution for normality (Shapiro-Wilk)
    shapiro_p = {s: shapiro(fk_scores[s]).pvalue for s in styles}
    all_normal = all(p >= 0.05 for p in shapiro_p.values())  # Check if all are normal

    # Choose statistical test based on normality result
    if all_normal:
        test_type = "ANOVA"  # Parametric test for normal data
        stat, p_val = f_oneway(*data)
    else:
        test_type = "Kruskal-Wallis"  # Non-parametric test for non-normal data
        stat, p_val = kruskal(*data)

    # Print test result
    print(f"\n\tStatistical Test: {test_type} (p = {p_val:.4f})\n", file=file)

    # If significant differences are found, run post-hoc analysis
    if p_val < 0.05:
        # Flatten all scores and create corresponding style labels
        all_vals = [val for sublist in data for val in sublist]
        all_labels = sum([[s] * len(fk_scores[s]) for s in styles], [])
        df = pd.DataFrame({"score": all_vals, "style": all_labels})

        # Run Dunn's post-hoc test with Bonferroni correction
        posthoc = sp.posthoc_dunn(df, val_col="score", group_col="style", p_adjust="bonferroni")

        # Summarize which pairs are significantly different
        sig_labels = summarize_significant_differences_annotated(posthoc, styles)

        # Output average score with significance indicators
        for s in styles:
            mean_val = fk_report[s]["flesch_kincaid"]
            sig_diff = sig_labels[s]
            print(f"\t{s:<12}: {mean_val:.3f}  {sig_diff}", file=file)
    else:
        # If no significant differences, just print average scores
        for s in styles:
            mean_val = fk_report[s]["flesch_kincaid"]
            print(f"\t{s:<12}: {mean_val:.3f}", file=file)

In [None]:
# === FORMALITY ANALYSIS ===

# Function to compute binary formality scores for a list of texts
def compute_formality_scores(responses: List[str]) -> List[float]:
    # Load a pre-trained RoBERTa-based classifier for formality ranking
    pipe = pipeline("text-classification", model="s-nlp/roberta-base-formality-ranker", device=0 if torch.cuda.is_available() else -1)

    scores = []  # List to store formality scores

    for text in responses:
        result = pipe(text)[0]['label']  # Get the predicted label for the text

        # Assign score: 1.0 for "formal", 0.0 for "informal"
        if result == "formal":
            scores.append(1.0)
        elif result == "informal":
            scores.append(0.0)

    return scores  # Return list of binary formality scores

# === INITIAL SETUP ===

# Define communication styles being analyzed
styles = ["clear", "friendly", "empathetic", "polite"]

# Corresponding response sets for each style
responses_list = [res_c, res_f, res_e, res_p]

# Containers to store mean formality scores and per-text scores
formality_report = {style: {} for style in styles}  # Average score per style
formality_scores = {}  # List of individual scores per style

# === COMPUTE FORMALITY SCORES PER STYLE ===

for style, resps in zip(styles, responses_list):
    scores = compute_formality_scores(resps)  # Get binary scores per response
    formality_report[style]["formality"] = np.mean(scores)  # Store average formality score
    formality_scores[style] = scores  # Store raw scores for statistical analysis

# === OUTPUT RESULTS TO FILE ===

with open('results.txt', 'a') as file:
    print("\nFormality Score (average)", file=file)

    # Prepare data for statistical analysis
    data = [formality_scores[s] for s in styles]

    # Test for normality using Shapiro-Wilk
    shapiro_p = {s: shapiro(formality_scores[s]).pvalue for s in styles}
    all_normal = all(p >= 0.05 for p in shapiro_p.values())  # Check if all distributions are normal

    # Choose statistical test based on normality
    test_type = "ANOVA" if all_normal else "Kruskal-Wallis"
    stat, p_val = (f_oneway(*data) if all_normal else kruskal(*data))

    # Print overall test result
    print(f"\n\tStatistical Test: {test_type} (p = {p_val:.4f})\n", file=file)

    # If statistically significant, perform post-hoc test
    if p_val < 0.05:
        # Flatten score data and assign labels
        all_vals = [val for sublist in data for val in sublist]
        all_labels = sum([[s]*len(formality_scores[s]) for s in styles], [])
        df = pd.DataFrame({"score": all_vals, "style": all_labels})

        # Perform Dunn's post-hoc test with Bonferroni correction
        posthoc = sp.posthoc_dunn(df, val_col="score", group_col="style", p_adjust="bonferroni")

        # Summarize pairwise significance annotations
        sig_labels = summarize_significant_differences_annotated(posthoc, styles)

        # Print average formality score with significance indicators
        for s in styles:
            mean_val = formality_report[s]["formality"]
            sig_diff = sig_labels[s]
            print(f"\t{s:<12}: {mean_val:.3f}  {sig_diff}", file=file)

    else:
        # If no significant differences, print average scores only
        for s in styles:
            mean_val = formality_report[s]["formality"]
            print(f"\t{s:<12}: {mean_val:.3f}", file=file)

In [None]:
# === CONCRETENESS ANALYSIS ===

# Function to compute concreteness scores for a list of responses
def compute_concreteness_scores(responses: List[str], file: str, nlp) -> List[float]:
    # --- Load and parse the concreteness lexicon from raw text ---
    concreteness_dict = {}  # Dictionary to map words to their concreteness scores

    for line in file.splitlines():
        # Match lines with a word followed by a floating-point score
        match = re.match(r"^(\w+)\s+(\d{1,3}(?:\.\d+)?)", line.strip())
        if match:
            word = match.group(1).lower()
            score = float(match.group(2))
            concreteness_dict[word] = score  # Add to dictionary

    scores = []  # List to store the concreteness score per response

    for text in responses:
        doc = nlp(text)  # Apply NLP processing (e.g., lemmatization)

        # Get concreteness scores for each lemmatized token, if available
        token_scores = [
            concreteness_dict[token.lemma_.lower()]
            for token in doc
            if not token.is_punct and token.lemma_.lower() in concreteness_dict
        ]

        # Count number of non-punctuation tokens for normalization
        num_tokens = len([t for t in doc if not t.is_punct])

        # Compute average concreteness score, scaled to 0–1 range
        score = sum(token_scores) / (num_tokens * 100) if num_tokens > 0 else 0
        scores.append(score / 7)  # Divide by max possible score (7.0) to normalize

    return scores  # Return concreteness scores per response

# === INITIAL SETUP ===

# Define the communication styles being analyzed
styles = ["clear", "friendly", "empathetic", "polite"]

# Corresponding responses for each style
responses_list = [res_c, res_f, res_e, res_p]

# Containers for results
concreteness_report = {style: {} for style in styles}  # Average score per style
concreteness_scores = {}  # Individual scores per style

# === COMPUTE CONCRETENESS SCORES PER STYLE ===

for style, resps in zip(styles, responses_list):
    # Compute concreteness scores for all responses of a given style
    scores = compute_concreteness_scores(resps, concreteness_txt, nlp)
    concreteness_report[style]["concreteness"] = np.mean(scores)  # Store average
    concreteness_scores[style] = scores  # Store raw scores

# === OUTPUT RESULTS TO FILE ===

with open('results.txt', 'a') as file:
    print("\nConcreteness Score (average)", file=file)

    # Prepare data for statistical testing
    data = [concreteness_scores[s] for s in styles]

    # Perform Shapiro-Wilk normality test for each style
    shapiro_p = {s: shapiro(concreteness_scores[s]).pvalue for s in styles}
    all_normal = all(p >= 0.05 for p in shapiro_p.values())  # Determine if all distributions are normal

    # Select statistical test based on normality
    test_type = "ANOVA" if all_normal else "Kruskal-Wallis"
    stat, p_val = (f_oneway(*data) if all_normal else kruskal(*data))

    # Print test summary
    print(f"\n\tStatistical Test: {test_type} (p = {p_val:.4f})\n", file=file)

    # If significant differences are found, perform post-hoc analysis
    if p_val < 0.05:
        # Flatten the scores and assign style labels to each entry
        all_vals = [val for sublist in data for val in sublist]
        all_labels = sum([[s]*len(concreteness_scores[s]) for s in styles], [])

        # Create a DataFrame for post-hoc analysis
        df = pd.DataFrame({"score": all_vals, "style": all_labels})

        # Run Dunn’s test with Bonferroni correction for multiple comparisons
        posthoc = sp.posthoc_dunn(df, val_col="score", group_col="style", p_adjust="bonferroni")

        # Annotate significant differences
        sig_labels = summarize_significant_differences_annotated(posthoc, styles)

        # Print average score and significance annotations
        for s in styles:
            mean_val = concreteness_report[s]["concreteness"]
            sig_diff = sig_labels[s]
            print(f"\t{s:<12}: {mean_val:.3f}  {sig_diff}", file=file)
    else:
        # No significant differences: print average scores only
        for s in styles:
            mean_val = concreteness_report[s]["concreteness"]
            print(f"\t{s:<12}: {mean_val:.3f}", file=file)

# Text Generation Tasks

In this section, different models are adapted on the RESPONSible Service dataset and tested for text generation.

## Text Generation Task with ICL

In [None]:
%pip install -r requirements_1.txt

In [None]:
import os
os.environ["CUDA_VISIBLE_DEVICES"]="0"
os.environ["HF_TOKEN"] = "HF_KEY"

In [None]:
from transformers import pipeline
import pandas as pd
import json
import torch

# Define the model ID for the text generation pipeline
model_id = "unsloth/Llama-3.2-1B-Instruct"

def classify_response_style(test_set, icl_set, stylistic_feature, model_name, num_shots=0):
    """
    Generates rephrased customer service responses using ICL (in-context learning)
    to match a given stylistic feature (e.g., 'friendly', 'clear').

    Parameters:
    - test_set: DataFrame with examples to rephrase (must include 'response0')
    - icl_set: DataFrame with few-shot examples including 'response0' and 'response1'
    - stylistic_feature: the target style to enhance (e.g., "empathetic")
    - model_name: name or path of the Hugging Face model to use
    - num_shots: number of ICL examples to prepend to each prompt

    Returns:
    - A list of dictionaries containing 'id', 'response0', and 'generated_response1'
    """

    # Create a text generation pipeline using the specified model
    generator = pipeline(
        model=model_name,
        max_new_tokens=40,  # Limit the output length
        device = 0 if torch.cuda.is_available() else -1,  # Use GPU (device=0) if available
        temperature=1.0,  # Sampling temperature
        top_p=1.0  # Top-p sampling (nucleus sampling)
    )

    # === FEW-SHOT PROMPT CONSTRUCTION ===
    few_shot_prompt = ""
    for _, shot in icl_set.iloc[:num_shots].iterrows():
        # Clean and extract the original and target responses
        shot_response0 = str(shot["response0"]).strip()
        shot_response1 = str(shot["response1"]).strip()

        # Format the few-shot example to show how a response is made more stylistic
        few_shot_prompt += (
            f"Instruction: A customer service agent responded to a message from a customer in the following way.\n"
            f"Response: {shot_response0}\n"
            f"Rephrase the response to make it more {stylistic_feature}. Only output the rephrased response. Do not add any explanation or extra text.\n"
            f"More {stylistic_feature} response: {shot_response1}\n\n"
        )

    # === CONSTRUCT PROMPTS FOR TEST SET ===
    prompts = []
    for _, row in test_set.iterrows():
        response0 = str(row["response0"]).strip()

        # Append the few-shot prompt and the test response to create a full prompt
        prompt = (
            few_shot_prompt +
            f"Instruction: A customer service agent responded to a message from a customer in the following way.\n"
            f"Response: {response0}\n"
            f"Rephrase the response to make it more {stylistic_feature}. Only output the rephrased response. Do not add any explanation or extra text.\n"
            f"More {stylistic_feature} response:"
        )
        prompts.append(prompt)

    # === TEXT GENERATION ===
    outputs = generator(prompts, batch_size=8)

    # === EXTRACT GENERATED RESPONSES ===
    predictions = []
    for i, (row, out) in enumerate(zip(test_set.itertuples(), outputs)):
        if isinstance(out, list) and isinstance(out[0], dict):
            generated_text = out[0]["generated_text"]
        elif isinstance(out, dict):
            generated_text = out["generated_text"]
        else:
            raise ValueError("Unexpected output format from the generator")

        generated_response1 = generated_text.split(f"More {stylistic_feature} response:")[-1].strip()
        if "\n\nInstruction:" in generated_response1:
            generated_response1 = generated_response1.split(f"\n\nInstruction:")[0].strip()

        prediction = {
            "id": getattr(row, "id", i),
            "response0": row.response0.strip(),
            "generated_response1": generated_response1
        }
        predictions.append(prediction)

    return predictions

In [None]:
# === RETRIEVE PARQUET FILES ===
# Each path points to a different stylistic dataset
path_clear = "RESPONSible Service/dataset_clear.parquet"
path_friendly = "RESPONSible Service/dataset_friendly.parquet"
path_empathetic = "RESPONSible Service/dataset_empathetic.parquet"
path_polite = "RESPONSible Service/dataset_polite.parquet"

# Read the Parquet content into Pandas DataFrames
clear_df = pd.read_parquet(path_clear)
friendly_df = pd.read_parquet(path_friendly)
empathetic_df = pd.read_parquet(path_empathetic)
polite_df = pd.read_parquet(path_polite)

# === SHUFFLE DATASETS (for randomness and reproducibility) ===
# Shuffle each dataset using the same random seed and reset the index
clear_df = clear_df.sample(frac=1, random_state=42).reset_index(drop=True)
friendly_df = friendly_df.sample(frac=1, random_state=42).reset_index(drop=True)
empathetic_df = empathetic_df.sample(frac=1, random_state=42).reset_index(drop=True)
polite_df = polite_df.sample(frac=1, random_state=42).reset_index(drop=True)

# === SELECT FEW-SHOT (ICL) EXAMPLES FROM FIRST 800 INSTANCES ===
# Take 5 random examples (for ICL prompting) from the first 800 rows of each DataFrame
clear_icl = clear_df.iloc[:800].sample(n=5, random_state=42).reset_index(drop=True)
friendly_icl = friendly_df.iloc[:800].sample(n=5, random_state=42).reset_index(drop=True)
empathetic_icl = empathetic_df.iloc[:800].sample(n=5, random_state=42).reset_index(drop=True)
polite_icl = polite_df.iloc[:800].sample(n=5, random_state=42).reset_index(drop=True)

# === USE REMAINING DATA AS TEST SET ===
# Use the rest (from row 800 onward) as test data for each stylistic category
clear_test = clear_df.iloc[800:].reset_index(drop=True)
friendly_test = friendly_df.iloc[800:].reset_index(drop=True)
empathetic_test = empathetic_df.iloc[800:].reset_index(drop=True)
polite_test = polite_df.iloc[800:].reset_index(drop=True)

# === SETUP LISTS FOR LOOPING ===
stylistic_features = ["clear", "friendly", "empathetic", "polite"]
test_sets = [clear_test, friendly_test, empathetic_test, polite_test]
icl_sets = [clear_icl, friendly_icl, empathetic_icl, polite_icl]

# === MAIN LOOP: RUN INFERENCE FOR EACH STYLE AND EACH FEW-SHOT SETTING ===
all_results = {}

for feature, test_set, icl_set in zip(stylistic_features, test_sets, icl_sets):
    print(f"Processing started: {feature}\n")
    all_results[feature] = {}

    # Test from 0-shot to 5-shot
    for n_shot in range(6):
        print(f"\tFew-shots stage: {n_shot}-shots")
        # Run classification/generation for current feature and few-shot setting
        results = classify_response_style(test_set, icl_set, feature, model_id, num_shots=n_shot)
        all_results[feature][f"{n_shot}-shot"] = results

    print(f"\n\tProcessing ended: {feature}\n")

# === SAVE RESULTS TO JSON FILE ===
# Write the nested dictionary containing all results to a local file
with open("icl_predictions.json", "w", encoding="utf-8") as f:
    json.dump(all_results, f, indent=4, ensure_ascii=False)

## Text Generation Task with SFT

In [None]:
%pip install -r requirements_2.txt

In [None]:
import os
os.environ["CUDA_VISIBLE_DEVICES"]="0"
os.environ["HF_TOKEN"] = "HF_KEY"

In [None]:
import pandas as pd
from transformers import AutoTokenizer, TrainingArguments
from datasets import Dataset
from multiprocessing import cpu_count
from trl import SFTTrainer
from peft import LoraConfig, get_peft_model

# === RETRIEVE PARQUET FILES ===
# Each path points to a different stylistic dataset
path_clear = "RESPONSible Service/dataset_clear.parquet"
path_friendly = "RESPONSible Service/dataset_friendly.parquet"
path_empathetic = "RESPONSible Service/dataset_empathetic.parquet"
path_polite = "RESPONSible Service/dataset_polite.parquet"

# Read the Parquet content into Pandas DataFrames
clear_df = pd.read_parquet(path_clear)
friendly_df = pd.read_parquet(path_friendly)
empathetic_df = pd.read_parquet(path_empathetic)
polite_df = pd.read_parquet(path_polite)

# === SHUFFLE EACH DATAFRAME TO AVOID ORDER BIAS ===
clear_df = clear_df.sample(frac=1, random_state=42).reset_index(drop=True)
friendly_df = friendly_df.sample(frac=1, random_state=42).reset_index(drop=True)
empathetic_df = empathetic_df.sample(frac=1, random_state=42).reset_index(drop=True)
polite_df = polite_df.sample(frac=1, random_state=42).reset_index(drop=True)

# === GROUP DATAFRAMES AND STYLE NAMES FOR ITERATION ===
dfs = [clear_df, friendly_df, empathetic_df, polite_df]
names = ["clear", "friendly", "empathetic", "polite"]

# === PREPARE A DICTIONARY TO HOLD TRAIN/TEST SPLITS IN CONVERSATION FORMAT ===
raw_datasets_dict = {
    "clear": {"train": None, "test": None},
    "friendly": {"train": None, "test": None},
    "empathetic": {"train": None, "test": None},
    "polite": {"train": None, "test": None}
}

# === FORMAT EACH STYLE'S DATA INTO CONVERSATIONAL PROMPTS ===
for df, name in zip(dfs, names):

    # Use only the columns containing original and target responses
    df = df[['response0', 'response1']]

    # Split into training and testing sets (first 800 for training, rest for testing)
    train_df = df.iloc[0:800].copy()
    test_df = df.iloc[800:].copy()

    # Function to format each row as a conversation for fine-tuning
    def format_conversation(row):
        return {
            "messages": [
                {
                    "role": "system",
                    "content": (
                        f"Rephrase the following customer service message to make it more {name}. "
                        "Only output the rephrased message. Do not add any explanation or extra text."
                    )
                },
                {"role": "user", "content": row["response0"]},  # Original response
                {"role": "assistant", "content": row["response1"]}  # Stylized (target) response
            ]
        }

    # Apply formatting to all training and test rows and save them to the dictionary
    raw_datasets_dict[name]["train"] = train_df.apply(format_conversation, axis=1).tolist()
    raw_datasets_dict[name]["test"] = test_df.apply(format_conversation, axis=1).tolist()

In [None]:
# === DEFINE THE BASE MODEL TO BE USED ===
model_id = "unsloth/Llama-3.2-1B-Instruct"  # A small instruct-tuned version of LLaMA 3 from Unsloth

# === LOAD TOKENIZER FROM THE PRETRAINED MODEL ===
tokenizer = AutoTokenizer.from_pretrained(model_id)

# === ENSURE A PAD TOKEN IS SET ===
# Some models do not define a pad_token by default. Here, we set it to the eos_token.
if tokenizer.pad_token_id is None:
  tokenizer.pad_token_id = tokenizer.eos_token_id

# === HANDLE CASES WITH UNREASONABLY LARGE MAX LENGTH ===
# Some tokenizers might return an extremely high default max length.
# If that's the case, reset it to a practical limit (2048 tokens).
if tokenizer.model_max_length > 100_000:
  tokenizer.model_max_length = 2048

# === DEFINE A CUSTOM CHAT TEMPLATE COMPATIBLE WITH UNSLOTH ===
# This template formats a list of messages into a special format
# used by Unsloth for training or inference in chat-based models.

DEFAULT_CHAT_TEMPLATE = """{% for message in messages %}
{% if message['role'] == 'system' %}
<|start_header_id|>system<|end_header_id|>
{{ message['content'] }}<|eot_id|>
{% elif message['role'] == 'user' %}
<|start_header_id|>user<|end_header_id|>
{{ message['content'] }}<|eot_id|>
{% elif message['role'] == 'assistant' %}
<|start_header_id|>assistant<|end_header_id|>
{{ message['content'] }}<|eot_id|>
{% endif %}
{% if loop.last and add_generation_prompt %}
<|start_header_id|>assistant<|end_header_id|>
{% endif %}
{% endfor %}"""

# === APPLY THE CUSTOM CHAT TEMPLATE TO THE TOKENIZER ===
# This ensures that when passing conversational data, the tokenizer will format it properly.
tokenizer.chat_template = DEFAULT_CHAT_TEMPLATE

In [None]:
# === FUNCTION TO APPLY THE CHAT TEMPLATE TO EACH EXAMPLE ===
def apply_chat_template(example, tokenizer):
    messages = example["messages"]

    # If the first message isn't from the system, insert an empty system prompt
    # This ensures consistency with the expected format (system → user → assistant)
    if messages[0]["role"] != "system":
        messages.insert(0, {"role": "system", "content": ""})

    # Apply the chat template using the tokenizer (produces a formatted string)
    # We do not tokenize it yet — just format into a single text field
    example["text"] = tokenizer.apply_chat_template(messages, tokenize=False)

    return example

def tokenize_for_causal_lm(example):
    tokenized = tokenizer(
        example["text"],
        truncation=True,
        padding="max_length",
        max_length=512
    )
    tokenized["labels"] = tokenized["input_ids"].copy()
    return tokenized

# === CONVERT RAW PYTHON LISTS INTO HUGGING FACE DATASETS ===
# Each dataset corresponds to a specific style: clear, friendly, empathetic, polite
raw_train_clear = Dataset.from_list(raw_datasets_dict["clear"]["train"])
raw_train_friendly = Dataset.from_list(raw_datasets_dict["friendly"]["train"])
raw_train_empathetic = Dataset.from_list(raw_datasets_dict["empathetic"]["train"])
raw_train_polite = Dataset.from_list(raw_datasets_dict["polite"]["train"])

# === GROUP ALL DATASETS INTO A LIST FOR ITERATION ===
raw_datasets_list = [raw_train_clear, raw_train_friendly, raw_train_empathetic, raw_train_polite]

# === INITIALIZE A DICTIONARY TO STORE THE FINAL FORMATTED TRAINING DATASETS ===
train_dataset_dict = {"clear": None, "friendly": None, "empathetic": None, "polite": None}

# === APPLY THE CHAT TEMPLATE TO ALL DATASETS IN PARALLEL ===
for raw_dataset, name in zip(raw_datasets_list, names):

    # Get the column names (not used further here, but useful for debugging or extensions)
    column_names = list(raw_dataset.features)

    # Placeholder for any columns to be removed (not used yet)
    remove_columns = None

    # Step 1: apply chat template
    chat_formatted = raw_dataset.map(
        apply_chat_template,
        num_proc=cpu_count(),
        fn_kwargs={"tokenizer": tokenizer},
        desc="Applying chat template"
    )

    # Step 2: tokenize for causal LM
    train_dataset_dict[name] = chat_formatted.map(
        tokenize_for_causal_lm,
        num_proc=cpu_count(),
        desc="Tokenizing text for causal LM"
    )

In [None]:
from transformers import BitsAndBytesConfig
import torch

# === SET UP QUANTIZATION CONFIGURATION ===
# Load the model in 4-bit precision to save memory and improve speed (if supported)
quantization_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",  # NormalFloat4: more efficient quantization
    bnb_4bit_compute_dtype="bfloat16",  # Use bfloat16 during computation for better accuracy
)

# Determine the device map: use GPU if available
device_map = {"": torch.cuda.current_device()} if torch.cuda.is_available() else None

# === ADDITIONAL MODEL ARGUMENTS ===
model_kwargs = dict(
    attn_implementation="flash_attention_2",  # Use Flash Attention if available (speeds up attention computations)
    torch_dtype="auto",
    use_cache=False,  # Disable caching for training (gradient checkpointing)
    device_map=device_map,
    quantization_config=quantization_config,
)

In [None]:
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig, GenerationConfig
import torch
import json

# GENERATION OF BASELINE EXAMPLES WITH NON-FINE-TUNED MODEL

# === INITIALIZE EMPTY DICTIONARY TO STORE PREDICTIONS FOR EACH STYLE ===
baseline_preds = {
    "clear": [],
    "friendly": [],
    "empathetic": [],
    "polite": []
}

# === LOAD MODEL ===

# Load quantized model for causal language modeling
model = AutoModelForCausalLM.from_pretrained(
    model_id,
    **model_kwargs
)

# === INFERENCE LOOP FOR EACH STYLE (clear, friendly, empathetic, polite) ===
for name in names:
    # Retrieve the list of test examples for this style
    test_set_list = [item for item in raw_datasets_dict[name]["test"]]

    # Iterate through each test example
    for i, entry in enumerate(test_set_list):
        # Use only the system and user messages as input to the model
        messages = entry["messages"][:2]

        # Tokenize the conversation prompt
        input_ids = tokenizer.apply_chat_template(
            messages,
            truncation=True,
            add_generation_prompt=True,
            return_tensors="pt"
        ).to(model.device)

        # Generate a response using sampling
        outputs = model.generate(
            input_ids=input_ids,
            max_new_tokens=40,       # Limit response length
            do_sample=True,          # Use sampling instead of greedy decoding
            temperature=1.0,         # Sampling temperature
            top_p=1.0                # Top-p sampling (p=1.0 = no restriction)
        )

        # Decode generated tokens into text
        generated_text = tokenizer.batch_decode(outputs, skip_special_tokens=True)[0]

        # Extract the assistant's response (after the assistant header)
        predicted_reply = generated_text.split("\nassistant\n")[1].strip()

        # Store the prediction in the dictionary
        baseline_preds[name].append({
            "id": i,
            "response0": messages[1]["content"],  # Original user input
            "generated_response1": predicted_reply  # Model-generated stylized output
        })

# === SAVE ALL GENERATED PREDICTIONS TO A JSON FILE ===
with open("baseline_predictions.json", "w") as f:
    json.dump(baseline_preds, f, indent=2)

In [None]:
# FINE-TUNING OF MODEL AND GENERATION OF EXAMPLES WITH FINE-TUNED MODEL

# === INITIALIZE A DICTIONARY TO STORE MODEL PREDICTIONS FOR EACH STYLE ===
predictions = {"clear": [], "friendly": [], "empathetic": [], "polite": []}

# === LOOP OVER EACH STYLE NAME ===
for name in names:

    # Define the output directory where checkpoints and logs will be stored
    output_dir = f'data/Llama-3.2-1B-Instruct/{name}'

    # === TRAINING ARGUMENTS ===
    training_args = TrainingArguments(
        fp16=True,  # Use half-precision training for speed and memory efficiency (use bf16=True on supporting GPUs)
        do_eval=False,  # No evaluation during training
        report_to="none",  # Disable logging to external services like wandb
        eval_strategy="no",
        gradient_accumulation_steps=256,  # Accumulate gradients over many steps (needed due to small batch size)
        gradient_checkpointing=True,  # Enable memory-efficient training via checkpointing
        gradient_checkpointing_kwargs={"use_reentrant": False},  # Required by some models for compatibility
        learning_rate=2e-5,  # Standard fine-tuning learning rate
        log_level="info",  # Logging level
        logging_steps=5,  # Log every 5 steps
        logging_strategy="steps",
        lr_scheduler_type="cosine",  # Cosine learning rate scheduler
        max_steps=-1,  # No manual step limit; use `num_train_epochs`
        num_train_epochs=1,  # Only one epoch (changeable)
        output_dir=output_dir,  # Directory to store model output
        overwrite_output_dir=True,  # Overwrite if already exists
        per_device_eval_batch_size=1,  # Evaluation batch size (set small for memory safety)
        per_device_train_batch_size=1,  # Training batch size (set small for memory safety)
        save_strategy="no",  # Do not save intermediate checkpoints
        save_total_limit=None,
        seed=42,  # Set random seed for reproducibility
    )

    # === CONFIGURE PEFT (LoRA) FOR MEMORY-EFFICIENT FINETUNING ===
    peft_config = LoraConfig(
        r=64,  # Rank for low-rank adaptation
        lora_alpha=16,
        lora_dropout=0.1,
        bias="none",
        task_type="CAUSAL_LM",  # Task type is causal language modeling
        target_modules=["q_proj", "k_proj", "v_proj", "o_proj"],  # Target transformer attention modules
    )

    # === LOAD BASE MODEL AND PREPARE FOR PEFT ===
    model = AutoModelForCausalLM.from_pretrained(model_id, **model_kwargs)
    model = get_peft_model(model, peft_config)

    # === INITIALIZE THE SFT TRAINER ===
    trainer = SFTTrainer(
        model=model,  # PEFT-wrapped model
        args=training_args,  # TrainingArguments object
        train_dataset=train_dataset_dict[name],  # Tokenized training data
        processing_class=tokenizer,  # Tokenizer used for processing
    )

    # === START TRAINING ===
    train_result = trainer.train()

    # === SAVE THE FINE-TUNED MODEL TO DISK ===
    trainer.model.save_pretrained(output_dir)

    # === LOG AND SAVE METRICS ===
    metrics = train_result.metrics
    max_train_samples = len(train_dataset_dict[name])
    metrics["train_samples"] = min(max_train_samples, len(train_dataset_dict[name]))
    trainer.log_metrics("train", metrics)
    trainer.save_metrics("train", metrics)
    trainer.save_state()

    # === FINE-TUNED MODEL FOR INFERENCE ===

    model = AutoModelForCausalLM.from_pretrained(
        output_dir,
        **model_kwargs
    )

    # === MERGE TRAIN AND TEST SETS FOR INFERENCE ===
    train_set_list = [item for item in raw_datasets_dict[name]["train"]]
    test_set_list = [item for item in raw_datasets_dict[name]["test"]]
    complete_dataset_list = train_set_list + test_set_list

    # === GENERATE PREDICTIONS FOR THE ENTIRE SET ===
    for i, entry in enumerate(complete_dataset_list):
        messages = entry["messages"][:2]  # Only system and user messages

        # Tokenize the conversation prompt
        input_ids = tokenizer.apply_chat_template(
            messages,
            truncation=True,
            add_generation_prompt=True,
            return_tensors="pt"
        ).to(model.device)

        # Generate a response from the model
        outputs = model.generate(
            input_ids=input_ids,
            max_new_tokens=40,
            do_sample=True,
            temperature=1.0,
            top_p=1.0,
        )

        # Decode generated output
        generated_text = tokenizer.batch_decode(outputs, skip_special_tokens=True)[0]

        # Extract only the assistant's reply (after the assistant role marker)
        predicted_reply = generated_text.split("\nassistant\n")[1].strip()

        # Save the prediction
        predictions[name].append({
            "id": i,
            "response0": messages[1]["content"],  # Original input
            "generated_response1": predicted_reply  # Model output
        })

# === SAVE ALL PREDICTIONS TO A JSON FILE ===
with open("sft_predictions.json", "w") as f:
    json.dump(predictions, f, indent=2)

## Text Generation Task with DPO

In [None]:
%pip install -r requirements_1.txt

In [None]:
import os
os.environ["CUDA_VISIBLE_DEVICES"]="0"
os.environ["HF_TOKEN"] = "HF_KEY"

In [None]:
from unsloth import PatchDPOTrainer

PatchDPOTrainer()

In [None]:
from unsloth import FastLanguageModel, is_bfloat16_supported
from datasets import Dataset
import json
from trl import DPOTrainer, DPOConfig
import pandas as pd
import torch

In [None]:
# === RETRIEVE PARQUET FILES ===
# Each path points to a different stylistic dataset
path_clear = "RESPONSible Service/dataset_clear.parquet"
path_friendly = "RESPONSible Service/dataset_friendly.parquet"
path_empathetic = "RESPONSible Service/dataset_empathetic.parquet"
path_polite = "RESPONSible Service/dataset_polite.parquet"

# Read the Parquet content into Pandas DataFrames
clear_df = pd.read_parquet(path_clear)
friendly_df = pd.read_parquet(path_friendly)
empathetic_df = pd.read_parquet(path_empathetic)
polite_df = pd.read_parquet(path_polite)

# === SHUFFLE THE DATAFRAMES ===
# Shuffle the rows in each DataFrame to ensure randomness while keeping reproducibility with seed
clear_df = clear_df.sample(frac=1, random_state=42).reset_index(drop=True)
friendly_df = friendly_df.sample(frac=1, random_state=42).reset_index(drop=True)
empathetic_df = empathetic_df.sample(frac=1, random_state=42).reset_index(drop=True)
polite_df = polite_df.sample(frac=1, random_state=42).reset_index(drop=True)

# === LOAD MODEL GENERATED RESPONSES ===
# Load a JSON file containing model-generated alternatives to response1, called "generated_response1"
with open("sft_predictions.json", "r") as f:
    sft_predictions = json.load(f)

# === INITIALIZE STRUCTURES FOR PROCESSING ===
dfs = [clear_df, friendly_df, empathetic_df, polite_df]  # List of datasets
names = ["clear", "friendly", "empathetic", "polite"]    # Corresponding style names
# Dictionary to hold train/test splits for each style
raw_datasets_dict = {
    "clear": {"train": None, "test": None},
    "friendly": {"train": None, "test": None},
    "empathetic": {"train": None, "test": None},
    "polite": {"train": None, "test": None}
}

# === CREATE STRUCTURED DATASETS FOR EACH STYLE ===
for df, name in zip(dfs, names):

    # Extract model-generated alternative responses ("response0")
    sft_generated_response1 = [item["generated_response1"] for item in sft_predictions[name]]

    # Keep only the original human-written 'response1' column
    df = df[['response1']].copy()

    # Add the model-generated 'response0' as the "rejected" response
    df["response0"] = sft_generated_response1

    # Rename columns to match preference learning format
    df = df.rename(columns={"response0": "rejected", "response1": "chosen"})

    # Add a task-specific prompt to guide model rephrasing
    df["prompt"] = f"Rephrase the following customer service message to make it more {name}. Only output the rephrased message. Do not add any explanation or extra text."

    # Split into train (first 800 rows) and test (remaining rows)
    raw_datasets_dict[name]["train"] = df.iloc[0:800].copy()
    raw_datasets_dict[name]["test"] = df.iloc[800:].copy()

# === CONVERT TRAINING DATAFRAMES INTO HUGGINGFACE DATASETS ===
# Convert each training set from pandas to HuggingFace Dataset format
train_clear = Dataset.from_pandas(raw_datasets_dict["clear"]["train"])
train_friendly = Dataset.from_pandas(raw_datasets_dict["friendly"]["train"])
train_empathetic = Dataset.from_pandas(raw_datasets_dict["empathetic"]["train"])
train_polite = Dataset.from_pandas(raw_datasets_dict["polite"]["train"])

In [None]:
# List of training datasets, one for each style (clear, friendly, empathetic, polite)
train_sets = [train_clear, train_friendly, train_empathetic, train_polite]

# Iterate over each style name and its corresponding dataset
for name, train_set in zip(names, train_sets):

    max_seq_length = 4096  # Maximum input sequence length for the model
    dtype = None  # Data type left as default (can be set to torch.float16/torch.bfloat16 if needed)
    load_in_4bit = True  # Enable 4-bit quantization to reduce memory footprint during training

    # Load a quantized Llama 3.2-1B-Instruct model fine-tuned on a specific style
    model, tokenizer = FastLanguageModel.from_pretrained(
        model_name = f"data/Llama-3.2-1B-Instruct/{name}",  # Path to the model fine-tuned on a specific style
        max_seq_length = max_seq_length,
        dtype = dtype,
        load_in_4bit = load_in_4bit
    )

    # Initialize a Direct Preference Optimization (DPO) trainer for reward modeling
    dpo_trainer = DPOTrainer(
        model = model,               # The policy model to be trained
        ref_model = None,            # No reference model is used (can be used for KL divergence regularization)
        args = DPOConfig(            # Training configuration
            per_device_train_batch_size = 2,  # Batch size per device
            gradient_accumulation_steps = 4,  # Effective batch size = 2 * 4 = 8
            warmup_ratio = 0.1,               # Fraction of steps for LR warmup
            num_train_epochs = 1,             # One epoch for quick experimentation
            learning_rate = 5e-6,             # Learning rate
            fp16 = not is_bfloat16_supported(),  # Use FP16 if BF16 is not supported
            bf16 = is_bfloat16_supported(),      # Use BF16 if supported by hardware
            logging_steps = 1,                   # Log every step (will be ignored since logging is disabled)
            optim = "adamw_8bit",                # Optimizer with 8-bit weights for memory efficiency
            weight_decay = 0.0,                  # No weight decay (L2 regularization)
            lr_scheduler_type = "linear",        # Linear learning rate schedule
            seed = 42,                           # Seed for reproducibility
            output_dir = f"DPO/{name}",          # Directory to save model checkpoints and logs
            report_to = "none",                  # No logging to external tools (e.g., WandB)
            logging_strategy = "no",             # Disable logging
            log_level = "error"                  # Suppress most logs
        ),
        beta = 0.1,                  # Temperature parameter for DPO loss
        train_dataset = train_set,   # The specific dataset for the current style
        tokenizer = tokenizer,       # Corresponding tokenizer
        max_length = 1024,           # Max total length (prompt + response)
        max_prompt_length = 512,     # Max length of the prompt portion
    )

    # Train the model using the DPO method on the given dataset
    dpo_trainer.train()

del model, tokenizer
torch.cuda.empty_cache()

In [None]:
# Dictionary to store the predictions for each style
dpo_predictions = {"clear": [], "friendly": [], "empathetic": [], "polite": []}

# Iterate over each style name
for name in names:

    # === Load the DPO-trained model and tokenizer from checkpoint ===
    model, tokenizer = FastLanguageModel.from_pretrained(
        model_name=f"DPO/{name}/checkpoint-100",  # Path to model checkpoint
        max_seq_length=2048,                      # Max sequence length allowed
        dtype=None,                               # Use default dtype
        load_in_4bit=True                         # Load model using 4-bit quantization for memory efficiency
    )

    # Prepare model for inference (e.g., disables training-specific behavior)
    FastLanguageModel.for_inference(model)

    # Get the device where the model is loaded (CPU or GPU)
    device = next(model.parameters()).device

    # Get test set messages (rejected responses) for the current style
    user_messages = raw_datasets_dict[name]["test"]["rejected"]

    # === Format prompts using the chat template ===
    system_prompt = f"Rephrase the following customer service message to make it more {name}. Only output the rephrased message. Do not add any explanation or extra text."

    # Build a list of prompts with a system instruction and user message
    prompts = [
        tokenizer.apply_chat_template([
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": msg}
        ], add_generation_prompt=True, tokenize=False)
        for msg in user_messages
    ]

    # === Tokenize all prompts in a single batch ===
    batch = tokenizer(prompts, return_tensors="pt", padding=True, truncation=True)
    batch = {k: v.to(device) for k, v in batch.items()}  # Move batch tensors to the same device as the model

    # === Generate responses for all prompts ===
    outputs = model.generate(
        **batch,
        do_sample=True,               # Enable sampling for more diverse outputs
        temperature=1.0,              # Sampling temperature (controls randomness)
        top_p=1.0,                    # Top-p nucleus sampling (1.0 means no restriction)
        max_new_tokens=40,           # Limit the number of generated tokens
        pad_token_id=tokenizer.eos_token_id,  # Use EOS token for padding
    )

    # === Decode the generated outputs into human-readable text ===
    decoded = tokenizer.batch_decode(outputs, skip_special_tokens=True)

    predictions = []

    # === Extract and structure rephrased responses ===
    for i, text in enumerate(decoded):
        pred = {}
        try:
            # Separate the prompt and the generated response
            text1 = text.split("user\n\n")[1].strip('\n\r"\'')
            text2 = text1.split("assistant\n\n")
            user = text2[0].strip('\n\r"\'')         # Original user input
            assistant = text2[1].strip('\n\r"\'')    # Model-generated rephrased output

            pred["id"] = i
            pred["response0"] = user
            pred["generated_response1"] = assistant
            predictions.append(pred)
        except IndexError:
            # In case the format doesn't match expectations, skip the sample
            continue

    # Store predictions for this particular style
    dpo_predictions[name].extend(predictions)

# === Optionally: Save the predictions to a JSON file ===
with open("dpo_predictions.json", "w") as f:
    json.dump(dpo_predictions, f, indent=2)

## Human Evaluation

The following section includes the script to process as .csv files the model outputs that were generated in the text generation tasks, so that they can be manually annotated and then re-uploaded to calculate human ground truth agreement.

In [None]:
import json
import pandas as pd
import random
import os

def process_json_to_csv(json_file: str, output_folder: str = "csv_outputs", nested: bool = True):
    # Set random seed for reproducibility
    random.seed(42)

    # Load the JSON file
    with open(json_file, "r", encoding="utf-8") as f:
        data = json.load(f)

    # Create output directory if it doesn't exist
    os.makedirs(output_folder, exist_ok=True)

    # Iterate through top-level subsets
    for style in data:
        if nested:
            # Handle nested structure: style → shot level (e.g., "0-shot")
            for shot in data[style]:
                entries = data[style][shot]
                df = pd.DataFrame(entries)[["response0", "generated_response1"]]
                df = df.sample(n=25, random_state=42).reset_index(drop=True)

                # Swap responses randomly and assign labels
                rows = []  # ✅ Fixed initialization
                for _, row in df.iterrows():
                    if random.random() < 0.5:
                        rows.append({"response0": row["response0"],
                                     "generated_response1": row["generated_response1"], "label": 1})
                    else:
                        rows.append({"response0": row["generated_response1"],
                                     "generated_response1": row["response0"], "label": 0})
                df_final = pd.DataFrame(rows)

                # Separate label column
                labels = df_final["label"]
                df_final = df_final.drop(columns=["label"])
                df_final["human"] = ""

                # Save main CSV (without label)
                filename = f"{style}_{shot}.csv"
                df_final.to_csv(os.path.join(output_folder, filename), index=False, encoding="utf-8", sep=";")

                # Save label CSV
                label_filename = f"{style}_{shot}_labels.csv"
                pd.DataFrame({"label": labels}).to_csv(os.path.join(output_folder, label_filename), index=False, encoding="utf-8", sep=";")

        else:
            # Handle flat structure: style → list of entries
            entries = data[style]
            df = pd.DataFrame(entries)[["response0", "generated_response1"]]
            df = df.sample(n=25, random_state=42).reset_index(drop=True)

            # Swap responses randomly and assign labels
            rows = []  # ✅ Fixed initialization
            for _, row in df.iterrows():
                if random.random() < 0.5:
                    rows.append({"response0": row["response0"],
                                 "generated_response1": row["generated_response1"], "label": 1})
                else:
                    rows.append({"response0": row["generated_response1"],
                                 "generated_response1": row["response0"], "label": 0})
            df_final = pd.DataFrame(rows)

            # Separate label column
            labels = df_final["label"]
            df_final = df_final.drop(columns=["label"])
            df_final["human"] = ""

            # Save main CSV (without label)
            filename = f"{style}.csv"
            df_final.to_csv(os.path.join(output_folder, filename), index=False, encoding="utf-8", sep=";")

            # Save label CSV
            label_filename = f"{style}_labels.csv"
            pd.DataFrame({"label": labels}).to_csv(os.path.join(output_folder, label_filename), index=False, encoding="utf-8", sep=";")

    print("✅ CSV export complete.")

In [None]:
process_json_to_csv("icl_predictions.json", output_folder="hum_eval_icl", nested=True)
process_json_to_csv("sft_predictions.json", output_folder="hum_eval_sft", nested=False)
process_json_to_csv("dpo_predictions.json", output_folder="hum_eval_dpo", nested=False)
process_json_to_csv("baseline_predictions.json", output_folder="hum_eval_baseline", nested=False)

In [None]:
import pandas as pd

def compute_agreement(data_file: str, label_file: str) -> float:
    # Load main CSV with human annotations
    df_data = pd.read_csv(data_file, sep=";", encoding="utf-8")

    # Load label CSV with model-assigned labels
    df_labels = pd.read_csv(label_file, sep=";", encoding="utf-8")

    # Ensure both files have the same number of rows
    if len(df_data) != len(df_labels):
        raise ValueError("Files do not match in number of rows.")

    # Extract human and label columns as integer lists
    human = df_data["human"].astype(str).str.strip()
    label = df_labels["label"].astype(str).str.strip()

    # Filter out missing human annotations (empty strings or NaN)
    valid_idx = (human != "") & (human != "nan")

    human = human[valid_idx].astype(int).tolist()
    label = label[valid_idx].astype(int).tolist()

    if not human:
        print("No human annotations found.")
        return 0.0

    # Compute agreement
    matches = sum(h == l for h, l in zip(human, label))
    agreement_percent = matches / len(human) * 100

    return round(agreement_percent, 0)

In [None]:
# BEFORE YOU RUN THIS CELL, MAKE SURE YOU HAVE COMPLETED THE FOLLOWING PASSAGES:
    # 1. EXPORT THE.csv FILES (the ones not ending with _label)
    # 2. MANUALLY ADD THE HUMAN LABELS
    # 3. REPLACE THE dataset_{style}.csv FILES WITH THE MANUALLY ANNOTATED ONES (KEEP THE SAME FILE NAME)

from scipy.stats import hmean
import pandas as pd

directories = ["hum_eval_icl", "hum_eval_sft", "hum_eval_dpo", "hum_eval_baseline"]
settings = ["icl", "sft", "dpo", "baseline"]
shots = ["0", "1", "2", "3", "4", "5"]
subsets = ["clear", 'friendly', "empathetic", "polite"]

# Initialize results
results = {
    "baseline": {"clear": None, "friendly": None, "empathetic": None, "polite": None},
    "sft": {"clear": None, "friendly": None, "empathetic": None, "polite": None},
    "dpo": {"clear": None, "friendly": None, "empathetic": None, "polite": None},
    "icl-0": {"clear": None, "friendly": None, "empathetic": None, "polite": None},
    "icl-1": {"clear": None, "friendly": None, "empathetic": None, "polite": None},
    "icl-2": {"clear": None, "friendly": None, "empathetic": None, "polite": None},
    "icl-3": {"clear": None, "friendly": None, "empathetic": None, "polite": None},
    "icl-4": {"clear": None, "friendly": None, "empathetic": None, "polite": None},
    "icl-5": {"clear": None, "friendly": None, "empathetic": None, "polite": None}
}

# Fill in results
for setting, directory in zip(settings, directories):
    for subset in subsets:
        if directory == "hum_eval_icl":
            for shot in shots:
                csv1 = f"{directory}/{subset}_{shot}-shot.csv"
                csv2 = f"{directory}/{subset}_{shot}-shot_labels.csv"
                agreement_score = compute_agreement(csv1, csv2)
                results[f"{setting}-{shot}"][subset] = agreement_score
        else:
            csv1 = f"{directory}/{subset}.csv"
            csv2 = f"{directory}/{subset}_labels.csv"
            agreement_score = compute_agreement(csv1, csv2)
            results[setting][subset] = agreement_score

# Convert to DataFrame
df = pd.DataFrame.from_dict(results, orient='index')

# Compute harmonic mean across each row
df["HM"] = df.apply(lambda row: hmean([v for v in row if v is not None and v > 0]), axis=1)

# Compute harmonic mean across each column
hmean_row = df.apply(lambda col: hmean([v for v in col if v is not None and v > 0]), axis=0)
hmean_row.name = "HM"

# Append row
df = pd.concat([df, pd.DataFrame([hmean_row])], axis=0)

# Move method index into a column
df = df.reset_index().rename(columns={"index": "method"})

# Save to CSV
df.to_csv("texgen_human_agreement.csv", encoding="utf-8", sep=";", index=False)

# Display final DataFrame
df

## Overlap between Predictions and Ground Truth

In [None]:
%pip install -r requirements_3.txt

In [1]:
import json
import pandas as pd

# === Load model-generated predictions from JSON files ===

# Load ICL (In-Context Learning) predictions
with open('icl_predictions.json', 'r') as f:
    icl_predictions = json.load(f)

# Load baseline model predictions (non-finetuned model)
with open('baseline_predictions.json', 'r') as f:
    baseline_predictions = json.load(f)

# Load SFT (Supervised Fine-Tuning) predictions
with open('sft_predictions.json', 'r') as f:
    sft_predictions = json.load(f)

# Load DPO (Direct Preference Optimization) predictions
with open('dpo_predictions.json', 'r') as f:
    dpo_predictions = json.load(f)

# === Retrieve the test datasets for each stylistic category ===

# Each path points to a different stylistic dataset
path_clear = "RESPONSible Service/dataset_clear.parquet"
path_friendly = "RESPONSible Service/dataset_friendly.parquet"
path_empathetic = "RESPONSible Service/dataset_empathetic.parquet"
path_polite = "RESPONSible Service/dataset_polite.parquet"

# Read the Parquet content into Pandas DataFrames
clear_df = pd.read_parquet(path_clear)
friendly_df = pd.read_parquet(path_friendly)
empathetic_df = pd.read_parquet(path_empathetic)
polite_df = pd.read_parquet(path_polite)

# === Shuffle datasets to ensure randomness before test/train split ===

# Reproducible shuffling using fixed seed (42)
clear_df = clear_df.sample(frac=1, random_state=42).reset_index(drop=True)
friendly_df = friendly_df.sample(frac=1, random_state=42).reset_index(drop=True)
empathetic_df = empathetic_df.sample(frac=1, random_state=42).reset_index(drop=True)
polite_df = polite_df.sample(frac=1, random_state=42).reset_index(drop=True)

# === Split into test sets ===

# Take the last 200 rows as the test set (assuming original size ≥ 1000)
clear_test = clear_df.iloc[800:].reset_index(drop=True)
friendly_test = friendly_df.iloc[800:].reset_index(drop=True)
empathetic_test = empathetic_df.iloc[800:].reset_index(drop=True)
polite_test = polite_df.iloc[800:].reset_index(drop=True)

In [2]:
df_clear = pd.DataFrame({
    'gt': clear_test["response1"],
    'icl-0': [entry['generated_response1'] for entry in icl_predictions["clear"]['0-shot']],
    'icl-1': [entry['generated_response1'] for entry in icl_predictions["clear"]['1-shot']],
    'icl-2': [entry['generated_response1'] for entry in icl_predictions["clear"]['2-shot']],
    'icl-3': [entry['generated_response1'] for entry in icl_predictions["clear"]['3-shot']],
    'icl-4': [entry['generated_response1'] for entry in icl_predictions["clear"]['4-shot']],
    'icl-5': [entry['generated_response1'] for entry in icl_predictions["clear"]['5-shot']],
    'baseline': [entry['generated_response1'] for entry in baseline_predictions["clear"]],
    'sft': [entry['generated_response1'] for entry in sft_predictions["clear"][800:]],
    'dpo': [entry['generated_response1'] for entry in dpo_predictions["clear"]]
})

df_friendly = pd.DataFrame({
    'gt': friendly_test["response1"],
    'icl-0': [entry['generated_response1'] for entry in icl_predictions["friendly"]['0-shot']],
    'icl-1': [entry['generated_response1'] for entry in icl_predictions["friendly"]['1-shot']],
    'icl-2': [entry['generated_response1'] for entry in icl_predictions["friendly"]['2-shot']],
    'icl-3': [entry['generated_response1'] for entry in icl_predictions["friendly"]['3-shot']],
    'icl-4': [entry['generated_response1'] for entry in icl_predictions["friendly"]['4-shot']],
    'icl-5': [entry['generated_response1'] for entry in icl_predictions["friendly"]['5-shot']],
    'baseline': [entry['generated_response1'] for entry in baseline_predictions["friendly"]],
    'sft': [entry['generated_response1'] for entry in sft_predictions["friendly"][800:]],
    'dpo': [entry['generated_response1'] for entry in dpo_predictions["friendly"]]
})

df_empathetic = pd.DataFrame({
    'gt': empathetic_test["response1"],
    'icl-0': [entry['generated_response1'] for entry in icl_predictions["empathetic"]['0-shot']],
    'icl-1': [entry['generated_response1'] for entry in icl_predictions["empathetic"]['1-shot']],
    'icl-2': [entry['generated_response1'] for entry in icl_predictions["empathetic"]['2-shot']],
    'icl-3': [entry['generated_response1'] for entry in icl_predictions["empathetic"]['3-shot']],
    'icl-4': [entry['generated_response1'] for entry in icl_predictions["empathetic"]['4-shot']],
    'icl-5': [entry['generated_response1'] for entry in icl_predictions["empathetic"]['5-shot']],
    'baseline': [entry['generated_response1'] for entry in baseline_predictions["empathetic"]],
    'sft': [entry['generated_response1'] for entry in sft_predictions["empathetic"][800:]],
    'dpo': [entry['generated_response1'] for entry in dpo_predictions["empathetic"]]
})

df_polite = pd.DataFrame({
    'gt': polite_test["response1"],
    'icl-0': [entry['generated_response1'] for entry in icl_predictions["polite"]['0-shot']],
    'icl-1': [entry['generated_response1'] for entry in icl_predictions["polite"]['1-shot']],
    'icl-2': [entry['generated_response1'] for entry in icl_predictions["polite"]['2-shot']],
    'icl-3': [entry['generated_response1'] for entry in icl_predictions["polite"]['3-shot']],
    'icl-4': [entry['generated_response1'] for entry in icl_predictions["polite"]['4-shot']],
    'icl-5': [entry['generated_response1'] for entry in icl_predictions["polite"]['5-shot']],
    'baseline': [entry['generated_response1'] for entry in baseline_predictions["polite"]],
    'sft': [entry['generated_response1'] for entry in sft_predictions["polite"][800:]],
    'dpo': [entry['generated_response1'] for entry in dpo_predictions["polite"]]
})


### Semantic Overlap (BERTScore)

In [3]:
import pandas as pd
import numpy as np
from bert_score import score as bertscore
from scipy.stats import shapiro, f_oneway, kruskal
import scikit_posthocs as sp

# Your pre-defined DataFrames: df_clear, df_friendly, df_empathetic, df_polite
dfs = {
    "clear": df_clear,
    "friendly": df_friendly,
    "empathetic": df_empathetic,
    "polite": df_polite
}

# Short tags for conditions
abbr = {
    "icl-0": "0", "icl-1": "1", "icl-2": "2", "icl-3": "3",
    "icl-4": "4", "icl-5": "5", "baseline": "b",
    "sft": "s", "dpo": "d"
}

DEVICE = 'cpu'
all_reports = {}
raw_f1_scores = {}
test_info = {}  # Store test type and p-value per style

# Compute BERTScore
for style, df in dfs.items():
    references = df["gt"].tolist()
    report = {}
    raw_f1_scores[style] = {}

    for col in df.columns:
        if col == "gt":
            continue
        candidates = df[col].tolist()
        P, R, F1 = bertscore(
            candidates, references,
            lang='en',
            model_type='bert-base-uncased',
            device=DEVICE,
            verbose=False
        )
        f1_list = F1.numpy().tolist()
        raw_f1_scores[style][col] = f1_list
        report[col] = {
            "precision": np.mean(P.numpy()),
            "recall": np.mean(R.numpy()),
            "f1": np.mean(F1.numpy())
        }

    all_reports[style] = pd.DataFrame(report).T

# Add significance column and test type
bert_results = {}
for style, df in all_reports.items():
    f1_data = raw_f1_scores[style]
    keys = list(f1_data.keys())
    values = list(f1_data.values())

    # Shapiro-Wilk normality test
    shapiro_p = [shapiro(v).pvalue for v in values]
    all_normal = all(p >= 0.05 for p in shapiro_p)

    # Global test
    if all_normal:
        test_type = "ANOVA"
        stat, p_val = f_oneway(*values)
    else:
        test_type = "Kruskal-Wallis"
        stat, p_val = kruskal(*values)

    test_info[style] = f"{test_type} (p = {p_val:.4f})"

    SS = []
    if p_val < 0.05:
        # Post-hoc test
        df_long = pd.DataFrame({
            "score": [v for sublist in values for v in sublist],
            "label": sum([[k]*len(f1_data[k]) for k in keys], [])
        })
        posthoc = sp.posthoc_dunn(df_long, val_col="score", group_col="label", p_adjust="bonferroni")

        for row in df.index:
            sig_with = [
                abbr[col] for col in df.index
                if col != row and posthoc.loc[row, col] < 0.05
            ]
            if len(sig_with) == len(df.index) - 1:
                SS.append("*")
            elif len(sig_with) == 0:
                SS.append("-")
            else:
                SS.append("".join(sorted(sig_with)))
    else:
        SS = ["-"] * len(df)

    df["SS"] = SS
    bert_results[style] = df.round(2)

# Write to file
with open("texgen_bertscores.txt", "w") as f:
    for style, df in bert_results.items():
        f.write(f"=== {style.upper()} ===\n")
        f.write(f"Statistical Test Used: {test_info[style]}\n")
        f.write(df.to_string())
        f.write("\n\n")

### N-gram Overlap (ROUGE-L)

In [4]:
import pandas as pd
import numpy as np
from rouge_score import rouge_scorer
from scipy.stats import shapiro, f_oneway, kruskal
import scikit_posthocs as sp

# Your pre-defined DataFrames: df_clear, df_friendly, df_empathetic, df_polite
dfs = {
    "clear": df_clear,
    "friendly": df_friendly,
    "empathetic": df_empathetic,
    "polite": df_polite
}

# Short tags for conditions
abbr = {
    "icl-0": "0", "icl-1": "1", "icl-2": "2", "icl-3": "3",
    "icl-4": "4", "icl-5": "5", "baseline": "b",
    "sft": "s", "dpo": "d"
}

all_reports = {}
raw_rouge_scores = {}
test_info = {}  # Store test type and p-value per style

# Function to compute ROUGE-L F1 scores for each request/response pair
def compute_rouge_l_scores(references, candidates):
    # Initialize ROUGE-L scorer with stemming enabled
    scorer = rouge_scorer.RougeScorer(['rougeL'], use_stemmer=True)

    # Compute ROUGE-L F1 score for each (reference, candidate) pair
    return [
        scorer.score(ref, cand)['rougeL'].fmeasure
        for ref, cand in zip(references, candidates)
    ]

# Compute ROUGE-L scores
for style, df in dfs.items():
    references = df["gt"].tolist()
    report = {}
    raw_rouge_scores[style] = {}

    for col in df.columns:
        if col == "gt":
            continue
        candidates = df[col].tolist()

        # Compute ROUGE-L F1 scores
        rouge_l_scores = compute_rouge_l_scores(references, candidates)
        raw_rouge_scores[style][col] = rouge_l_scores

        report[col] = {
            "rouge_l": np.mean(rouge_l_scores)
        }

    all_reports[style] = pd.DataFrame(report).T

# Add significance column and test type
rouge_results = {}
for style, df in all_reports.items():
    rouge_data = raw_rouge_scores[style]
    keys = list(rouge_data.keys())
    values = list(rouge_data.values())

    # Shapiro-Wilk normality test
    shapiro_p = [shapiro(v).pvalue for v in values]
    all_normal = all(p >= 0.05 for p in shapiro_p)

    # Global test
    if all_normal:
        test_type = "ANOVA"
        stat, p_val = f_oneway(*values)
    else:
        test_type = "Kruskal-Wallis"
        stat, p_val = kruskal(*values)

    test_info[style] = f"{test_type} (p = {p_val:.4f})"

    SS = []
    if p_val < 0.05:
        # Post-hoc test
        df_long = pd.DataFrame({
            "score": [v for sublist in values for v in sublist],
            "label": sum([[k]*len(rouge_data[k]) for k in keys], [])
        })
        posthoc = sp.posthoc_dunn(df_long, val_col="score", group_col="label", p_adjust="bonferroni")

        for row in df.index:
            sig_with = [
                abbr[col] for col in df.index
                if col != row and posthoc.loc[row, col] < 0.05
            ]
            if len(sig_with) == len(df.index) - 1:
                SS.append("*")
            elif len(sig_with) == 0:
                SS.append("-")
            else:
                SS.append("".join(sorted(sig_with)))
    else:
        SS = ["-"] * len(df)

    df["SS"] = SS
    rouge_results[style] = df.round(3)  # 3 decimal places for ROUGE-L scores

# Write to file
with open("texgen_rouge_scores.txt", "w") as f:
    for style, df in rouge_results.items():
        f.write(f"=== {style.upper()} ===\n")
        f.write(f"Statistical Test Used: {test_info[style]}\n")
        f.write(df.to_string())
        f.write("\n\n")