# LLM Requirements Traceability Analysis
**Analysis and evaluation of LLM-based requirements traceability results with performance metrics, ROC curves, confusion matrices, and comparative evaluation across multiple models.**

In [None]:
# Cell [0] - Imports and Setup
# Purpose: Import all required libraries and configure environment settings for LLM traceability analysis
# Dependencies: pandas, numpy, matplotlib, seaborn, sklearn, neo4j, dotenv, re, collections, datetime, typing
# Breadcrumbs: Setup -> Imports and Configuration

import os
import sys
import logging
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import re
import datetime
from collections import Counter
from typing import Dict, List, Any
from dotenv import load_dotenv
from neo4j import GraphDatabase
from sklearn.metrics import (
    confusion_matrix, precision_recall_fscore_support, accuracy_score,
    roc_curve, roc_auc_score, precision_recall_curve, auc
)

def setup_analysis_environment():
    """
    Configure logging and load environment variables for analysis
    
    Returns:
        dict: Configuration parameters for analysis
    """
    # Configure logging
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s [%(levelname)s] %(name)s: %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S'
    )
    logger = logging.getLogger(__name__)
    
    # Load environment variables
    load_dotenv()
    
    # Extract configuration from environment variables
    model_env_var = os.getenv('CURRENT_MODEL')
    current_model = os.getenv(model_env_var) if model_env_var else None
    
    config = {
        'NEO4J_URI': os.getenv('NEO4J_URI'),
        'NEO4J_USER': os.getenv('NEO4J_USER'),
        'NEO4J_PASSWORD': os.getenv('NEO4J_PASSWORD'),
        'NEO4J_PROJECT_NAME': os.getenv('NEO4J_PROJECT_NAME'),
        'MODEL_ENV_VAR': model_env_var,
        'CURRENT_MODEL': current_model,
        'SHOW_VISUALIZATION': os.getenv('SHOW_VISUALIZATION', 'True').lower() == 'true',
        'MIN_TRACEABILITY_THRESHOLD': int(os.getenv('MIN_TRACEABILITY_THRESHOLD', '1'))
    }
    
    # Log configuration
    logger.info(f"Analyzing project: {config['NEO4J_PROJECT_NAME']}")
    logger.info(f"Using model: {config['CURRENT_MODEL']}")
    logger.info(f"Show visualization: {config['SHOW_VISUALIZATION']}")
    logger.info(f"Minimum traceability threshold: {config['MIN_TRACEABILITY_THRESHOLD']}")
    
    return config, logger

# Execute setup when cell is run
CONFIG, logger = setup_analysis_environment()
NEO4J_URI = CONFIG['NEO4J_URI']
NEO4J_USER = CONFIG['NEO4J_USER']
NEO4J_PASSWORD = CONFIG['NEO4J_PASSWORD']
NEO4J_PROJECT_NAME = CONFIG['NEO4J_PROJECT_NAME']
MODEL_ENV_VAR = CONFIG['MODEL_ENV_VAR']
CURRENT_MODEL = CONFIG['CURRENT_MODEL']
SHOW_VISUALIZATION = CONFIG['SHOW_VISUALIZATION']
MIN_TRACEABILITY_THRESHOLD = CONFIG['MIN_TRACEABILITY_THRESHOLD']

In [None]:
# Cell [1] - Neo4j Connection Setup
# Purpose: Create connection to Neo4j database containing LLM traceability analysis results
# Dependencies: neo4j, logging
# Breadcrumbs: Setup -> Database Connection

def create_neo4j_driver():
    """
    Create and return a Neo4j driver instance for accessing traceability data
    
    Returns:
    --------
    neo4j.Driver
        Connected Neo4j driver instance
    """
    try:
        driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))
        logger.info("Successfully connected to Neo4j database")
        return driver
    except Exception as e:
        logger.error(f"Failed to connect to Neo4j: {str(e)}")
        raise

# Create Neo4j driver
driver = create_neo4j_driver()

In [None]:
# Cell [2] - Query Ground Truth Data
# Purpose: Retrieve ground truth traceability links between requirements from Neo4j database
# Dependencies: pandas, neo4j, logging, typing
# Breadcrumbs: Data Acquisition -> Ground Truth Links

logger = logging.getLogger(__name__)

def query_ground_truth(driver, project_name):
    """
    Query ground truth trace links from Neo4j
    
    Parameters:
    -----------
    driver : neo4j.Driver
        Neo4j database driver connection
    project_name : str
        Name of the project to query
        
    Returns:
    --------
    pd.DataFrame
        DataFrame containing ground truth links
    """
    try:
        ground_truth_query = """
        MATCH (p:Project {name: $project_name})-[:CONTAINS]->(d:Document)-[:CONTAINS]->(source:Requirement)-[r:GROUND_TRUTH]->(target:Requirement)
        RETURN 
            p.name as project_name,
            p.description as project_description,
            d.id as document_id,
            source.id as source_id, 
            source.type as source_type,
            target.id as target_id,
            target.type as target_type,
            true as is_ground_truth
        ORDER BY source.id, target.id
        """
        
        with driver.session() as session:
            # Execute query with project name parameter
            ground_truth_results = session.run(
                ground_truth_query, 
                project_name=project_name
            ).data()
            
            if not ground_truth_results:
                logger.warning(f"No ground truth data found for project {project_name}")
                return pd.DataFrame(
                    columns=['project_name', 'document_id', 'source_id', 'target_id', 'is_ground_truth']
                )
            
            ground_truth_df = pd.DataFrame(ground_truth_results)
            logger.info(f"Retrieved {len(ground_truth_df)} ground truth links for project {project_name}")
            
            # Log some statistics
            if 'source_id' in ground_truth_df.columns:
                unique_sources = ground_truth_df['source_id'].nunique()
                logger.info(f"Ground truth contains {unique_sources} unique source requirements")
            
            if 'target_id' in ground_truth_df.columns:
                unique_targets = ground_truth_df['target_id'].nunique()
                logger.info(f"Ground truth contains {unique_targets} unique target requirements")
                
            return ground_truth_df
            
    except Exception as e:
        logger.error(f"Error querying ground truth data: {str(e)}")
        logger.error("Exception details:", exc_info=True)
        # Return empty DataFrame on error with consistent columns
        return pd.DataFrame(
            columns=['project_name', 'document_id', 'source_id', 'target_id', 'is_ground_truth']
        )

# Query ground truth data
ground_truth_df = query_ground_truth(driver, NEO4J_PROJECT_NAME)

# Display and analyze ground truth data
print(f"\nGround Truth Data for {NEO4J_PROJECT_NAME}:")
print("=" * 80)
display(ground_truth_df[['project_name', 'source_id', 'target_id', 'is_ground_truth']].head(10))
print(f"Total ground truth links: {len(ground_truth_df)}")

# If we have results, display additional information
if not ground_truth_df.empty:
    # Display unique documents if available
    if 'document_id' in ground_truth_df.columns:
        print("\nUnique documents with ground truth links:")
        for doc_id in ground_truth_df['document_id'].unique():
            doc_links = ground_truth_df[ground_truth_df['document_id'] == doc_id]
            print(f"- Document {doc_id}: {len(doc_links)} links")
    
    # Display source to target ratio
    unique_sources = ground_truth_df['source_id'].nunique()
    unique_targets = ground_truth_df['target_id'].nunique()
    print(f"\nUnique source requirements: {unique_sources}")
    print(f"Unique target requirements: {unique_targets}")
    print(f"Average links per source: {len(ground_truth_df) / unique_sources:.2f}")
else:
    print("\nNo ground truth links found. Possible causes:")
    print("1. The project doesn't have any ground truth links defined")
    print("2. The GROUND_TRUTH relationship type doesn't exist")
    print("3. There was an error in the query (check logs)")

In [None]:
# Cell [3] - Query LLM Results Data
# Purpose: Retrieve LLM prediction results from Neo4j for different evaluation methods
# Dependencies: pandas, neo4j, logging
# Breadcrumbs: Data Acquisition -> LLM Results

