# Data Processing of Test Data

Find missing files

In [60]:
# Existing imports
from pathlib import Path
from dotenv import load_dotenv
import pandas as pd
import os
import json
import random
import openai
import anthropic

# Load environment variables
load_dotenv()

# Initialize API keys
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
ANTHROPIC_API_KEY = os.getenv('ANTHROPIC_API_KEY')

if not OPENAI_API_KEY or not ANTHROPIC_API_KEY:
    raise ValueError(
        "API keys for OpenAI and Anthropic must be set in the .env file.")

# Define paths using Pathlib
path_groundTruth_excel = Path(
    '../test-contracts/dataset/groundTruth/groundTruth.xlsx')
path_hex_folder = Path('../test-contracts/dataset/groundTruth/hex')
path_sol_folder = Path('../test-contracts/dataset/groundTruth/sol')

# Read the Excel file
df_groundTruth = pd.read_excel(path_groundTruth_excel, engine='openpyxl')

list_groundTruth_address = df_groundTruth['address'].tolist()
print('Number of groundTruth addresses: ', len(list_groundTruth_address))

# List filenames without extensions using Pathlib
list_filenames_hex = [f.stem for f in path_hex_folder.iterdir() if f.is_file()]
list_filenames_sol = [f.stem for f in path_sol_folder.iterdir() if f.is_file()]


def get_missing_addresses(
    ground_truth_addresses: list,
    existing_hex_files: list,
    existing_sol_files: list
) -> tuple:
    """
    Identify missing hex and sol files based on ground truth addresses.

    Args:
        ground_truth_addresses (list): List of ground truth contract addresses.
        existing_hex_files (list): List of existing hex file names (without extensions).
        existing_sol_files (list): List of existing sol file names (without extensions).

    Returns:
        tuple: Two lists containing missing hex addresses and missing sol addresses respectively.
    """
    # Convert lists to sets for faster lookup and ensure case-insensitivity
    ground_truth_set = {addr.strip().lower()
                        for addr in ground_truth_addresses}
    existing_hex_set = {fname.strip().lower() for fname in existing_hex_files}
    existing_sol_set = {fname.strip().lower() for fname in existing_sol_files}

    # Find missing hex and sol addresses using set operations
    missing_hex_files = list(ground_truth_set - existing_hex_set)
    missing_sol_files = list(ground_truth_set - existing_sol_set)

    # Log the missing counts
    print(f"Number of missing hex files: {len(missing_hex_files)}")
    print(f"Number of missing sol files: {len(missing_sol_files)}")

    return missing_hex_files, missing_sol_files

# Find missing hex and sol files using the improved function
missing_hex, missing_sol = get_missing_addresses(
    list_groundTruth_address,
    list_filenames_hex,
    list_filenames_sol
)

print(f"Missing Addresses are: \n hex: {missing_hex}, \n sol: {missing_sol}")
print('The number of missing addresses: hex: ',
      len(missing_hex), '; sol: ', len(missing_sol))

Number of groundTruth addresses:  67
Number of missing hex files: 0
Number of missing sol files: 0
Missing Addresses are: 
 hex: [], 
 sol: []
The number of missing addresses: hex:  0 ; sol:  0


# Experiment

## Setup

In [61]:
# Functions for Setup

def load_prompts():
    """Load prompts from the prompts directory."""
    prompts = {}
    prompt_dir = Path('prompts/')
    basic_prompt_path = prompt_dir / 'basic_ai_prompt.txt'
    developer_meta_prompt_path = prompt_dir / 'developer_meta_prompt.txt'

    with open(basic_prompt_path, 'r') as file:
        prompts['basic'] = file.read()

    with open(developer_meta_prompt_path, 'r') as file:
        prompts['developer_meta'] = file.read()

    return prompts

def load_contracts(contract_dir='../test-contracts/dataset/groundTruth/sol'):
    """Load smart contracts from the specified directory."""
    contracts = []
    for file in os.listdir(contract_dir):
        if file.endswith('.sol'):
            with open(os.path.join(contract_dir, file), 'r') as f:
                contracts.append({'filename': file, 'code': f.read()})
    return contracts

