# Statistics of FORGE-curated Dataset

In [7]:
import os
import json
from dataclasses import dataclass
from dataclasses import field
from typing import List, Dict, Union, Optional, Literal
from pydantic import BaseModel, RootModel, Field
from collections import namedtuple,Counter

@dataclass
class ProjectInfo:
    url: Union[str, int, List, None] = "n/a"
    commit_id: Union[str, int, List, None] = "n/a"
    address: Union[str, int, List, None] = "n/a"
    chain: Union[str, int, List, None] = "n/a"
    compiler_version: Union[str, List, None] = "n/a"
    audit_date: Union[str, int, List, None] = "n/a"
    project_path: Union[str, List, Dict, None] = "n/a"

    def is_empty(self):
        if (self.url == "n/a" and self.address == "n/a") or (
            not self.url and not self.address
        ):
            return True
        return False

@dataclass
class Finding:
    id: Union[str, int] = 0
    category: Dict = field(default_factory=dict)
    title: str = ""
    description: str = ""
    severity: Optional[str] = ""
    location: Union[str, int, List] = ""


class Report(BaseModel):
    path: str = ""
    project_info: ProjectInfo = field(default_factory=ProjectInfo)
    findings: List[Finding] = field(default_factory=list)

    def append_finding(self, finding: Finding):
        self.findings.append(finding)


class JSONFileProcessor:
    def __init__(self, directory: str):
        self.directory = directory
        self.file_count = 0

    def _get_all_json_files(self) -> List[str]:
        json_files = []
        for root, _, files in os.walk(self.directory):
            for file in files:
                if file.endswith(".json"):
                    json_files.append(os.path.join(root, file))
                    self.file_count += 1
        return json_files

    def operate_add(self,results:List,result_type):
        res = {}
        for field in result_type._fields:
            if isinstance(getattr(results[0],field),int):
                res[field] = 0
            else:
                res[field] = []
            # res[field] = 0
        for result in results:
            for field in result._fields:
                res[field] += getattr(result, field)

        return res
    def operate_reduce(self,results:List,result_type):
        res = {}
        for field in result_type._fields:
            res[field] = []
        for result in results:
            for field in result._fields:
                res[field].extend(getattr(result, field))
        return res

    def process_files(
        self, analysis_func=None, modify_func=None
    ) -> List[Report]:
        results = []
        json_files = self._get_all_json_files()
        for json_file in json_files:
            # print(f"Processing file: {json_file}")
            with open(json_file, "r", encoding="utf8") as f:
                data = json.load(f)
            report = Report(**data)
            if analysis_func:
                result = analysis_func(report)
                results.append(result)
            if modify_func:
                modified_report: Report = modify_func(report)
                with open(json_file, "w", encoding="utf8") as f:
                    f.write(modified_report.model_dump_json(indent=4,exclude={'project_info': {'compiler_version'}}))
        return results

class CWE(BaseModel):
    ID: int
    Name: str
    Description: str = ""
    Abstraction: Literal["Pillar", "Class", "Base", "Variant", "Compound"]
    Mapping: Literal["Allowed", "Allowed-with-Review", "Discouraged", "Prohibited"]
    Peer: List = Field(default_factory=list)
    Parent: List = Field(default_factory=list)
    Child: List[int] = Field(default_factory=list)

    def __str__(self) -> str:
        return f"CWE-{self.ID}: {self.Name}"

    def __hash__(self):
        return hash(str(self))

    def add_child(self, child_cwe: "CWE"):
        self.Child.append(child_cwe)
        child_cwe.Parent.append(self)


class CWEDatabase(RootModel):
    root: Dict[str, CWE]

    def get_by_id(self, id: int | str):
        name = f"CWE-{id}"
        return self.root[name]

    def get_by_name(self, name: str):
        return self.root[name]

severity_score_map = {
    "informational": (0, 0, 0),
    "low": (0.1, 3.9, 2),
    "medium": (4.0, 6.9, 5.45),
    "high": (7.0, 8.9, 7.95),
    "critical": (9.0, 10.0, 9.5),
}