def query_llm_results(driver, project_name, model):
    """
    Query different types of LLM results from Neo4j for comparison
    
    Parameters:
    -----------
    driver : neo4j.Driver
        Neo4j database driver connection
    project_name : str
        Name of the project to query
    model : str
        Model name to filter results
        
    Returns:
    --------
    tuple of pd.DataFrame
        (meta_judge_df, transformers_df, llm_result_df)
    """
    try:
        # Query for meta-judge links
        meta_judge_query = """
        MATCH (p:Project)-[:CONTAINS]->(d:Document)-[:CONTAINS]->(source:Requirement)-[r:LLM_RESULT_META_JUDGE]->(target:Requirement)
        WHERE p.name = $project_name AND source.type = 'SOURCE' AND r.model = $model
        RETURN 
            p.name as project_name,
            source.id as source_id,
            target.id as target_id,
            r.is_traceable as is_traceable_meta,
            r.judge_score as judge_score_meta,
            r.semantic_alignment as semantic_alignment_meta,
            r.non_functional_coverage as non_functional_coverage_meta,
            r.final_score as final_score_meta,
            r.actor_score as actor_score_meta,
            r.functional_completeness as functional_completeness_meta,
            r.model as model,
            'meta_judge' as result_type
        """
        
        # Query for transformer results
        transformers_query = """
        MATCH (p:Project)-[:CONTAINS]->(d:Document)-[:CONTAINS]->(source:Requirement)-[r:LLM_RESULT_WITH_TRANSFORMERS]->(target:Requirement)
        WHERE p.name = $project_name AND source.type = 'SOURCE' AND r.model = $model
        RETURN 
            p.name as project_name,
            source.id as source_id,
            target.id as target_id,
            r.is_associated as is_associated,
            r.association_probability as association_probability,
            r.explanation as explanation,
            r.transformers_utilized as transformers_utilized,
            r.model as model,
            'transformer' as result_type
        """
        
        # Query for basic LLM results
        llm_query = """
        MATCH (p:Project)-[:CONTAINS]->(d:Document)-[:CONTAINS]->(source:Requirement)-[r:LLM_RESULT]->(target:Requirement)
        WHERE p.name = $project_name AND source.type = 'SOURCE' AND r.model = $model
        RETURN 
            p.name as project_name,
            source.id as source_id,
            target.id as target_id,
            r.is_associated as is_associated,
            r.association_probability as association_probability,
            r.explanation as explanation,
            r.model as model,
            'llm' as result_type
        """
        
        with driver.session() as session:
            # Execute queries with parameters
            meta_judge_results = session.run(
                meta_judge_query, 
                project_name=project_name, 
                model=model
            ).data()
            
            transformers_results = session.run(
                transformers_query, 
                project_name=project_name, 
                model=model
            ).data()
            
            llm_results = session.run(
                llm_query, 
                project_name=project_name, 
                model=model
            ).data()
            
            # Create DataFrames
            meta_judge_df = pd.DataFrame(meta_judge_results) if meta_judge_results else pd.DataFrame()
            transformers_df = pd.DataFrame(transformers_results) if transformers_results else pd.DataFrame()
            llm_result_df = pd.DataFrame(llm_results) if llm_results else pd.DataFrame()
            
            logger.info(f"Retrieved {len(meta_judge_df)} meta-judge links for {project_name} with model {model}")
            logger.info(f"Retrieved {len(transformers_df)} transformer results for {project_name} with model {model}")
            logger.info(f"Retrieved {len(llm_result_df)} basic LLM results for {project_name} with model {model}")
            
            return meta_judge_df, transformers_df, llm_result_df
            
    except Exception as e:
        logger.error(f"Error querying LLM results: {str(e)}")
        raise

# Query LLM results
meta_judge_df, transformers_df, llm_result_df = query_llm_results(driver, NEO4J_PROJECT_NAME, CURRENT_MODEL)

# Display sample results for each type
print(f"\nMeta Judge Results for {NEO4J_PROJECT_NAME} using {CURRENT_MODEL}:")
print("=" * 80)
if not meta_judge_df.empty:
    display(meta_judge_df.head())
    print(f"Total meta judge links: {len(meta_judge_df)}")
else:
    print("No meta judge results found")

print(f"\nTransformer Results for {NEO4J_PROJECT_NAME} using {CURRENT_MODEL}:")
print("=" * 80)
if not transformers_df.empty:
    display(transformers_df.head())
    print(f"Total transformer results: {len(transformers_df)}")
else:
    print("No transformer results found")

print(f"\nBasic LLM Results for {NEO4J_PROJECT_NAME} using {CURRENT_MODEL}:")
print("=" * 80)
if not llm_result_df.empty:
    display(llm_result_df.head())
    print(f"Total basic LLM results: {len(llm_result_df)}")
else:
    print("No basic LLM results found")

In [None]:
# Cell [4] - Prepare Data for Evaluation
# Purpose: Prepare and align LLM results with ground truth data for performance evaluation
# Dependencies: pandas, logging
# Breadcrumbs: Data Processing -> Evaluation Preparation

def prepare_evaluation_data(ground_truth_df, result_df, threshold=0.5, mode='is_traceable'):
    """
    Prepare data for evaluation by comparing results with ground truth
    
    Parameters:
    -----------
    ground_truth_df : pd.DataFrame
        DataFrame containing ground truth links
    result_df : pd.DataFrame
        DataFrame containing LLM result links
    threshold : float or int, optional
        Threshold for considering a link as positive (default: 0.5)
    mode : str
        Which field to use for evaluation: 'is_traceable', 'actor_score', or 'final_score'
        
    Returns:
    --------
    pd.DataFrame
        DataFrame with evaluation metrics
    """
    try:
        if ground_truth_df.empty or result_df.empty:
            logger.warning("Ground truth or result DataFrame is empty")
            return pd.DataFrame()
        
        # Create a mapping of source_id to a set of target_ids from ground truth
        ground_truth_map = {}
        for _, row in ground_truth_df.iterrows():
            if row['source_id'] not in ground_truth_map:
                ground_truth_map[row['source_id']] = set()
            ground_truth_map[row['source_id']].add(row['target_id'])
        
        # List to store evaluation results
        eval_results = []
        
        # Determine which fields to use based on mode
        if mode == 'is_traceable':
            is_traceable_col = 'is_traceable_meta'
        elif mode == 'actor_score':
            score_col = 'actor_score_meta'
        elif mode == 'final_score':
            score_col = 'final_score_meta'
        
        # Process each row in the result DataFrame
        for _, row in result_df.iterrows():
            source_id = row['source_id']
            target_id = row['target_id']
            
            # Determine if this is a ground truth link
            is_ground_truth = False
            if source_id in ground_truth_map and target_id in ground_truth_map[source_id]:
                is_ground_truth = True
            
            # Determine if this is a predicted link based on the mode
            is_predicted = False
            if mode == 'is_traceable':
                if is_traceable_col in row and row[is_traceable_col] is not None:
                    if isinstance(row[is_traceable_col], bool):
                        is_predicted = row[is_traceable_col]
                    elif isinstance(row[is_traceable_col], str):
                        is_predicted = row[is_traceable_col].lower() == 'true'
                    else:
                        try:
                            is_predicted = bool(row[is_traceable_col])
                        except:
                            logger.warning(f"Couldn't convert {is_traceable_col} value to boolean")
                            continue
            else:  # actor_score or final_score
                if score_col in row and row[score_col] is not None:
                    try:
                        score_value = int(row[score_col])
                        is_predicted = score_value >= MIN_TRACEABILITY_THRESHOLD
                    except:
                        logger.warning(f"Couldn't convert {score_col} value to int")
                        continue
            
            # Create evaluation entry
            eval_entry = {
                'source_id': source_id,
                'target_id': target_id,
                'model': row.get('model', CURRENT_MODEL),
                'is_ground_truth': is_ground_truth,
                'is_predicted': is_predicted,
                'result_type': row.get('result_type', 'unknown'),
                'evaluation_mode': mode
            }
            
            # Add score if available
            if mode != 'is_traceable' and score_col in row and row[score_col] is not None:
                try:
                    eval_entry['score'] = int(row[score_col])
                except:
                    eval_entry['score'] = 0
            
            eval_results.append(eval_entry)
        
        return pd.DataFrame(eval_results)
    
    except Exception as e:
        logger.error(f"Error preparing evaluation data: {str(e)}")
        raise

# Prepare evaluation data for each result type and each evaluation mode
eval_data = {}

if not meta_judge_df.empty:
    # Process all three evaluation modes
    eval_modes = ['is_traceable', 'actor_score', 'final_score']
    
    for mode in eval_modes:
        mode_key = f"meta_judge_{mode}"
        eval_data[mode_key] = prepare_evaluation_data(ground_truth_df, meta_judge_df, mode=mode)
        print(f"\nMeta Judge Evaluation Data ({mode}):")
        print("=" * 80)
        display(eval_data[mode_key].head())
        print(f"Total evaluation entries: {len(eval_data[mode_key])}")

if not transformers_df.empty:
    eval_data['transformer'] = prepare_evaluation_data(ground_truth_df, transformers_df)
    print("\nTransformer Evaluation Data:")
    print("=" * 80)
    display(eval_data['transformer'].head())
    print(f"Total evaluation entries: {len(eval_data['transformer'])}")