def initialize_openai_client():
    """Initialize the OpenAI client."""
    return openai.OpenAI(api_key=OPENAI_API_KEY)

def initialize_anthropic_client():
    """Initialize the Anthropic client."""
    return anthropic.Anthropic(api_key=ANTHROPIC_API_KEY)

def audit_contract_openai(model, prompt, contract_code):
    """Audit a single contract using OpenAI's GPT-4-O model."""
    client = initialize_openai_client()
    try:
        response = client.chat.completions.create(
            model=model,
            messages=[
                {"role": "system", "content": "You are a smart contract auditor."},
                {"role": "user", "content": prompt.replace(
                    "[Insert smart contract code here]", contract_code)}
            ],
            temperature=0.7,
            max_tokens=2000
        )
        return response.choices[0].message.content
    except Exception as e:
        return f"Open AI Error: {str(e)}"


def audit_contract_anthropic(model, prompt, contract_code):
    """Audit a single contract using Anthropic's Claude 3.5 Sonnet model."""
    client = initialize_anthropic_client()
    try:
        response = client.messages.create(
            model=model,
            messages=[
                {"role": "user", "content": prompt.replace(
                    "[Insert smart contract code here]", contract_code)}
            ],
            max_tokens=3000,  # Updated argument
            temperature=0.7
        )
        return response.content[0].text
    except Exception as e:
        return f"Anthropic Error: {str(e)}"
    

def evaluate_audit(audit_result, known_errors):
    """
    Evaluate whether the audit_result contains the known_errors.
    
    Parameters:
    - audit_result (str): The text output from the audit model.
    - known_errors (list): A list of known error descriptions.
    
    Returns:
    - bool: True if all known errors are found, False otherwise.
    """
    audit_result_lower = audit_result.lower()
    print(audit_result_lower)
    # append the audit_result to a json file
    with open('audit_reports/audit_result.json', 'a') as f:
        json.dump(audit_result_lower, f)
        f.write('\n')
    return all(error.lower() in audit_result_lower for error in known_errors)


def analyze_results(results, total_contracts=20):
    """Analyze the audit results."""
    analysis = {}
    for model_name in results:
        analysis[model_name] = {}
        for prompt_type in results[model_name]:
            successes = sum(
                1 for result in results[model_name][prompt_type] if result['found_errors'])
            failure = total_contracts - successes
            analysis[model_name][prompt_type] = {
                'Success': successes,
                'Failure': failure,
                'Success Rate (%)': round((successes / total_contracts) * 100, 2)
            }
    return analysis

## Performing the audit

In [62]:
print('Starting the audit...')
# Load prompts and contracts
prompts = load_prompts()
contracts = load_contracts()

# Define models
models = {
    'GPT4O': 'gpt-4o-mini',
    'Claude3.5_Sonnet': 'claude-3-5-sonnet-20240620'
}

# Define prompt types
prompt_types = ['basic', 'developer_meta']

# Initialize results storage
results = {
    'GPT4O': {'basic': [], 'developer_meta': []},
    'Claude3.5_Sonnet': {'basic': [], 'developer_meta': []}
}

# Iterate over each contract
for contract in contracts[:20]:  # Ensure only 20 contracts are processed
    filename = contract['filename']
    code = contract['code']
    print(filename)

    # Retrieve known errors for this contract
    known_errors = df_groundTruth[df_groundTruth['address'].str.lower() == filename.lower()][['Mint', 'Leak', 'Limit']].values.tolist()

    for model_name, model_identifier in models.items():
        for prompt_type in prompt_types:
            if model_name == 'GPT4O':
                audit_result = audit_contract_openai(
                    model_identifier, prompts[prompt_type], code)
            elif model_name == 'Claude3.5_Sonnet':
                audit_result = audit_contract_anthropic(
                    model_identifier, prompts[prompt_type], code)
            else:
                audit_result = "Unsupported model."

            # Evaluate the audit result
            found_errors = evaluate_audit(audit_result, known_errors)

            # Store the result
            results[model_name][prompt_type].append({
                'contract': filename,
                'found_errors': found_errors
            })

            print(
                f"Completed {model_name} with {prompt_type} on {filename}: {'Success' if found_errors else 'Failure'}")
            
            # Save into a md file
            with open(f'audit_reports/{model_name}_{prompt_type}_results.json', 'w') as f:
                json.dump(results, f)