class CWEHandler:
    def __init__(self, cwe_database: CWEDatabase):
        self.db = cwe_database
        self.setup_relationships()

    @classmethod
    def load_from_file(cls, file_path: str) -> "CWEHandler":
        import json

        with open(file_path, "r") as f:
            data = json.load(f)
        # CWEDatabase expects a dict where keys are CWE IDs (strings) and values are CWE objects
        db = CWEDatabase.model_validate(data)
        return cls(db)

    def setup_relationships(self):
        """
        Build the parent-child relationships (DAG) based on the 'Child' field (List[int]).
        Populates the 'Parent' field (List[CWE]) for each CWE.
        """
        # First pass: Convert any integer IDs in Parent to CWE objects or clear if invalid
        for cwe in self.db.root.values():
            cleaned_parents = []
            for p in cwe.Parent:
                if isinstance(p, CWE):
                    cleaned_parents.append(p)
                elif isinstance(p, int):
                    try:
                        parent_cwe = self.db.get_by_id(p)
                        cleaned_parents.append(parent_cwe)
                    except KeyError:
                        pass
            cwe.Parent = cleaned_parents

        # Second pass: Build relationships from Child fields
        for cwe_key, cwe in self.db.root.items():
            for child_id in cwe.Child:
                try:
                    child_cwe = self.db.get_by_id(child_id)
                    # Add current cwe as parent to the child
                    # Check if already exists to avoid duplicates if run multiple times
                    if cwe not in child_cwe.Parent:
                        child_cwe.Parent.append(cwe)
                except KeyError:
                    # Child ID might not exist in the database
                    continue

    def get_cwe(self, cwe_id: Union[int, str]) -> Optional[CWE]:
        try:
            if isinstance(cwe_id, int):
                return self.db.get_by_id(cwe_id)
            elif isinstance(cwe_id, str):
                if cwe_id.startswith("CWE-"):
                    return self.db.root[cwe_id]
                else:
                    return self.db.get_by_id(int(cwe_id))
        except (KeyError, ValueError):
            return None

    def get_direct_children(self, cwe_id: Union[int, str]) -> List[CWE]:
        cwe = self.get_cwe(cwe_id)
        if not cwe:
            return []
        children = []
        for child_id in cwe.Child:
            child = self.get_cwe(child_id)
            if child:
                children.append(child)
        return children

    def get_direct_parents(self, cwe_id: Union[int, str]) -> List[CWE]:
        cwe = self.get_cwe(cwe_id)
        if not cwe:
            return []
        return cwe.Parent

    def get_all_descendants(self, cwe_id: Union[int, str]) -> List[CWE]:
        cwe = self.get_cwe(cwe_id)
        if not cwe:
            return []
        descendants = set()
        queue = [cwe]
        while queue:
            current = queue.pop(0)
            for child_id in current.Child:
                child = self.get_cwe(child_id)
                if child and child not in descendants:
                    descendants.add(child)
                    queue.append(child)
        return list(descendants)

    def get_root_parents(self, cwe_id: Union[int, str]) -> List[CWE]:
        """
        Recursively find the most root parents (Pillars).
        """
        cwe = self.get_cwe(cwe_id)
        if not cwe:
            return []

        roots = set()
        queue = [cwe]
        visited = set()

        while queue:
            current = queue.pop(0)
            if current in visited:
                continue
            visited.add(current)

            # If it's a Pillar, it's a root
            if current.Abstraction == "Pillar":
                roots.add(current)
                continue  # Stop traversing up from a Pillar? Usually yes.

            # If no parents, it's a root
            if not current.Parent:
                roots.add(current)
                continue

            # Otherwise, traverse up
            for parent in current.Parent:
                queue.append(parent)

        return list(roots)

    def is_related(self, id1: Union[int, str], id2: Union[int, str]) -> bool:
        """
        Check if there is any inheritance relationship between two CWEs.
        Returns True if id1 is ancestor of id2 OR id2 is ancestor of id1.
        """
        cwe1 = self.get_cwe(id1)
        cwe2 = self.get_cwe(id2)

        if not cwe1 or not cwe2:
            return False

        if cwe1 == cwe2:
            return True

        # Check if cwe1 is ancestor of cwe2
        if self._is_ancestor(cwe1, cwe2):
            return True

        # Check if cwe2 is ancestor of cwe1
        if self._is_ancestor(cwe2, cwe1):
            return True

        return False

    def _is_ancestor(self, ancestor: CWE, descendant: CWE) -> bool:
        # BFS up from descendant to find ancestor
        queue = [descendant]
        visited = set()
        while queue:
            current = queue.pop(0)
            if current in visited:
                continue
            visited.add(current)

            if current == ancestor:
                return True

            for parent in current.Parent:
                queue.append(parent)
        return False


## Count findings & projects

In [8]:
FINDING_PATH = "../dataset-curated/findings"
VERIFY_PATH = "../dataset-curated/manual_verification"
CONTRACTS_PATH = "../dataset-curated/contracts"
CONTRACTS_RAW_PATH = "../dataset-curated/contracts-raw"
VFP_PATH ="../flatten/vfp"
VFP_VULN_PATH ="../flatten/vfp-vuln"


Finding = namedtuple("Finding", ["total_files","total_projects","total_findings"])

def count_finding(report: Report):
    if isinstance(report.project_info.url,list):
        # print(report.path)
        project_count = len(report.project_info.url)
    else:
        project_count = 1
    result = Finding(total_findings=len(report.findings),total_files=1,total_projects=project_count)
    return result

processor = JSONFileProcessor(FINDING_PATH)
results = processor.process_files(analysis_func=count_finding)
res = processor.operate_add(results,Finding)
res