if not llm_result_df.empty:
    eval_data['llm'] = prepare_evaluation_data(ground_truth_df, llm_result_df)
    print("\nBasic LLM Evaluation Data:")
    print("=" * 80)
    display(eval_data['llm'].head())
    print(f"Total evaluation entries: {len(eval_data['llm'])}")

In [None]:
# Cell [5] - Calculate Performance Metrics
# Purpose: Calculate comprehensive performance metrics (precision, recall, F1, accuracy, MCC)
# Dependencies: sklearn, numpy, pandas, logging
# Breadcrumbs: Analysis -> Performance Metrics

def calculate_metrics(eval_df):
    """
    Calculate performance metrics for the evaluation data
    
    Parameters:
    -----------
    eval_df : pd.DataFrame
        DataFrame with evaluation results
        
    Returns:
    --------
    dict
        Dictionary with performance metrics
    """
    try:
        if eval_df.empty:
            logger.warning("Evaluation DataFrame is empty - cannot calculate metrics")
            return {}
        
        # Extract true and predicted labels
        y_true = eval_df['is_ground_truth'].astype(bool).values
        y_pred = eval_df['is_predicted'].astype(bool).values
        
        # Calculate confusion matrix
        tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
        
        # Calculate precision, recall, f1-score
        precision, recall, f1, _ = precision_recall_fscore_support(y_true, y_pred, average='binary')
        
        # Calculate accuracy
        accuracy = accuracy_score(y_true, y_pred)
        
        # False positive rate
        fpr = fp / (fp + tn) if (fp + tn) > 0 else 0
        
        # False negative rate
        fnr = fn / (fn + tp) if (fn + tp) > 0 else 0
        
        # Calculate F2 score (weighs recall higher than precision)
        beta = 2
        f2 = (1 + beta**2) * (precision * recall) / ((beta**2 * precision) + recall) if (precision + recall) > 0 else 0
        
        # Calculate Matthews correlation coefficient (MCC)
        mcc_numerator = (tp * tn) - (fp * fn)
        mcc_denominator = np.sqrt((tp + fp) * (tp + fn) * (tn + fp) * (tn + fn))
        mcc = mcc_numerator / mcc_denominator if mcc_denominator != 0 else 0
        
        metrics = {
            'true_positives': tp,
            'false_positives': fp,
            'true_negatives': tn,
            'false_negatives': fn,
            'precision': precision,
            'recall': recall,
            'f1_score': f1,
            'f2_score': f2,
            'accuracy': accuracy,
            'false_positive_rate': fpr,
            'false_negative_rate': fnr,
            'mcc': mcc
        }
        
        return metrics
    
    except Exception as e:
        logger.error(f"Error calculating metrics: {str(e)}")
        raise

# Calculate metrics for each result type and evaluation mode
metrics_results = {}

for result_type, df in eval_data.items():
    if not df.empty:
        metrics = calculate_metrics(df)
        metrics_results[result_type] = metrics
        
        # Extract mode for display
        if result_type.startswith('meta_judge_'):
            mode = result_type.split('_', 2)[2]
            display_type = f"META_JUDGE ({mode.upper()})"
        else:
            display_type = result_type.upper()
            
        print(f"\nPerformance Metrics for {display_type}:")
        print("=" * 80)
        for metric, value in metrics.items():
            print(f"{metric.replace('_', ' ').title()}: {value:.4f}" if isinstance(value, float) else f"{metric.replace('_', ' ').title()}: {value}")

In [None]:
# Cell [6] - Compare Metrics Across Result Types
# Purpose: Compare performance metrics across different LLM evaluation methods and visualize results
# Dependencies: pandas, matplotlib, seaborn
# Breadcrumbs: Analysis -> Performance Comparison

if len(metrics_results) > 0:
    # Create comparison DataFrame
    metrics_comparison = pd.DataFrame(metrics_results).T
    
    print("\nMetrics Comparison Across Result Types:")
    print("=" * 80)
    display(metrics_comparison)
    
    # Plot comparison bar chart for key metrics only if visualization is enabled
    if SHOW_VISUALIZATION:
        key_metrics = ['precision', 'recall', 'f1_score', 'accuracy']
        
        # Create meta-judge comparison plot
        meta_judge_results = {k: v for k, v in metrics_results.items() if k.startswith('meta_judge_')}
        if len(meta_judge_results) > 0:
            meta_judge_df = pd.DataFrame(meta_judge_results).T
            
            # Rename the index for better display
            meta_judge_df.index = meta_judge_df.index.str.replace('meta_judge_', '').str.upper()
            
            plt.figure(figsize=(12, 8))
            meta_judge_df[key_metrics].plot(kind='bar')
            plt.title(f'Meta-Judge Evaluation Method Comparison - {NEO4J_PROJECT_NAME} ({CURRENT_MODEL})')
            plt.xlabel('Evaluation Method')
            plt.ylabel('Score')
            plt.ylim(0, 1)
            plt.xticks(rotation=0)
            plt.legend(title='Metric')
            plt.grid(axis='y', linestyle='--', alpha=0.7)
            plt.tight_layout()
            plt.show()
            
            # Create a heatmap for the confusion matrix data
            confusion_metrics = ['true_positives', 'false_positives', 'true_negatives', 'false_negatives']
            if all(metric in meta_judge_df.columns for metric in confusion_metrics):
                plt.figure(figsize=(10, 6))
                sns.heatmap(meta_judge_df[confusion_metrics], annot=True, fmt="g", cmap="YlGnBu")
                plt.title(f'Confusion Matrix Metrics Comparison - {NEO4J_PROJECT_NAME}')
                plt.tight_layout()
                plt.show()
        
        # Plot all result types for comparison
        plt.figure(figsize=(12, 8))
        metrics_comparison[key_metrics].plot(kind='bar')
        plt.title(f'Performance Metrics Comparison - {NEO4J_PROJECT_NAME} ({CURRENT_MODEL})')
        plt.xlabel('Result Type')
        plt.ylabel('Score')
        plt.ylim(0, 1)
        plt.xticks(rotation=45)
        plt.legend(title='Metric')
        plt.grid(axis='y', linestyle='--', alpha=0.7)
        plt.tight_layout()
        plt.show()
    else:
        logger.info("Visualization disabled - skipping performance metrics bar chart")
else:
    logger.warning("No metrics results available for comparison")

In [None]:
# Cell [7] - Compare Predictions Across Evaluation Methods
# Purpose: Analyze and compare predictions between different evaluation methods to identify disagreements
# Dependencies: pandas, numpy
# Breadcrumbs: Analysis -> Method Comparison

