In [None]:
#STEP 1: Install Slither (and dependencies)

!pip install solc-select

In [None]:
!pip install slither-analyzer

In [None]:
import re

def extract_solidity_version(file_path):
    try:
        with open(file_path, 'r') as file:
            content = file.read()

        # Look for the first version number in the pragma line
        match = re.search(r'pragma\s+solidity\s+[^;]*?(\d+\.\d+\.\d+)', content)
        if match:
            version = match.group(1)
            print(f"{version}")
            return version
        else:
            print("No Solidity version found.")
            return None
    except Exception as e:
        print(f"Error: {e}")
        return None


In [None]:
#STEP 2: Upload Solidity file
from google.colab import files
uploaded = files.upload()

Saving SimpleDAO.sol to SimpleDAO.sol


In [None]:
file_path = list(uploaded.keys())[0]
ver = extract_solidity_version(file_path)

0.6.12


In [None]:
!solc-select install {ver}

Installing solc '0.6.12'...
Version '0.6.12' installed.


In [None]:
!solc-select use {ver}

Switched global version to 0.6.12


In [None]:
import os
import shutil
import time

# Step 3: Extract contract name (without .sol extension)
contract_name = os.path.splitext(os.path.basename(file_path))[0]

***CFG construction***

In [None]:
# Step 4: Create output folder for the .dot files
output_dir = f"{contract_name}_cfg"
os.makedirs(output_dir, exist_ok=True)

In [None]:
# Start timer
start_time_cfg_slither = time.time()

# Step 5: Run Slither to generate CFG in .dot format
!slither {file_path} --print cfg

# End timer
end_time_cfg_slither = time.time()

# Calculate elapsed time
elapsed_time_cfg_slither = end_time_cfg_slither - start_time_cfg_slither
print(f"Time taken cfg slither: {elapsed_time_cfg_slither:.6f} seconds")

'solc --version' running
'solc SimpleDAO.sol --combined-json abi,ast,bin,bin-runtime,srcmap,srcmap-runtime,userdoc,devdoc,hashes,compact-format --allow-paths .,/content' running
--> SimpleDAO.sol


INFO:Printers:Export SimpleDAO.sol-SimpleDAO-donate(address).dot
Export SimpleDAO.sol-SimpleDAO-withdraw(uint256).dot
Export SimpleDAO.sol-SimpleDAO-queryCredit(address).dot

INFO:Slither:SimpleDAO.sol analyzed (1 contracts)
Time taken cfg slither: 1.309967 seconds


In [None]:
# Step 6: Rename and move each .dot file
for filename in os.listdir():
    if filename.endswith('.dot') and filename.startswith(file_path):
        match = re.match(rf'{re.escape(file_path)}-(.*)-([^\(]+)\(.*\)\.dot', filename)
        if match:
            contract_part = match.group(1)
            func_name = match.group(2)
            new_filename = f"{contract_part}_{func_name}.dot"
        else:
            match = re.match(rf'{re.escape(file_path)}-(.*)-([^\(]+)\(\)\.dot', filename)
            if match:
                contract_part = match.group(1)
                func_name = match.group(2)
                new_filename = f"{contract_part}_{func_name}.dot"
            else:
                print(f"Skipping unrecognized file: {filename}")
                continue

        shutil.move(filename, os.path.join(output_dir, new_filename))

print(f"All renamed .dot files saved in folder: {output_dir}")

All renamed .dot files saved in folder: SimpleDAO_cfg


***CFG_ENCODER***

In [None]:
import re
from collections import defaultdict
import os



