In [72]:
import datetime
from collections import defaultdict
from pathlib import Path
from typing import Dict, List

import pandas as pd

# --- Configuration ---
# Specify a list of input CSV files to process.
INPUT_FILES: List[Path] = [
    Path('results_handcrafted_0_8_source_llm.csv'),
    Path('results_handcrafted_0_8_source.csv'),
    Path('results_handcrafted_0_5_source.csv'),
    Path('results_handcrafted_0_4_source.csv'),
    Path('results_handcrafted_0_8_bin.csv'),
    Path('results_handcrafted_0_5_bin.csv'),
    Path('results_handcrafted_0_4_bin.csv'),
]
OUTPUT_FILE = Path('reentrancy_metrics_data_combined.csv')
COLUMNS_TO_USE = ['filename', 'toolid', 'findings']
HIERARCHICAL_MAX_DEPTH = 30

# A dictionary mapping each tool to its label(s) for a reentrancy finding.
REENTRANCY_LABELS: Dict[str, str] = {
    'ccc': 'Reentrancy_Vulnerability',
    'confuzzius': 'Reentrancy',
    'conkas': 'Reentrancy',
    'mythril-0.24.7': 'State_access_after_external_call_SWC_107',
    'oyente+-2acaf2e': 'Re_Entrancy_Vulnerability',
    'securify': 'DAO',
    'sfuzz': 'Reentrancy',
    'slither-0.11.3': 'reentrancy_eth,reentrancy_no_eth',
    'solhint-6.0.0': 'reentrancy',
    'ethor-2023': 'insecure',
    'oyente+-060ca34': 'Callstack_Depth_Attack_Vulnerability',
    'vandal': 'ReentrantCall',
    'gpt-oss': 'reentrant',
    'gpt-5-mini': 'reentrant',
    'gpt-5': 'reentrant',
    'gpt-5-nano': 'reentrant'
}


# --- Functions ---

def log(message: str, level: str = "INFO"):
    """Prints a formatted log message with a timestamp."""
    timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"[{level}] [{timestamp}] {message}")


def load_and_prepare_data(filepath: Path, use_cols: List[str]) -> pd.DataFrame:
    """
    Loads data from a list of CSVs, checks for a 'fails' column,
    and prepares the DataFrame for analysis.
    """
    log("Loading data...")
    df = pd.DataFrame()
    try:
        log(f"--> Reading '{filepath}'")
        all_cols = pd.read_csv(filepath, nrows=0).columns.tolist()
        cols_to_read = use_cols[:]
        if 'fails' in all_cols:
            cols_to_read.append('fails')

        df = pd.read_csv(filepath, usecols=cols_to_read)

        if 'fails' not in df.columns:
            df['fails'] = "{}"
        else:
            df['fails'] = df['fails'].fillna("{}")

    except FileNotFoundError:
        log(f"File not found at '{filepath}'. Skipping.", level="WARN")
        return pd.DataFrame()
    except Exception as e:
        log(f"Could not read file '{filepath}'. Error: {e}. Skipping.", level="WARN")
        return pd.DataFrame()

    df['true_reentrancy'] = df['filename'].str.contains(r'_ree', case=False)
    df['findings'] = df['findings'].fillna('').astype(str)
    return df


def add_predictions(df: pd.DataFrame, labels_map: Dict[str, str]) -> pd.DataFrame:
    """
    Adds a 'predicted_reentrancy' column based on tool findings.
    """
    processed_labels = {tool: labels.split(',') for tool, labels in labels_map.items()}

    def get_prediction(row: pd.Series) -> bool:
        tool_labels = processed_labels.get(row['toolid'])
        if not tool_labels:
            return False
        return any(label.strip() in row['findings'] for label in tool_labels)

    df['predicted_reentrancy'] = df.apply(get_prediction, axis=1)
    return df


def calculate_metrics_for_group(group: pd.DataFrame) -> pd.Series:
    """
    Calculates confusion matrix and metrics, accounting for failed runs
    as misclassifications.
    """
    failed_runs = group['fails'] != "{}"
    num_fails = failed_runs.sum()

    fn_from_fails = (group.loc[failed_runs, 'true_reentrancy'] == True).sum()
    fp_from_fails = (group.loc[failed_runs, 'true_reentrancy'] == False).sum()

    non_failed_group = group[~failed_runs]
    tp = ((non_failed_group['true_reentrancy'] == True) & (non_failed_group['predicted_reentrancy'] == True)).sum()
    tn = ((non_failed_group['true_reentrancy'] == False) & (non_failed_group['predicted_reentrancy'] == False)).sum()
    fp = ((non_failed_group['true_reentrancy'] == False) & (non_failed_group['predicted_reentrancy'] == True)).sum()
    fn = ((non_failed_group['true_reentrancy'] == True) & (non_failed_group['predicted_reentrancy'] == False)).sum()

    fp += fp_from_fails
    fn += fn_from_fails

    accuracy = ((tp + tn) / (tp + fp + tn + fn)) if (tp + fp + tn + fn) > 0 else 0
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

    return pd.Series({
        'Accuracy': accuracy,
        'Precision': precision,
        'Recall': recall,
        'F1_Score': f1_score,
        'TP': tp,
        'FP': fp,
        'FN': fn,
        'Fails': num_fails,
        'Count': len(group)
    })