if len(eval_data) > 0:
    # Check if we have meta-judge data with different evaluation modes
    meta_judge_keys = [k for k in eval_data.keys() if k.startswith('meta_judge_')]
    
    if len(meta_judge_keys) > 0:
        # Create a unified dataset comparing predictions across methods
        comparison_data = []
        
        # Get a list of all unique source-target pairs
        all_pairs = set()
        for key in meta_judge_keys:
            df = eval_data[key]
            for _, row in df.iterrows():
                all_pairs.add((row['source_id'], row['target_id']))
        
        # Create a comparison row for each source-target pair
        for source_id, target_id in all_pairs:
            comparison_row = {
                'source_id': source_id,
                'target_id': target_id
            }
            
            # Find if this is a ground truth link
            is_ground_truth = False
            for key in meta_judge_keys:
                df = eval_data[key]
                matching_rows = df[(df['source_id'] == source_id) & (df['target_id'] == target_id)]
                if not matching_rows.empty:
                    is_ground_truth = matching_rows.iloc[0]['is_ground_truth']
                    break
            
            comparison_row['is_ground_truth'] = is_ground_truth
            
            # Add prediction from each evaluation method
            for key in meta_judge_keys:
                df = eval_data[key]
                matching_rows = df[(df['source_id'] == source_id) & (df['target_id'] == target_id)]
                
                method_name = key.split('_', 2)[2]  # Extract method name
                if not matching_rows.empty:
                    comparison_row[f'predicted_by_{method_name}'] = matching_rows.iloc[0]['is_predicted']
                    
                    # Add score if available
                    if 'score' in matching_rows.iloc[0]:
                        comparison_row[f'{method_name}_score'] = matching_rows.iloc[0]['score']
                else:
                    comparison_row[f'predicted_by_{method_name}'] = False
                    
            comparison_data.append(comparison_row)
        
        # Create DataFrame from comparison data
        comparison_df = pd.DataFrame(comparison_data)
        
        # Find disagreements between methods
        prediction_cols = [col for col in comparison_df.columns if col.startswith('predicted_by_')]
        
        if len(prediction_cols) > 1:
            # Check for any disagreement
            comparison_df['has_disagreement'] = comparison_df.apply(
                lambda row: len(set(row[prediction_cols])) > 1, 
                axis=1
            )
            
            # Count cases where different methods disagree
            disagreements = comparison_df[comparison_df['has_disagreement']]
            
            # Display summary
            print("\nEvaluation Method Comparison:")
            print("=" * 80)
            print(f"Total source-target pairs: {len(comparison_df)}")
            print(f"Pairs with disagreement between methods: {len(disagreements)} ({len(disagreements)/len(comparison_df)*100:.2f}%)")
            
            # Calculate agreement with ground truth for each method
            for method in [col.replace('predicted_by_', '') for col in prediction_cols]:
                correct_predictions = comparison_df[
                    comparison_df[f'predicted_by_{method}'] == comparison_df['is_ground_truth']
                ]
                agreement_rate = len(correct_predictions) / len(comparison_df) * 100
                print(f"Agreement with ground truth ({method}): {agreement_rate:.2f}%")
            
            # Display disagreement examples
            if len(disagreements) > 0:
                print("\nExamples of Disagreements:")
                print("-" * 60)
                sample_size = min(10, len(disagreements))
                display(disagreements.sample(sample_size)[
                    ['source_id', 'target_id', 'is_ground_truth'] + 
                    prediction_cols + 
                    [col for col in comparison_df.columns if col.endswith('_score')]
                ])
                
                # Analyze disagreements with ground truth
                print("\nDisagreement Analysis with Ground Truth:")
                print("-" * 60)
                
                for method in [col.replace('predicted_by_', '') for col in prediction_cols]:
                    # False positives (predicted as link but not in ground truth)
                    false_positives = comparison_df[
                        (comparison_df[f'predicted_by_{method}'] == True) & 
                        (comparison_df['is_ground_truth'] == False)
                    ]
                    
                    # False negatives (not predicted as link but in ground truth)
                    false_negatives = comparison_df[
                        (comparison_df[f'predicted_by_{method}'] == False) & 
                        (comparison_df['is_ground_truth'] == True)
                    ]
                    
                    print(f"Method: {method}")
                    print(f"  False positives: {len(false_positives)} ({len(false_positives)/len(comparison_df)*100:.2f}%)")
                    print(f"  False negatives: {len(false_negatives)} ({len(false_negatives)/len(comparison_df)*100:.2f}%)")
        else:
            print("Only one evaluation method available - no comparison possible")
    else:
        print("No meta-judge data with multiple evaluation methods found")
else:
    logger.warning("No evaluation data available for comparison")

In [None]:
# Cell [8] - Error Analysis
# Purpose: Analyze false positives and false negatives to understand model prediction errors
# Dependencies: pandas, logging
# Breadcrumbs: Analysis -> Error Analysis

def perform_error_analysis(eval_df, result_type):
    """
    Analyze false positives and false negatives
    
    Parameters:
    -----------
    eval_df : pd.DataFrame
        DataFrame with evaluation results
    result_type : str
        Type of result being analyzed
    """
    try:
        if eval_df.empty:
            logger.warning(f"Evaluation DataFrame for {result_type} is empty")
            return
        
        # Identify false positives (predicted but not in ground truth)
        false_positives = eval_df[(eval_df['is_predicted'] == True) & (eval_df['is_ground_truth'] == False)]
        
        # Identify false negatives (in ground truth but not predicted)
        false_negatives = eval_df[(eval_df['is_predicted'] == False) & (eval_df['is_ground_truth'] == True)]
        
        # Print summary
        print(f"\nError Analysis for {result_type.upper()}:")
        print("=" * 80)
        print(f"Total links evaluated: {len(eval_df)}")
        print(f"False positives: {len(false_positives)} ({len(false_positives)/len(eval_df)*100:.2f}%)")
        print(f"False negatives: {len(false_negatives)} ({len(false_negatives)/len(eval_df)*100:.2f}%)")
        
        # Show sample of false positives
        if not false_positives.empty:
            print("\nSample False Positives (incorrectly predicted as links):")
            display(false_positives.head(5)[['source_id', 'target_id', 'probability'] if 'probability' in false_positives.columns else ['source_id', 'target_id']])
        
        # Show sample of false negatives
        if not false_negatives.empty:
            print("\nSample False Negatives (missed ground truth links):")
            display(false_negatives.head(5)[['source_id', 'target_id', 'probability'] if 'probability' in false_negatives.columns else ['source_id', 'target_id']])
        
    except Exception as e:
        logger.error(f"Error performing error analysis: {str(e)}")
        raise

# Perform error analysis for each result type
for result_type, df in eval_data.items():
    if not df.empty:
        perform_error_analysis(df, result_type)

In [None]:
# Cell [9] - Threshold Analysis
# Purpose: Analyze how different probability thresholds affect performance for probability-based results
# Dependencies: sklearn, numpy, matplotlib, pandas
# Breadcrumbs: Analysis -> Threshold Sensitivity

def analyze_threshold_sensitivity(eval_df, result_type):
    """
    Analyze how different probability thresholds affect performance
    
    Parameters:
    -----------
    eval_df : pd.DataFrame
        DataFrame with evaluation results
    result_type : str
        Type of result being analyzed
    """
    try:
        if eval_df.empty or 'probability' not in eval_df.columns:
            logger.warning(f"Cannot perform threshold analysis for {result_type} - missing probability column")
            return
        
        # Extract ground truth labels
        y_true = eval_df['is_ground_truth'].astype(bool).values
        
        # Create threshold range
        thresholds = np.arange(0.1, 1.0, 0.05)
        
        # Store results
        results = []
        
        for threshold in thresholds:
            # Apply threshold to get predictions
            y_pred = (eval_df['probability'] >= threshold).values
            
            # Calculate metrics
            precision, recall, f1, _ = precision_recall_fscore_support(y_true, y_pred, average='binary', zero_division=0)
            accuracy = accuracy_score(y_true, y_pred)
            
            results.append({
                'threshold': threshold,
                'precision': precision,
                'recall': recall,
                'f1_score': f1,
                'accuracy': accuracy
            })
        
        # Convert to DataFrame
        threshold_df = pd.DataFrame(results)
        
        print(f"\nThreshold Sensitivity Analysis for {result_type.upper()}:")
        print("=" * 80)
        display(threshold_df)
        
        # Plot threshold sensitivity only if visualization is enabled
        if SHOW_VISUALIZATION:
            plt.figure(figsize=(12, 8))
            plt.plot(threshold_df['threshold'], threshold_df['precision'], 'b-', label='Precision')
            plt.plot(threshold_df['threshold'], threshold_df['recall'], 'r-', label='Recall')
            plt.plot(threshold_df['threshold'], threshold_df['f1_score'], 'g-', label='F1 Score')
            plt.plot(threshold_df['threshold'], threshold_df['accuracy'], 'k-', label='Accuracy')
            
            plt.title(f'Performance vs Threshold - {result_type.upper()} ({NEO4J_PROJECT_NAME})')
            plt.xlabel('Probability Threshold')
            plt.ylabel('Score')
            plt.grid(True, linestyle='--', alpha=0.7)
            plt.legend()
            plt.tight_layout()
            plt.show()
        else:
            logger.info("Visualization disabled - skipping threshold sensitivity plot")
        
        # Find optimal threshold for F1 score
        optimal_idx = threshold_df['f1_score'].idxmax()
        optimal_threshold = threshold_df.loc[optimal_idx, 'threshold']
        optimal_f1 = threshold_df.loc[optimal_idx, 'f1_score']
        
        print(f"Optimal threshold for F1 score: {optimal_threshold:.2f} (F1 = {optimal_f1:.4f})")
        
    except Exception as e:
        logger.error(f"Error analyzing threshold sensitivity: {str(e)}")
        raise

# Perform threshold analysis for each result type
for result_type, df in eval_data.items():
    if not df.empty and 'probability' in df.columns:
        analyze_threshold_sensitivity(df, result_type)

In [None]:
# Cell [10] - Save Results
# Purpose: Save analysis results to CSV files with timestamp for future reference
# Dependencies: os, datetime, pandas, logging, typing
# Breadcrumbs: Output -> Results Export

logger = logging.getLogger(__name__)