def parse_dot_cfg(dot_file_path):
    func_name = os.path.splitext(os.path.basename(dot_file_path))[0].split('_', 1)[-1]
    cfg_name = f"CFG_{func_name.capitalize()}"

    nodes = {}
    edges = defaultdict(list)

    with open(dot_file_path, 'r') as file:
        content = file.read()

    # Parse nodes
    node_blocks = re.findall(r'(\d+)\[label="Node Type: ([^\n]+)\s*\d*\n(.*?)"\];', content, re.DOTALL)
    for node_id_raw, node_type, body in node_blocks:
        node_id = f"N{node_id_raw}"
        body = re.sub(r'\nIRs:.*', '', body, flags=re.DOTALL)
        expr_match = re.search(r'EXPRESSION:\n(.*?)(?:\n[A-Z_]+:|\Z)', body, re.DOTALL)
        if expr_match:
            expression = expr_match.group(1).strip().replace('\n', ' ')
        else:
            expression = node_type.strip()
        nodes[node_id] = expression

    # Parse edges
    for match in re.finditer(r'(\d+)->(\d+)(?:\[label="([^"]+)"\])?;', content):
        src = f"N{match.group(1)}"
        dst = f"N{match.group(2)}"
        label = match.group(3).strip() if match.group(3) else None
        edges[src].append((dst, label))

    # DFS with visit count to allow 1 unroll
    all_paths = []
    def dfs(current, path, conditions, visit_count):
        if visit_count.get(current, 0) >= 2:
            return  # allow at most 1 revisit (2 entries total)
        visit_count[current] = visit_count.get(current, 0) + 1

        if current not in edges:
            all_paths.append((path[:], conditions[:]))
        else:
            for dst, label in edges[current]:
                path.append(dst)
                if label:
                    conditions.append(f"{current}={label}")
                dfs(dst, path, conditions, visit_count)
                path.pop()
                if label:
                    conditions.pop()
        visit_count[current] -= 1

    dfs("N0", ["N0"], [], dict())

    # Output In Cell In a format
    # print(f"{cfg_name}:\n")
    # for nid in sorted(nodes.keys(), key=lambda x: int(x[1:])):
    #     print(f"{nid}: {nodes[nid]}")

    # print(f"\nPaths in {cfg_name}:\n")
    # for path, conds in all_paths:
    #     cond_str = " [" + ", ".join(conds) + "]" if conds else ""
    #     print("->".join(path) + cond_str)

     # Collect output in a string
    output = [f"{cfg_name}:\n"]
    for nid in sorted(nodes.keys(), key=lambda x: int(x[1:])):
        output.append(f"{nid}: {nodes[nid]}")
    output.append(f"\nPaths in {cfg_name}:\n")
    for path, conds in all_paths:
        cond_str = " [" + ", ".join(conds) + "]" if conds else ""
        output.append("->".join(path) + cond_str)
    output.append("\n" + "-" * 40 + "\n")
    return "\n".join(output)


In [None]:


# Start timer
start_time_cfg_encoder = time.time()

# ---------- Run on all .dot files in a folder ----------

def process_all_dot_files(dot_folder_path):
    combined_output = []

    dot_files = sorted([f for f in os.listdir(dot_folder_path) if f.endswith(".dot")])
    if not dot_files:
        print(" No .dot files found in the folder.")
        return

    # smart_contract_name = dot_files[0].split('_')[0]
    output_txt_path = f"{contract_name}_CFGs.txt"

    for filename in dot_files:
        full_path = os.path.join(dot_folder_path, filename)
        print(f"Processing: {filename}")
        result = parse_dot_cfg(full_path)
        combined_output.append(result)

    with open(output_txt_path, "w") as f:
        f.write("\n".join(combined_output))

    print(f"\n All CFGs saved to: {output_txt_path}")


# Replace with your actual folder path
dot_folder_path = f"{contract_name}_cfg"  # folder containing your .dot files
process_all_dot_files(dot_folder_path)

# End timer
end_time_cfg_encoder = time.time()

# Calculate elapsed time
elapsed_time_cfg_encoder = end_time_cfg_encoder - start_time_cfg_encoder
print(f"Time taken cfg Encoder: {elapsed_time_cfg_encoder:.6f} seconds")

Processing: SimpleDAO_donate.dot
Processing: SimpleDAO_queryCredit.dot
Processing: SimpleDAO_withdraw.dot

 All CFGs saved to: SimpleDAO_CFGs.txt
Time taken cfg Encoder: 0.001885 seconds


***CG Construction***

In [None]:
# Create output folder for the .dot files
output_dir = f"{contract_name}_cg"
os.makedirs(output_dir, exist_ok=True)

In [None]:
# Start timer
start_time_cg_slither = time.time()


# Run Slither to generate CG in .dot format
!slither {file_path} --print call-graph

