## Dataset Preprocessor

Initial experiment setup

- Initial idea: Classifier to identify vulnerable lines of code. 
- Dataset : smartbugs curated- vulnerable lines of code




In [1]:
# First, get the map of contracts and respective line of code

import os
import sys
import json

dataset= "smartbugs-curated"

path='../dataset/'+dataset
if dataset == 'smartbugs-curated' :
    vulnerability_localization= json.load(open(path+'/vulnerabilities.json'))


In [11]:


## now prepare the dataset, for each contract, get the line of code that is vulnerable
base_path = f'../dataset/{dataset}'
# Function to extract vulnerable lines
def extract_vulnerable_lines(contract_data):
    vulnerable_lines_data = []
    vulnerable_lines = []
    
    for contract in contract_data:
        contract_name = contract['name']
        relative_path = contract['path']
        full_path = os.path.join(base_path, relative_path)
        
        # Check if file exists
        if not os.path.exists(full_path):
            print(f"Warning: File not found - {full_path}")
            continue
        
        # Read the file content
        try:
            with open(full_path, 'r', encoding='utf-8') as file:
                file_lines = file.readlines()
        except Exception as e:
            print(f"Error reading {full_path}: {e}")
            continue
            
        # Process vulnerabilities
        for vulnerability in contract['vulnerabilities']:
            category = vulnerability['category']
            
            for line_number in vulnerability['lines']:
                # Adjust for 0-based indexing
                adjusted_line_number = line_number - 1
                
                # Extract the actual code (if line number is valid)
                code_line = ""
                if 0 <= adjusted_line_number < len(file_lines):
                    code_line = file_lines[adjusted_line_number].strip()
                else:
                    print(f"Warning: Line {line_number} out of range in {full_path}")
                
                vulnerable_lines_data.append({
                    'contract_name': contract_name,
                    'contract_path': relative_path,
                    'full_path': full_path,
                    'pragma_version': contract['pragma'],
                    'source': contract['source'],
                    'line_number': line_number,
                    'vulnerability_category': category,
                    'code': code_line
                })
                vulnerable_lines.append(code_line)
    
    return vulnerable_lines_data, vulnerable_lines
        
    

In [13]:
vulnerable_lines_data,vulnerable_lines = extract_vulnerable_lines(vulnerability_localization)
print(len(vulnerable_lines_data))
print(vulnerable_lines_data[0:3])
print(vulnerable_lines[0:3])


## storage data
with open(f'../dataset/{dataset}_vulnerable_lines.json', 'w') as file:
    json.dump(vulnerable_lines_data, file, indent=4)





