In [1]:
import pandas as pd
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix


def analyze_csv_data_per_tool(df_input):
    """
    Parses CSV data from a pandas DataFrame, filters data, determines ground truth
    and predictions for Reentrancy-related findings, handles multiple runs, and
    calculates performance metrics per toolid using sklearn.

    Args:
        df_input (pd.DataFrame): Input DataFrame from CSV.

    Returns:
        dict: A dictionary where keys are toolids and values are dictionaries
              containing calculated metrics (accuracy, precision, recall, f1_score)
              and counts (TP, FP, TN, FN) for that tool.
    """
    df = df_input.copy()  # Work on a copy

    # --- Global Preprocessing on df (before splitting by tool) ---

    # 2. Determine ground truth ('actual_label') and filter ambiguous filenames
    def determine_ground_truth(filename_val):
        if isinstance(filename_val, str):
            filename_lower = filename_val.lower()
            if "ree" in filename_lower:  # Assumes "ree" in filename indicates a vulnerable sample
                return 1
            if "safe" in filename_lower:  # Assumes "safe" in filename indicates a non-vulnerable sample
                return 0
        return pd.NA  # For filenames not matching 'ree' or 'safe', or if not a string

    df['actual_label'] = df['filename'].apply(determine_ground_truth)

    original_row_count_before_gt_filter = len(df)
    df = df.dropna(subset=['actual_label'])  # Remove rows with pd.NA actual_label
    print(
        f"Filtered out {original_row_count_before_gt_filter - len(df)} rows with ambiguous filenames (not containing 'ree' or 'safe').")

    if df.empty:
        print("Warning: DataFrame is empty after filtering for 'ree'/'safe' filenames. No data to process.")
        return {}
    df['actual_label'] = df['actual_label'].astype(int)

    required_columns_for_core_logic = ['filename', 'findings', 'toolid', 'actual_label']
    for col in required_columns_for_core_logic:
        if col not in df.columns:
            print(f"Critical Error: Column '{col}' is missing after initial processing. Cannot proceed.")
            return {}

    results_per_tool = {}
    if 'toolid' not in df.columns:
        print("Critical Error: 'toolid' column is missing. Cannot group results.")
        return {}

    unique_toolids = df['toolid'].unique()

    for tool_id in unique_toolids:
        tool_df_initial = df[df['toolid'] == tool_id].copy()

        if tool_df_initial.empty:
            continue

        tool_df_deduplicated = pd.DataFrame()
        if 'start' in tool_df_initial.columns:
            tool_df_initial['start_numeric'] = pd.to_numeric(tool_df_initial['start'], errors='coerce')
            valid_start_times_df = tool_df_initial.dropna(subset=['start_numeric'])

            if not valid_start_times_df.empty:
                tool_df_sorted = valid_start_times_df.sort_values(
                    by=['filename', 'start_numeric'], ascending=[True, False]
                )
                tool_df_deduplicated = tool_df_sorted.drop_duplicates(subset=['filename'], keep='first')
                print(
                    f"For toolid '{tool_id}', processed {len(tool_df_initial)} rows initially, kept {len(tool_df_deduplicated)} after deduplicating by filename (latest run with valid start time).")
            else:
                print(
                    f"Warning: For toolid '{tool_id}', no valid numeric 'start' times. Using all {len(tool_df_initial)} rows, which might include duplicates.")
                tool_df_deduplicated = tool_df_initial
        else:
            print(
                f"Warning: 'start' column not found for toolid '{tool_id}'. Using all {len(tool_df_initial)} rows, which might include duplicates.")
            tool_df_deduplicated = tool_df_initial

        if tool_df_deduplicated.empty:
            print(f"Warning: No data remaining for toolid '{tool_id}' after deduplication. Skipping.")
            results_per_tool[tool_id] = {
                "tp": 0, "fp": 0, "tn": 0, "fn": 0,
                "accuracy": 0.0, "precision": 0.0, "recall": 0.0, "f1_score": 0.0,
                "error": "No data after deduplication"
            }
            continue

        tool_df = tool_df_deduplicated
        y_true = tool_df['actual_label'].tolist()

        # Define keywords for reentrancy-related findings (case-insensitive).
        # This list is crucial and MUST be updated based on how YOUR tools report reentrancy.
        reentrancy_keywords = [
            "reentrancy",
            # Broadly catches most reentrancy mentions including specific types like _eth, _no_eth, _events, _benign, _unlimited_gas
            "re-entrancy",  # Variation
            "reentrant",  # Adjective form
            "swc_107",  # Standard Reentrancy SWC ID (will catch SWC-107 too due to .lower())
            "swc-107",  # Explicitly include hyphenated version for clarity
            "state_access_after_external_call",
            # Common pattern from Mythril indicating reentrancy (often with SWC_107)
            "delegatecall",
            # Catches findings like "Delegatecall_to_user_supplied_address..." and "controlled_delegatecall"
            "swc_112",  # SWC ID for Delegatecall issues, often a reentrancy vector
            "swc-112",  # Hyphenated version
            "re_entrancy_vulnerability",  # Oyente
            "dao"  # Securify
        ]
        tool_df.loc[:, 'predicted_label'] = tool_df['findings'].apply(
            lambda x: 1 if isinstance(x, str) and ("reentrancy_benign" not in x.lower()) and (
                any(keyword in x.lower() for keyword in reentrancy_keywords)) else 0
        )
        y_pred = tool_df['predicted_label'].tolist()

        if not y_true:
            print(f"Warning: No valid labels collected for toolid '{tool_id}'. Skipping metrics.")
            results_per_tool[tool_id] = {
                "tp": 0, "fp": 0, "tn": 0, "fn": 0,
                "accuracy": 0.0, "precision": 0.0, "recall": 0.0, "f1_score": 0.0,
                "error": "No valid labels after processing"
            }
            continue

        try:
            cm = confusion_matrix(y_true, y_pred, labels=[0, 1])
            tn, fp, fn, tp = cm.ravel()
        except ValueError as e:
            print(
                f"Warning: Could not compute confusion matrix directly for toolid '{tool_id}': {e}. Manually calculating.")
            tp = sum((yt == 1 and yp == 1) for yt, yp in zip(y_true, y_pred))
            fp = sum((yt == 0 and yp == 1) for yt, yp in zip(y_true, y_pred))
            tn = sum((yt == 0 and yp == 0) for yt, yp in zip(y_true, y_pred))
            fn = sum((yt == 1 and yp == 0) for yt, yp in zip(y_true, y_pred))

        accuracy = accuracy_score(y_true, y_pred)
        precision = precision_score(y_true, y_pred, zero_division=0, labels=[0, 1], pos_label=1)
        recall = recall_score(y_true, y_pred, zero_division=0, labels=[0, 1], pos_label=1)
        f1 = f1_score(y_true, y_pred, zero_division=0, labels=[0, 1], pos_label=1)

        results_per_tool[tool_id] = {
            "tp": int(tp), "fp": int(fp), "tn": int(tn), "fn": int(fn),
            "accuracy": accuracy, "precision": precision, "recall": recall, "f1_score": f1
        }
    return results_per_tool


