In [1]:
from utxo_utils.crypto.signature import verifySignature
import glob
import os
import pandas as pd
import ecdsa

In [2]:
#################
# DATA HANDLING #
#################
def is_row_valid(row):
    try:
        verifySignature(row["pubkey"], f"{int(row["r"], 16):064x}" + f"{int(row["s"], 16):064x}", row["message digest"])
        return True
    except Exception:
        return False


def collect_file_paths(folders: list[str]):
    signatures_files = []
    for folder in folders:
        signatures_files += glob.glob(os.path.join(folder, "*.parquet"))
    return signatures_files


def read_raw_data(signatures_folders: list[str], check_signatures: bool = False):
    signatures_files = collect_file_paths(signatures_folders)
    df = pd.concat(
        pd.read_parquet(parquet_file)
        for parquet_file in signatures_files
    )

    # Last filtering step: remove possible unvalid signatures
    filtered_df = df if not check_signatures else df[df.apply(is_row_valid, axis=1)]

    return filtered_df


def prepare_data(df):
    """ Read the signature files and group the rows by pubkey and r such that it contains two message digests and two s."""
    # Group by pubkey and r. Keep two records for every row.
    grouped_df = (
        df[["pubkey", "r", "s", "message digest"]]
        .groupby(by=["pubkey", "r"])
        .aggregate(
            s_cnt=("s", "nunique"),
            digest_cnt=("message digest", "nunique"),
            s=("s", lambda s: s.drop_duplicates().head(2)),
            digests=("message digest", lambda s: s.drop_duplicates().head(2)),
        )
    )
    # Keep the records in presence of a repeated nonce
    grouped_df = grouped_df[(grouped_df["s_cnt"] > 1) & (grouped_df["digest_cnt"] > 1)]

    return grouped_df

In [10]:
##################
# SIG MANAGEMENT #
##################
def derive_private_key(
    r: int, s1: int, s2: int, h1_str: str, h2_str: str, curve=ecdsa.SECP256k1
):
    # Typecasting & constants
    order = curve.order
    generator = curve.generator
    h1, h2 = (
        int(h1_str, base=16),
        int(h2_str, base=16),
    )

    for s1, s2 in [
        [s1, s2],
        [order - s1, s2],
        [s1, order - s2],
        [order - s1, order - s2],
    ]:
        # Nonce derivation
        s_diff_inv = pow((s1 - s2), -1, order)
        nonce = ((h1 - h2) * s_diff_inv) % order
        if r == (nonce * generator).x():
            priv_key = pow(r, -1, order) * (s2 * nonce - h2) % order
            sk = ecdsa.SigningKey.from_secret_exponent(
                secexp=priv_key, curve=curve, hashfunc=None
            )
            vk = sk.get_verifying_key()
            try:
                # Check the validity of the private key, as we might have recovered -k if the user has published -s1 and -s2 (which are still valid) over the network.
                vk.verify_digest(
                    bytes.fromhex(f"{r:064x}{s1:064x}"),
                    bytes.fromhex(h1_str),
                )
                vk.verify_digest(
                    bytes.fromhex(f"{r:064x}{s2:064x}"),
                    bytes.fromhex(h2_str),
                )
                return nonce, priv_key
            except Exception as e:
                raise e

    raise ValueError(
        "The nonce and the private key could not be recovered: the input signatures are probably invalid."
    )


def retrieve_private_keys_from_repeated_nonces(grouped_df, curve=ecdsa.SECP256k1):
    private_keys = {}
    nonces = {}
    for row in grouped_df.itertuples():
        # Data
        pubkey = row.Index[0]
        r = int(row.Index[1], base=16)
        s1, s2 = map(lambda s: int(s, base=16), row.s)
        h1, h2 = row.digests

        try:
            nonce, private_key = derive_private_key(r, s1, s2, h1, h2)
            private_keys[pubkey] = private_key
            if r not in nonces:
                nonces[r] = {nonce}
            else:
                nonces[r].add(nonce)
        except Exception as e:
            print(pubkey, e)

    return private_keys, nonces


