In [None]:
import pandas as pd
import ast
from sklearn.metrics import classification_report

### 2-way (binary) evaluation

In [None]:
# Mapping to convert full label names to single-letter codes
label_map = {
    'High': 'H',
    'Medium': 'M',
    'Low': 'L',
    'None': 'N'
}

# Custom scoring table mapping (Gold, System) -> {TP, TN, FP, FN}
score_mapping = {
    ('H', 'H'): {'TP': 1,    'TN': 0,   'FP': 0,  'FN': 0},
    ('H', 'M'): {'TP': 1,    'TN': 0,   'FP': 0,  'FN': 0},
    ('H', 'L'): {'TP': 0,    'TN': 0,   'FP': 0,  'FN': 1},
    ('H', 'N'): {'TP': 0,    'TN': 0,   'FP': 0,  'FN': 1},
    ('M', 'H'): {'TP': 1,    'TN': 0,   'FP': 0,  'FN': 0},
    ('M', 'M'): {'TP': 1,    'TN': 0,   'FP': 0,  'FN': 0},
    ('M', 'L'): {'TP': 0.25, 'TN': 0.25,'FP': 0.25,'FN': 0.25},
    ('M', 'N'): {'TP': 0,    'TN': 0,   'FP': 0,  'FN': 1},
    ('L', 'H'): {'TP': 0,    'TN': 0,   'FP': 1,  'FN': 0},
    ('L', 'M'): {'TP': 0.25, 'TN': 0.25,'FP': 0.25,'FN': 0.25},
    ('L', 'L'): {'TP': 0,    'TN': 1,   'FP': 0,  'FN': 0},
    ('L', 'N'): {'TP': 0,    'TN': 1,   'FP': 0,  'FN': 0},
    ('N', 'H'): {'TP': 0,    'TN': 0,   'FP': 1,  'FN': 0},
    ('N', 'M'): {'TP': 0,    'TN': 0,   'FP': 1,  'FN': 0},
    ('N', 'L'): {'TP': 0,    'TN': 1,   'FP': 0,  'FN': 0},
    ('N', 'N'): {'TP': 0,    'TN': 1,   'FP': 0,  'FN': 0},
}

