In [None]:
import os

# WSL path to project directory
wsl_path = "/content/drive/My Drive/MSC thesis/final_datasets/big5_classification"

# Change current working directory to the WSL path
os.chdir(wsl_path)

# Print the current working directory to verify the change
print("Current working directory:", os.getcwd())


In [None]:
"""
Big5 Personality Analysis Pipeline and Visualization

This script processes and analyzes Big5 personality assessment results and generates
visualizations for different analytical perspectives:
1. Combined results processing and analysis
2. NaN occurrence patterns analysis
3. Statistical analysis with ground truth
4. Visualization of accuracy metrics:
   - Overall accuracy by combined condition
   - Trait-level accuracy by condition
   - Accuracy by prompting technique
   - Accuracy by input type
   - Author-level aggregated analysis

This script processes and analyzes Big5 personality assessment results from various conditions:
- Text-only baseline and chain-of-thought (CoT)
- Programmatic features baseline and CoT
- Semantic features baseline and CoT

The script performs three main functions:
1. Combines results from different conditions into a single dataset
2. Analyzes NaN occurrence patterns across conditions
3. Performs statistical analysis on the combined dataset
"""

import os
import json
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

# File paths configuration
PATHS = {
    "text_only_baseline": "big5_text_only_classification/text_only_baseline_big5_results_gpt-4o_temp_1.0.json",
    "text_only_cot": "big5_text_only_classification/text_only_cot_big5_results_gpt-4o_temp_1.0.json",
    "progfeat_baseline": "big5_programmatic_features_classification/progfeat_baseline_big5_openai_gpt-4o_temp_1.0.json",
    "progfeat_cot": "big5_programmatic_features_classification/progfeat_cot_big5_openai_gpt-4o_temp_1.0.json",
    "semantic_baseline": "big5_semantic_features_classification/semfeat_baseline_big5_openai_gpt-4o_temp_1.0.json",
    "semantic_cot": "big5_semantic_features_classification/semfeat_cot_big5_openai_gpt-4o_temp_1.0.json",
    "ground_truth": "../full_chunked_local_minima_pass_2_0.40.json"
}