def retrieve_private_keys_from_known_nonces(
    sigs_df, nonces_by_r: dict, private_keys: dict, curve=ecdsa.SECP256k1
):
    # Fetch the rows for which we know the nonce to generate the 'r' value and for those the private key has not been recovered
    df = sigs_df[
        sigs_df.apply(
            lambda row: int(row["r"], 16) in nonces_by_r
            and row["pubkey"] not in private_keys,
            axis=1,
        )
    ]

    # Group those records by public key and 'r'
    df = df.groupby(by=["pubkey", "r"]).aggregate(
        s=("s", lambda s: s.drop_duplicates().head(1)),
        digest=("message digest", lambda s: s.drop_duplicates().head(1)),
    )

    private_keys = {}
    order = curve.order

    for row in df.itertuples():
        # Data
        pubkey = row.Index[0]
        r = int(row.Index[1], base=16)
        s = int(row.s, 16)
        h_str = row.digest
        h = int(h_str, 16)

        nonces = list(nonces_by_r[r])
        expected_vk = ecdsa.VerifyingKey.from_string(bytes.fromhex(pubkey), curve=curve)
        # We have to be careful that for a given nonce and given signature, 2 private keys are valid, see ecdsa_tutorial (mirror_key)
        for nonce in nonces:
            priv_key = pow(r, -1, order) * (s * nonce - h) % order
            sk = ecdsa.SigningKey.from_secret_exponent(
                secexp=priv_key, curve=curve, hashfunc=None
            )
            vk = sk.get_verifying_key()

            if vk != expected_vk:
                # Let's fetch the alternative private key, that could have signed that message
                priv_key = (pow(r, -1, order) * s * (-2 * nonce) + priv_key) % order
                sk = ecdsa.SigningKey.from_secret_exponent(
                    secexp=priv_key, curve=curve, hashfunc=None
                )
                vk = sk.get_verifying_key()

            assert vk == expected_vk
            private_keys[pubkey] = priv_key

    return private_keys

In [4]:
#################
# POST CHECKING #
#################
def verify_private_keys(privkeys_by_pubkey: dict, curve=ecdsa.SECP256k1):
    # Verify that every recovered private key is valid
    for pubkey, privkey in privkeys_by_pubkey.items():

        vk_expected = ecdsa.VerifyingKey.from_string(bytes.fromhex(pubkey), curve=curve)
        sk = ecdsa.SigningKey.from_secret_exponent(secexp=privkey, curve=curve)
        vk = sk.get_verifying_key()

        assert vk == vk_expected


def verify_nonces(nonces_by_r: dict, curve=ecdsa.SECP256k1):
    for r, nonces in nonces_by_r.items():
        # As every x coor. is shared by two points, two nonces can generate two points with the same x coor., i.e. the same 'r'.
        assert len(nonces) > 0 and len(nonces) <= 2
        for nonce in nonces:
            assert (curve.generator * nonce).x() == r

In [12]:
bch_signatures_folders = [
    "/Users/vincent/Documents/PhD/Blockchains/UTXO/ecdsa-signatures/local/signatures/bch",
]

signatures_folders = [
    "/Users/vincent/Documents/PhD/Blockchains/UTXO/ecdsa-signatures/local/signatures/btc",
    "/Users/vincent/Documents/PhD/Blockchains/UTXO/ecdsa-signatures/local/signatures/dash",
    "/Users/vincent/Documents/PhD/Blockchains/UTXO/ecdsa-signatures/local/signatures/bch",
    "/Users/vincent/Documents/PhD/Blockchains/UTXO/ecdsa-signatures/local/signatures/ltc",
    "/Users/vincent/Documents/PhD/Blockchains/UTXO/ecdsa-signatures/local/signatures/doge",
]

# Retrieve the private keys

bch_sig_df = read_raw_data(bch_signatures_folders, check_signatures=True)
other_sig_df = read_raw_data(signatures_folders, check_signatures=False)
sig_df = pd.concat([bch_sig_df, other_sig_df])
df = prepare_data(sig_df)
private_keys, nonces_by_r = retrieve_private_keys_from_repeated_nonces(df)

# Check the validity of the results
verify_private_keys(private_keys)
verify_nonces(nonces_by_r)

# Now, from the set of known (r, nonce) pairs, derive private keys from signatures that used those nonce even once
extra_private_keys = retrieve_private_keys_from_known_nonces(sig_df, nonces_by_r, private_keys)

# Check those keys as well
verify_private_keys(extra_private_keys)

# Print some stats
print(f"{len(private_keys)} private keys found from a set of {df.reset_index()["pubkey"].nunique()} public keys associated with repeated nonces.")
print(f"{len(extra_private_keys)} private keys found from a set of {sig_df.reset_index()["pubkey"].nunique()} public keys associated with nonces being used multiple times by other users.")
print(f"{len(private_keys | extra_private_keys)} private keys found in total.")

1385 private keys found from a set of 1385 public keys associated with repeated nonces.
1260 private keys found from a set of 21760 public keys associated with nonces being used multiple times by other users.
2645 private keys found in total.