'''score_mapping = {
    ('H', 'H'): {'TP': 1,    'TN': 0,   'FP': 0,  'FN': 0},
    ('H', 'M'): {'TP': 1,    'TN': 0,   'FP': 0,  'FN': 0},
    ('H', 'L'): {'TP': 0,    'TN': 0,   'FP': 0,  'FN': 1},
    ('H', 'N'): {'TP': 0,    'TN': 0,   'FP': 0,  'FN': 1},
    ('M', 'H'): {'TP': 1,    'TN': 0,   'FP': 0,  'FN': 0},
    ('M', 'M'): {'TP': 1,    'TN': 0,   'FP': 0,  'FN': 0},
    ('M', 'L'): {'TP': 0,    'TN': 0.5, 'FP': 0,  'FN': 0.5},
    ('M', 'N'): {'TP': 0,    'TN': 0,   'FP': 0,  'FN': 1},
    ('L', 'H'): {'TP': 0,    'TN': 0,   'FP': 1,  'FN': 0},
    ('L', 'M'): {'TP': 0.5,  'TN': 0,   'FP': 0.5,'FN': 0},
    ('L', 'L'): {'TP': 0,    'TN': 1,   'FP': 0,  'FN': 0},
    ('L', 'N'): {'TP': 0,    'TN': 1,   'FP': 0,  'FN': 0},
    ('N', 'H'): {'TP': 0,    'TN': 0,   'FP': 1,  'FN': 0},
    ('N', 'M'): {'TP': 0,    'TN': 0,   'FP': 1,  'FN': 0},
    ('N', 'L'): {'TP': 0,    'TN': 1,   'FP': 0,  'FN': 0},
    ('N', 'N'): {'TP': 0,    'TN': 1,   'FP': 0,  'FN': 0},
}

score_mapping = {
    ('H', 'H'): {'TP': 1,    'TN': 0,   'FP': 0,  'FN': 0},
    ('H', 'M'): {'TP': 1,    'TN': 0,   'FP': 0,  'FN': 0},
    ('H', 'L'): {'TP': 0,    'TN': 0,   'FP': 0,  'FN': 1},
    ('H', 'N'): {'TP': 0,    'TN': 0,   'FP': 0,  'FN': 1},
    ('M', 'H'): {'TP': 1,    'TN': 0,   'FP': 0,  'FN': 0},
    ('M', 'M'): {'TP': 1,    'TN': 0,   'FP': 0,  'FN': 0},
    ('M', 'L'): {'TP': 0,    'TN': 0,   'FP': 0,  'FN': 1},
    ('M', 'N'): {'TP': 0,    'TN': 0,   'FP': 0,  'FN': 1},
    ('L', 'H'): {'TP': 0,    'TN': 0,   'FP': 1,  'FN': 0},
    ('L', 'M'): {'TP': 0,    'TN': 0,   'FP': 1,  'FN': 0},
    ('L', 'L'): {'TP': 0,    'TN': 1,   'FP': 0,  'FN': 0},
    ('L', 'N'): {'TP': 0,    'TN': 1,   'FP': 0,  'FN': 0},
    ('N', 'H'): {'TP': 0,    'TN': 0,   'FP': 1,  'FN': 0},
    ('N', 'M'): {'TP': 0,    'TN': 0,   'FP': 1,  'FN': 0},
    ('N', 'L'): {'TP': 0,    'TN': 1,   'FP': 0,  'FN': 0},
    ('N', 'N'): {'TP': 0,    'TN': 1,   'FP': 0,  'FN': 0},
}

score_mapping = {
    ('H', 'H'): {'TP': 1,    'TN': 0,   'FP': 0,  'FN': 0},
    ('H', 'M'): {'TP': 1,    'TN': 0,   'FP': 0,  'FN': 0},
    ('H', 'L'): {'TP': 0,    'TN': 0,   'FP': 0,  'FN': 1},
    ('H', 'N'): {'TP': 0,    'TN': 0,   'FP': 0,  'FN': 1},
    ('M', 'H'): {'TP': 1,    'TN': 0,   'FP': 0,  'FN': 0},
    ('M', 'M'): {'TP': 1,    'TN': 0,   'FP': 0,  'FN': 0},
    ('M', 'L'): {'TP': 0.5,  'TN': 0.5, 'FP': 0.5,'FN': 0.5},
    ('M', 'N'): {'TP': 0,    'TN': 0,   'FP': 0,  'FN': 1},
    ('L', 'H'): {'TP': 0,    'TN': 0,   'FP': 1,  'FN': 0},
    ('L', 'M'): {'TP': 0.5,  'TN': 0.5, 'FP': 0.5,'FN': 0.5},
    ('L', 'L'): {'TP': 0,    'TN': 1,   'FP': 0,  'FN': 0},
    ('L', 'N'): {'TP': 0,    'TN': 1,   'FP': 0,  'FN': 0},
    ('N', 'H'): {'TP': 0,    'TN': 0,   'FP': 1,  'FN': 0},
    ('N', 'M'): {'TP': 0,    'TN': 0,   'FP': 1,  'FN': 0},
    ('N', 'L'): {'TP': 0,    'TN': 1,   'FP': 0,  'FN': 0},
    ('N', 'N'): {'TP': 0,    'TN': 1,   'FP': 0,  'FN': 0},
}'''

gold_df = pd.read_excel('/Users/innerpiece92/Desktop/NLP_Workspace/AArec/mturk/mturk-marketplace-ready/test/system_results/restaurants/ground_truth_w_split_abstractive_rag.xlsx')
sys_df = pd.read_excel('/Users/innerpiece92/Desktop/NLP_Workspace/AArec/mturk/mturk-marketplace-ready/test/system_results/restaurants/system_user_profiles_restaurants_reviews_match_rag.xlsx')

if len(gold_df) != len(sys_df):
    raise ValueError("The number of records in the gold file does not match the system file!")

total_TP = 0.0
total_FP = 0.0
total_FN = 0.0
total_TN = 0.0

record_logs = []
gold_labels_list = []
sys_labels_list = []

print("Processing records and computing scores...\n")