def analyze_per_requirement(eval_df, result_type):
    """
    Analyze performance metrics per requirement
    
    Parameters:
    -----------
    eval_df : pd.DataFrame
        DataFrame with evaluation results
    result_type : str
        Type of result being analyzed
        
    Returns:
    --------
    pd.DataFrame
        DataFrame with per-requirement metrics
    """
    try:
        if eval_df.empty:
            logger.warning(f"Evaluation DataFrame for {result_type} is empty")
            return pd.DataFrame()
        
        # Group by source_id (requirement) and calculate metrics
        source_ids = eval_df['source_id'].unique()
        per_req_results = []
        
        for source_id in source_ids:
            req_df = eval_df[eval_df['source_id'] == source_id]
            
            # Extract true and predicted labels for this requirement
            y_true = req_df['is_ground_truth'].astype(bool).values
            y_pred = req_df['is_predicted'].astype(bool).values
            
            # Calculate basic counts
            tp = sum((y_true == True) & (y_pred == True))
            fp = sum((y_true == False) & (y_pred == True))
            tn = sum((y_true == False) & (y_pred == False))
            fn = sum((y_true == True) & (y_pred == False))
            
            # Calculate metrics
            precision = tp / (tp + fp) if (tp + fp) > 0 else 0
            recall = tp / (tp + fn) if (tp + fn) > 0 else 0
            f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
            accuracy = (tp + tn) / len(y_true) if len(y_true) > 0 else 0
            
            # Create result dictionary
            req_result = {
                'source_id': source_id,
                'result_type': result_type,
                'true_positives': tp,
                'false_positives': fp,
                'true_negatives': tn,
                'false_negatives': fn,
                'precision': precision,
                'recall': recall,
                'f1_score': f1,
                'accuracy': accuracy,
                'total_links': len(req_df),
                'ground_truth_links': sum(y_true),
                'predicted_links': sum(y_pred)
            }
            
            per_req_results.append(req_result)
        
        return pd.DataFrame(per_req_results)
    
    except Exception as e:
        logger.error(f"Error in analyze_per_requirement: {str(e)}")
        logger.debug("Exception details:", exc_info=True)
        return pd.DataFrame()

def save_results_to_csv(metrics_comparison, project_name, model):
    """
    Save analysis results to CSV files with timestamp
    
    Parameters:
    -----------
    metrics_comparison : pd.DataFrame
        DataFrame with metrics comparison
    project_name : str
        Name of the project analyzed
    model : str
        Model used for analysis
    """
    try:
        # Create output directory if it doesn't exist
        output_dir = os.path.join('output', 'analysis')
        os.makedirs(output_dir, exist_ok=True)
        
        # Get current timestamp (to the minute)
        timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M")
        
        # Save metrics comparison
        if not metrics_comparison.empty:
            filename = os.path.join(output_dir, f"{project_name}_{model}_metrics_comparison_{timestamp}.csv")
            metrics_comparison.to_csv(filename)
            logger.info(f"Saved metrics comparison to {filename}")
        
        # Save per-requirement analysis for each result type
        for result_type, df in eval_data.items():
            if not df.empty:
                # Generate per-requirement metrics
                per_req_metrics = analyze_per_requirement(df, result_type)
                
                if not per_req_metrics.empty:
                    filename = os.path.join(output_dir, f"{project_name}_{model}_{result_type}_per_requirement_{timestamp}.csv")
                    per_req_metrics.to_csv(filename)
                    logger.info(f"Saved {result_type} per-requirement analysis to {filename}")
        
    except Exception as e:
        logger.error(f"Error saving results: {str(e)}")
        logger.debug("Exception details:", exc_info=True)
        raise

# Save results if metrics comparison exists
if 'metrics_comparison' in locals() and not metrics_comparison.empty:
    save_results_to_csv(metrics_comparison, NEO4J_PROJECT_NAME, CURRENT_MODEL)
    print(f"\nResults saved to output/analysis directory for {NEO4J_PROJECT_NAME} using {CURRENT_MODEL}")
    print(f"Files include timestamp: {datetime.datetime.now().strftime('%Y-%m-%d_%H-%M')}")

In [None]:
# Cell [11] - Confusion Matrix Visualization
# Purpose: Visualize confusion matrices for evaluation results across different methods
# Dependencies: matplotlib, seaborn, sklearn, numpy, pandas, logging
# Breadcrumbs: Visualization -> Confusion Matrix

logger = logging.getLogger(__name__)

def plot_confusion_matrices(eval_data):
    """
    Plot confusion matrices for each result type with threshold information
    
    Parameters:
    -----------
    eval_data : dict
        Dictionary containing evaluation DataFrames for each result type
    """
    try:
        if not eval_data:
            logger.warning("No evaluation data available for plotting confusion matrices")
            return
            
        # Skip visualization if disabled
        if not SHOW_VISUALIZATION:
            logger.info("Visualization disabled - skipping confusion matrices")
            
            # Still print confusion matrix data as text
            for result_type, df in eval_data.items():
                if df.empty:
                    continue
                    
                # Extract true and predicted labels
                y_true = df['is_ground_truth'].astype(bool).values
                y_pred = df['is_predicted'].astype(bool).values
                
                # Calculate confusion matrix
                cm = confusion_matrix(y_true, y_pred)
                
                print(f"\nConfusion Matrix for {result_type.upper()}:")
                print("=" * 80)
                print(f"Threshold: MIN_TRACEABILITY_THRESHOLD = {MIN_TRACEABILITY_THRESHOLD}")
                print(f"True Negatives: {cm[0][0]}")
                print(f"False Positives: {cm[0][1]}")
                print(f"False Negatives: {cm[1][0]}")
                print(f"True Positives: {cm[1][1]}")
            
            return
            
        # Determine number of result types to plot
        num_types = len(eval_data)
        
        if num_types == 0:
            return
            
        # Set up the figure with appropriate size
        fig, axes = plt.subplots(1, num_types, figsize=(5*num_types, 5))
        
        # Ensure axes is always iterable, even with a single plot
        if num_types == 1:
            axes = [axes]
        
        # Add a figure title with threshold information
        fig.suptitle(f'Confusion Matrices (Threshold: MIN_TRACEABILITY_THRESHOLD = {MIN_TRACEABILITY_THRESHOLD})', 
                    fontsize=14, y=1.05)
            
        # Plot each confusion matrix
        for idx, (result_type, df) in enumerate(eval_data.items()):
            if df.empty:
                continue
                
            # Extract true and predicted labels
            y_true = df['is_ground_truth'].astype(bool).values
            y_pred = df['is_predicted'].astype(bool).values
            
            # Calculate confusion matrix
            cm = confusion_matrix(y_true, y_pred)
            
            # Prepare annotations with percentages
            total = np.sum(cm)
            annotations = np.empty_like(cm, dtype=object)
            for i in range(2):
                for j in range(2):
                    annotations[i, j] = f"{cm[i, j]}\n({cm[i, j]/total:.1%})"
            
            # Plot confusion matrix
            sns.heatmap(cm, annot=annotations, fmt='', cmap='Blues', ax=axes[idx],
                       xticklabels=['Not Link', 'Link'],
                       yticklabels=['Not Link', 'Link'])
            
            # Get mode for title (convert meta_judge_final_score to "FINAL SCORE", etc.)
            if '_' in result_type:
                parts = result_type.split('_')
                if len(parts) >= 3:
                    mode_description = ' '.join(parts[2:]).upper()
                else:
                    mode_description = result_type.upper()
            else:
                mode_description = result_type.upper()
                
            # Add descriptive title
            description = ""
            if "is_traceable" in result_type.lower():
                description = "Binary Classification"
            elif "actor_score" in result_type.lower():
                description = f"Score ≥ {MIN_TRACEABILITY_THRESHOLD}"
            elif "final_score" in result_type.lower():
                description = f"Score ≥ {MIN_TRACEABILITY_THRESHOLD}"
            
            # Add labels
            axes[idx].set_title(f'{mode_description}\n{description}')
            axes[idx].set_xlabel('Predicted')
            axes[idx].set_ylabel('Actual')
        
        # Add a legend explaining the threshold
        threshold_description = None
        if "actor_score" in eval_data or "final_score" in eval_data:
            if MIN_TRACEABILITY_THRESHOLD == 1:
                threshold_description = "MIN_TRACEABILITY_THRESHOLD = 1 (Any score above zero considered a link)"
            elif MIN_TRACEABILITY_THRESHOLD == 2:
                threshold_description = "MIN_TRACEABILITY_THRESHOLD = 2 (Moderate confidence required)"
            elif MIN_TRACEABILITY_THRESHOLD == 3:
                threshold_description = "MIN_TRACEABILITY_THRESHOLD = 3 (High confidence required)"
            else:
                threshold_description = f"MIN_TRACEABILITY_THRESHOLD = {MIN_TRACEABILITY_THRESHOLD}"
        
        if threshold_description:
            plt.figtext(0.5, -0.05, threshold_description, ha='center', fontsize=10, 
                      bbox=dict(boxstyle="round,pad=0.5", facecolor="lightyellow", alpha=0.5))
            
        plt.tight_layout()
        plt.show()
            
    except Exception as e:
        logger.error(f"Error plotting confusion matrices: {str(e)}")
        logger.debug("Exception details:", exc_info=True)