# End timer
end_time_cg_slither = time.time()
# Calculate elapsed time
elapsed_time_cg_slither = end_time_cg_slither - start_time_cg_slither
print(f"Time taken cg slither: {elapsed_time_cg_slither:.6f} seconds")

'solc --version' running
'solc SimpleDAO.sol --combined-json abi,ast,bin,bin-runtime,srcmap,srcmap-runtime,userdoc,devdoc,hashes,compact-format --allow-paths .,/content' running
--> SimpleDAO.sol


INFO:Printers:Call Graph: SimpleDAO.sol.all_contracts.call-graph.dot
Call Graph: SimpleDAO.sol.SimpleDAO.call-graph.dot

INFO:Slither:SimpleDAO.sol analyzed (1 contracts)
Time taken cg slither: 0.905950 seconds


In [None]:
# Step: Rename and move each .dot file
for filename in os.listdir():
    if filename.endswith("call-graph.dot"):
        # Example filename: SimpleDAO.sol.SimpleDAO.call-graph.dot
        match = re.match(r"(.+)\.sol\.(.+)\.call-graph\.dot", filename)
        if match:
            contract_name = match.group(1)
            part = match.group(2)

            if part == "all_contracts":
                new_filename = f"{contract_name}_All_cg.dot"
            else:
                new_filename = f"{part}_cg.dot"

            shutil.move(filename, os.path.join(output_dir, new_filename))
            print(f" Moved: {filename} ➝ {new_filename}")
        else:
            print(f" Skipping unrecognized file: {filename}")

print(f"\n All renamed .dot files saved in folder: {output_dir}")

 Moved: SimpleDAO.sol.SimpleDAO.call-graph.dot ➝ SimpleDAO_cg.dot
 Moved: SimpleDAO.sol.all_contracts.call-graph.dot ➝ SimpleDAO_All_cg.dot

 All renamed .dot files saved in folder: SimpleDAO_cg


***CG_Encoder***

In [None]:
import re
from collections import defaultdict

def parse_call_graph_dot(dot_file_path):
    with open(dot_file_path, 'r') as f:
        content = f.read()

    # Preprocess to split everything into clean chunks (contract sections, edges, etc.)
    contract_sections = re.findall(r'subgraph cluster_\d+_([^\s{]+)\s*{(.*?)}', content, re.DOTALL)
    edges = re.findall(r'"([^"]+)"\s*->\s*"([^"]+)"', content)
    unlabeled_solidity_nodes = re.findall(r'subgraph cluster_[^}]*label = "\[Solidity\]"\s*{([^}]*)}', content, re.DOTALL)

    contract_map = defaultdict(list)        # contract -> list of node_ids
    node_label_map = dict()                 # node_id -> function label
    func_to_contract = dict()               # node_id -> contract name

    # Parse each contract subgraph
    for contract_name, body in contract_sections:
        node_matches = re.findall(r'"([^"]+)"\s*\[label="([^"]+)"\]', body)
        for node_id, label in node_matches:
            contract_map[contract_name].append(node_id)
            node_label_map[node_id] = label
            func_to_contract[node_id] = contract_name

    # Handle Solidity nodes (they have no labels)
    for block in unlabeled_solidity_nodes:
        for line in block.strip().splitlines():
            match = re.match(r'"([^"]+)"', line.strip())
            if match:
                node_id = match.group(1)
                contract_map['[Solidity]'].append(node_id)
                node_label_map[node_id] = node_id
                func_to_contract[node_id] = '[Solidity]'

    # Build call edges
    calls_map = defaultdict(list)
    for caller, callee in edges:
        calls_map[caller].append(callee)

    # Final output grouped by contracts
    output = ""
    for contract in contract_map:
        output += f"\nCG_{contract}:\n\n"
        for node in contract_map[contract]:
            caller_label = node_label_map.get(node, node)
            callees = calls_map.get(node, [])
            if not callees:
                output += f"CG({caller_label}, )\n"
            else:
                for callee in callees:
                    callee_label = node_label_map.get(callee, callee)
                    callee_contract = func_to_contract.get(callee, "")
                    tag = " [solidity]" if callee_contract == "[Solidity]" else ""
                    output += f"CG({caller_label}, {callee_label}){tag}\n"

    return output.strip()


