In [10]:
import binascii
import cbor2
import base58
import datetime
import json
import requests
from web3 import Web3
from dotenv import load_dotenv
import os
from web3.contract import Contract
from pathlib import Path
import pprint
import ipywidgets as widgets
from IPython.display import display
from IPython import get_ipython
from git import Repo
import tempfile
import shutil
import subprocess
from IPython.display import display, Markdown

# load .env file
load_dotenv()

True

In [11]:
diamondLoupeABI = [
    {
        "constant": True,
        "inputs": [],
        "name": "facetAddresses",
        "outputs": [
            {
                "internalType": "address[]",
                "name": "facetAddresses_",
                "type": "address[]",
            }
        ],
        "payable": False,
        "stateMutability": "view",
        "type": "function",
    }
]

In [12]:
class CompareBytecode:
    def __init__(
        self,
        # contracts: dict = None,
        json_with_diamond_address: str,
        key_to_read_diamond_address: str,
        w3: Web3,
        contract_abi: dict,
        dir_path_to_broadcasts: str = "broadcast",
        dir_path_to_artifacts: str = "forge-artifacts",
        additional_contracts: dict = None,
        repos_with_build_option: tuple = None,
        forge_path: str = None,
    ):
        self.json_with_diamond_address = json_with_diamond_address
        self.key_to_read_diamond_address = key_to_read_diamond_address
        self.w3 = w3
        self.contract_abi = contract_abi
        self.dir_path_to_broadcasts = dir_path_to_broadcasts
        self.dir_path_to_artifacts = dir_path_to_artifacts
        self.temp_dirs: dict = (
            {}
        )  # keeps track of temporary directories created for each commit
        self.diamond_address = self.get_address_from_json(
            self.key_to_read_diamond_address, self.json_with_diamond_address
        )
        self.repos_with_build_option = repos_with_build_option
        self.forge_path = (
            forge_path if forge_path is not None else shutil.which("forge")
        )
        if self.forge_path is None:
            raise Exception(
                "Could not find 'forge' executable. Please ensure it is installed and available in your PATH."
            )

        self.built_repos_commits = set()  # keeps track of built repos and commits
        self.current_facet_addresses: list = None  # keeps track of facet addresses currently active on the proxy contract
        self.contracts = (
            self.match_contract_addresses_to_names()
        )  # if there's no name to match, then this currently removes the contract from the list

        # Add any additional self.contracts - override existing ones if needed
        if additional_contracts:
            for contract_name, address in additional_contracts.items():
                if contract_name not in self.contracts:
                    self.contracts[contract_name] = address
                else:
                    print(
                        f"Contract '{contract_name}' already exists in the dictionary."
                    )

        self.metadata: dict = {}

    def get_address_from_json(self, key=None, filename=None):
        """
        Reads a JSON file and returns the value corresponding to a given key

        Args:
            key (str): key to look up in the JSON file. If None, self.key_to_read_diamond_address is used.
            filename (str): name of the JSON file. If None, self.json_with_diamond_address is used.

        Returns:
            str: value corresponding to the given key or None if the key is not found
        """
        key = self.key_to_read_diamond_address if key is None else key
        filename = self.json_with_diamond_address if filename is None else filename

        if not os.path.isfile(filename):
            print(f"File '{filename}' does not exist.")
            return None

        try:
            with open(filename, "r") as f:
                data = json.load(f)
        except json.JSONDecodeError:
            print(f"File '{filename}' is not a valid JSON file.")
            return None

        address = data.get(key)
        if address is None:
            print(f"No address found for key '{key}' in {filename}.")

        return address

    def is_notebook(self):
        """Check if the script is running in a Jupyter notebook"""
        shell = get_ipython().__class__.__name__
        return shell == "ZMQInteractiveShell"

    def get_input(self, prompt):
        """Get input from user, compatible with both Jupyter notebook and CLI"""
        if self.is_notebook():
            input_widget = widgets.Text(
                value="",
                placeholder="Enter new address",
                description=prompt,
                disabled=False,
                layout=widgets.Layout(
                    width="auto"
                ),  # adjusts the width of the input box
            )
            input_widget.style.description_width = (
                "initial"  # adjusts the width of the description
            )
            display(input_widget)

            # Wait for input in Jupyter
            while not input_widget.value:
                pass
            return input_widget.value
        else:
            # Get input in CLI
            return input(prompt)

    def checkout_and_build(self, repo=None, commit=None):
        """
        Checks out a specific commit in the repository and runs `forge build`.
        If no repo and commit are specified, checks `self.repos_with_build_option`.

        Args:
            repo (str, optional): The repository to use. Defaults to None.
            commit (str, optional): The commit to checkout. Defaults to None.
        """

        # If no repo and commit are specified, use the ones from self.repos_with_build_option
        # if repo is None and commit is None:
        for (repo, commit), options in self.repos_with_build_option.items():
            if options["build"] and commit not in self.built_repos_commits:
                self.temp_dirs[commit] = self.perform_build(repo, commit)
                self.built_repos_commits.add(commit)

    def perform_build(self, repo, commit):
        """
        Performs the git clone, checkout, and forge build operations for the specified repo and commit.

        Args:
            repo (str): The repository to use.
            commit (str): The commit to checkout.
        """
        # Save the initial working directory
        start_dir = os.getcwd()

        # Create a temporary directory
        with tempfile.TemporaryDirectory() as tempdir:
            # Clone the repository into the temporary directory
            clone_command = ["git", "clone", repo, tempdir]
            subprocess.run(clone_command, check=True)

            # Checkout the specific commit
            os.chdir(tempdir)
            checkout_command = ["git", "checkout", commit]
            subprocess.run(checkout_command, check=True)

            # Deinitialize any existing submodules
            # deinit_command = ["git", "submodule", "deinit", "--force", "."]
            # subprocess.run(deinit_command, check=True)

            # Update and initialize submodules
            update_command = ["git", "submodule", "update", "--init", "--recursive"]
            subprocess.run(update_command, check=True)

            # Get short commit hash
            get_commit_hash_command = ["git", "rev-parse", "--short", commit]
            commit_hash = subprocess.run(
                get_commit_hash_command, capture_output=True, text=True
            ).stdout.strip()

            # Create a new directory for these artifacts with the commit hash in the name
            temp_artifacts_dir = os.path.join(tempdir, f"{commit_hash}_artifacts")
            os.makedirs(temp_artifacts_dir, exist_ok=True)

            # Run `forge build`
            build_command = [
                self.forge_path,
                "build",
                "--out",
                temp_artifacts_dir,
                "--skip",
                "script",
                "test",
                "D0",
                "DeploymentHelpers",
            ]
            subprocess.run(build_command, check=True)

            # Switch back to the initial directory
            os.chdir(start_dir)

            # Create a directory for these artifacts with the commit hash in the name in a persistent location
            new_artifacts_dir = os.path.join(
                start_dir, "temp_artifacts", f"{commit_hash}_artifacts"
            )
            os.makedirs(new_artifacts_dir, exist_ok=True)

            # Copy the files from the temporary directory to the new directory
            shutil.copytree(temp_artifacts_dir, new_artifacts_dir, dirs_exist_ok=True)

        return new_artifacts_dir

    def get_current_facet_addresses(self, diamond_address=None):
        if diamond_address is None:
            diamond_address = self.diamond_address

        diamond_loupe = self.w3.eth.contract(
            address=diamond_address, abi=self.contract_abi
        )

        # Call the facetAddresses() method
        try:
            self.current_facet_addresses = (
                diamond_loupe.functions.facetAddresses().call()
            )

            return self.current_facet_addresses

        except:
            print("Failed to call facetAddresses() method")

    def create_contract_address_list(self):
        # Get the facet addresses
        facet_addresses = self.get_current_facet_addresses()

        contract_address_list = [self.diamond_address] + facet_addresses

        return contract_address_list

    def match_contract_addresses_to_names(self, contract_addresses: str = None):
        """
        Create a dictionary of contract names and addresses.

        Args:
            contract_addresses (str): The addresses of the contract.

        Returns:
            dict: A dictionary with contract names as keys and contract addresses as values.
        """
        contract_dict = {}
        reverse_dict = {}

        # Get the contract addresses
        if contract_addresses is None:
            contract_addresses = self.create_contract_address_list()

        # Traverse the directory and find all JSON files
        root = self.dir_path_to_broadcasts
        json_files = []
        for dirpath, dirnames, filenames in os.walk(root):
            # check if directory ends with key_to_read_diamond_address
            if dirpath.endswith(self.key_to_read_diamond_address):
                # check if directory is not 'dry-run'
                if "dry-run" not in dirpath:
                    for filename in filenames:
                        if filename.endswith(".json"):
                            json_path = os.path.join(dirpath, filename)
                            json_files.append(json_path)

        # Sort the files by modification time in reverse order
        sorted_files = sorted(json_files, key=os.path.getmtime, reverse=True)
        for json_path in sorted_files:
            with open(json_path, "r") as f:
                data = json.load(f)
                transactions = data.get("transactions", [])
                for transaction in transactions:
                    # Check if the contractAddress matches any of the facet addresses
                    if transaction.get("contractAddress") in contract_addresses:
                        contract_name = transaction.get("contractName")
                        contract_address = transaction.get("contractAddress")
                        # If address not in contract_dict or is same as existing key
                        if (
                            contract_address not in contract_dict.values()
                            or contract_dict.get(contract_name) == contract_address
                        ):
                            contract_dict[contract_name] = contract_address
                            # Update the reverse dictionary
                            if (
                                contract_address in reverse_dict
                                and contract_name != reverse_dict[contract_address]
                            ):
                                reverse_dict[contract_address].add(contract_name)
                            else:
                                reverse_dict[contract_address] = {contract_name}
                        else:
                            duplicate_keys = [
                                k
                                for k, v in contract_dict.items()
                                if v == contract_address
                            ]
                            print(
                                f"Duplicate address {contract_address} found for contracts: {duplicate_keys} and {contract_name} in {filename}"
                            )
                            duplicate_keys.append(contract_name)
                            print(
                                "Which contract would you like to provide a new address for?"
                            )
                            for i, key in enumerate(duplicate_keys):
                                print(f"{i+1}: {key}")
                            selected_index = (
                                int(
                                    input(
                                        "Enter the number corresponding to your selection: "
                                    )
                                )
                                - 1
                            )
                            selected_key = duplicate_keys[selected_index]
                            new_address = input(
                                "Enter new address for the selected contract: "
                            )
                            contract_dict[selected_key] = new_address

        return contract_dict

    def verify_contracts_are_active(self, contracts: dict = None):
        """
        Verifies that the contract is active by calling the `getFacetAddresses()` method.
        """

        # Get the facet addresses
        facet_addresses = self.get_current_facet_addresses()

        if contracts is None:
            contracts = self.contracts

        active_contracts: dict = {}

        # Check if the contract address is in the list of facet addresses
        for contract_name, contract_address in contracts.items():
            if contract_address in facet_addresses:
                active_contracts[contract_name] = True
            else:
                active_contracts[contract_name] = False
                
            # Assume the proxy address in self.contracts is active
            if contract_address == self.diamond_address:
                active_contracts[contract_name] = True

        return active_contracts

    def get_bytecodes(self, contracts):
        """
        Extracts bytecode from given contracts' artifact JSON file

        Args:
            contracts (dict): dictionary of contract names and addresses.

        Returns:
            dict: dictionary of contract names and their bytecodes
        """
        bytecodes = {}

        for contract_name in contracts.keys():
            # Check if the contract should be read from a specific repo and commit
            for (repo, commit_hash), options in self.repos_with_build_option.items():
                if contract_name in options.get("contracts", []):
                    # Update the root path to the temporary directory for this contract
                    if options["build"] == True:
                        json_path = os.path.join(
                            self.temp_dirs[commit_hash],
                            f"{contract_name}.sol",
                            f"{contract_name}.json",
                        )
                    else:
                        json_path = os.path.join(
                            "temp_artifacts",
                            f"{commit_hash}_artifacts",
                            f"{contract_name}.sol",
                            f"{contract_name}.json",
                        )

                else:
                    json_path = os.path.join(
                        self.dir_path_to_artifacts,
                        f"{contract_name}.sol",
                        f"{contract_name}.json",
                    )

                with open(json_path) as f:
                    data = json.load(f)

                bytecode = data["deployedBytecode"]["object"]
                if bytecode == "":
                    print(f"Bytecode for contract '{contract_name}' is empty.")
                bytecodes[contract_name] = bytecode

        return bytecodes

    def get_onchain_bytecodes(self, contracts):
        """
        Retrieves on-chain runtime bytecodes for given contracts

        Args:
            contracts (dict): dictionary of contract names and addresses.

        Returns:
            dict: dictionary of contract names and their on-chain runtime bytecodes
        """
        onchain_bytecodes = {}

        for contract_name, contract_address in contracts.items():
            onchain_bytecode = self.w3.eth.get_code(contract_address).hex()
            onchain_bytecodes[contract_name] = onchain_bytecode

        return onchain_bytecodes

    def get_metadata_hash_bytecode(self, bytecode):
        # todo fix prefix
        prefix = "a264"
        start_index = bytecode.find(prefix)

        while start_index != -1:
            try:
                metadata_hex = bytecode[start_index:]
                metadata_hash_bytecode = binascii.unhexlify(metadata_hex)

                self.metadata[bytecode] = metadata_hash_bytecode

                return metadata_hash_bytecode, start_index
            except (ValueError, EOFError):
                start_index = bytecode.find(prefix, start_index + 1)

        return None, -1

    def get_ipfs_hash(self, metadata_hash_bytecode):
        cbor_decoded_metadata_hash_bytecode: dict = cbor2.loads(metadata_hash_bytecode)
        hex_ipfs_hash = cbor_decoded_metadata_hash_bytecode["ipfs"]

        ipfs_hash = base58.b58encode(hex_ipfs_hash).decode("utf-8")

        return ipfs_hash

    def get_metadata_from_ipfs(self, ipfs_hash):
        # Use an IPFS gateway to fetch the data
        response = requests.get(f"https://ipfs.io/ipfs/{ipfs_hash}")

        # The actual metadata is usually a JSON file
        metadata = response.json()

        pprint.pprint(metadata)

    def remove_metadata_hash_bytecode(self, bytecode):
        md, idx = self.get_metadata_hash_bytecode(bytecode)
        if idx != -1:
            return bytecode[:idx]
        return bytecode

    def are_bytecodes_matching(self, bytecode1, bytecode2):
        return bytecode1 == bytecode2

    def compare_bytecodes(self, bytecodes1, bytecodes2):
        """
        Compares bytecodes for each contract in two given dictionaries

        Args:
            bytecodes1 (dict): dictionary of contract names and their bytecodes
            bytecodes2 (dict): dictionary of contract names and their bytecodes

        Returns:
            dict: dictionary of contract names and boolean values indicating if bytecodes are different
        """
        are_different_with_metadata_bytecode_hash = {}
        are_different_without_metadata_bytecode_hash = {}
        for contract_name in bytecodes1.keys():
            are_different_with_metadata_bytecode_hash[
                contract_name
            ] = self.are_bytecodes_matching(
                bytecodes1[contract_name], bytecodes2.get(contract_name, "")
            )

            are_different_without_metadata_bytecode_hash[
                contract_name
            ] = self.are_bytecodes_matching(
                self.remove_metadata_hash_bytecode(bytecodes1[contract_name]),
                self.remove_metadata_hash_bytecode(bytecodes2.get(contract_name, "")),
            )

        return (
            are_different_with_metadata_bytecode_hash,
            are_different_without_metadata_bytecode_hash,
        )

    def compare_contract_bytecodes(self):
        # checkout and build as needed
        self.checkout_and_build()

        (
            are_different_with_metadata_bytecode_hash,
            are_different_without_metadata_bytecode_hash,
        ) = self.compare_bytecodes(
            self.get_bytecodes(self.contracts),
            self.get_onchain_bytecodes(self.contracts),
        )

        self.print_contracts_info(
            self.merge_dicts(
                self.contracts,
                are_different_with_metadata_bytecode_hash,
                are_different_without_metadata_bytecode_hash,
            )
        )

    def merge_dicts(
        self, contract_dict, comparison_with_metadata, comparison_without_metadata
    ):
        merged_dict = {}
        for key in contract_dict:
            merged_dict[key] = {
                "address": contract_dict[key],
                "comparison_with_metadata": comparison_with_metadata[key],
                "comparison_without_metadata": comparison_without_metadata[key],
            }

        return self.merge_commit_hashes(merged_dict)

    def merge_commit_hashes(self, existing_dict):
        merged_dict = {}
        for (repo, commit_hash), data in self.repos_with_build_option.items():
            for contract in data["contracts"]:
                if contract in existing_dict:
                    existing_dict[contract]["commit_hash"] = commit_hash
                    merged_dict[contract] = existing_dict[contract]
        return merged_dict

    # def print_contracts_info(self, merged_dict):
    #     base_url = "https://etherscan.io/address/"
    #     for contract_name, info in merged_dict.items():
    #         comparison_result = "✅" if info["comparison_result"] else "❌"
    #         clickable_address = base_url + info["address"] + "#code"
    #         display(
    #             Markdown(
    #                 f"{comparison_result} {contract_name}: [{info['address']}]({clickable_address})"
    #             )
    #         )
    def print_contracts_info(self, merged_dict):
        base_url = "https://etherscan.io/address/"
        active_facet_addresses = self.verify_contracts_are_active()

        chain_id = self.w3.eth.chain_id
        block_number = self.w3.eth.block_number
        timestamp = self.w3.eth.get_block(block_number).timestamp
        # Convert timestamp to datetime
        dt = datetime.datetime.utcfromtimestamp(timestamp)

        # Convert datetime to ISO 8601 format
        iso8601_time = dt.isoformat()

        with open("bytecode_verification_report.md", "w") as file:
            file.write("# Bytecode Verification Report\n\n")

            file.write(
                f"## Network {chain_id}, block number {block_number} ({iso8601_time})\n\n"
            )

            file.write(f"Proxy address: `{self.diamond_address}`\n\n")
            
            file.write(f"Number of active facets: `{len(self.current_facet_addresses)}`\n\n")

            file.write(
                "| Contract Name | Address | Active | Commit Hash | Comparison with Metadata | Comparison without Metadata |\n"
            )
            file.write(
                "|---------------|---------|--------|-------------|-------------------------|-----------------------------|\n"
            )

            for contract_name, info in merged_dict.items():
                comparison_with_metadata = (
                    "✅" if info["comparison_with_metadata"] else "❌"
                )
                comparison_without_metadata = (
                    "✅" if info["comparison_without_metadata"] else "❌"
                )
                clickable_address = base_url + info["address"] + "#code"
                active = "✅" if active_facet_addresses[contract_name] else "❌"
                commit_hash = info["commit_hash"]

                file.write(
                    f"| {contract_name} | [{info['address']}]({clickable_address}) | {active} | {commit_hash} | {comparison_with_metadata} | {comparison_without_metadata} |\n"
                )