222
[{'contract_name': 'FibonacciBalance.sol', 'contract_path': 'dataset/access_control/FibonacciBalance.sol', 'full_path': '../dataset/smartbugs-curated/dataset/access_control/FibonacciBalance.sol', 'pragma_version': '0.4.22', 'source': 'https://github.com/sigp/solidity-security-blog', 'line_number': 31, 'vulnerability_category': 'access_control', 'code': 'require(fibonacciLibrary.delegatecall(fibSig, withdrawalCounter));'}, {'contract_name': 'FibonacciBalance.sol', 'contract_path': 'dataset/access_control/FibonacciBalance.sol', 'full_path': '../dataset/smartbugs-curated/dataset/access_control/FibonacciBalance.sol', 'pragma_version': '0.4.22', 'source': 'https://github.com/sigp/solidity-security-blog', 'line_number': 38, 'vulnerability_category': 'access_control', 'code': 'require(fibonacciLibrary.delegatecall(msg.data));'}, {'contract_name': 'arbitrary_location_write_simple.sol', 'contract_path': 'dataset/access_control/arbitrary_location_write_simple.sol', 'full_path': '../dataset/s

In [53]:
# now get an equal amount of non-vulnerable lines
# current vulnerable lines 222

import random
import re

def get_pragma_version(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        file_lines = file.readlines()
    #regex to get the pragma version
    while file_lines[0].strip().startswith("pragma") == False:
        file_lines.pop(0)
    
    pragma_line= file_lines[0].strip()

    # now only get the pragma version
    match = re.search(r'pragma solidity \^?([\d\.]+);', pragma_line)

    if match:
        version = match.group(1)
        return version
    return None

def extract_non_vulnerable_lines(vulnerable_lines, num_samples):
    non_vulnerable_lines = []
    
    # Load all contract files
    contract_files = []
    for root, dirs, files in os.walk(base_path):
        for file in files:
            if file.endswith('.sol'):
                contract_files.append(os.path.join(root, file))
    
    # Extract non-vulnerable lines
    while len(non_vulnerable_lines) < num_samples:
        # Randomly select a contract file
        contract_file = random.choice(contract_files)
        
        # Read the file content
        try:
            with open(contract_file, 'r', encoding='utf-8') as file:
                file_lines = file.readlines()
        except Exception as e:
            print(f"Error reading {contract_file}: {e}")
            continue
        
        # Randomly select a line
        line_number = random.randint(0, len(file_lines) - 1)
        code_line = file_lines[line_number].strip()
        pragma= get_pragma_version(contract_file)
        
        # Check if line is a comment or blank
        if re.match(r'^\s*(//|$)', code_line):
            continue
        
        # Check if the exact line from the contract is already in vulnerable lines
        
        if any((line['code'] == code_line and line["full_path"]==contract_file and line["line_number"]==line_number) for line in vulnerable_lines_data):
            print("duplicate")
            continue
        
        # Add to non-vulnerable lines
        non_vulnerable_lines.append({
            'contract_path': os.path.relpath(contract_file, base_path),
            'full_path': contract_file,
            'line_number': line_number + 1,
            'code': code_line,
            'pragma_version':pragma
        })
    
    return non_vulnerable_lines


In [54]:
random.seed(42)
non_vulnerable_lines= extract_non_vulnerable_lines(vulnerable_lines, len(vulnerable_lines_data))
print(len(non_vulnerable_lines))
print(non_vulnerable_lines[0:3])

222
[{'contract_path': 'dataset/arithmetic/overflow_single_tx.sol', 'full_path': '../dataset/smartbugs-curated/dataset/arithmetic/overflow_single_tx.sol', 'line_number': 2, 'code': '* @source: https://github.com/ConsenSys/evm-analyzer-benchmark-suite', 'pragma_version': '0.4.23'}, {'contract_path': 'dataset/reentrancy/0x4320e6f8c05b27ab4707cd1f6d5ce6f3e4b3a5a1.sol', 'full_path': '../dataset/smartbugs-curated/dataset/reentrancy/0x4320e6f8c05b27ab4707cd1f6d5ce6f3e4b3a5a1.sol', 'line_number': 32, 'code': '}', 'pragma_version': '0.4.19'}, {'contract_path': 'dataset/reentrancy/etherstore.sol', 'full_path': '../dataset/smartbugs-curated/dataset/reentrancy/etherstore.sol', 'line_number': 5, 'code': '*/', 'pragma_version': '0.4.10'}]


In [55]:
## storage data
final_data= vulnerable_lines_data + non_vulnerable_lines
with open(f'../dataset/{dataset}_final_data.json', 'w') as file:
    json.dump(final_data, file, indent=4)


### Now Find a way to represent the data
Feature Engineering Pipeline
#### 1. Tokenization (code)

    Tokenize Solidity code by splitting on non-alphanumeric characters.
    Remove comments and unnecessary whitespace.
    Convert tokens into numerical representations (e.g., TF-IDF, one-hot encoding, or embeddings).

#### 2. Presence of External Calls

    Check if the line contains low-level function calls like:
        call, delegatecall, staticcall, send, transfer
    Store this as a binary feature (1 if present, 0 otherwise).

#### 3. Use of require or assert

    Check if require(...) or assert(...) appears in the line.
    Store as a binary feature (1 if present, 0 otherwise).

#### 4. Encoding Categorical Features

    pragma_version: Convert Solidity versions into numerical features (e.g., split into major, minor, and patch).
    vulnerability_category: Use label encoding or one-hot encoding.
    contract_name: Encode as a categorical variable (or use hashing to avoid high-dimensionality).

#### 5. Normalization of line_number

    Scale it to a [0,1] range using Min-Max Scaling.


In [85]:
import re
import os
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder, MinMaxScaler

# Helper function to tokenize Solidity code
def tokenize_code(code):
    # Remove comments
    code = re.sub(r"//.*|/\*[\s\S]*?\*/", "", code)
    # Tokenize by splitting on non-alphanumeric characters
    tokens = re.findall(r"\w+", code)
    return tokens

# Feature extraction function
def extract_features(data):
    df = pd.DataFrame(data)

    #Code Encoding
    code_enc = LabelEncoder()
    df["code_enc"] = code_enc.fit_transform(df["code"])
    # Tokenization
    df["tokens"] = df["code"].apply(tokenize_code)
    df['tokens']= df['tokens'].apply(len)
    
    # Presence of external calls
    external_calls = ["call", "delegatecall", "staticcall", "send", "transfer"]
    df["has_external_call"] = df["code"].apply(lambda x: any(call in x for call in external_calls)).astype(int)
    
    # Presence of `require` or `assert`
    df["has_require_assert"] = df["code"].apply(lambda x: "require" in x or "assert" in x).astype(int)

    # Encoding categorical variables
    df["pragma_version"] = df["pragma_version"].apply(lambda x: tuple(map(int, x.lstrip("^").split("."))) if x else (0, 0, 0))
    df[["pragma_major", "pragma_minor", "pragma_patch"]] = pd.DataFrame(df["pragma_version"].tolist(), index=df.index)

    label_enc = LabelEncoder()
    #df["vulnerability_category"] = label_enc.fit_transform(df["vulnerability_category"])

    #df["contract_name"] = label_enc.fit_transform(df["contract_name"])

    # Normalize `line_number`
    scaler = MinMaxScaler()
    df["line_number"] = scaler.fit_transform(df[["line_number"]])

    # vulnerable if vulnerability category is not empty
    df["label"]= df["vulnerability_category"].apply(lambda x: False if pd.isna(x) else True)
    
    # Drop unnecessary columns

    return df.drop(columns=["pragma_version", "code", "contract_path", "full_path", "source", "vulnerability_category","contract_name"])



In [88]:
encoded_data = extract_features(final_data)
encoded_data[encoded_data['label']==True]

Unnamed: 0,line_number,code_enc,tokens,has_external_call,has_require_assert,pragma_major,pragma_minor,pragma_patch,label
0,0.012165,218,5,1,1,0,4,22,True
1,0.015004,219,5,1,1,0,4,22,True
2,0.010543,209,4,0,1,0,4,25,True
3,0.007705,112,2,0,0,0,4,24,True
4,0.006894,132,2,0,0,0,4,24,True
...,...,...,...,...,...,...,...,...,...
217,0.070154,280,3,1,0,0,4,0,True
218,0.007705,278,3,1,0,0,4,18,True
219,0.010543,181,5,1,0,0,4,18,True
220,0.005272,177,4,1,0,0,4,0,True


In [89]:
encoded_data.to_csv(f'../dataset/{dataset}_encoded_data.csv', index=False)

In [90]:
# Now random split for training and testing
from sklearn.model_selection import train_test_split

train_data, test_data = train_test_split(encoded_data, test_size=0.2, random_state=42, stratify=encoded_data["label"])
train_data.to_csv(f'../dataset/{dataset}_train_data.csv', index=False)
test_data.to_csv(f'../dataset/{dataset}_test_data.csv', index=False)

print(f"Train data: {len(train_data)} samples")
print(f"Test data: {len(test_data)} samples")
print(f"Train data: {train_data['label'].value_counts()}")
print(f"Test data: {test_data['label'].value_counts()}")

Train data: 355 samples
Test data: 89 samples
Train data: label
True     178
False    177
Name: count, dtype: int64
Test data: label
False    45
True     44
Name: count, dtype: int64