def calculate_and_display_metrics(df: pd.DataFrame):
    """Calculates and displays metrics for each source input file."""
    log("=" * 50)
    log("Reentrancy Metrics:")
    log("=" * 50)

    tool_metrics = df.groupby('toolid').apply(calculate_metrics_for_group, include_groups=False)
    reportable_metrics = tool_metrics[tool_metrics['F1_Score'] > 0].sort_index()

    for tool, data in reportable_metrics.iterrows():
        print(f"  Tool: {tool}")
        print(f"    Accuracy: {data['Accuracy']:.2f}")
        print(f"    Precision: {data['Precision']:.2f}")
        print(f"    Recall:    {data['Recall']:.2f}")
        print(f"    F1 Score:  {data['F1_Score']:.2f}")
        if 'Fails' in data and data['Fails'] > 0:
            print(f"    Fails:     {int(data['Fails'])}")


def filename2dirname(filename: str) -> Path:
    path = Path(filename)
    filename = path.parts[-1]
    if filename.endswith(".hex") and len(path.parts) > 1:
        path = path.parent
    path_components = path.parts
    filename_path = path_components[-1].split("_sol_")[0]
    filename_components = filename_path.split('_')
    filename = '_'.join(filename_components[-2:])
    path = Path("/".join(list(path_components[:-1]) + filename_components[:-2] + [filename]))
    path = path.parent
    return path


def calculate_and_display_hierarchical_metrics(df: pd.DataFrame, max_depth: int):
    """
    Expands data by directory paths and calculates metrics for each level up to max_depth.
    Reports also the support (number of files under each directory, recursively).
    """
    log("=" * 50)
    log(f"Hierarchical Reentrancy Metrics (by dir, up to depth {max_depth}):")
    log("=" * 50)

    expanded_rows = []
    dir_support = defaultdict(set)  # maps directory -> set of files under it

    for _, row in df.iterrows():
        path = filename2dirname(row['filename'])
        file_id = row['filename']

        cumulative_paths = [Path(*path.parts[:i + 1]) for i in range(len(path.parts)) if i < max_depth]
        if not cumulative_paths and max_depth > 0:
            cumulative_paths = [Path('<root>')]

        for level_path in cumulative_paths:
            dir_support[str(level_path)].add(file_id)  # track unique files under this directory
            new_row = row.to_dict()
            new_row['directory_level'] = str(level_path)
            expanded_rows.append(new_row)

    if not expanded_rows:
        log("No directory structure found to analyze.", level="WARN")
        return

    hierarchical_df = pd.DataFrame(expanded_rows)
    metrics = (
        hierarchical_df.groupby(['directory_level', 'toolid'])
        .apply(calculate_metrics_for_group, include_groups=False)
        .reset_index()
    )

    sorted_levels = sorted(metrics['directory_level'].unique(), key=lambda x: (x.count('/'), x))

    for level in sorted_levels:
        level_metrics = metrics[metrics['directory_level'] == level]
        indentation = ".." * level.count('/')

        total_tp = level_metrics['TP'].sum()
        total_fp = level_metrics['FP'].sum()
        total_fn = level_metrics['FN'].sum()
        total_count = level_metrics['Count'].sum()
        total_fails = level_metrics['Fails'].sum()
        total_support = len(dir_support[level])  # unique files under this dir

        print(
            f"\n{indentation}📁 {level} "
            f"(Support: {total_support}, Total Findings: {int(total_count)}, "
            f"TP:{int(total_tp)}, FP:{int(total_fp)}, FN:{int(total_fn)}, Fails:{int(total_fails)})"
        )

        for _, tool_data in level_metrics.iterrows():
            if tool_data['F1_Score'] > 0:
                fails_str = f", Fails: {int(tool_data['Fails'])}" if 'Fails' in tool_data and tool_data[
                    'Fails'] > 0 else ""
                print(
                    f"{indentation}  - Tool: {tool_data['toolid']:<18} | "
                    f"A: {tool_data['Accuracy']:.2f}, "
                    f"F1: {tool_data['F1_Score']:.2f}, "
                    f"P: {tool_data['Precision']:.2f}, "
                    f"R: {tool_data['Recall']:.2f} "
                    f"({int(tool_data['Count'])} files{fails_str})"
                )