for idx, (gold_row, sys_row) in enumerate(zip(gold_df.itertuples(index=False), sys_df.itertuples(index=False))):
    # Retrieve the A_prime field from each file
    gold_val = gold_row.A_prime   # e.g., "A' = ('amazing tour de bier', 'Low')"
    sys_val = sys_row.A_prime     # e.g., "A' = [('amazing tour de bier', 'None')]"
    
    # Remove the "A' = " prefix if present
    if gold_val.startswith("A' ="):
        gold_val = gold_val.replace("A' = ", "", 1).strip()
    if sys_val.startswith("A' ="):
        sys_val = sys_val.replace("A' = ", "", 1).strip()
    
    # Convert the string representations to actual Python objects
    try:
        gold_tuple = ast.literal_eval(gold_val)
    except Exception as e:
        raise ValueError(f"Error parsing gold A_prime at record {idx}: {e}")
        
    try:
        sys_list = ast.literal_eval(sys_val)
    except Exception as e:
        raise ValueError(f"Error parsing system A_prime at record {idx}: {e}")
        
    # Extract the atypical aspect phrase and label from gold data
    gold_aspect, gold_label_text = gold_tuple
    
    # For system data, ensure that there is at least one prediction
    if not sys_list:
        raise ValueError(f"No system prediction found at record {idx}")
    
    # Ensure sys_list is either a tuple directly or a list containing tuples
    if isinstance(sys_list, tuple) and len(sys_list) == 2:
        sys_aspect, sys_label_text = sys_list
    elif isinstance(sys_list, list) and len(sys_list) > 0 and isinstance(sys_list[0], tuple) and len(sys_list[0]) == 2:
        sys_aspect, sys_label_text = sys_list[0]  # Unpack first element if it's a list
    else:
        raise ValueError(f"Unexpected format in system A_prime at record {idx}: {sys_list}")
    
    if gold_aspect != sys_aspect:
        raise ValueError(f"Aspect phrase mismatch at record {idx}: Gold aspect '{gold_aspect}' vs System aspect '{sys_aspect}'")
        
    gold_labels_list.append(gold_label_text)
    sys_labels_list.append(sys_label_text)

    gold_code = label_map.get(gold_label_text, gold_label_text)
    sys_code = label_map.get(sys_label_text, sys_label_text)
    
    try:
        scores = score_mapping[(gold_code, sys_code)]
    except KeyError:
        raise ValueError(f"Invalid label pair at record {idx}: Gold '{gold_code}', System '{sys_code}'")

    record_logs.append({
        "Record": idx,
        "Atypical Aspect": gold_aspect,
        "Gold Label": gold_label_text,
        "Gold Code": gold_code,
        "System Label": sys_label_text,
        "Sys Code": sys_code,
        "TP": scores['TP'],
        "TN": scores['TN'],
        "FP": scores['FP'],
        "FN": scores['FN']
    })
    
    print(f"Record {idx}:")
    print(f"  Aspect Phrase: '{gold_aspect}'")
    print(f"  Gold Label: '{gold_label_text}' (converted to '{gold_code}')")
    print(f"  System Label: '{sys_label_text}' (converted to '{sys_code}')")
    print(f"  Assigned Scores: {scores}\n")
    
    total_TP += scores['TP']
    total_FP += scores['FP']
    total_FN += scores['FN']
    total_TN += scores['TN']

precision = (total_TP / (total_TP + total_FP) * 100) if (total_TP + total_FP) > 0 else 0
recall = (total_TP / (total_TP + total_FN) * 100) if (total_TP + total_FN) > 0 else 0
f1_score = (2 * precision * recall / (precision + recall)) if (precision + recall) > 0 else 0
accuracy = ((total_TP + total_TN) / (total_TP + total_TN + total_FP + total_FN) * 100) if (total_TP + total_TN + total_FP + total_FN) > 0 else 0


evaluation_metrics = {
    "Total TP": total_TP,
    "Total FP": total_FP,
    "Total FN": total_FN,
    "Total TN": total_TN,
    "Precision": round(precision, 2),
    "Recall": round(recall, 2),
    "F1 Score": round(f1_score, 2),
    "Accuracy": round(accuracy, 2)
}

