<a href="https://colab.research.google.com/gist/karinstahel/b3624de8953a0e3773b53c8ef9ddc638/digi405-lab-4-3-facilitator-k-alpha.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# DIGI405 Lab 4.3: Facilitator Notebook

This notebook can be used to collate the class annotations, measure agreement with Krippendorff's Alpha, and find the texts with the lowest and highest average confidence, and the greatest confidence range.

You don't need to change any code apart from adding the zip file path.

1. Upload a zip file of the labelled CSV files from the class
2. Enter the path to the zip as the `zip_file_path` variable
3. Run the cells - do a visual check of output to make sure it is as expected
4. Share the `=== CONFIDENCE ANALYSIS PER TEXT ===` results with the class
5. Share the `Krippendorff's alpha:` score with the class and the `encoded_values.csv` which is in a format that students can upload directly to the online K-Alpha calcuator (test this first - the result using the 'Nominal' data type option should be the same as calculated here.



In [None]:
import pandas as pd
import numpy as np
import zipfile
import re
import io
from typing import Union

In [None]:
zip_file_path = "df_labelled_test.zip"

In [None]:
def process_csv_zip(zip_file_path):
    """
    Given multiple CSV files in a zip archive, concatenate into a single dataframe.

    This function processes CSV files matching the pattern "df_labelled_*.csv" from
    a zip archive. It merges them horizontally based on the "text" column, renames
    the "label" and "confidence" columns with suffixes based on the filename, and
    adds encoded columns that convert text labels to integers.
    """
    with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
        csv_files = [f for f in zip_ref.namelist() if f.startswith("df_labelled_") and f.endswith(".csv")]

        if not csv_files:
            print("No matching CSV files found in the zip archive.")
            return None

        first_file = csv_files[0]
        match = re.search(r"df_labelled_(.+)\.csv", first_file)
        first_suffix = match.group(1) if match else "unknown"

        with zip_ref.open(first_file) as f:
            result_df = pd.read_csv(io.TextIOWrapper(f, encoding = "utf-8"))

        # Rename columns in first dataframe
        result_df = result_df.rename(columns = {
            "label": f"label_{first_suffix}",
            "confidence": f"confidence_{first_suffix}"
        })

        # Create mapping dictionary for label encoding
        label_mapping = {"NEGATIVE": 0, "NEUTRAL": 1, "POSITIVE": 2}

        # Create encoded column for first dataframe
        result_df[f"encoded_{first_suffix}"] = result_df[f"label_{first_suffix}"].map(label_mapping)

        # Process remaining CSV files
        for file in csv_files[1:]:
            # Extract suffix from filename
            match = re.search(r"df_labelled_(.+)\.csv", file)
            if match:
                suffix = match.group(1)

                with zip_ref.open(file) as f:
                    df = pd.read_csv(io.TextIOWrapper(f, encoding = "utf-8"))

                # Create temporary df with renamed columns
                temp_df = df[["text", "label", "confidence"]].rename(columns={
                    "label": f"label_{suffix}",
                    "confidence": f"confidence_{suffix}"
                })

                # Create encoded column
                temp_df[f"encoded_{suffix}"] = temp_df[f"label_{suffix}"].map(label_mapping)

                # Merge with result dataframe
                result_df = pd.merge(result_df, temp_df, on = "text", how = "outer")

        result_df = result_df.reset_index(drop=True)

        return result_df

In [None]:
merged_df = process_csv_zip(zip_file_path)

In [None]:
# Check the resulting df
merged_df

In [None]:
def analyse_confidence_per_text(merged_df):
    """
    Get the average, min, max, and range of
    confidence per text.

    Print results and return as a dict.
    """
    df = merged_df.copy()
    confidence_cols = [col for col in df.columns if "confidence_" in col]

    if not confidence_cols:
        print("No confidence columns found in the dataframe")
        return None

    for col in confidence_cols:
        if df[col].dtype.kind not in "iuf":  # if not integer, unsigned int, or float
            df[col] = df[col].astype(str).str.rstrip("%").astype(int)

    # Calculate statistics per text
    df["avg_confidence"] = df[confidence_cols].mean(axis = 1)
    df["min_confidence"] = df[confidence_cols].min(axis = 1)
    df["max_confidence"] = df[confidence_cols].max(axis = 1)
    df["range_confidence"] = df["max_confidence"] - df["min_confidence"]

    highest_avg_idx = df["avg_confidence"].idxmax()
    lowest_avg_idx = df["avg_confidence"].idxmin()
    highest_range_idx = df["range_confidence"].idxmax()

    # Create result dictionary
    results = {
        "highest_avg_confidence": {
            "text": df.loc[highest_avg_idx, "text"],
            "average": df.loc[highest_avg_idx, "avg_confidence"],
        },
        "lowest_avg_confidence": {
            "text": df.loc[lowest_avg_idx, "text"],
            "average": df.loc[lowest_avg_idx, "avg_confidence"],
        },
        "highest_range": {
            "text": df.loc[highest_range_idx, "text"],
            "range": df.loc[highest_range_idx, "range_confidence"],
            "min": df.loc[highest_range_idx, "min_confidence"],
            "max": df.loc[highest_range_idx, "max_confidence"],
        }
    }

    # Print results in a readable format
    print("\n=== CONFIDENCE ANALYSIS PER TEXT ===\n")

    print("TEXT WITH HIGHEST AVERAGE CONFIDENCE:")
    print(f"Text: \'{results["highest_avg_confidence"]["text"]}\'")
    print(f"Average confidence: {results["highest_avg_confidence"]["average"]:.2f}")
    print()

    print("TEXT WITH LOWEST AVERAGE CONFIDENCE:")
    print(f"Text: \'{results["lowest_avg_confidence"]["text"]}\'")
    print(f"Average confidence: {results["lowest_avg_confidence"]["average"]:.2f}")
    print()

    print("TEXT WITH BIGGEST CONFIDENCE RANGE:")
    print(f"Text: \'{results["highest_range"]["text"]}\'")
    print(f"Range: {results["highest_range"]["range"]} (from {results["highest_range"]["min"]} to {results["highest_range"]["max"]})")

    return results

In [None]:
results = analyse_confidence_per_text(merged_df)

In [None]:
# Calculate Krippendorff's alpha
# Developed with help from Claude 3.7 Sonnet
# Using method 'C. Nominal data, any number of observers, missing data' from:
# Krippendorff, K. (2011, January 25). Computing Krippendorff’s Alpha-Reliability. https://repository.upenn.edu/handle/20.500.14332/2089

def calculate_krippendorff_alpha(data: Union[pd.DataFrame, np.ndarray], verbose=True) -> float:
    """
    Calculate Krippendorff's alpha for nominal data.

    Args:
        data: DataFrame or array where rows are units and columns are observers
        verbose: Whether to print detailed information about the calculation

    Returns:
        Krippendorff's alpha coefficient
    """
    # Convert to numpy array with NaN for missing values
    if isinstance(data, pd.DataFrame):
        data = data.values
    data = np.where(data == 'NA', np.nan, data).astype(float)

    # Initialise coincidence matrix as dictionary
    coincidences = {}
    value_counts = {}
    total_coincidences = 0
    total_values = 0
    total_pairs = 0  # Track the actual number of pairs

    # For each unit (row)
    for unit in data:
        # Get valid values in this unit
        valid_values = unit[~np.isnan(unit)]
        mu = len(valid_values)

        if mu <= 1:
            continue  # Skip units with 0 or 1 valid value

        # Add to total values that can be paired
        total_values += mu

        # Add to total pairs within this unit
        unit_pairs = mu * (mu - 1)
        total_pairs += unit_pairs

        # Calculate all coincidences within this unit
        for i, val1 in enumerate(valid_values):
            for j, val2 in enumerate(valid_values):
                if i != j:  # Don't pair a value with itself
                    # Create pair key for coincidence matrix
                    pair = (val1, val2)

                    # Add to coincidence matrix with weight 1/(mu-1)
                    coincidences[pair] = coincidences.get(pair, 0) + 1/(mu-1)

                    # Update value count for val1
                    value_counts[val1] = value_counts.get(val1, 0) + 1/(mu-1)

                    # Add to total coincidences
                    total_coincidences += 1/(mu-1)

    # Calculate observed disagreement (Do)
    Do = 0
    for (val1, val2), count in coincidences.items():
        if val1 != val2:  # For nominal data, disagreement when values differ
            Do += count

    # Calculate expected disagreement (De)
    De = 0
    for val1, count1 in value_counts.items():
        for val2, count2 in value_counts.items():
            if val1 != val2:
                De += (count1 * count2) / (total_coincidences - 1)

    # Calculate alpha
    alpha = 1 - (Do / De)

    if verbose:
        print(f"Total pairable values: {total_values}")
        print(f"Total pairs compared: {total_pairs}")
        print(f"Total coincidences (n): {total_coincidences}")
        print(f"Value counts (n.c): {value_counts}")
        print(f"Observed disagreement (Do): {Do}")
        print(f"Expected disagreement (De): {De}")
        print(f"Krippendorff's alpha: {alpha:.3f}")

    return alpha

In [None]:
# Extract just the encoded columns from the merged dataframe
encoded_cols = [col for col in merged_df.columns if "encoded_" in col]
encoded_df = merged_df[encoded_cols]

encoded_df = encoded_df.fillna("NA")

print(f"Calculating Krippendorff's alpha for {len(encoded_cols)} encoded columns")
print(f"Number of texts analysed: {len(encoded_df)}")

alpha = calculate_krippendorff_alpha(encoded_df, verbose=True)

In [None]:
# encoded_df

In [None]:
encoded_df.to_csv("encoded_values.csv", header=False, index=False)
print("Saved encoded values to encoded_values.csv (no headers, no index)\nDownload this file and share with the class for testing with online K-Alpha calculator\nPlease test first!")