In [4]:
import json
import logging
import os
from concurrent.futures import ThreadPoolExecutor, as_completed

from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from tqdm import tqdm

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")

In [5]:
def compute_accuracy(path_to_log, gt_category):
    """
    Calculates the classification accuracy for a specific subdirectory.

    :param path_to_log: Path to the directory containing JSON log files.
    :param gt_category: The ground truth category for comparison.
    :return: Tuple (accuracy_score, total_files)
    """
    try:
        files = [f for f in os.listdir(path_to_log) if f.endswith(".json")]
        total_files = len(files)

        if total_files == 0:
            logger.warning(f"No JSON files found in '{path_to_log}'.")
            return 0.0, 0

        correct_files = 0
        gt_category = gt_category.lower()

        for filename in files:
            file_path = os.path.join(path_to_log, filename)
            try:
                with open(file_path, "r", encoding="utf-8") as file:
                    content = json.load(file)
                    classification = content.get("classification", "").lower()

                    if classification == gt_category:
                        correct_files += 1

            except (json.JSONDecodeError, KeyError) as e:
                logger.error(f"Error processing {filename}: {e}")

        accuracy_score = correct_files / total_files
        logger.info(
            f"Accuracy for '{gt_category}' in '{path_to_log}': {accuracy_score:.2%} ({correct_files}/{total_files})")
        return accuracy_score, total_files

    except Exception as e:
        logger.error(f"An error occurred: {e}")
        return 0.0, 0

In [7]:
# Root directory containing the two subdirectories
root_dir = "../explanations/baseline_o3mini"

# Compute accuracy for 'reentrant' and 'safe' subdirectories
path_reentrant = os.path.join(root_dir, "reentrant")
path_safe = os.path.join(root_dir, "safe")

acc_reentrant, total_reentrant = compute_accuracy(path_reentrant, gt_category="reentrant")
acc_safe, total_safe = compute_accuracy(path_safe, gt_category="safe")

# Compute overall accuracy
total_all = total_reentrant + total_safe
correct_all = (acc_reentrant * total_reentrant) + (acc_safe * total_safe)

if total_all > 0:
    overall_accuracy = correct_all / total_all
    logger.info(f"Overall Accuracy: {overall_accuracy:.2%} ({int(correct_all)}/{total_all})")
else:
    overall_accuracy = 0.0
    logger.warning("No JSON files found in either subdirectory.")

# Print results
logger.info(f"Reentrant Accuracy: {acc_reentrant:.2%}")
logger.info(f"Safe Accuracy: {acc_safe:.2%}")
logger.info(f"Overall Accuracy: {overall_accuracy:.2%}")

INFO: Accuracy for 'reentrant' in '../explanations/baseline_o3mini/reentrant': 87.67% (64/73)
INFO: Accuracy for 'safe' in '../explanations/baseline_o3mini/safe': 95.89% (70/73)
INFO: Overall Accuracy: 91.78% (134/146)
INFO: Reentrant Accuracy: 87.67%
INFO: Safe Accuracy: 95.89%
INFO: Overall Accuracy: 91.78%


In [None]:
def load_json(file_path):
    """Load a JSON file and return its contents."""
    logger.debug(f"Loading JSON file: {file_path}")
    try:
        with open(file_path, 'r') as f:
            return json.load(f)
    except Exception as e:
        logger.error(f"Error loading JSON file {file_path}: {e}")
        return None


def get_ground_truth_from_path(contract_path):
    """Determine the ground truth label based on the parent folder name."""
    label = "reentrant" if "reentrant" in contract_path else "safe" if "safe" in contract_path else None
    return label


def collect_predictions(contract_dir):
    """Collect labels from JSON files (excluding classification.json)."""
    predictions = []
    files = [f for f in os.listdir(contract_dir) if f.endswith(".json") and f != "classification.json"]

    with ThreadPoolExecutor() as executor:
        future_to_file = {executor.submit(load_json, os.path.join(contract_dir, file)): file for file in files}
        for future in as_completed(future_to_file):
            data = future.result()
            if data and "label" in data:
                predictions.append(data["label"])

    return predictions


def process_contract(root):
    """Process a single contract directory."""
    ground_truth = get_ground_truth_from_path(root)
    classification_file = os.path.join(root, "classification.json")

    y_true_class, y_pred_class = None, None
    y_true_anal, y_pred_anal = [], []

    if os.path.exists(classification_file):
        classification_data = load_json(classification_file)
        contract_label = classification_data.get("classification", "").lower()

        if contract_label:
            y_true_class, y_pred_class = ground_truth, contract_label

        predictions = collect_predictions(root)
        y_true_anal = [ground_truth] * len(predictions)
        y_pred_anal = predictions

    return y_true_class, y_pred_class, y_true_anal, y_pred_anal


def evaluate(base_dir):
    """Compute accuracy, precision, recall, and F1-score for contract classification using parallel processing."""
    logger.info(f"Starting evaluation on base directory: {base_dir}")

    contract_paths = [os.path.join(root) for root, _, _ in os.walk(base_dir) if
                      any(sub in root for sub in ["reentrant", "safe"])]

    y_true_analysis, y_pred_analysis = [], []
    y_true_classification, y_pred_classification = [], []

    with ThreadPoolExecutor() as executor:
        futures = {executor.submit(process_contract, root): root for root in contract_paths}
        for future in tqdm(as_completed(futures), total=len(futures), desc="Processing contracts"):
            y_true_class, y_pred_class, y_true_anal, y_pred_anal = future.result()
            if y_true_class is not None:
                y_true_classification.append(y_true_class)
                y_pred_classification.append(y_pred_class)
            y_true_analysis.extend(y_true_anal)
            y_pred_analysis.extend(y_pred_anal)

    # Compute metrics
    def compute_metrics(y_true, y_pred, metric_name):
        if not y_true:
            logger.warning(f"No data available for {metric_name} computation.")
            return 0, 0, 0, 0
        return (
            accuracy_score(y_true, y_pred),
            precision_score(y_true, y_pred, pos_label="reentrant", average='binary'),
            recall_score(y_true, y_pred, pos_label="reentrant", average='binary'),
            f1_score(y_true, y_pred, pos_label="reentrant", average='binary')
        )

    analysis_metrics = compute_metrics(y_true_analysis, y_pred_analysis, "analysis")
    classification_metrics = compute_metrics(y_true_classification, y_pred_classification, "classification")

    logger.info("Analysis Files Metrics:")
    logger.info(f"Accuracy: {analysis_metrics[0]:.4f}")
    logger.info(f"Precision: {analysis_metrics[1]:.4f}")
    logger.info(f"Recall: {analysis_metrics[2]:.4f}")
    logger.info(f"F1 Score: {analysis_metrics[3]:.4f}")

    logger.info("\nClassification File Metrics:")
    logger.info(f"Accuracy: {classification_metrics[0]:.4f}")
    logger.info(f"Precision: {classification_metrics[1]:.4f}")
    logger.info(f"Recall: {classification_metrics[2]:.4f}")
    logger.info(f"F1 Score: {classification_metrics[3]:.4f}")

In [None]:
base_directory = "../explanations/ast_cfg"
evaluate(base_directory)

In [None]:
base_directory = "../explanations/ast"
evaluate(base_directory)

In [None]:
base_directory = "../explanations/cfg"
evaluate(base_directory)