In [73]:
def calculate_metrics_by_path_and_version(df: pd.DataFrame):
    """
    Aggregates metrics per directory path, solidity version, and toolid.
    Returns two dataframes: one for source, one for binary.
    """
    # Add helper cols
    df['directory'] = df['filename'].apply(lambda f: str(filename2dirname(f)))

    grouped = (
        df.groupby(['type', 'directory', 'solidity_version', 'toolid'])
        .apply(calculate_metrics_for_group, include_groups=False)
        .reset_index()
    )

    source_df = grouped[grouped['type'] == "source"].drop(columns=["type"])
    bin_df = grouped[grouped['type'] == "bin"].drop(columns=["type"])
    return source_df, bin_df


def display_path_version_tables(source_df: pd.DataFrame, bin_df: pd.DataFrame):
    """
    Pretty-prints two final tables (source vs bin), grouped by directory and solidity version.
    """

    def display_table(df: pd.DataFrame, title: str):
        log("=" * 70)
        log(f"{title}")
        log("=" * 70)
        for directory in sorted(df['directory'].unique()):
            print(f"\n📂 Path: {directory}")
            dir_df = df[df['directory'] == directory]
            for version in sorted(dir_df['solidity_version'].unique()):
                print(f"  🔹 Solidity {version}")
                version_df = dir_df[dir_df['solidity_version'] == version]
                for _, row in version_df.iterrows():
                    print(
                        f"    - Tool: {row['toolid']:<15} | "
                        f"A: {row['Accuracy']:.2f}, "
                        f"F1: {row['F1_Score']:.2f}, "
                        f"P: {row['Precision']:.2f}, "
                        f"R: {row['Recall']:.2f}, "
                        #f"TP: {int(row['TP'])}, FP: {int(row['FP'])}, FN: {int(row['FN'])}, "
                        f"Fails: {int(row['Fails'])}, N={int(row['Count'])}"
                    )

    display_table(source_df, "FINAL TABLE — SOURCE")
    #display_table(bin_df, "FINAL TABLE — BINARY")


def main():
    all_data = []
    for input_file in INPUT_FILES:
        filename_parts = str(input_file).split("_")
        version = ".".join([filename_parts[2]] + [filename_parts[3]])
        data_type = filename_parts[4].split(".")[0]
        print("\n\n")
        log("=" * 50)
        log(f"Processing input file: {input_file}")
        log("=" * 50)
        df = load_and_prepare_data(input_file, COLUMNS_TO_USE)
        if df.empty:
            continue
        df = add_predictions(df, REENTRANCY_LABELS)
        df["solidity_version"] = version
        df["type"] = data_type
        all_data.append(df)

    if not all_data:
        log("No data loaded. Exiting.", level="ERROR")
        return

    combined_df = pd.concat(all_data, ignore_index=True)

    # Existing reports
    # calculate_and_display_metrics(combined_df)
    # calculate_and_display_hierarchical_metrics(combined_df, HIERARCHICAL_MAX_DEPTH)

    # NEW: Path + Solidity version tables (source/bin separated)
    source_df, bin_df = calculate_metrics_by_path_and_version(combined_df)
    display_path_version_tables(source_df, bin_df)


In [74]:
main()




[INFO] [2025-09-05 12:13:01] Processing input file: results_handcrafted_0_8_source_llm.csv
[INFO] [2025-09-05 12:13:01] Loading data...
[INFO] [2025-09-05 12:13:01] --> Reading 'results_handcrafted_0_8_source_llm.csv'



[INFO] [2025-09-05 12:13:01] Processing input file: results_handcrafted_0_8_source.csv
[INFO] [2025-09-05 12:13:01] Loading data...
[INFO] [2025-09-05 12:13:01] --> Reading 'results_handcrafted_0_8_source.csv'



[INFO] [2025-09-05 12:13:01] Processing input file: results_handcrafted_0_5_source.csv
[INFO] [2025-09-05 12:13:01] Loading data...
[INFO] [2025-09-05 12:13:01] --> Reading 'results_handcrafted_0_5_source.csv'



[INFO] [2025-09-05 12:13:01] Processing input file: results_handcrafted_0_4_source.csv
[INFO] [2025-09-05 12:13:01] Loading data...
[INFO] [2025-09-05 12:13:01] --> Reading 'results_handcrafted_0_4_source.csv'



[INFO] [2025-09-05 12:13:01] Processing input file: results_handcrafted_0_8_bin.csv
[INFO] [2025-09-05 12:13:01] Loading data...
[INFO] [