if __name__ == "__main__":
    csv_file_path = "trs.csv"
    df_main = pd.read_csv(csv_file_path)
    print(f"Successfully loaded '{csv_file_path}', shape: {
        
        
        .shape}")

    if not df_main.empty:
        print("\nAnalyzing CSV data for Reentrancy-related findings per tool (using pandas & sklearn):\n")
        all_results = analyze_csv_data_per_tool(df_main)

        if not all_results:
            print("No results were generated. Please check the input data and logs.")
        else:
            for tool_id, metrics in all_results.items():
                print(f"\nResults for toolid: {tool_id}")
                if "error" in metrics:
                    print(f"  Message: {metrics['error']}")
                elif all(k in metrics for k in ["tp", "fp", "tn", "fn"]):  # Check if full metrics dict
                    print(f"  True Positives (TP):  {metrics['tp']}")
                    print(f"  False Positives (FP): {metrics['fp']}")
                    print(f"  True Negatives (TN):  {metrics['tn']}")
                    print(f"  False Negatives (FN): {metrics['fn']}")
                    print("  ------------------------------------")
                    print(f"  Accuracy:             {metrics['accuracy']:.4f}")
                    print(f"  Precision:            {metrics['precision']:.4f}")
                    print(f"  Recall (Sensitivity): {metrics['recall']:.4f}")
                    print(f"  F1-Score:             {metrics['f1_score']:.4f}")
                else:
                    print(f"  Metrics data incomplete: {metrics}")
                print("-" * 40)
    else:
        print(
            "Could not proceed with analysis as DataFrame is empty (either file not found/error or dummy data creation failed).")



SyntaxError: unterminated string literal (detected at line 156) (2119933242.py, line 156)

In [7]:
import pandas as pd
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix


def analyze_csv_data_per_tool(df_input):
    df = df_input.copy()
    

    def determine_ground_truth(filename_val):
        if isinstance(filename_val, str):
            filename_lower = filename_val.lower()
            if "ree" in filename_lower:
                return 1
            if "safe" in filename_lower:
                return 0
        return pd.NA

    df['actual_label'] = df['filename'].apply(determine_ground_truth)

    original_row_count_before_gt_filter = len(df)
    df = df.dropna(subset=['actual_label'])
    print(
        f"Filtered out {original_row_count_before_gt_filter - len(df)} rows with ambiguous filenames (not containing 'ree' or 'safe').")

    if df.empty:
        print("Warning: DataFrame is empty after filtering for 'ree'/'safe' filenames. No data to process.")
        return {}
    df['actual_label'] = df['actual_label'].astype(int)

    required_columns_for_core_logic = ['filename', 'findings', 'toolid', 'actual_label']
    for col in required_columns_for_core_logic:
        if col not in df.columns:
            print(f"Critical Error: Column '{col}' is missing after initial processing. Cannot proceed.")
            return {}

    results_per_tool = {}
    if 'toolid' not in df.columns:
        print("Critical Error: 'toolid' column is missing. Cannot group results.")
        return {}

    unique_toolids = df['toolid'].unique()

    d  = {}

    for tool_id in unique_toolids:
        tool_df_initial = df[df['toolid'] == tool_id].copy()

        if tool_df_initial.empty:
            continue

        tool_df_deduplicated = pd.DataFrame()
        if 'start' in tool_df_initial.columns:
            tool_df_initial['start_numeric'] = pd.to_numeric(tool_df_initial['start'], errors='coerce')
            valid_start_times_df = tool_df_initial.dropna(subset=['start_numeric'])

            if not valid_start_times_df.empty:
                tool_df_sorted = valid_start_times_df.sort_values(
                    by=['filename', 'start_numeric'], ascending=[True, False]
                )
                tool_df_deduplicated = tool_df_sorted.drop_duplicates(subset=['filename'], keep='first')
                print(
                    f"For toolid '{tool_id}', processed {len(tool_df_initial)} rows initially, kept {len(tool_df_deduplicated)} after deduplicating by filename (latest run with valid start time).")
            else:
                print(
                    f"Warning: For toolid '{tool_id}', no valid numeric 'start' times. Using all {len(tool_df_initial)} rows, which might include duplicates.")
                tool_df_deduplicated = tool_df_initial
        else:
            print(
                f"Warning: 'start' column not found for toolid '{tool_id}'. Using all {len(tool_df_initial)} rows, which might include duplicates.")
            tool_df_deduplicated = tool_df_initial

        if tool_df_deduplicated.empty:
            print(f"Warning: No data remaining for toolid '{tool_id}' after deduplication. Skipping.")
            results_per_tool[tool_id] = {
                "tp": 0, "fp": 0, "tn": 0, "fn": 0,
                "accuracy": 0.0, "precision": 0.0, "recall": 0.0, "f1_score": 0.0,
                "error": "No data after deduplication"
            }
            continue

        tool_df = tool_df_deduplicated
        y_true = tool_df['actual_label'].tolist()

        reentrancy_keywords = [
            "reentrancy", "re-entrancy", "reentrant", "swc_107", "swc-107",
            "state_access_after_external_call", "delegatecall", "swc_112",
            "swc-112", "re_entrancy_vulnerability", "dao"
        ]
        tool_df.loc[:, 'predicted_label'] = tool_df['findings'].apply(
            lambda x: 1 if isinstance(x, str) and ("reentrancy_benign" not in x.lower()) and (
                any(keyword in x.lower() for keyword in reentrancy_keywords)) else 0
        )
        y_pred = tool_df['predicted_label'].tolist()

        if not y_true:
            print(f"Warning: No valid labels collected for toolid '{tool_id}'. Skipping metrics.")
            results_per_tool[tool_id] = {
                "tp": 0, "fp": 0, "tn": 0, "fn": 0,
                "accuracy": 0.0, "precision": 0.0, "recall": 0.0, "f1_score": 0.0,
                "error": "No valid labels after processing"
            }
            continue

        try:
            cm = confusion_matrix(y_true, y_pred, labels=[0, 1])
            tn, fp, fn, tp = cm.ravel()
        except ValueError as e:
            print(
                f"Warning: Could not compute confusion matrix directly for toolid '{tool_id}': {e}. Manually calculating.")
            tp = sum((yt == 1 and yp == 1) for yt, yp in zip(y_true, y_pred))
            fp = sum((yt == 0 and yp == 1) for yt, yp in zip(y_true, y_pred))
            tn = sum((yt == 0 and yp == 0) for yt, yp in zip(y_true, y_pred))
            fn = sum((yt == 1 and yp == 0) for yt, yp in zip(y_true, y_pred))

        accuracy = accuracy_score(y_true, y_pred)
        precision = precision_score(y_true, y_pred, zero_division=0, labels=[0, 1], pos_label=1)
        recall = recall_score(y_true, y_pred, zero_division=0, labels=[0, 1], pos_label=1)
        f1 = f1_score(y_true, y_pred, zero_division=0, labels=[0, 1], pos_label=1)

        results_per_tool[tool_id] = {
            "tp": int(tp), "fp": int(fp), "tn": int(tn), "fn": int(fn),
            "accuracy": accuracy, "precision": precision, "recall": recall, "f1_score": f1
        }

        # --- NEW: Print misclassified contracts (FP and FN) ---
        misclassified = tool_df[tool_df['actual_label'] != tool_df['predicted_label']]
        if not misclassified.empty:
            print(f"\nMisclassified contracts for toolid '{tool_id}':")
            false_positives = misclassified[misclassified['predicted_label'] == 1]
            false_negatives = misclassified[misclassified['predicted_label'] == 0]

            if not false_positives.empty:
                print("  False Positives (predicted vulnerable, actually safe):")
                for fp_fname in false_positives['filename']:
                    print(f"    - {fp_fname}")
                    if fp_fname in d:
                        d[fp_fname] += 1
                    else:
                        d[fp_fname] = 1

            if not false_negatives.empty:
                print("  False Negatives (predicted safe, actually vulnerable):")
                for fn_fname in false_negatives['filename']:
                    print(f"    - {fn_fname}")
                    if fn_fname in d:
                        d[fn_fname] += 1
                    else:
                        d[fn_fname] = 1



        classified = tool_df[tool_df['actual_label'] == tool_df['predicted_label']]
        if not classified.empty:
            print(f"\nCorrectly classified contracts for toolid '{tool_id}':")
            true_positives = classified[classified['predicted_label'] == 1]
            true_negatives = classified[classified['predicted_label'] == 0]

            if not true_positives.empty:
                print("  True Positives (predicted vulnerable, actually vulnerable):")
                for fp_fname in true_positives['filename']:
                    print(f"    - {fp_fname}")

            if not true_negatives.empty:
                print("  True Negatives (predicted safe, actually safe):")
                for fn_fname in true_negatives['filename']:
                    print(f"    - {fn_fname}")


    print('Dictionary:', d)
    print('\n---------Misclassified by all tools-------\n')
    for k,v in d.items():
        if v >= 1:
            print(k, v)    
    print()

    return results_per_tool


