# Statistics of FORGE Dataset

In [3]:
import os
import json
from dataclasses import dataclass
from dataclasses import field
from typing import List, Dict, Union, Optional, Literal
from pydantic import BaseModel, RootModel, Field
from collections import namedtuple,Counter

@dataclass
class ProjectInfo:
    url: Union[str, int, List, None] = "n/a"
    commit_id: Union[str, int, List, None] = "n/a"
    address: Union[str, int, List, None] = "n/a"
    chain: Union[str, int, List, None] = "n/a"
    compiler_version: Union[str, List, None] = "n/a"
    project_path: Union[str, List, Dict, None] = "n/a"

    def is_empty(self):
        if (self.url == "n/a" and self.address == "n/a") or (
            not self.url and not self.address
        ):
            return True
        return False

@dataclass
class Finding:
    id: Union[str, int] = 0
    category: Dict = field(default_factory=dict)
    title: str = ""
    description: str = ""
    severity: str = ""
    location: Union[str, int, List] = ""


class Report(BaseModel):
    path: str = ""
    project_info: ProjectInfo = field(default_factory=ProjectInfo)
    findings: List[Finding] = field(default_factory=list)

    def append_finding(self, finding: Finding):
        self.findings.append(finding)


class JSONFileProcessor:
    def __init__(self, directory: str):
        self.directory = directory
        self.file_count = 0

    def _get_all_json_files(self) -> List[str]:
        json_files = []
        for root, _, files in os.walk(self.directory):
            for file in files:
                if file.endswith(".json"):
                    json_files.append(os.path.join(root, file))
                    self.file_count += 1
        return json_files

    def operate_add(self,results:List,result_type):
        res = {}
        for field in result_type._fields:
            if isinstance(getattr(results[0],field),int):
                res[field] = 0
            else:
                res[field] = []
            # res[field] = 0
        for result in results:
            for field in result._fields:
                res[field] += getattr(result, field)

        return res
    def operate_reduce(self,results:List,result_type):
        res = {}
        for field in result_type._fields:
            res[field] = []
        for result in results:
            for field in result._fields:
                res[field].extend(getattr(result, field))
        return res

    def process_files(
        self, analysis_func=None
    ) -> List[Report]:
        results = []
        json_files = self._get_all_json_files()
        for json_file in json_files:
            with open(json_file, "r", encoding="utf8") as f:
                data = json.load(f)
                report = Report(**data)
                if analysis_func:
                    result = analysis_func(report)
                    results.append(result)
        return results


## Count findings & projects

In [4]:
Finding = namedtuple("Finding", ["total_files","total_projects","total_findings"])

def count_finding(report: Report):
    result = Finding(total_findings=len(report.findings),total_files=1,total_projects=len(report.project_info.project_path.keys()))
    return result

processor = JSONFileProcessor("../../dataset/results")
results = processor.process_files(analysis_func=count_finding)
res = processor.operate_add(results,Finding)
res

{'total_files': 6454, 'total_projects': 6579, 'total_findings': 27497}

## Check & count contracts

In [3]:
import re
from pathlib import Path

Result = namedtuple(
    "Result",
    [
        "total_files",
        "total_projects",
        "valid_projects",
        "solidity_files",
        "lines_of_code",
        "compiler_version",
    ],
)

def check_projects(report: Report):
    def count_lines_and_get_compiler_version(filepath: Path):
        regex = re.compile(r"pragma [^;]+( [^;]+)*;", re.IGNORECASE|re.MULTILINE)
        try:
            with open(filepath, "r", encoding="utf8") as sol_file:
                lines =  len(sol_file.readlines())
                sol_file.seek(0)
                for line in sol_file:
                    match = regex.match(line)
                    if match:
                        return (lines, match.group(0))
                return (lines, "")
        except Exception as e:
            print(f"Error opening file {filepath}: {e}")
            return (0,"")

    def deal_compiler_version(version_list:list):
        version_set = set(version_list)
        version_dict = {}
        for version in version_set:
            version_dict[version] = version_list.count(version)
        if "" in version_dict:
            del version_dict[""]
        if not version_dict:
            return ""
        return max(version_dict, key=version_dict.get)
    if report.project_info.is_empty():
        valid_projects = 0
        print(f"Empty project info in {report.path}")
    else:
        valid_projects = len(report.project_info.project_path.keys())
        solidity_files = 0
        lines_of_code = 0
        compiler_versions = []
        for k, v in report.project_info.project_path.items():
            paths = Path("../../dataset/" + v).glob("**/*.sol")
            for path in paths:

                line,version= count_lines_and_get_compiler_version(path)
                lines_of_code += line
                compiler_versions.append(version)
            solidity_files += len(list(Path("../../dataset/" + v).glob("**/*.sol")))
    compiler_version = deal_compiler_version(compiler_versions)
    # if compiler_version == "":
        # print(f"No compiler version found in {report.path}")
    return Result(
        total_files=1,
        total_projects=len(report.project_info.project_path.keys()),
        valid_projects=valid_projects,
        solidity_files=solidity_files,
        lines_of_code=lines_of_code,
        compiler_version=[compiler_version]
    )