print("Aggregated Confusion Matrix Totals (Custom Scoring):")
print(f"  Total TP: {total_TP}")
print(f"  Total FP: {total_FP}")
print(f"  Total FN: {total_FN}")
print(f"  Total TN: {total_TN}\n")

print("Evaluation Metrics (Custom Scoring):")
print(f"  Precision: {precision * 100:.1f}")
print(f"  Recall:    {recall * 100:.1f}")
print(f"  F1 Score:  {f1_score * 100:.1f}")
print(f"  Accuracy:  {accuracy * 100:.1f}")

labels_order = ["None", "Low", "Medium", "High"]

cm = pd.crosstab(
    pd.Series(gold_labels_list, name="Gold"),
    pd.Series(sys_labels_list, name="System")
)

cm = cm.reindex(index=labels_order, columns=labels_order, fill_value=0)

print("\nStandard Confusion Matrix:")
print(cm)

# -------------------- New: Classification Report --------------------
classification_rep = classification_report(
    gold_labels_list, sys_labels_list, target_names=labels_order, digits=2, output_dict=True
)
df_classification_report = pd.DataFrame(classification_rep).transpose()
# --------------------------------------------------------------------

df_records = pd.DataFrame(record_logs)
df_evaluation = pd.DataFrame(list(evaluation_metrics.items()), columns=["Metric", "Value"])
df_confusion = cm.reset_index()  # convert confusion matrix to DataFrame for Excel output

output_filename = "/Users/innerpiece92/Desktop/NLP_Workspace/AArec/mturk/mturk-marketplace-ready/test/system_results/restaurants/statistics_rag.xlsx"
with pd.ExcelWriter(output_filename) as writer:
    df_records.to_excel(writer, sheet_name="Record Details", index=False)
    df_evaluation.to_excel(writer, sheet_name="Evaluation Metrics", index=False)
    df_confusion.to_excel(writer, sheet_name="Confusion Matrix", index=False)
    df_classification_report.to_excel(writer, sheet_name="Classification Report", index=False)

print(f"\nOutput written to {output_filename}")

### 4-way evaluation

In [None]:
label_mapping = {'None': 0, 'Low': 1, 'Medium': 2, 'High': 3}

def calculate_sample_standard_deviation(system_label, manual_label):
    """Calculate the standard deviation between system and manual labels."""
    values = [system_label, manual_label]
    mean_value = sum(values) / len(values)
    standard_deviation = (sum((x - mean_value) ** 2 for x in values) / len(values)) ** 0.5
    return standard_deviation

def calculate_pairwise_absolute_deviation(system_label, manual_label):
    """Calculate absolute deviation between system and manual labels."""
    return abs(system_label - manual_label)

def calculate_accuracy(system_label, manual_label):
    """
    Calculate accuracy based on proximity between system predictions and manual annotations.
    """
    if system_label == manual_label:
        return 1
    elif abs(system_label - manual_label) == 1:
        return 0.5
    else:
        return 0

def evaluate_performance(data, label_mapping):
    """
    Evaluate absolute deviation, standard deviation, and accuracy
    between system and manual labels.
    Expects the DataFrame to have 'manual_label' and 'system_label' columns.
    """
    # Map string labels to numeric values
    data['system_label_numeric'] = data['system_label'].map(label_mapping)
    data['manual_label_numeric'] = data['manual_label'].map(label_mapping)

    deviations = []
    for _, row in data.iterrows():
        sys_val = row['system_label_numeric']
        man_val = row['manual_label_numeric']

        # Calculate metrics; if either value is NaN, metrics will be NaN
        std_dev = calculate_sample_standard_deviation(sys_val, man_val) if pd.notnull(sys_val) and pd.notnull(man_val) else None
        abs_dev = calculate_pairwise_absolute_deviation(sys_val, man_val) if pd.notnull(sys_val) and pd.notnull(man_val) else None
        acc = calculate_accuracy(sys_val, man_val) if pd.notnull(sys_val) and pd.notnull(man_val) else "NaN"

        details = f"Labels: [{sys_val}, {man_val}], StdDev: {std_dev:.3f} " if std_dev is not None else "Labels not parsed correctly"
        deviations.append((std_dev, abs_dev, acc, details))

    data[['Standard_Deviation', 'Absolute_Deviation', 'Accuracy', 'Deviation_Calculation_Details']] = pd.DataFrame(deviations, index=data.index)
    return data