print('Audit completed.')

Starting the audit...
0xA0ffC741F109159ee203424A299E6d2731dcFC76.sol
### comprehensive security analysis of the provided solidity smart contract

#### overview
the contract `missionmasterchef` is an implementation of a yield farming strategy that interacts with a masterchef contract, which likely manages liquidity pools and rewards in a decentralized finance (defi) context. given the code, we will analyze it for potential vulnerabilities, particularly those that may facilitate a rug pull.

#### key components analyzed
1. **ownership and governance**
2. **token management (minting, burning, and allowances)**
3. **external contract dependencies**
4. **emergency withdrawals and pausing mechanisms**
5. **functionality and modifiers**

### analysis

#### 1. ownership and governance
```solidity
address public govaddress;
```
- **vulnerability**: the contract uses a single governance address (`govaddress`) to control critical functions. if this address is compromised, the attacker could drain

### Saving the results to CSV

In [63]:
# Create audit_reports directory if it doesn't exist
os.makedirs('audit_reports', exist_ok=True)

# Save the results to CSV files
for model_name in results:
    for prompt_type in results[model_name]:
        df_results = pd.DataFrame(results[model_name][prompt_type])
        csv_path = f'audit_reports/{model_name.lower()}_{prompt_type}_results.csv'
        df_results.to_csv(csv_path, index=False)
        print(f"Results saved to {csv_path}")

Results saved to audit_reports/gpt4o_basic_results.csv
Results saved to audit_reports/gpt4o_developer_meta_results.csv
Results saved to audit_reports/claude3.5_sonnet_basic_results.csv
Results saved to audit_reports/claude3.5_sonnet_developer_meta_results.csv


In [64]:
# Save all the results into a single csv file
df_all_results = pd.concat([pd.read_csv(f'audit_reports/{model_name.lower()}_{prompt_type}_results.csv') for model_name in results for prompt_type in results[model_name]], ignore_index=True)
df_all_results.to_csv('audit_reports/all_results.csv', index=False)

print('All results saved to audit_reports/all_results.csv')

All results saved to audit_reports/all_results.csv


### Analyzing the results

In [65]:
def analyze_results(results, total_contracts=20):
    """Analyze the audit results."""
    analysis = {}
    for model_name in results:
        analysis[model_name] = {}
        for prompt_type in results[model_name]:
            successes = sum(
                1 for result in results[model_name][prompt_type] if result['found_errors'])
            failure = total_contracts - successes
            analysis[model_name][prompt_type] = {
                'Success': successes,
                'Failure': failure,
                'Success Rate (%)': round((successes / total_contracts) * 100, 2)
            }
    return analysis


# Perform analysis
analysis = analyze_results(results)

# Display the analysis
for model, prompt_data in analysis.items():
    print(f"\nModel: {model}")
    for prompt, data in prompt_data.items():
        print(f"  Prompt: {prompt}")
        for key, value in data.items():
            print(f"    {key}: {value}")

# Add the analysis to the csv file
analysis_df = pd.DataFrame([(model, prompt, data['Success'], data['Failure'], data['Success Rate (%)']) 
                            for model, prompt_data in analysis.items() 
                            for prompt, data in prompt_data.items()],
                           columns=['Model', 'Prompt', 'Success', 'Failure', 'Success Rate (%)'])

analysis_csv_path = 'audit_reports/analysis_results.csv'
analysis_df.to_csv(analysis_csv_path, index=False)
print(f"Analysis results saved to {analysis_csv_path}")





Model: GPT4O
  Prompt: basic
    Success: 20
    Failure: 0
    Success Rate (%): 100.0
  Prompt: developer_meta
    Success: 20
    Failure: 0
    Success Rate (%): 100.0

Model: Claude3.5_Sonnet
  Prompt: basic
    Success: 20
    Failure: 0
    Success Rate (%): 100.0
  Prompt: developer_meta
    Success: 20
    Failure: 0
    Success Rate (%): 100.0
Analysis results saved to audit_reports/analysis_results.csv