# Plot confusion matrices
plot_confusion_matrices(eval_data)

In [None]:
# Cell [12] - Link Prediction Distribution Analysis
# Purpose: Analyze the distribution of link predictions and ground truth patterns
# Dependencies: matplotlib, pandas, logging
# Breadcrumbs: Analysis -> Distribution Analysis

def analyze_prediction_distribution(eval_data):
    """
    Analyze the distribution of link predictions and ground truth
    
    Parameters:
    -----------
    eval_data : dict
        Dictionary containing evaluation DataFrames for each result type
    """
    try:
        if not eval_data:
            logger.warning("No evaluation data available for prediction distribution analysis")
            return
            
        for result_type, df in eval_data.items():
            if df.empty:
                continue
                
            print(f"\nPrediction Distribution Analysis for {result_type.upper()}:")
            print("=" * 80)
            
            # Count total predictions and ground truth
            total_evaluated = len(df)
            total_ground_truth = df['is_ground_truth'].sum()
            total_predicted = df['is_predicted'].sum()
            
            # Calculate percentages
            ground_truth_percentage = (total_ground_truth / total_evaluated) * 100
            predicted_percentage = (total_predicted / total_evaluated) * 100
            
            print(f"Total links evaluated: {total_evaluated}")
            print(f"Ground truth links: {total_ground_truth} ({ground_truth_percentage:.2f}%)")
            print(f"Predicted links: {total_predicted} ({predicted_percentage:.2f}%)")
            
            # Calculate overlap
            true_positives = ((df['is_ground_truth'] & df['is_predicted']).sum())
            true_positive_percentage = (true_positives / total_ground_truth) * 100 if total_ground_truth > 0 else 0
            
            print(f"Correctly predicted links (true positives): {true_positives} ({true_positive_percentage:.2f}% of ground truth)")
            
            # Distribution of source requirements
            unique_sources = df['source_id'].nunique()
            unique_targets = df['target_id'].nunique()
            
            print(f"Unique source requirements: {unique_sources}")
            print(f"Unique target requirements: {unique_targets}")
            
            # For probability-based results, plot histogram if visualization is enabled
            if 'probability' in df.columns and SHOW_VISUALIZATION:
                plt.figure(figsize=(10, 6))
                
                # Separate probabilities for ground truth and non-ground truth links
                gt_probs = df[df['is_ground_truth']]['probability']
                non_gt_probs = df[~df['is_ground_truth']]['probability']
                
                plt.hist(gt_probs, bins=20, alpha=0.5, label='Ground Truth Links', color='green')
                plt.hist(non_gt_probs, bins=20, alpha=0.5, label='Non-Ground Truth Links', color='red')
                
                plt.title(f'Probability Distribution - {result_type.upper()} ({NEO4J_PROJECT_NAME})')
                plt.xlabel('Probability')
                plt.ylabel('Frequency')
                plt.legend()
                plt.grid(True, linestyle='--', alpha=0.7)
                plt.show()
            elif 'probability' in df.columns:
                logger.info("Visualization disabled - skipping probability distribution histogram")
                
    except Exception as e:
        logger.error(f"Error analyzing prediction distribution: {str(e)}")
        raise

# Analyze prediction distributions
analyze_prediction_distribution(eval_data)

In [None]:
# Cell [13] - ROC Curve Analysis
# Purpose: Generate ROC and precision-recall curves for score-based evaluation methods
# Dependencies: sklearn, pandas, matplotlib, numpy, logging
# Breadcrumbs: Visualization -> ROC Analysis

logger = logging.getLogger(__name__)

def normalize_scores(scores):
    """
    Normalize scores to 0-1 range to use as probability values
    
    Parameters:
    -----------
    scores : numpy.ndarray
        Array of scores to normalize
        
    Returns:
    --------
    numpy.ndarray
        Normalized scores in range 0-1
    """
    if np.all(scores == scores[0]):  # All scores are the same
        return np.zeros_like(scores)  # Return array of zeros to avoid NaN issues
    
    min_score = np.min(scores)
    max_score = np.max(scores)
    
    if min_score == max_score:  # Avoid division by zero
        return np.ones_like(scores) if min_score > 0 else np.zeros_like(scores)
        
    return (scores - min_score) / (max_score - min_score)