def calculate_summary(data):
    """Calculate summary metrics and return them as a DataFrame."""
    mean_std_dev = round(data['Standard_Deviation'].mean(), 3)
    mean_abs_dev = round(data['Absolute_Deviation'].mean(), 3)
    mean_acc = round(data['Accuracy'].mean(), 3)

    summary = pd.DataFrame({
        'Metric': ['Mean Standard Deviation', 'Mean Absolute Deviation', 'Mean Accuracy'],
        'Value': [mean_std_dev, mean_abs_dev, mean_acc]
    })
    return summary

def extract_labels(gold_df, sys_df):
    """
    Extract label values from the A_prime column in the gold and system DataFrames.
    Returns a DataFrame with columns 'manual_label' and 'system_label'.
    """
    gold_labels_list = []
    sys_labels_list = []
    
    for idx, (gold_row, sys_row) in enumerate(zip(gold_df.itertuples(index=False), sys_df.itertuples(index=False))):
        gold_val = getattr(gold_row, 'A_prime')
        sys_val = getattr(sys_row, 'A_prime')
        
        if gold_val.startswith("A' ="):
            gold_val = gold_val.replace("A' =", "", 1).strip()
        if sys_val.startswith("A' ="):
            sys_val = sys_val.replace("A' =", "", 1).strip()
        
        try:
            gold_tuple = ast.literal_eval(gold_val)
        except Exception as e:
            raise ValueError(f"Error parsing gold A_prime at record {idx}: {e}")
        
        try:
            sys_obj = ast.literal_eval(sys_val)
        except Exception as e:
            raise ValueError(f"Error parsing system A_prime at record {idx}: {e}")
        
        try:
            gold_aspect, gold_label_text = gold_tuple
        except Exception as e:
            raise ValueError(f"Error unpacking gold A_prime at record {idx}: {e}")
        
        if isinstance(sys_obj, tuple) and len(sys_obj) == 2:
            sys_aspect, sys_label_text = sys_obj
        elif isinstance(sys_obj, list) and len(sys_obj) > 0 and isinstance(sys_obj[0], tuple) and len(sys_obj[0]) == 2:
            sys_aspect, sys_label_text = sys_obj[0]
        else:
            raise ValueError(f"Unexpected format in system A_prime at record {idx}: {sys_obj}")
        
        if gold_aspect != sys_aspect:
            raise ValueError(f"Aspect phrase mismatch at record {idx}: Gold aspect '{gold_aspect}' vs System aspect '{sys_aspect}'")
        
        gold_labels_list.append(gold_label_text)
        sys_labels_list.append(sys_label_text)
    
    return pd.DataFrame({'manual_label': gold_labels_list, 'system_label': sys_labels_list})

ground_truth_file = '/Users/innerpiece92/Desktop/NLP_Workspace/AArec/mturk/mturk-marketplace-ready/test/system_results/restaurants/ground_truth_w_split_abstractive_rag.xlsx'
system_file = '/Users/innerpiece92/Desktop/NLP_Workspace/AArec/mturk/mturk-marketplace-ready/test/system_results/restaurants/system_user_profiles_restaurants_reviews_match_rag.xlsx'

df_gold = pd.read_excel(ground_truth_file)
df_sys = pd.read_excel(system_file)
df_labels = extract_labels(df_gold, df_sys)
df_eval = evaluate_performance(df_labels.copy(), label_mapping)
summary = calculate_summary(df_eval)

print("Summary Metrics:")
print(summary)
print("\nDetailed Evaluation (first 5 rows):")
print(df_eval.head())

with pd.ExcelWriter('/Users/innerpiece92/Desktop/NLP_Workspace/AArec/mturk/mturk-marketplace-ready/test/system_results/restaurants/evaluation_results_rag3.xlsx') as writer:
    df_eval.to_excel(writer, sheet_name='Evaluation', index=False)
    summary.to_excel(writer, sheet_name='Summary', index=False)