In [None]:
"""
Static Security Bechmark
This script is designed to measure the ability of static analysis tools to identify safe and unsafe code.
"""

import json
import os
import sys
import re
import time
import subprocess
import pandas as pd
from pathlib import Path
from llmScan import *

In [None]:
# Define the benchmark test function with timing

def test_vulnerabilities_benchmark(python_code_path):
    """
    Analyzes a Python file for security vulnerabilities using Bandit (plain text output).

    Args:
        python_code_path (str): Path to the Python file to analyze

    Returns:
        tuple: (success, result, tool_name, scan_time)
            - success (bool): True if analysis successful, False otherwise
            - result (bool): True if vulnerable, False if safe
            - tool_name (str): Name of the static tool used
            - scan_time (float): Time taken for the scan in seconds
    """
    import re
    start_time = time.time()
    tool_name = "Bandit"
    command = [
        'bandit',
        '-r', python_code_path,
        '-ll', '-iii'
    ]
    try:
        proc = subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
        output = proc.stdout
        
        # Store raw output for debugging
        global last_bandit_output
        last_bandit_output = output
        
        # Look for signs of vulnerability
        if ">> Issue:" in output:
            is_vulnerable = True
        elif "No issues identified." in output:
            is_vulnerable = False
        elif "Files skipped" in output and "No such file or directory" in output:
            print(f"File not found: {python_code_path}")
            scan_time = time.time() - start_time
            return False, None, tool_name, scan_time
        else:
            # Could not determine, treat as error
            print(f"Bandit output ambiguous for file: {python_code_path}")
            scan_time = time.time() - start_time
            return False, None, tool_name, scan_time

        scan_time = time.time() - start_time
        return True, is_vulnerable, tool_name, scan_time
    except Exception as e:
        print(f"Bandit error: {e}")
        scan_time = time.time() - start_time
        return False, None, tool_name, scan_time

# Initialize a global variable to store the last output
last_bandit_output = ""

print("test_vulnerabilities_benchmark function defined successfully!")

In [None]:
# Define the benchmark runner function
def run_benchmark(train_dir="train", max_files_per_dir=None, total_max_files=None,
                 include_patched=True, include_vulnerable=True, test_split=None):
    """
    Run Bandit benchmark on Python files in train directories.

    Returns:
        pd.DataFrame: Results with columns [file_path, success, tool_name, bandit_result, actual, scan_time_seconds]
    """
    results = []
    total_files_processed = 0
    files_by_directory = {}

    directories = {}
    if include_patched:
        directories[os.path.join(train_dir, "patched")] = False
    if include_vulnerable:
        directories[os.path.join(train_dir, "vulnerable")] = True

    if not directories:
        print("Error: No directories selected.")
        return pd.DataFrame()

    print(f"Selected directories: {list(directories.keys())}")

    for directory, actual_vulnerable in directories.items():
        if not os.path.exists(directory):
            print(f"Warning: Directory {directory} does not exist")
            continue

        if total_max_files and total_files_processed >= total_max_files:
            print(f"Reached total file limit of {total_max_files}. Stopping.")
            break

        print(f"\nProcessing directory: {directory}")
        all_python_files = list(Path(directory).glob("**/*.py"))
        original_count = len(all_python_files)
        print(f"Found {original_count} Python files")

        python_files = all_python_files.copy()

        # Test split
        if test_split is not None and 0.0 <= test_split <= 1.0:
            import random
            random.seed(42)
            random.shuffle(python_files)
            split_size = int(len(python_files) * test_split)
            python_files = python_files[:split_size]
            print(f"Using test split of {test_split:.2%}: {len(python_files)} files selected from {original_count}")

        # Per-directory limit
        if max_files_per_dir:
            python_files = python_files[:max_files_per_dir]

        # Total limit
        if total_max_files:
            remaining_files = total_max_files - total_files_processed
            python_files = python_files[:remaining_files]

        directory_name = "patched" if not actual_vulnerable else "vulnerable"
        files_by_directory[directory_name] = len(python_files)
        print(f"Will analyze {len(python_files)} files from {directory_name} directory")

        for file_path in python_files:
            print(f"Analyzing: {file_path}")
            success, bandit_result, tool_name, scan_time = test_vulnerabilities_benchmark(str(file_path))
            results.append({
                "file_path": str(file_path),
                "success": success,
                "tool_name": tool_name,
                "bandit_result": bandit_result,
                "actual": actual_vulnerable,
                "scan_time_seconds": round(scan_time, 3)
            })
            total_files_processed += 1

            if success:
                status = "VULNERABLE" if bandit_result else "SAFE"
                expected = "VULNERABLE" if actual_vulnerable else "SAFE"
                match = "✓" if bandit_result == actual_vulnerable else "✗"
                print(f"  Result: {status} | Expected: {expected} | {match} | Time: {scan_time:.3f}s")
            else:
                print(f"  Error: Analysis failed | Time: {scan_time:.3f}s")

            if total_max_files and total_files_processed >= total_max_files:
                print(f"Reached total file limit of {total_max_files}. Stopping.")
                break

    print(f"\n{'='*50}\nFILES PROCESSED SUMMARY\n{'='*50}")
    for dir_name, count in files_by_directory.items():
        print(f"{dir_name.capitalize()} files: {count}")
    print(f"Total files processed: {total_files_processed}")

    return pd.DataFrame(results)