In [None]:
# Start timer
start_time_cg_encoder = time.time()

dot_path = f"{contract_name}_cg/{contract_name}_All_cg.dot"  # your actual .dot path
result = parse_call_graph_dot(dot_path)
output_path = f"{contract_name}_CG.txt"
with open(output_path, "w") as f:
    f.write(result)

print(result)
print()
print(f"Call graph output saved to: {output_path}")

# END timer
end_time_cg_encoder = time.time()

# Calculate elapsed time
elapsed_time_cg_encoder = end_time_cg_encoder - start_time_cg_encoder
print(f"Time taken cg Encoder: {elapsed_time_cg_encoder:.6f} seconds")

CG_SimpleDAO:

CG(queryCredit, )
CG(donate, )
CG(withdraw, require(bool,string))

Call graph output saved to: SimpleDAO_CG.txt
Time taken cg Encoder: 0.001288 seconds


# ***Data Dependency Construction***

In [None]:
# Create output file for the storing of DD information
output_file = f"{contract_name}_dd.txt"


In [None]:

# Start timer
start_time_dd_slither = time.time()

# Run Slither and capture stdout + stderr
!slither {file_path} --print data-dependency > {output_file} 2>&1

# End timer
end_time_dd_slither = time.time()
# Calculate elapsed time
elapsed_time_dd_slither = end_time_dd_slither - start_time_dd_slither
print(f"Time taken dd slither: {elapsed_time_cg_slither:.6f} seconds")


Time taken dd slither: 0.905950 seconds


***Data Dependency Encoder***

In [None]:
import re
from collections import defaultdict

# Start timer
start_time_dd_encoder = time.time()


# Step 2: Read the captured output
with open(output_file, "r") as f:
    output = f.read()

# Step 3: Parse and clean
dd_data = defaultdict(dict)
current_scope = ""

lines = output.strip().splitlines()

for line in lines:
    line = line.strip()

    # Detect contract header
    contract_match = re.match(r'^Contract\s+(\w+)', line)
    if contract_match:
        current_scope = contract_match.group(1)
        continue

    # Detect function header
    func_match = re.match(r'^Function\s+(\w+)\(.*?\)', line)
    if func_match:
        current_scope = func_match.group(1)
        continue

    # Skip headers
    if 'Variable' in line or 'Dependencies' in line or set(line) <= set('+-|'):
        continue

    # Parse variable-dependency rows
    if '|' in line:
        parts = [p.strip() for p in line.strip('|').split('|')]
        if len(parts) == 2:
            var, deps = parts
            try:
                dep_list = eval(deps) if deps and deps != '[]' else []
            except:
                dep_list = []
            dd_data[current_scope][var] = dep_list

# Step 4: Prepare the output string
output_str = ""
for scope, vars in dd_data.items():
    print(f"DD_{scope}:")
    output_str += f"DD_{scope}:\n"
    for var, deps in vars.items():
        dep_str = ', '.join(deps) if deps else '-'
        output_str += f"  {var}: {dep_str}\n"
        print(f"  {var}: {dep_str}")
    print()
    output_str += "\n"

# Step 5: Save to file
filename = f"{contract_name}_DD_Encoded.txt"
with open(filename, "w") as f:
    f.write(output_str)

print(f"DD output saved to: {filename}")


# End timer
end_time_dd_encoder = time.time()

# Calculate elapsed time
elapsed_time_dd_encoder = end_time_dd_encoder - start_time_dd_encoder
print(f"Time taken dd Encoder: {elapsed_time_dd_encoder:.6f} seconds")


DD_SimpleDAO:
  credit: amount, credit, msg.value

DD_donate:
  to: -
  SimpleDAO.credit: credit, msg.value

DD_withdraw:
  amount: -
  sent: TUPLE_0, amount, msg.sender
  SimpleDAO.credit: amount, credit

DD_queryCredit:
  to: -
  : -
  SimpleDAO.credit: credit

DD output saved to: SimpleDAO_DD_Encoded.txt
Time taken dd Encoder: 0.002903 seconds