if __name__ == "__main__":
    csv_file_path = "trs.csv"
    df_main = pd.read_csv(csv_file_path)
    print(f"Successfully loaded '{csv_file_path}', shape: {df_main.shape}")
    df_main = df_main[df_main['toolid'].isin([
        #'confuzzius'#,
        'mythril-0.24.7'
        #'slither-0.10.4'
        ])]

    

    if not df_main.empty:
        print("\nAnalyzing CSV data for Reentrancy-related findings per tool (using pandas & sklearn):\n")
        all_results = analyze_csv_data_per_tool(df_main)

        if not all_results:
            print("No results were generated. Please check the input data and logs.")
        else:
            for tool_id, metrics in all_results.items():
                print(f"\nResults for toolid: {tool_id}")
                if "error" in metrics:
                    print(f"  Message: {metrics['error']}")
                elif all(k in metrics for k in ["tp", "fp", "tn", "fn"]):
                    print(f"  True Positives (TP):  {metrics['tp']}")
                    print(f"  False Positives (FP): {metrics['fp']}")
                    print(f"  True Negatives (TN):  {metrics['tn']}")
                    print(f"  False Negatives (FN): {metrics['fn']}")
                    print("  ------------------------------------")
                    print(f"  Accuracy:             {metrics['accuracy']:.4f}")
                    print(f"  Precision:            {metrics['precision']:.4f}")
                    print(f"  Recall (Sensitivity): {metrics['recall']:.4f}")
                    print(f"  F1-Score:             {metrics['f1_score']:.4f}")
                else:
                    print(f"  Metrics data incomplete: {metrics}")
                print("-" * 40)
    else:
        print("Could not proceed with analysis as DataFrame is empty.")


Successfully loaded 'trs.csv', shape: (2100, 13)

Analyzing CSV data for Reentrancy-related findings per tool (using pandas & sklearn):

Filtered out 0 rows with ambiguous filenames (not containing 'ree' or 'safe').
For toolid 'mythril-0.24.7', processed 150 rows initially, kept 150 after deduplicating by filename (latest run with valid start time).

Misclassified contracts for toolid 'mythril-0.24.7':
  False Positives (predicted vulnerable, actually safe):
    - tests/handcrafted-raw/00_BasicCall_safe1.sol
    - tests/handcrafted-raw/00_BasicEmit_safe1.sol
    - tests/handcrafted-raw/00_BasicFold_safe1.sol
    - tests/handcrafted-raw/00_BasicFold_safe2.sol
    - tests/handcrafted-raw/00_BasicNoChecks_safe1.sol
    - tests/handcrafted-raw/00_BasicUnchecked_safe1.sol
    - tests/handcrafted-raw/01_SingleMutexFoldSem_safe1.sol
    - tests/handcrafted-raw/01_SingleMutex_safe1.sol
    - tests/handcrafted-raw/01_SingleMutex_safe2.sol
    - tests/handcrafted-raw/02_CrossMutexSem_safe1.sol
 