processor = JSONFileProcessor("../../dataset/results")
results = processor.process_files(analysis_func=check_projects)
res = processor.operate_add(results, Result)


In [4]:
average_lines_of_code = res["lines_of_code"] / res["valid_projects"]
average_files_per_project = res["solidity_files"] / res["valid_projects"]
print(f"Total files: {res['total_files']}")
print(f"Total projects: {res['total_projects']}")
print(f"Valid projects: {res['valid_projects']}")
print(f"Total solidity files: {res['solidity_files']}")
print(f"Total lines of code: {res['lines_of_code']}")
print(f"Average lines of code: {average_lines_of_code}")
print(f"Average files per project: {average_files_per_project}")
# some might be pragma solidity >=0.7.0 <0.8.0;
from collections import Counter
compiler_versions = res["compiler_version"]
compiler_versions
compiler_versions2 = []
# compiler_versions = [version for version in compiler_versions if version]
regex = re.compile(r"\d+\.\d+(\.\d+)?", re.IGNORECASE)
for version in compiler_versions:
    # print(version)
    version2 = regex.search(version).group(0) if regex.search(version) else ""
    
    if not version2:
        continue
    if version2 < "0.5.0":
        compiler_versions2.append("^0.4")
    elif version2 < "0.6.0":
        compiler_versions2.append("^0.5")
    elif version2 < "0.7.0":
        compiler_versions2.append("^0.6")
    elif version2 < "0.8.0":
        compiler_versions2.append("^0.7")
    elif version2 < "0.9.0":
        compiler_versions2.append("^0.8")

compiler_version = Counter(compiler_versions2)
compiler_version


Total files: 6454
Total projects: 6579
Valid projects: 6579
Total solidity files: 81390
Total lines of code: 16941428
Average lines of code: 2575.076455388357
Average files per project: 12.371181030551755


Counter({'^0.8': 3788, '^0.6': 1518, '^0.4': 275, '^0.5': 480, '^0.7': 362})

## Severity statistics

In [5]:
severity_category = ["na","info","low","medium","high","critical"]

Severity = namedtuple("Category",severity_category)
def count_severity(report: Report):
    findings = report.findings
    severity_dict = {"na": 0, "info": 0, "low": 0, "medium": 0, "high": 0, "critical": 0}
    for finding in findings:
        severity = finding.severity.lower().strip()
        if severity == "n/a":
            severity = "na"
        if severity not in severity_category:
            continue
        severity_dict[severity] = severity_dict.get(severity, 0) + 1
        # dict to namedtuple
    return Severity(**severity_dict)

processor = JSONFileProcessor("../../dataset/results")
results = processor.process_files(analysis_func=count_severity)
res = processor.operate_add(results, Severity)
res

{'na': 2406,
 'info': 3456,
 'low': 15170,
 'medium': 3367,
 'high': 2094,
 'critical': 1004}

## CWE-Id statistics

In [9]:
CWE = namedtuple("CWE", ["cwe_ids"])

def extract_cwe_ids(report: Report):
    cwe_ids = []
    for finding in report.findings:
        if finding.category:
            # Get the ID with the highest key from the category dictionary
            try:
                max_key = max([int(k) for k in finding.category.keys() if k.isdigit()])
                cwe_value = finding.category.get(str(max_key), [""])[0]
                # Extract CWE ID if it exists
                if isinstance(cwe_value, str) and cwe_value.startswith("CWE-"):
                    cwe_ids.append(cwe_value)
            except (ValueError, AttributeError):
                continue
    return CWE(cwe_ids=[cwe_id for cwe_id in cwe_ids])

processor = JSONFileProcessor("../../dataset/results")
results = processor.process_files(analysis_func=extract_cwe_ids)
cwe_result = processor.operate_reduce(results, CWE)

# Count unique CWE IDs
unique_cwe_ids = set(cwe_result["cwe_ids"])
cwe_counter = Counter(cwe_result["cwe_ids"])

print(f"Total unique CWE IDs: {len(unique_cwe_ids)}")

Total unique CWE IDs: 296