{'total_files': 209, 'total_projects': 254, 'total_findings': 2556}

## Check & count contracts

In [9]:
import re
from pathlib import Path

Result = namedtuple(
    "Result",
    [
        "total_files",
        "total_projects", 
        "valid_projects",
        "solidity_files",
        "lines_of_code",
    ],
)


_processed_projects = set()

def check_projects(report: Report):
    def count_lines(filepath: Path):
        try:
            with open(filepath, "r", encoding="utf8", errors='ignore') as rust_file:
                lines = len(rust_file.readlines())
                return lines
        except Exception as e:
            print(f"Error opening file {filepath}: {e}")
            return 0
    
    if report.project_info.is_empty():
        valid_projects = 0
        solidity_files = 0
        lines_of_code = 0
        total_projects = 0
        print(f"Empty project info in {report.path}")
    else:
        valid_projects = 0
        solidity_files = 0
        lines_of_code = 0
        total_projects = 0
        
        project_paths = []
        if isinstance(report.project_info.project_path, dict):
            project_paths = list(report.project_info.project_path.values())
        elif report.project_info.project_path and report.project_info.project_path != "n/a":
            project_paths = [report.project_info.project_path]
        
        for v in project_paths:
            if not v or v == "n/a":
                continue
                
            # "dataset-curated/"
            
            project_path = Path("../") / Path(v)
            
            project_key = str(project_path.resolve()) if project_path.exists() else str(project_path)
            if project_key in _processed_projects:
                total_projects += 1
                continue
            
            _processed_projects.add(project_key)
            total_projects += 1
            

            if project_path.exists():
                valid_projects += 1
                solidity_file_paths = [p for p in project_path.glob("**/*.sol") if p.is_file()]
                solidity_files += len(solidity_file_paths)
                for path in solidity_file_paths:
                    lines_of_code += count_lines(path)
    
    return Result(
        total_files=1,
        total_projects=total_projects,
        valid_projects=valid_projects,
        solidity_files=solidity_files,
        lines_of_code=lines_of_code,
    )


_processed_projects.clear()

processor = JSONFileProcessor(FINDING_PATH)
results = processor.process_files(analysis_func=check_projects)
res = processor.operate_add(results, Result)

res

{'total_files': 209,
 'total_projects': 210,
 'valid_projects': 210,
 'solidity_files': 28925,
 'lines_of_code': 4724389}

In [10]:
average_lines_of_code = res["lines_of_code"] / res["valid_projects"]
average_files_per_project = res["solidity_files"] / res["valid_projects"]
print(f"Total files: {res['total_files']}")
print(f"Total projects: {res['total_projects']}")
print(f"Valid projects: {res['valid_projects']}")
print(f"Total solidity files: {res['solidity_files']}")
print(f"Total lines of code: {res['lines_of_code']}")
print(f"Average lines of code: {average_lines_of_code}")
print(f"Average files per project: {average_files_per_project}")

Total files: 209
Total projects: 210
Valid projects: 210
Total solidity files: 28925
Total lines of code: 4724389
Average lines of code: 22497.090476190475
Average files per project: 137.73809523809524


## Severity statistics

In [11]:
severity_category = ["na","informational","low","medium","high","critical"]

Severity = namedtuple("Category",severity_category)
def count_severity(report: Report):
    findings = report.findings
    severity_dict = {"na": 0, "informational": 0, "low": 0, "medium": 0, "high": 0, "critical": 0}
    for finding in findings:
        severity = finding.severity
        if severity:
            severity = finding.severity.lower().strip()
        if severity == "info" or severity == "warning" or severity == "note/information" or severity == "gas" or severity == "note":
            severity = "informational"
        if severity == None or severity == "" or severity == "n/a":
            severity = "na"
        
        
        if severity not in severity_category:
            print(f"Unknown severity: {severity}")
            continue
        severity_dict[severity] = severity_dict.get(severity, 0) + 1
        # dict to namedtuple
    return Severity(**severity_dict)

processor = JSONFileProcessor(FINDING_PATH)
results = processor.process_files(analysis_func=count_severity)
res = processor.operate_add(results, Severity)
res

{'na': 93,
 'informational': 908,
 'low': 794,
 'medium': 439,
 'high': 254,
 'critical': 68}

## Statistics of Vulnerability-File Pairs

In [None]:
vp_files = Path(VFP_PATH).glob("*.json")
vp_vuln_files = Path(VFP_VULN_PATH).glob("*.json")

vp_count = len(list(vp_files))
vp_vuln_count = len(list(vp_vuln_files))
print(f"Total Vulnerability-File Pairs: {vp_count}")
print(f"Total Vulnerability-File Pairs with Medium, High and Critical Severity: {vp_vuln_count}")

Total Vulnerability-File Pairs: 654
Total Vulnerability-File Pairs with Medium, High and Critical Severity: 319