In [13]:
FORGE_PATH = "/Users/kevinpark/.foundry/bin/forge"
PROJECT_ROOT = os.path.join("/", "Users", "kevinpark", "dev", "nayms", "v3-extoken")
repos_with_build_option = {
    (PROJECT_ROOT, "69ed11f7"): {
        "build": False,
        "contracts": [
            "ACLFacet",
            "AdminFacet",
            "EntityFacet",
            "GovernanceFacet",
            "MarketFacet",
            "NaymsTokenFacet",
            "PhasedDiamondCutFacet",
            "SimplePolicyFacet",
            "SystemFacet",
            "TokenizedVaultFacet",
            "TokenizedVaultIOFacet",
            "UserFacet",
        ],
    },
    (PROJECT_ROOT, "9de0e394"): {
        "build": False,
        "contracts": ["Nayms", "NaymsOwnershipFacet", "DiamondLoupeFacet"],
    },
}

json_with_diamond_address = os.path.join(PROJECT_ROOT, "deployedAddresses.json")
key_to_read_diamond_address = "1"  # update with your key to read proxy address
web3_instance = Web3(
    Web3.HTTPProvider(os.getenv("ETH_MAINNET_RPC_URL"))
)  # update with your provider details
contract_abi = diamondLoupeABI
dir_path_to_broadcasts = os.path.join(
    PROJECT_ROOT, "broadcast"
)  # update with your broadcasts directory path
dir_path_to_artifacts = os.path.join(
    PROJECT_ROOT, "forge-artifacts"
)  # update with your artifacts directory path
additional_contracts = {
    "NaymsOwnershipFacet": "0x073C1a072845D1d87f42309af9911bd3c07fC599",
    "DiamondLoupeFacet": "0x0318ff107aFA55E3dc658cEA06748d0c35fbEC73",
}
compare_bytecode = CompareBytecode(
    json_with_diamond_address=json_with_diamond_address,
    key_to_read_diamond_address=key_to_read_diamond_address,
    dir_path_to_broadcasts=dir_path_to_broadcasts,
    dir_path_to_artifacts=dir_path_to_artifacts,
    w3=web3_instance,
    contract_abi=contract_abi,
    additional_contracts=additional_contracts,
    repos_with_build_option=repos_with_build_option,
    forge_path=FORGE_PATH,
)

compare_bytecode.compare_contract_bytecodes()