print("run_benchmark function defined successfully!")

In [None]:
# Run the benchmark
print("Starting Bandit Security Benchmark...")

# Example usage with different options:

# Basic usage:
# df_results = run_benchmark()                           # All files from both directories

# Directory selection:
# df_results = run_benchmark(include_patched=True, include_vulnerable=False)   # Only safe files
# df_results = run_benchmark(include_patched=False, include_vulnerable=True)   # Only vulnerable files

# File limits:
df_results = run_benchmark(max_files_per_dir=50)        # Max 50 files per directory
# df_results = run_benchmark(total_max_files=10)         # Max 10 files total

# Test split:
# df_results = run_benchmark(test_split=0.1)             # Use 10% of files for testing
# df_results = run_benchmark(test_split=0.5)             # Use 50% of files for testing

# Combined options:
# df_results = run_benchmark(include_vulnerable=True, include_patched=False, 
#                            test_split=0.2, max_files_per_dir=10)

# Run with default settings (all files from both directories)
# df_results = run_benchmark()

print(f"\nBenchmark completed! Analyzed {len(df_results)} files.")
print("Results stored in 'df_results' DataFrame")

In [None]:
# Calculate and display metrics
print("="*60)
print("BANDIT BENCHMARK RESULTS")
print("="*60)

total_files = len(df_results)
successful_analyses = df_results['success'].sum()
failed_analyses = total_files - successful_analyses

print(f"Total files analyzed: {total_files}")
print(f"Successful analyses: {successful_analyses}")
print(f"Failed analyses: {failed_analyses}")

if not df_results.empty:
    tools_used = df_results['tool_name'].unique()
    print(f"Static Tool(s) used: {', '.join(tools_used)}")

    total_time = df_results['scan_time_seconds'].sum()
    avg_time = df_results['scan_time_seconds'].mean()
    min_time = df_results['scan_time_seconds'].min()
    max_time = df_results['scan_time_seconds'].max()

    print(f"\nTiming Statistics:")
    print(f"Total scan time: {total_time:.3f} seconds")
    print(f"Average scan time: {avg_time:.3f} seconds")
    print(f"Fastest scan: {min_time:.3f} seconds")
    print(f"Slowest scan: {max_time:.3f} seconds")

    if successful_analyses > 0:
        successful_df = df_results[df_results['success'] == True]
        avg_successful_time = successful_df['scan_time_seconds'].mean()
        print(f"Average time for successful scans: {avg_successful_time:.3f} seconds")

    if failed_analyses > 0:
        failed_df = df_results[df_results['success'] == False]
        avg_failed_time = failed_df['scan_time_seconds'].mean()
        print(f"Average time for failed scans: {avg_failed_time:.3f} seconds")