def perform_roc_analysis(eval_data):
    """
    Perform ROC curve analysis for score-based results
    
    Parameters:
    -----------
    eval_data : dict
        Dictionary containing evaluation DataFrames for each result type
    """
    try:
        # Create a flag to track if we have any data suitable for ROC analysis
        has_score_data = False
        
        # Check which result types have score data
        score_data_types = []
        for result_type, df in eval_data.items():
            if not df.empty and ('score' in df.columns or 'probability' in df.columns):
                has_score_data = True
                score_data_types.append(result_type)
                
        if not has_score_data:
            logger.warning("No score or probability data available for ROC analysis")
            print("\nNo score or probability data available for ROC analysis.")
            print("ROC curves require continuous probability scores.")
            print(f"MIN_TRACEABILITY_THRESHOLD is currently set to: {MIN_TRACEABILITY_THRESHOLD}")
            print("To see ROC analysis, use actor_score or final_score evaluation modes which provide score data.")
            return
        
        # Log which result types have score data
        logger.info(f"Found {len(score_data_types)} result types with score data: {', '.join(score_data_types)}")
        
        # Skip visualization if disabled but still calculate AUC scores
        if not SHOW_VISUALIZATION:
            logger.info("Visualization disabled - skipping ROC curve plot")
            print("\nROC Analysis (AUC Scores):")
            print("=" * 80)
            
            for result_type in score_data_types:
                df = eval_data[result_type]
                
                # Use probability if available, otherwise use normalized scores
                if 'probability' in df.columns:
                    y_true = df['is_ground_truth'].astype(bool).values
                    y_score = df['probability'].values
                    score_source = 'probability'
                elif 'score' in df.columns:
                    y_true = df['is_ground_truth'].astype(bool).values
                    y_score = normalize_scores(df['score'].values)
                    score_source = 'normalized score'
                else:
                    continue
                
                # Calculate AUC
                try:
                    auc_score = roc_auc_score(y_true, y_score)
                    print(f"{result_type.upper()} AUC Score: {auc_score:.4f} (using {score_source})")
                except ValueError as ve:
                    logger.warning(f"Could not calculate AUC for {result_type}: {str(ve)}")
                    print(f"{result_type.upper()}: Unable to calculate AUC (possibly only one class present)")
                
            return
        
        # If visualization is enabled, create ROC curves
        plt.figure(figsize=(12, 8))
        
        # Plot ROC curve for each result type with score data
        for result_type in score_data_types:
            df = eval_data[result_type]
            
            # Skip if dataframe is empty
            if df.empty:
                continue
            
            # Extract true labels
            y_true = df['is_ground_truth'].astype(bool).values
            
            # Check if we have enough variety in the labels
            if len(np.unique(y_true)) < 2:
                logger.warning(f"Not enough label variety for {result_type} to calculate ROC curve")
                continue
            
            # Use probability if available, otherwise use normalized scores
            if 'probability' in df.columns:
                y_score = df['probability'].values
                score_source = 'probability'
            elif 'score' in df.columns:
                y_score = normalize_scores(df['score'].values)
                score_source = 'normalized score'
            else:
                logger.warning(f"No suitable score values found for {result_type}")
                continue
            
            # Calculate ROC curve
            try:
                fpr, tpr, thresholds = roc_curve(y_true, y_score)
                roc_auc = auc(fpr, tpr)
                
                # Get the display name for the result type
                display_name = result_type.upper() 
                if '_' in result_type:
                    parts = result_type.split('_')
                    if len(parts) >= 3:
                        display_name = f"{parts[0].upper()} {' '.join(parts[2:]).upper()}"
                
                # Plot the ROC curve
                plt.plot(fpr, tpr, lw=2, 
                        label=f'{display_name} (AUC = {roc_auc:.3f}, {score_source})')
                
                # Log thresholds at specific points
                idx_5pct_fpr = next((i for i, x in enumerate(fpr) if x >= 0.05), None)
                idx_10pct_fpr = next((i for i, x in enumerate(fpr) if x >= 0.10), None)
                
                if idx_5pct_fpr is not None:
                    logger.info(f"{result_type} - At 5% FPR: TPR={tpr[idx_5pct_fpr]:.3f}, threshold={thresholds[idx_5pct_fpr]:.3f}")
                
                if idx_10pct_fpr is not None:
                    logger.info(f"{result_type} - At 10% FPR: TPR={tpr[idx_10pct_fpr]:.3f}, threshold={thresholds[idx_10pct_fpr]:.3f}")
                
            except ValueError as ve:
                logger.warning(f"Error calculating ROC curve for {result_type}: {str(ve)}")
                continue
        
        # Add diagonal reference line
        plt.plot([0, 1], [0, 1], 'k--', lw=2)
        
        # Add current threshold line
        if 'meta_judge_actor_score' in eval_data or 'meta_judge_final_score' in eval_data:
            plt.text(0.5, 0.1, f"MIN_TRACEABILITY_THRESHOLD = {MIN_TRACEABILITY_THRESHOLD}", 
                    horizontalalignment='center', 
                    bbox=dict(facecolor='yellow', alpha=0.3),
                    fontsize=10)
        
        # Add labels and legend
        plt.xlabel('False Positive Rate')
        plt.ylabel('True Positive Rate')
        plt.title(f'ROC Curves - {NEO4J_PROJECT_NAME} ({CURRENT_MODEL})')
        plt.legend(loc='lower right')
        plt.grid(True, linestyle='--', alpha=0.7)
        
        # Add a descriptive note about the threshold
        threshold_description = ""
        if MIN_TRACEABILITY_THRESHOLD == 1:
            threshold_description = "Current Threshold: MIN_TRACEABILITY_THRESHOLD = 1 (Low confidence required)"
        elif MIN_TRACEABILITY_THRESHOLD == 2:
            threshold_description = "Current Threshold: MIN_TRACEABILITY_THRESHOLD = 2 (Moderate confidence required)"
        elif MIN_TRACEABILITY_THRESHOLD == 3:
            threshold_description = "Current Threshold: MIN_TRACEABILITY_THRESHOLD = 3 (High confidence required)"
        else:
            threshold_description = f"Current Threshold: MIN_TRACEABILITY_THRESHOLD = {MIN_TRACEABILITY_THRESHOLD}"
        
        plt.figtext(0.5, 0.01, threshold_description, ha='center', fontsize=10, 
                   bbox=dict(boxstyle="round,pad=0.5", facecolor="lightyellow", alpha=0.5))
        
        plt.tight_layout(rect=[0, 0.05, 1, 1])  # Leave space at the bottom for the text
        plt.show()
        
        # Also generate precision-recall curves which often work better for imbalanced datasets
        plt.figure(figsize=(12, 8))
        
        for result_type in score_data_types:
            df = eval_data[result_type]
            
            # Skip if dataframe is empty
            if df.empty:
                continue
            
            # Extract true labels
            y_true = df['is_ground_truth'].astype(bool).values
            
            # Check if we have enough variety in the labels
            if len(np.unique(y_true)) < 2:
                continue
            
            # Use probability if available, otherwise use normalized scores
            if 'probability' in df.columns:
                y_score = df['probability'].values
                score_source = 'probability'
            elif 'score' in df.columns:
                y_score = normalize_scores(df['score'].values)
                score_source = 'normalized score'
            else:
                continue
            
            # Calculate precision-recall curve
            try:
                precision, recall, thresholds = precision_recall_curve(y_true, y_score)
                pr_auc = auc(recall, precision)
                
                # Get display name
                display_name = result_type.upper() 
                if '_' in result_type:
                    parts = result_type.split('_')
                    if len(parts) >= 3:
                        display_name = f"{parts[0].upper()} {' '.join(parts[2:]).upper()}"
                
                # Plot the precision-recall curve
                plt.plot(recall, precision, lw=2, 
                        label=f'{display_name} (AUC = {pr_auc:.3f}, {score_source})')
                
            except ValueError as ve:
                logger.warning(f"Error calculating PR curve for {result_type}: {str(ve)}")
                continue
        
        # Add horizontal line for random classifier performance (class imbalance)
        # Calculate average positive rate across all datasets
        positive_rates = []
        for result_type in score_data_types:
            df = eval_data[result_type]
            if not df.empty:
                positive_rate = df['is_ground_truth'].mean()
                positive_rates.append(positive_rate)
        
        if positive_rates:
            avg_positive_rate = np.mean(positive_rates)
            plt.axhline(y=avg_positive_rate, color='r', linestyle='--', 
                      label=f'Random classifier (positive rate: {avg_positive_rate:.3f})')
        
        # Add labels and legend
        plt.xlabel('Recall')
        plt.ylabel('Precision')
        plt.title(f'Precision-Recall Curves - {NEO4J_PROJECT_NAME} ({CURRENT_MODEL})')
        plt.legend(loc='best')
        plt.grid(True, linestyle='--', alpha=0.7)
        
        plt.figtext(0.5, 0.01, threshold_description, ha='center', fontsize=10, 
                   bbox=dict(boxstyle="round,pad=0.5", facecolor="lightyellow", alpha=0.5))
        
        plt.tight_layout(rect=[0, 0.05, 1, 1])  # Leave space at the bottom for the text
        plt.show()
        
    except Exception as e:
        logger.error(f"Error in ROC analysis: {str(e)}")
        logger.debug("Exception details:", exc_info=True)
        print(f"Error during ROC analysis: {str(e)}")

# Perform ROC curve analysis
perform_roc_analysis(eval_data)

In [None]:
# Cell [14] - Explanation Analysis
# Purpose: Analyze explanation text from LLM results to extract insights and common patterns
# Dependencies: re, collections, matplotlib, wordcloud (optional)
# Breadcrumbs: Analysis -> Explanation Mining

def analyze_explanations(llm_dfs):
    """
    Analyze explanation text to extract insights
    
    Parameters:
    -----------
    llm_dfs : list
        List of DataFrames containing LLM results with explanations
    """
    try:
        explanations_found = False
        
        # Create a combined list of all explanations with their result types
        all_explanations = []
        
        # Check each result DataFrame for explanations
        if not meta_judge_df.empty and 'explanation' in meta_judge_df.columns:
            meta_explanations = meta_judge_df['explanation'].dropna().tolist()
            all_explanations.extend([(exp, 'meta_judge') for exp in meta_explanations])
            explanations_found = True
            
        if not transformers_df.empty and 'explanation' in transformers_df.columns:
            trans_explanations = transformers_df['explanation'].dropna().tolist()
            all_explanations.extend([(exp, 'transformer') for exp in trans_explanations])
            explanations_found = True
            
        if not llm_result_df.empty and 'explanation' in llm_result_df.columns:
            llm_explanations = llm_result_df['explanation'].dropna().tolist()
            all_explanations.extend([(exp, 'llm') for exp in llm_explanations])
            explanations_found = True
            
        if not explanations_found:
            logger.warning("No explanations found for analysis")
            return
            
        # Process explanations
        print("\nExplanation Analysis:")
        print("=" * 80)
        print(f"Total explanations found: {len(all_explanations)}")
        
        # Extract common phrases and terms
        explanation_text = ' '.join([exp[0] for exp in all_explanations])
        
        # Generate word cloud only if visualization is enabled
        if SHOW_VISUALIZATION and explanation_text:
            try:
                from wordcloud import WordCloud
                
                wordcloud = WordCloud(
                    width=800, 
                    height=400, 
                    background_color='white',
                    max_words=100
                ).generate(explanation_text)
                
                plt.figure(figsize=(12, 6))
                plt.imshow(wordcloud, interpolation='bilinear')
                plt.axis('off')
                plt.title(f'Word Cloud of Explanations - {NEO4J_PROJECT_NAME} ({CURRENT_MODEL})')
                plt.show()
            except ImportError:
                logger.warning("wordcloud package not found - skipping word cloud visualization")
        elif explanation_text:
            logger.info("Visualization disabled - skipping word cloud")
            
        # Extract and count common phrases (2-3 words)
        def extract_phrases(text, n=2):
            words = re.findall(r'\b\w+\b', text.lower())
            return [' '.join(words[i:i+n]) for i in range(len(words)-n+1)]
        
        bigrams = []
        trigrams = []
        
        for exp, _ in all_explanations:
            bigrams.extend(extract_phrases(exp, 2))
            trigrams.extend(extract_phrases(exp, 3))
            
        # Count and show most common phrases
        common_bigrams = Counter(bigrams).most_common(20)
        common_trigrams = Counter(trigrams).most_common(20)
        
        print("\nMost Common Bigrams in Explanations:")
        for phrase, count in common_bigrams:
            print(f"'{phrase}': {count}")
            
        print("\nMost Common Trigrams in Explanations:")
        for phrase, count in common_trigrams:
            print(f"'{phrase}': {count}")
            
    except Exception as e:
        logger.error(f"Error analyzing explanations: {str(e)}")
        raise

# Analyze explanations
analyze_explanations([meta_judge_df, transformers_df, llm_result_df])