def load_jsonl_to_records(file_path, prompt_type, input_type):
    """
    Load and process a JSONL file into a list of standardized records.

    Args:
        file_path (str): Path to the JSONL file
        prompt_type (str): Type of prompt ('baseline' or 'cot')
        input_type (str): Type of input ('text_only', 'text_programmatic', or 'text_semantic')

    Returns:
        list: List of dictionaries containing processed records
    """
    records = []
    with open(file_path, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            data = json.loads(line)
            record = {
                "task": "big5",
                "model": "gpt-4o",
                "prompt": prompt_type,
                "input": input_type,
                "author_id": data["author_id"],
                "chunk_number": data["chunk_number"],
                "model_output": data["model_output"]
            }
            records.append(record)
    return records

def contains_nan(model_output_str):
    """
    Check if a model output string contains any 'NaN' values.

    Args:
        model_output_str: String representation of model output JSON

    Returns:
        bool: True if NaN is present, False otherwise
    """
    try:
        if isinstance(model_output_str, str):
            model_output = json.loads(model_output_str.replace("'", "\""))
        else:
            return False

        return any(trait.get("result") == "NaN" for trait in model_output.get("traits", []))
    except (json.JSONDecodeError, TypeError):
        return False

def get_trait_nan_status(model_output_str):
    """
    Extract NaN status for each Big5 trait from model output.

    Args:
        model_output_str: String representation of model output JSON

    Returns:
        dict: Dictionary containing NaN status for each trait
    """
    trait_nan_status = {
        "Openness to Experience": False,
        "Conscientiousness": False,
        "Extroversion": False,
        "Agreeableness": False,
        "Neuroticism": False,
    }
    try:
        if isinstance(model_output_str, str):
            model_output = json.loads(model_output_str.replace("'", "\""))
            for trait in model_output.get("traits", []):
                if trait.get("result") == "NaN":
                    trait_nan_status[trait.get("trait")] = True
    except (json.JSONDecodeError, TypeError):
        pass
    return trait_nan_status

def get_word_count(model_output_str):
    """
    Calculate approximate word count from model output.

    Args:
        model_output_str: String representation of model output JSON

    Returns:
        int: Total word count
    """
    try:
        if isinstance(model_output_str, str):
            model_output = json.loads(model_output_str.replace("'", "\""))
        else:
            return 0

        word_count = 0
        for trait in model_output.get("traits", []):
            # Count words in reasoning steps
            word_count += sum(len(step.split()) for step in trait.get("reasoning_steps", []))
            # Count words in justifications
            word_count += len(trait.get("result_justification", "").split())
            word_count += len(trait.get("confidence_score_justification", "").split())

        return word_count
    except (json.JSONDecodeError, TypeError):
        return 0

def create_visualizations(merged_df):
    """
    Create and display visualizations for accuracy analysis.

    Args:
        merged_df: DataFrame containing the merged results
    """
    # Style configurations
    plt.style.use('seaborn')

    # Color schemes
    cond_colors = {
        "text_only-baseline": "#5DA786",
        "text_only-cot": "#B5E3D8",
        "text_programmatic-baseline": "#F3995F",
        "text_programmatic-cot": "#FBD4B2",
        "text_semantic-baseline": "#7B3294",
        "text_semantic-cot": "#C2A5CF",
    }

    condition_label_map = {
        "text_only-baseline": "Text Only : Baseline",
        "text_only-cot": "Text Only : CoT",
        "text_programmatic-baseline": "Text + Prog : Baseline",
        "text_programmatic-cot": "Text + Prog : CoT",
        "text_semantic-baseline": "Text + Sem : Baseline",
        "text_semantic-cot": "Text + Sem : CoT",
    }

    condition_order = [
        "text_only-baseline",
        "text_only-cot",
        "text_programmatic-baseline",
        "text_programmatic-cot",
        "text_semantic-baseline",
        "text_semantic-cot",
    ]

    # Trait columns
    trait_cols = ["cOPN_accuracy", "cCON_accuracy", "cEXT_accuracy", "cAGR_accuracy", "cNEU_accuracy"]

    # Overall accuracy visualization
    create_overall_accuracy_plot(merged_df, trait_cols, cond_colors, condition_label_map, condition_order)

    # Trait-level accuracy visualization
    create_trait_level_plot(merged_df, trait_cols, cond_colors, condition_label_map, condition_order)

    # Prompting technique visualization
    create_prompt_technique_plot(merged_df, trait_cols)

    # Input type visualization
    create_input_type_plot(merged_df, trait_cols)

    # Author-level aggregated analysis
    create_author_level_analysis(merged_df)

def create_overall_accuracy_plot(df, trait_cols, cond_colors, condition_label_map, condition_order):
    """Create and display the overall accuracy plot"""
    grouped_overall = df.groupby(["input", "prompt"], as_index=False)[trait_cols].mean()
    grouped_overall["Condition"] = grouped_overall["input"] + "-" + grouped_overall["prompt"]
    grouped_overall["MeanOverallAccuracy"] = grouped_overall[trait_cols].mean(axis=1)

    plt.figure(figsize=(7, 5))
    ax = sns.barplot(
        data=grouped_overall,
        x="Condition",
        y="MeanOverallAccuracy",
        palette=cond_colors,
        order=condition_order,
        alpha=0.9,
        dodge=False,
        width=0.4
    )

    format_accuracy_plot(ax, "Overall Accuracy by Combined Condition",
                        condition_label_map, condition_order)
    plt.show()

def create_trait_level_plot(df, trait_cols, cond_colors, condition_label_map, condition_order):
    """Create and display the trait-level accuracy plot"""
    grouped_traits = df.groupby(["input", "prompt"], as_index=False)[trait_cols].mean()
    grouped_traits["Condition"] = grouped_traits["input"] + "-" + grouped_traits["prompt"]

    trait_name_map = {
        "cOPN_accuracy": "Openness to Experience",
        "cCON_accuracy": "Conscientiousness",
        "cEXT_accuracy": "Extroversion",
        "cAGR_accuracy": "Agreeableness",
        "cNEU_accuracy": "Neuroticism",
    }

    melted_traits = format_trait_data(grouped_traits, trait_cols, trait_name_map)

    plt.figure(figsize=(12, 6))
    ax = create_trait_plot(melted_traits, cond_colors, condition_order)
    format_trait_plot(ax, condition_label_map)
    plt.show()

def create_prompt_technique_plot(df, trait_cols):
    """Create and display the prompting technique accuracy plot"""
    grouped_prompt = df.groupby("prompt", as_index=False)[trait_cols].mean()
    grouped_prompt["OverallAccuracy"] = grouped_prompt[trait_cols].mean(axis=1)

    prompt_colors = {
        "baseline": "#5DA786",
        "cot": "#B5E3D8",
    }

    plt.figure(figsize=(5, 4))
    ax = create_prompt_plot(grouped_prompt, prompt_colors)
    format_prompt_plot(ax)
    plt.show()

def create_input_type_plot(df, trait_cols):
    """Create and display the input type accuracy plot"""
    grouped_input = df.groupby("input", as_index=False)[trait_cols].mean()
    grouped_input["OverallAccuracy"] = grouped_input[trait_cols].mean(axis=1)

    input_colors = {
        "text_only": "#5DA786",
        "text_programmatic": "#F3995F",
        "text_semantic": "#7B3294",
    }

    plt.figure(figsize=(6, 4))
    ax = create_input_plot(grouped_input, input_colors)
    format_input_plot(ax)
    plt.show()

def main():
    """Main execution function"""

    # 1. Load and combine all conditions
    all_records = []
    conditions = [
        (PATHS["text_only_baseline"], "baseline", "text_only"),
        (PATHS["text_only_cot"], "cot", "text_only"),
        (PATHS["progfeat_baseline"], "baseline", "text_programmatic"),
        (PATHS["progfeat_cot"], "cot", "text_programmatic"),
        (PATHS["semantic_baseline"], "baseline", "text_semantic"),
        (PATHS["semantic_cot"], "cot", "text_semantic")
    ]

    for file_path, prompt_type, input_type in conditions:
        all_records.extend(load_jsonl_to_records(file_path, prompt_type, input_type))

    # Create combined DataFrame
    big_df = pd.DataFrame(
        all_records,
        columns=["task", "model", "prompt", "input", "author_id", "chunk_number", "model_output"]
    )

    # Save combined results
    out_csv = "big5_combined_results.csv"
    big_df.to_csv(out_csv, index=False)
    print(f"Saved combined DataFrame to {out_csv} with {len(big_df)} rows.")

    # 2. Analyze NaN patterns
    big_df["has_nan"] = big_df["model_output"].apply(contains_nan)

    # Overall NaN statistics
    total_rows = len(big_df)
    total_nans = big_df["has_nan"].sum()
    print(f"\nNaN Statistics:")
    print(f"Total rows: {total_rows}")
    print(f"Total rows with at least one NaN: {total_nans}")
    print(f"Overall NaN rate: {total_nans / total_rows * 100:.2f}%")

    # NaN counts by condition
    print("\nNaN counts by prompt:")
    for prompt in big_df["prompt"].unique():
        prompt_df = big_df[big_df["prompt"] == prompt]
        num_nans = prompt_df["has_nan"].sum()
        print(f"  {prompt}: {num_nans} (out of {len(prompt_df)}) - {num_nans / len(prompt_df) * 100:.2f}%")

    print("\nNaN counts by input type:")
    for input_type in big_df["input"].unique():
        input_df = big_df[big_df["input"] == input_type]
        num_nans = input_df["has_nan"].sum()
        print(f"  {input_type}: {num_nans} (out of {len(input_df)}) - {num_nans / len(input_df) * 100:.2f}%")

    # Trait-specific NaN analysis
    trait_nan_status_df = big_df["model_output"].apply(get_trait_nan_status).apply(pd.Series)
    big_df = pd.concat([big_df, trait_nan_status_df], axis=1)

    print("\nNaN counts by trait:")
    for trait in trait_nan_status_df.columns:
        num_nans = big_df[trait].sum()
        print(f"  {trait}: {num_nans} (out of {len(big_df)}) - {num_nans / len(big_df) * 100:.2f}%")

    # 3. Statistical analysis with ground truth
    # Load ground truth data
    ground_rows = []
    with open(PATHS["ground_truth"], "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if line:
                ground_rows.append(json.loads(line))

    gt_df = pd.DataFrame(ground_rows)
    gt_df = gt_df.rename(columns={"#AUTHID": "author_id", "Chunk Number": "chunk_number"})

    # Merge with ground truth
    merged_df = pd.merge(big_df, gt_df, on=["author_id", "chunk_number"], how="inner")

    # Calculate statistics
    num_authors = merged_df["author_id"].nunique()
    num_chunks = len(merged_df)
    chunks_per_author = merged_df.groupby("author_id")["chunk_number"].count()
    merged_df["word_count_actual"] = merged_df["TEXT"].str.split().str.len()

    # Print statistical results
    print("\nDataset Statistics:")
    print(f"Number of unique authors: {num_authors}")
    print(f"Number of unique chunks: {num_chunks}")
    print("\nChunks per author statistics:")
    print(chunks_per_author.describe())
    print("\nActual word count per chunk statistics:")
    print(merged_df["word_count_actual"].describe())
    print("\nDistribution of prompt types:")
    print(merged_df["prompt"].value_counts())
    print("\nDistribution of input types:")
    print(merged_df["input"].value_counts())

def create_author_level_analysis(df):
    """Create and display author-level aggregated analysis"""
    def majority_vote(series):
        """Return the most frequent value or NaN if no valid prediction exists."""
        if series.mode().empty:
            return "NaN"
        return series.mode()[0]

    # Add combined condition column
    df["Condition"] = df["input"] + " : " + df["prompt"]

    # Aggregate to author level
    author_level = aggregate_to_author_level(df, majority_vote)

    # Calculate and visualize accuracy scores
    condition_accuracy_df = calculate_condition_accuracy(author_level)

    # Create visualizations
    create_author_level_plots(condition_accuracy_df, author_level)

def aggregate_to_author_level(df, majority_vote):
    """Aggregate chunk-level predictions to author-level"""
    author_level_conditions = df.groupby(["author_id", "Condition"]).agg(
        {f"{trait}_pred": majority_vote for trait in ["cOPN", "cCON", "cEXT", "cAGR", "cNEU"]}
    ).reset_index()

    ground_truth = df.groupby("author_id").first()[["cOPN", "cCON", "cEXT", "cAGR", "cNEU"]]
    return author_level_conditions.merge(ground_truth, on="author_id", how="inner")

def calculate_condition_accuracy(author_level):
    """Calculate accuracy scores per condition for each trait"""
    condition_accuracy_scores = {}
    for condition in author_level["Condition"].unique():
        condition_data = author_level[author_level["Condition"] == condition]
        accuracy_scores = calculate_trait_accuracy(condition_data)
        condition_accuracy_scores[condition] = accuracy_scores

    return format_condition_accuracy(condition_accuracy_scores)

def create_author_level_plots(condition_accuracy_df, author_level):
    """Create and display author-level visualizations"""
    cond_colors = {
        "text_only : baseline": "#5DA786",
        "text_only : cot": "#B5E3D8",
        "text_programmatic : baseline": "#F3995F",
        "text_programmatic : cot": "#FBD4B2",
        "text_semantic : baseline": "#7B3294",
        "text_semantic : cot": "#C2A5CF",
    }

    condition_label_map = {
        "text_only : baseline": "Text Only : Baseline",
        "text_only : cot": "Text Only : CoT",
        "text_programmatic : baseline": "Text + Prog : Baseline",
        "text_programmatic : cot": "Text + Prog : CoT",
        "text_semantic : baseline": "Text + Sem : Baseline",
        "text_semantic : cot": "Text + Sem : CoT",
    }

    create_author_overall_plot(condition_accuracy_df, cond_colors, condition_label_map, author_level)
    create_author_trait_plot(condition_accuracy_df, cond_colors, condition_label_map, author_level)

if __name__ == "__main__":
    main()