if total_files > 0:
    success_rate = successful_analyses / total_files
    print(f"\nOverall Success Rate: {success_rate:.2%} ({successful_analyses}/{total_files})")

    if successful_analyses > 0:
        successful_df = df_results[df_results['success'] == True]
        correct_predictions = (successful_df['bandit_result'] == successful_df['actual']).sum()
        accuracy_on_successful = correct_predictions / len(successful_df)
        overall_accuracy = correct_predictions / total_files

        print(f"Accuracy on successful scans: {accuracy_on_successful:.2%} ({correct_predictions}/{len(successful_df)})")
        print(f"Overall accuracy (failed scans counted as incorrect): {overall_accuracy:.2%} ({correct_predictions}/{total_files})")

        # Confusion matrix
        true_positives = ((successful_df['bandit_result'] == True) & (successful_df['actual'] == True)).sum()
        true_negatives = ((successful_df['bandit_result'] == False) & (successful_df['actual'] == False)).sum()
        false_positives = ((successful_df['bandit_result'] == True) & (successful_df['actual'] == False)).sum()
        false_negatives = ((successful_df['bandit_result'] == False) & (successful_df['actual'] == True)).sum()

        print(f"\nConfusion Matrix (Successful Scans Only):")
        print(f"True Positives: {true_positives}")
        print(f"True Negatives: {true_negatives}")
        print(f"False Positives: {false_positives}")
        print(f"False Negatives: {false_negatives}")

        # Extended confusion matrix
        failed_df = df_results[df_results['success'] == False]
        failed_vulnerable = (failed_df['actual'] == True).sum()
        failed_safe = (failed_df['actual'] == False).sum()

        print(f"\nExtended Confusion Matrix (Including Failed Scans):")
        print(f"True Positives: {true_positives}")
        print(f"True Negatives: {true_negatives}")
        print(f"False Positives: {false_positives}")
        print(f"False Negatives: {false_negatives}")
        print(f"Failed on Vulnerable Files: {failed_vulnerable}")
        print(f"Failed on Safe Files: {failed_safe}")
        print(f"Total Failed: {failed_analyses}")

        # Precision, Recall, F1 (on successful scans only)
        precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
        recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
        f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

        print(f"\nMetrics (Successful Scans Only):")
        print(f"Precision: {precision:.2%}")
        print(f"Recall: {recall:.2%}")
        print(f"F1 Score: {f1_score:.2%}")

        # Adjusted metrics including failed scans
        total_vulnerable = (df_results['actual'] == True).sum()
        total_safe = (df_results['actual'] == False).sum()
        adjusted_recall = true_positives / total_vulnerable if total_vulnerable > 0 else 0
        adjusted_precision = precision
        adjusted_f1 = 2 * (adjusted_precision * adjusted_recall) / (adjusted_precision + adjusted_recall) if (adjusted_precision + adjusted_recall) > 0 else 0

        print(f"\nAdjusted Metrics (Failed Scans Impact Recall):")
        print(f"Adjusted Precision: {adjusted_precision:.2%} (same as above)")
        print(f"Adjusted Recall: {adjusted_recall:.2%} (failed vulnerable files counted as missed)")
        print(f"Adjusted F1 Score: {adjusted_f1:.2%}")

    else:
        print("\nNo successful analyses to calculate detailed metrics.")

    # Failure analysis by file type
    if failed_analyses > 0:
        failed_df = df_results[df_results['success'] == False]
        failed_vulnerable = (failed_df['actual'] == True).sum()
        failed_safe = (failed_df['actual'] == False).sum()

        print(f"\nFailure Analysis:")
        print(f"Failed vulnerable file scans: {failed_vulnerable}")
        print(f"Failed safe file scans: {failed_safe}")
        print(f"Failure rate on vulnerable files: {failed_vulnerable/total_vulnerable:.2%}" if total_vulnerable > 0 else "No vulnerable files")
        print(f"Failure rate on safe files: {failed_safe/total_safe:.2%}" if total_safe > 0 else "No safe files")

else:
    print("\nNo files analyzed.")

In [None]:
# Save results to CSV
output_file = "bandit_benchmark_results.csv"
df_results.to_csv(output_file, index=False)
print(f"Results saved to: {output_file}")