In [None]:
# Cell [15] - Source-Target Link Analysis
# Purpose: Analyze connection patterns between source and target requirements
# Dependencies: pandas, matplotlib, logging
# Breadcrumbs: Analysis -> Link Pattern Analysis

def analyze_source_target_links(eval_data, ground_truth_df):
    """
    Analyze connection patterns between source and target requirements
    
    Parameters:
    -----------
    eval_data : dict
        Dictionary containing evaluation DataFrames for each result type
    ground_truth_df : pd.DataFrame
        DataFrame containing ground truth links
    """
    try:
        if ground_truth_df.empty:
            logger.warning("Ground truth DataFrame is empty - cannot analyze source-target patterns")
            return
            
        print("\nSource-Target Link Pattern Analysis:")
        print("=" * 80)
        
        # Analyze ground truth patterns
        source_counts = ground_truth_df['source_id'].value_counts()
        target_counts = ground_truth_df['target_id'].value_counts()
        
        print("Ground Truth Link Statistics:")
        print(f"Total unique source requirements: {source_counts.shape[0]}")
        print(f"Total unique target requirements: {target_counts.shape[0]}")
        print(f"Average links per source requirement: {source_counts.mean():.2f}")
        print(f"Max links from a single source: {source_counts.max()} (ID: {source_counts.idxmax()})")
        print(f"Min links from a source: {source_counts.min()}")
        print(f"Average links per target requirement: {target_counts.mean():.2f}")
        print(f"Max links to a single target: {target_counts.max()} (ID: {target_counts.idxmax()})")
        
        # For each result type, compare pattern with ground truth
        for result_type, df in eval_data.items():
            if df.empty:
                continue
                
            # Filter for predicted links only
            predicted_links = df[df['is_predicted']]
            
            if predicted_links.empty:
                print(f"\nNo links predicted by {result_type.upper()}")
                continue
                
            pred_source_counts = predicted_links['source_id'].value_counts()
            pred_target_counts = predicted_links['target_id'].value_counts()
            
            print(f"\n{result_type.upper()} Predicted Link Statistics:")
            print(f"Total unique source requirements with predictions: {pred_source_counts.shape[0]}")
            print(f"Total unique target requirements with predictions: {pred_target_counts.shape[0]}")
            print(f"Average predicted links per source: {pred_source_counts.mean():.2f}")
            print(f"Max predicted links from a single source: {pred_source_counts.max()} (ID: {pred_source_counts.idxmax()})")
            
            # Compare with ground truth coverage
            source_coverage = len(set(pred_source_counts.index) & set(source_counts.index)) / len(source_counts)
            target_coverage = len(set(pred_target_counts.index) & set(target_counts.index)) / len(target_counts)
            
            print(f"Source requirement coverage: {source_coverage:.2%}")
            print(f"Target requirement coverage: {target_coverage:.2%}")
            
            # Plot distribution comparison for source requirements if visualization is enabled
            if SHOW_VISUALIZATION:
                plt.figure(figsize=(12, 6))
                
                # Combine data for plotting
                gt_sources = pd.DataFrame(source_counts).reset_index()
                gt_sources.columns = ['source_id', 'gt_count']
                
                pred_sources = pd.DataFrame(pred_source_counts).reset_index()
                pred_sources.columns = ['source_id', 'pred_count']
                
                # Merge data
                combined = pd.merge(gt_sources, pred_sources, on='source_id', how='outer').fillna(0)
                
                # Sort by ground truth count for better visualization
                combined = combined.sort_values('gt_count', ascending=False)
                
                # Plot only first 20 for clarity
                plt.bar(range(min(20, len(combined))), combined['gt_count'].values[:20], 
                       alpha=0.7, label='Ground Truth Links')
                plt.bar(range(min(20, len(combined))), combined['pred_count'].values[:20], 
                       alpha=0.5, label='Predicted Links')
                
                plt.xlabel('Source Requirements (sorted by ground truth count)')
                plt.ylabel('Number of Links')
                plt.title(f'Ground Truth vs {result_type.upper()} Predictions - Top Sources')
                plt.legend()
                plt.tight_layout()
                plt.show()
            else:
                logger.info(f"Visualization disabled - skipping source-target bar chart for {result_type}")
            
    except Exception as e:
        logger.error(f"Error analyzing source-target link patterns: {str(e)}")
        raise

# Analyze source-target link patterns
analyze_source_target_links(eval_data, ground_truth_df)

In [None]:
# Cell [16] - Project-Model Summary
# Purpose: Generate a comprehensive summary report for the analyzed project and model
# Dependencies: pandas, logging
# Breadcrumbs: Output -> Summary Report

def generate_summary_report():
    """
    Generate a comprehensive summary report for the analyzed project and model
    """
    try:
        print("\n" + "=" * 80)
        print(f"SUMMARY REPORT FOR {NEO4J_PROJECT_NAME} WITH MODEL {CURRENT_MODEL}")
        print("=" * 80)
        
        # Project statistics
        if not ground_truth_df.empty:
            total_source_reqs = ground_truth_df['source_id'].nunique()
            total_target_reqs = ground_truth_df['target_id'].nunique()
            total_gt_links = len(ground_truth_df)
            
            print(f"\nProject Statistics:")
            print(f"Total source requirements: {total_source_reqs}")
            print(f"Total target requirements: {total_target_reqs}")
            print(f"Total ground truth links: {total_gt_links}")
            print(f"Link density: {total_gt_links / (total_source_reqs * total_target_reqs):.4f}")
        else:
            print("\nNo ground truth data available for project statistics")
            
        # Results availability
        print("\nResults Availability:")
        print(f"Meta Judge results: {'Available' if not meta_judge_df.empty else 'Not available'}")
        print(f"Transformer results: {'Available' if not transformers_df.empty else 'Not available'}")
        print(f"Basic LLM results: {'Available' if not llm_result_df.empty else 'Not available'}")
        
        # Performance metrics summary
        if len(metrics_results) > 0:
            print("\nPerformance Metrics Summary:")
            for result_type, metrics in metrics_results.items():
                print(f"\n{result_type.upper()}:")
                print(f"  Precision: {metrics.get('precision', 'N/A'):.4f}")
                print(f"  Recall: {metrics.get('recall', 'N/A'):.4f}")
                print(f"  F1 Score: {metrics.get('f1_score', 'N/A'):.4f}")
                print(f"  Accuracy: {metrics.get('accuracy', 'N/A'):.4f}")
                print(f"  MCC: {metrics.get('mcc', 'N/A'):.4f}")
                print(f"  True Positives: {metrics.get('true_positives', 'N/A')}")
                print(f"  False Positives: {metrics.get('false_positives', 'N/A')}")
                print(f"  False Negatives: {metrics.get('false_negatives', 'N/A')}")
        else:
            print("\nNo performance metrics available")
            
        # Best performing approach
        if len(metrics_results) > 1:
            f1_scores = {rt: m.get('f1_score', 0) for rt, m in metrics_results.items()}
            best_approach = max(f1_scores.items(), key=lambda x: x[1])[0]
            print(f"\nBest performing approach: {best_approach.upper()} (F1 Score: {f1_scores[best_approach]:.4f})")
            
        # Recommendations
        print("\nRecommendations:")
        
        # Generate automatic recommendations based on results
        recommendations = []
        
        if len(metrics_results) > 0:
            for result_type, metrics in metrics_results.items():
                # Low precision suggestions
                if metrics.get('precision', 1) < 0.7:
                    recommendations.append(f"Consider tuning {result_type} for higher precision - currently at {metrics.get('precision', 0):.2f}")
                
                # Low recall suggestions
                if metrics.get('recall', 1) < 0.7:
                    recommendations.append(f"Consider tuning {result_type} for higher recall - currently at {metrics.get('recall', 0):.2f}")
                    
                # High false negatives
                if metrics.get('false_negatives', 0) > 10:
                    recommendations.append(f"Address high false negative rate in {result_type} approach")
        
        # Add general recommendations if none specific
        if not recommendations:
            recommendations.append("Review explanations for insights on model reasoning")
            recommendations.append("Consider evaluating with different threshold values")
            
        for idx, rec in enumerate(recommendations, 1):
            print(f"{idx}. {rec}")
            
        print("=" * 80)
        
    except Exception as e:
        logger.error(f"Error generating summary report: {str(e)}")
        raise

# Generate summary report
generate_summary_report()

In [None]:
# Cell [17] - Close Resources
# Purpose: Clean up database connections and finalize analysis
# Dependencies: neo4j, logging
# Breadcrumbs: Cleanup -> Resource Management

# Close the Neo4j driver
driver.close()
logger.info("Neo4j driver closed")
print("\nAnalysis complete.")