# Save summary metrics to a separate file
if successful_analyses > 0:
    summary = {
        "total_files": total_files,
        "successful_analyses": successful_analyses,
        "failed_analyses": failed_analyses,
        "tool_name": ', '.join(tools_used),
        "accuracy_on_successful": accuracy_on_successful,
        "overall_accuracy": overall_accuracy,
        "precision": precision,
        "recall": recall,
        "f1_score": f1_score,
        "adjusted_precision": adjusted_precision,
        "adjusted_recall": adjusted_recall,
        "adjusted_f1": adjusted_f1,
        "true_positives": true_positives,
        "true_negatives": true_negatives,
        "false_positives": false_positives,
        "false_negatives": false_negatives,
        "failed_vulnerable": failed_vulnerable,
        "failed_safe": failed_safe,
        "total_scan_time_seconds": total_time,
        "average_scan_time_seconds": avg_time,
        "min_scan_time_seconds": min_time,
        "max_scan_time_seconds": max_time
    }
    summary_df = pd.DataFrame([summary])
    summary_df.to_csv("bandit_benchmark_summary.csv", index=False)
    print("Summary metrics saved to: bandit_benchmark_summary.csv")
else:
    print("No successful analyses to save summary metrics.")

In [None]:
# Debug Bandit Input/Output
def debug_bandit_example(file_path=None):
    """
    Show exactly what Bandit receives and outputs for debugging purposes

    Args:
        file_path (str, optional): Path to a specific file to analyze. If None, uses first available file.
    """
    print("="*60)
    print("BANDIT INPUT/OUTPUT DEBUG EXAMPLE")
    print("="*60)

    # Find a file to analyze if none provided
    if file_path is None:
        for directory in ["train/patched", "train/vulnerable"]:
            if os.path.exists(directory):
                python_files = list(Path(directory).glob("**/*.py"))
                if python_files:
                    file_path = str(python_files[0])
                    break
        if file_path is None:
            print("No Python files found in train/patched or train/vulnerable directories")
            return

    print(f"Analyzing file: {file_path}")
    print(f"File type: {'VULNERABLE' if 'vulnerable' in file_path else 'SAFE'}")
    print("-" * 60)

    try:
        with open(file_path, "r") as f:
            python_code = f.read()
        print("PYTHON CODE CONTENT:")
        print("-" * 30)
        print(python_code)
        print("-" * 30)

        print("Running Bandit...")
        # Capture raw output by running command directly
        bandit_cmd = f"bandit -r {file_path} -ll -iii"
        print(f"Command: {bandit_cmd}")
        
        start_time = time.time()
        # First get the raw output
        raw_output = subprocess.run(
            bandit_cmd, 
            shell=True, 
            stdout=subprocess.PIPE, 
            stderr=subprocess.PIPE, 
            text=True
        )
        
        # Then run the benchmark function
        success, bandit_result, tool_name, scan_time = test_vulnerabilities_benchmark(file_path)
        
        print(f"BANDIT RAW OUTPUT (took {time.time() - start_time:.3f}s):")
        print("-" * 30)
        print(raw_output.stdout)
        if raw_output.stderr:
            print("STDERR:")
            print(raw_output.stderr)
        print("-" * 30)
        
        print(f"BANDIT PARSED RESULT:")
        print("-" * 30)
        print(f"Success: {success}")
        print(f"Bandit result (True=vulnerable, False=safe): {bandit_result}")
        print(f"Tool name: {tool_name}")
        print("-" * 30)

        expected_result = "vulnerable" in file_path.lower()
        print(f"EXPECTED RESULT: {expected_result}")

        if success:
            match = "✓ CORRECT" if bandit_result == expected_result else "✗ INCORRECT"
            print(f"MATCH: {match}")
        else:
            print("MATCH: ✗ FAILED TO ANALYZE")

    except Exception as e:
        print(f"ERROR: {str(e)}")

# Let's test with both a vulnerable and safe file
print("Running Bandit debug examples...")

# Try to find a vulnerable file
vulnerable_file = None
if os.path.exists("train/vulnerable"):
    vulnerable_files = list(Path("train/vulnerable").glob("**/*.py"))
    if vulnerable_files:
        vulnerable_file = str(vulnerable_files[0])
        print(f"\nTesting with vulnerable file: {vulnerable_file}")
        debug_bandit_example(vulnerable_file)

# Try to find a safe file
safe_file = None
if os.path.exists("train/patched"):
    safe_files = list(Path("train/patched").glob("**/*.py"))
    if safe_files:
        safe_file = str(safe_files[0])
        print(f"\nTesting with safe file: {safe_file}")
        debug_bandit_example(safe_file)

# If we couldn't find test files from the train directories
if not vulnerable_file and not safe_file:
    print("\nFalling back to a simple test...")
    debug_bandit_example()