From 2d05e6a15384269767a076e94d9395dd00d29226 Mon Sep 17 00:00:00 2001 From: Thomas Coratger Date: Thu, 29 Jan 2026 16:56:03 +0100 Subject: [PATCH 1/6] enr: more complete implementation + fixes + tests --- src/lean_spec/__main__.py | 10 + src/lean_spec/subspecs/networking/enr/enr.py | 211 +++++- src/lean_spec/types/__init__.py | 9 +- src/lean_spec/types/rlp.py | 10 +- .../subspecs/networking/enr/test_enr.py | 687 +++++++++++++++++- .../lean_spec/subspecs/networking/test_enr.py | 181 +---- tests/lean_spec/test_cli.py | 38 +- tests/lean_spec/types/test_rlp.py | 206 +++--- 8 files changed, 1024 insertions(+), 328 deletions(-) diff --git a/src/lean_spec/__main__.py b/src/lean_spec/__main__.py index 28390133..cca92c40 100644 --- a/src/lean_spec/__main__.py +++ b/src/lean_spec/__main__.py @@ -84,6 +84,16 @@ def resolve_bootnode(bootnode: str) -> str: enr = ENR.from_string(bootnode) + # Verify structural validity (correct scheme, public key present). + if not enr.is_valid(): + raise ValueError(f"ENR structurally invalid: {enr}") + + # Cryptographically verify signature to ensure authenticity. + # + # This prevents attackers from forging ENRs to redirect connections. + if not enr.verify_signature(): + raise ValueError(f"ENR signature verification failed: {enr}") + # ENR.multiaddr() returns None when the record lacks IP or TCP port. # # This happens with discovery-only ENRs that only contain UDP info. diff --git a/src/lean_spec/subspecs/networking/enr/enr.py b/src/lean_spec/subspecs/networking/enr/enr.py index 8d1b07a8..eac385da 100644 --- a/src/lean_spec/subspecs/networking/enr/enr.py +++ b/src/lean_spec/subspecs/networking/enr/enr.py @@ -57,11 +57,18 @@ from typing_extensions import Self from lean_spec.subspecs.networking.types import Multiaddr, NodeId, SeqNumber -from lean_spec.types import Bytes33, Bytes64, RLPDecodingError, StrictBaseModel, Uint64 -from lean_spec.types.rlp import decode_list as rlp_decode_list +from lean_spec.types import ( + Bytes32, + Bytes33, + Bytes64, + StrictBaseModel, + Uint64, + rlp, +) +from lean_spec.types.byte_arrays import Bytes4 from . import keys -from .eth2 import AttestationSubnets, Eth2Data +from .eth2 import AttestationSubnets, Eth2Data, SyncCommitteeSubnets from .keys import EnrKey ENR_PREFIX = "enr:" @@ -69,24 +76,7 @@ class ENR(StrictBaseModel): - r""" - Ethereum Node Record (EIP-778). - - Example from EIP-778 (IPv4 127.0.0.1, UDP 30303):: - - enr:-IS4QHCYrYZbAKWCBRlAy5zzaDZXJBGkcnh4MHcBFZntXNFrdvJjX04j... - - Which decodes to RLP:: - - [ - 7098ad865b00a582..., # signature (64 bytes) - 01, # seq = 1 - "id", "v4", - "ip", 7f000001, # 127.0.0.1 - "secp256k1", 03ca634cae0d49acb401d8a4c6b6fe8c55b70d115bf400769cc1400f3258cd3138, - "udp", 765f, # 30303 - ] - """ + """Ethereum Node Record (EIP-778).""" MAX_SIZE: ClassVar[int] = 300 """Maximum RLP-encoded size in bytes (EIP-778).""" @@ -152,6 +142,18 @@ def udp_port(self) -> int | None: port = self.get(keys.UDP) return int.from_bytes(port, "big") if port else None + @property + def tcp6_port(self) -> int | None: + """IPv6-specific TCP port. Falls back to tcp_port if not set.""" + port = self.get(keys.TCP6) + return int.from_bytes(port, "big") if port else None + + @property + def udp6_port(self) -> int | None: + """IPv6-specific UDP port. Falls back to udp_port if not set.""" + port = self.get(keys.UDP6) + return int.from_bytes(port, "big") if port else None + def multiaddr(self) -> Multiaddr | None: """Construct multiaddress from endpoint info.""" if self.ip4 and self.tcp_port: @@ -160,18 +162,11 @@ def multiaddr(self) -> Multiaddr | None: return f"/ip6/{self.ip6}/tcp/{self.tcp_port}" return None - # ========================================================================= - # Ethereum Consensus Extensions - # ========================================================================= - @property def eth2_data(self) -> Eth2Data | None: """Parse eth2 key: fork_digest(4) + next_fork_version(4) + next_fork_epoch(8).""" eth2_bytes = self.get(keys.ETH2) if eth2_bytes and len(eth2_bytes) >= 16: - from lean_spec.types import Uint64 - from lean_spec.types.byte_arrays import Bytes4 - return Eth2Data( fork_digest=Bytes4(eth2_bytes[0:4]), next_fork_version=Bytes4(eth2_bytes[4:8]), @@ -185,9 +180,13 @@ def attestation_subnets(self) -> AttestationSubnets | None: attnets = self.get(keys.ATTNETS) return AttestationSubnets.decode_bytes(attnets) if attnets and len(attnets) == 8 else None - # ========================================================================= - # Validation - # ========================================================================= + @property + def sync_committee_subnets(self) -> SyncCommitteeSubnets | None: + """Parse syncnets key (SSZ Bitvector[4]).""" + syncnets = self.get(keys.SYNCNETS) + if syncnets and len(syncnets) == 1: + return SyncCommitteeSubnets.decode_bytes(syncnets) + return None def is_valid(self) -> bool: """ @@ -207,9 +206,128 @@ def is_compatible_with(self, other: "ENR") -> bool: return False return self_eth2.fork_digest == other_eth2.fork_digest - # ========================================================================= - # Display - # ========================================================================= + def _build_content_items(self) -> list[bytes]: + """ + Build the list of content items for RLP encoding. + + Returns [seq, k1, v1, k2, v2, ...] with keys sorted lexicographically. + """ + sorted_keys = sorted(self.pairs.keys()) + + # Sequence number: minimal big-endian, empty bytes for zero. + seq_bytes = self.seq.to_bytes(8, "big").lstrip(b"\x00") or b"" + items: list[bytes] = [seq_bytes] + + for key in sorted_keys: + items.append(key.encode("utf-8")) + items.append(self.pairs[key]) + + return items + + def _content_rlp(self) -> bytes: + """ + Get RLP-encoded content for signing (excludes signature). + + Returns the RLP encoding of [seq, k1, v1, k2, v2, ...]. + """ + return rlp.encode_rlp(self._build_content_items()) + + def verify_signature(self) -> bool: + """ + Cryptographically verify the ENR signature. + + Per EIP-778 "v4" identity scheme: + + 1. Compute keccak256 hash of content RLP (seq + sorted key/value pairs) + 2. Verify the 64-byte secp256k1 signature against the public key + + Returns True if signature is valid, False otherwise. + """ + from Crypto.Hash import keccak + from cryptography.hazmat.primitives import hashes + from cryptography.hazmat.primitives.asymmetric import ec + from cryptography.hazmat.primitives.asymmetric.utils import ( + Prehashed, + encode_dss_signature, + ) + + if self.public_key is None: + return False + + try: + # Hash the content (excludes signature). + content = self._content_rlp() + k = keccak.new(digest_bits=256) + k.update(content) + digest = k.digest() + + # Load the compressed public key. + public_key = ec.EllipticCurvePublicKey.from_encoded_point( + ec.SECP256K1(), bytes(self.public_key) + ) + + # Convert r||s (64 bytes) to DER-encoded signature. + r = int.from_bytes(self.signature[:32], "big") + s = int.from_bytes(self.signature[32:], "big") + der_signature = encode_dss_signature(r, s) + + # Verify signature against pre-hashed digest. + # SHA256 is used as the algorithm marker since it has the same 32-byte digest size. + public_key.verify(der_signature, digest, ec.ECDSA(Prehashed(hashes.SHA256()))) + return True + except Exception: + return False + + def compute_node_id(self) -> NodeId | None: + """ + Compute the node ID from the public key. + + Per EIP-778 "v4" identity scheme: keccak256(uncompressed_pubkey). + The hash is computed over the 64-byte x||y coordinates. + """ + from Crypto.Hash import keccak + from cryptography.hazmat.primitives import serialization + from cryptography.hazmat.primitives.asymmetric import ec + + if self.public_key is None: + return None + + try: + # Uncompress public key to 65 bytes (0x04 || x || y). + public_key = ec.EllipticCurvePublicKey.from_encoded_point( + ec.SECP256K1(), self.public_key + ) + uncompressed = public_key.public_bytes( + encoding=serialization.Encoding.X962, + format=serialization.PublicFormat.UncompressedPoint, + ) + + # Hash the 64-byte x||y (excluding 0x04 prefix). + k = keccak.new(digest_bits=256) + k.update(uncompressed[1:]) + return Bytes32(k.digest()) + except Exception: + return None + + def to_rlp(self) -> bytes: + """ + Serialize to RLP bytes. + + Format: [signature, seq, k1, v1, k2, v2, ...] + Keys are sorted lexicographically per EIP-778. + """ + items = [bytes(self.signature)] + self._build_content_items() + return rlp.encode_rlp(items) + + def to_string(self) -> str: + """ + Serialize to text representation. + + Format: "enr:" + base64url(RLP) without padding. + """ + rlp_bytes = self.to_rlp() + b64_content = base64.urlsafe_b64encode(rlp_bytes).decode("utf-8").rstrip("=") + return ENR_PREFIX + b64_content def __str__(self) -> str: """Human-readable summary.""" @@ -260,10 +378,14 @@ def from_string(cls, enr_text: str) -> Self: # RLP decode: [signature, seq, k1, v1, k2, v2, ...] try: - items = rlp_decode_list(rlp_data) - except RLPDecodingError as e: + items = rlp.decode_rlp_list(rlp_data) + except rlp.RLPDecodingError as e: raise ValueError(f"Invalid RLP encoding: {e}") from e + # EIP-778 requires ENRs to be at most 300 bytes. + if len(rlp_data) > cls.MAX_SIZE: + raise ValueError(f"ENR exceeds max size: {len(rlp_data)} > {cls.MAX_SIZE}") + if len(items) < 2: raise ValueError("ENR must have at least signature and seq") @@ -281,14 +403,29 @@ def from_string(cls, enr_text: str) -> Self: # Parse key/value pairs. # # Keys are strings, values are arbitrary bytes. + # EIP-778 requires keys to be lexicographically sorted. pairs: dict[str, bytes] = {} + prev_key: str | None = None for i in range(2, len(items), 2): key = items[i].decode("utf-8") + if prev_key is not None and key <= prev_key: + raise ValueError( + f"ENR keys must be lexicographically sorted per EIP-778: " + f"'{key}' follows '{prev_key}'" + ) value = items[i + 1] pairs[key] = value + prev_key = key - return cls( + enr = cls( signature=signature, seq=Uint64(seq), pairs=pairs, ) + + # Compute and store node_id for routing/identification. + node_id = enr.compute_node_id() + if node_id is not None: + return enr.model_copy(update={"node_id": node_id}) + + return enr diff --git a/src/lean_spec/types/__init__.py b/src/lean_spec/types/__init__.py index 2a5ecf1d..cc872071 100644 --- a/src/lean_spec/types/__init__.py +++ b/src/lean_spec/types/__init__.py @@ -13,9 +13,7 @@ SSZTypeError, SSZValueError, ) -from .rlp import RLPDecodingError, RLPItem -from .rlp import decode as rlp_decode -from .rlp import encode as rlp_encode +from .rlp import RLPDecodingError, RLPItem, decode_rlp, decode_rlp_list, encode_rlp from .ssz_base import SSZType from .uint import Uint64 @@ -40,8 +38,9 @@ "Boolean", "Container", # RLP encoding/decoding - "rlp_encode", - "rlp_decode", + "encode_rlp", + "decode_rlp", + "decode_rlp_list", "RLPItem", "RLPDecodingError", # Exceptions diff --git a/src/lean_spec/types/rlp.py b/src/lean_spec/types/rlp.py index b278cd15..dc557d54 100644 --- a/src/lean_spec/types/rlp.py +++ b/src/lean_spec/types/rlp.py @@ -72,7 +72,7 @@ """Base for long list prefix. Final prefix = 0xf7 + length_of_length.""" -def encode(item: RLPItem) -> bytes: +def encode_rlp(item: RLPItem) -> bytes: """ Encode an item using RLP. @@ -124,7 +124,7 @@ def _encode_list(items: list[RLPItem]) -> bytes: Long lists (>55 bytes payload) use prefix 0xf7 + length-of-length, then length. """ # Recursively encode all items. - payload = b"".join(encode(item) for item in items) + payload = b"".join(encode_rlp(item) for item in items) length = len(payload) # Short list: 0-55 bytes payload. @@ -153,7 +153,7 @@ class RLPDecodingError(Exception): """Error during RLP decoding.""" -def decode(data: bytes) -> RLPItem: +def decode_rlp(data: bytes) -> RLPItem: """ Decode RLP-encoded bytes. @@ -177,7 +177,7 @@ def decode(data: bytes) -> RLPItem: return item -def decode_list(data: bytes) -> list[bytes]: +def decode_rlp_list(data: bytes) -> list[bytes]: """ Decode RLP data as a flat list of byte items. @@ -193,7 +193,7 @@ def decode_list(data: bytes) -> list[bytes]: Raises: RLPDecodingError: If data is not a list or contains nested lists. """ - item = decode(data) + item = decode_rlp(data) if not isinstance(item, list): raise RLPDecodingError("Expected RLP list") diff --git a/tests/lean_spec/subspecs/networking/enr/test_enr.py b/tests/lean_spec/subspecs/networking/enr/test_enr.py index 51d6d9cd..7b0bbd82 100644 --- a/tests/lean_spec/subspecs/networking/enr/test_enr.py +++ b/tests/lean_spec/subspecs/networking/enr/test_enr.py @@ -29,7 +29,7 @@ # 03ca634cae0d49acb401d8a4c6b6fe8c55b70d115bf400769cc1400f3258cd3138 OFFICIAL_ENR_STRING = ( "enr:-IS4QHCYrYZbAKWCBRlAy5zzaDZXJBGkcnh4MHcBFZntXNFrdvJjX04jRzjz" - "CBOOnrkTfj499SZuOh8R33Ls8RRcy5wBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQ" + "CBOonrkTfj499SZuOh8R33Ls8RRcy5wBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQ" "PKY0yuDUmstAHYpMa2_oxVtw0RW_QAdpzBQA8yWM0xOIN1ZHCCdl8" ) @@ -43,7 +43,7 @@ ) OFFICIAL_SIGNATURE = bytes.fromhex( "7098ad865b00a582051940cb9cf36836572411a47278783077011599ed5cd16b" - "76f2635f4e234738f308138e9eb9137e3e3df5266e3a1f11df72ecf1145ccb9c" + "76f2635f4e234738f30813a89eb9137e3e3df5266e3a1f11df72ecf1145ccb9c" ) @@ -203,10 +203,10 @@ def test_minimum_fields_required(self) -> None: # Create RLP for just signature (missing seq) import base64 - from lean_spec.types.rlp import encode + from lean_spec.types.rlp import encode_rlp # RLP list with only signature - rlp_data = encode([b"\x00" * 64]) + rlp_data = encode_rlp([b"\x00" * 64]) b64_content = base64.urlsafe_b64encode(rlp_data).decode("utf-8").rstrip("=") with pytest.raises(ValueError, match=r"at least signature and seq"): @@ -216,10 +216,10 @@ def test_odd_number_of_kv_pairs_rejected(self) -> None: """ENR key/value pairs must be even count.""" import base64 - from lean_spec.types.rlp import encode + from lean_spec.types.rlp import encode_rlp # [signature, seq, key1] - odd number after signature/seq - rlp_data = encode([b"\x00" * 64, b"\x01", b"id"]) + rlp_data = encode_rlp([b"\x00" * 64, b"\x01", b"id"]) b64_content = base64.urlsafe_b64encode(rlp_data).decode("utf-8").rstrip("=") with pytest.raises(ValueError, match=r"key/value pairs must be even"): @@ -249,10 +249,10 @@ def test_valid_minimal_enr(self) -> None: """Minimal valid ENR with only required fields parses.""" import base64 - from lean_spec.types.rlp import encode + from lean_spec.types.rlp import encode_rlp # [signature(64), seq(1), "id", "v4", "secp256k1", pubkey(33)] - rlp_data = encode( + rlp_data = encode_rlp( [ b"\x00" * 64, # signature b"\x01", # seq = 1 @@ -686,9 +686,9 @@ def test_enr_with_only_required_fields(self) -> None: """ENR with minimum required fields is valid.""" import base64 - from lean_spec.types.rlp import encode + from lean_spec.types.rlp import encode_rlp - rlp_data = encode( + rlp_data = encode_rlp( [ b"\x00" * 64, # signature b"\x01", # seq @@ -710,10 +710,10 @@ def test_enr_with_ipv6_only(self) -> None: """ENR with IPv6 but no IPv4 parses correctly.""" import base64 - from lean_spec.types.rlp import encode + from lean_spec.types.rlp import encode_rlp ipv6_bytes = bytes.fromhex("20010db8000000000000000000000001") # 2001:db8::1 - rlp_data = encode( + rlp_data = encode_rlp( [ b"\x00" * 64, b"\x01", @@ -742,9 +742,9 @@ def test_enr_with_both_tcp_and_udp(self) -> None: """ENR with both TCP and UDP ports parses correctly.""" import base64 - from lean_spec.types.rlp import encode + from lean_spec.types.rlp import encode_rlp - rlp_data = encode( + rlp_data = encode_rlp( [ b"\x00" * 64, b"\x01", @@ -771,9 +771,9 @@ def test_sequence_number_zero(self) -> None: """ENR with sequence number 0 is valid.""" import base64 - from lean_spec.types.rlp import encode + from lean_spec.types.rlp import encode_rlp - rlp_data = encode( + rlp_data = encode_rlp( [ b"\x00" * 64, b"", # Empty bytes = 0 @@ -792,10 +792,10 @@ def test_large_sequence_number(self) -> None: """ENR with large sequence number parses correctly.""" import base64 - from lean_spec.types.rlp import encode + from lean_spec.types.rlp import encode_rlp large_seq = (2**32).to_bytes(5, "big") - rlp_data = encode( + rlp_data = encode_rlp( [ b"\x00" * 64, large_seq, @@ -825,3 +825,654 @@ def test_scheme_constant(self) -> None: def test_prefix_constant(self) -> None: """ENR_PREFIX is 'enr:' for text encoding.""" assert ENR_PREFIX == "enr:" + + +class TestEth2DataProperty: + """Tests for eth2_data property parsing.""" + + def test_eth2_data_parses_from_enr(self) -> None: + """eth2_data property parses 16-byte eth2 key.""" + from lean_spec.types.byte_arrays import Bytes4 + + # 4 bytes fork_digest + 4 bytes next_fork_version + 8 bytes next_fork_epoch + eth2_bytes = b"\x12\x34\x56\x78" + b"\x02\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x01" + enr = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.ETH2: eth2_bytes}, + ) + + eth2 = enr.eth2_data + assert eth2 is not None + assert eth2.fork_digest == Bytes4(b"\x12\x34\x56\x78") + assert eth2.next_fork_version == Bytes4(b"\x02\x00\x00\x00") + # Epoch is little-endian + assert eth2.next_fork_epoch == Uint64(1 << 56) + + def test_eth2_data_returns_none_when_missing(self) -> None: + """eth2_data returns None when eth2 key is absent.""" + enr = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={keys.ID: b"v4"}, + ) + assert enr.eth2_data is None + + def test_eth2_data_returns_none_for_short_data(self) -> None: + """eth2_data returns None when eth2 key is too short.""" + enr = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.ETH2: b"\x12\x34\x56\x78"}, # Only 4 bytes + ) + assert enr.eth2_data is None + + +class TestAttestationSubnetsProperty: + """Tests for attestation_subnets property parsing.""" + + def test_attestation_subnets_parses_from_enr(self) -> None: + """attestation_subnets property parses 8-byte attnets key.""" + # All bits set (64 bits = 8 bytes of 0xFF) + attnets_bytes = b"\xff" * 8 + enr = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.ATTNETS: attnets_bytes}, + ) + + attnets = enr.attestation_subnets + assert attnets is not None + assert attnets.subscription_count() == 64 + + def test_attestation_subnets_returns_none_when_missing(self) -> None: + """attestation_subnets returns None when attnets key is absent.""" + enr = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={keys.ID: b"v4"}, + ) + assert enr.attestation_subnets is None + + def test_attestation_subnets_returns_none_for_wrong_length(self) -> None: + """attestation_subnets returns None when attnets key is not 8 bytes.""" + enr = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.ATTNETS: b"\xff\xff\xff\xff"}, # Only 4 bytes + ) + assert enr.attestation_subnets is None + + +class TestSyncCommitteeSubnetsProperty: + """Tests for sync_committee_subnets property parsing.""" + + def test_sync_committee_subnets_parses_from_enr(self) -> None: + """sync_committee_subnets property parses 1-byte syncnets key.""" + # All 4 bits set (lower nibble of 0x0F) + syncnets_bytes = b"\x0f" + enr = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.SYNCNETS: syncnets_bytes}, + ) + + syncnets = enr.sync_committee_subnets + assert syncnets is not None + for i in range(4): + assert syncnets.is_subscribed(i) + + def test_sync_committee_subnets_returns_none_when_missing(self) -> None: + """sync_committee_subnets returns None when syncnets key is absent.""" + enr = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={keys.ID: b"v4"}, + ) + assert enr.sync_committee_subnets is None + + def test_sync_committee_subnets_returns_none_for_wrong_length(self) -> None: + """sync_committee_subnets returns None when syncnets key is not 1 byte.""" + enr = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.SYNCNETS: b"\x0f\x00"}, # 2 bytes + ) + assert enr.sync_committee_subnets is None + + +class TestForkCompatibility: + """Tests for is_compatible_with() method.""" + + def test_compatible_with_same_fork_digest(self) -> None: + """ENRs with same fork digest are compatible.""" + eth2_bytes = b"\x12\x34\x56\x78" + b"\x02\x00\x00\x00" + b"\x00" * 8 + + enr1 = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.ETH2: eth2_bytes}, + ) + enr2 = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(2), + pairs={keys.ID: b"v4", keys.ETH2: eth2_bytes}, + ) + + assert enr1.is_compatible_with(enr2) + + def test_incompatible_with_different_fork_digest(self) -> None: + """ENRs with different fork digests are incompatible.""" + eth2_bytes1 = b"\x12\x34\x56\x78" + b"\x02\x00\x00\x00" + b"\x00" * 8 + eth2_bytes2 = b"\xab\xcd\xef\x01" + b"\x02\x00\x00\x00" + b"\x00" * 8 + + enr1 = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.ETH2: eth2_bytes1}, + ) + enr2 = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(2), + pairs={keys.ID: b"v4", keys.ETH2: eth2_bytes2}, + ) + + assert not enr1.is_compatible_with(enr2) + + def test_incompatible_when_self_missing_eth2(self) -> None: + """ENR is incompatible when self lacks eth2 key.""" + eth2_bytes = b"\x12\x34\x56\x78" + b"\x02\x00\x00\x00" + b"\x00" * 8 + + enr1 = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={keys.ID: b"v4"}, # No eth2 + ) + enr2 = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(2), + pairs={keys.ID: b"v4", keys.ETH2: eth2_bytes}, + ) + + assert not enr1.is_compatible_with(enr2) + + def test_incompatible_when_other_missing_eth2(self) -> None: + """ENR is incompatible when other lacks eth2 key.""" + eth2_bytes = b"\x12\x34\x56\x78" + b"\x02\x00\x00\x00" + b"\x00" * 8 + + enr1 = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.ETH2: eth2_bytes}, + ) + enr2 = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(2), + pairs={keys.ID: b"v4"}, # No eth2 + ) + + assert not enr1.is_compatible_with(enr2) + + def test_incompatible_when_both_missing_eth2(self) -> None: + """ENRs are incompatible when both lack eth2 key.""" + enr1 = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={keys.ID: b"v4"}, + ) + enr2 = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(2), + pairs={keys.ID: b"v4"}, + ) + + assert not enr1.is_compatible_with(enr2) + + +class TestMaxSizeEnforcement: + """Tests for MAX_SIZE (300 bytes) enforcement.""" + + def test_enr_exactly_300_bytes_succeeds(self) -> None: + """ENR with exactly 300 bytes RLP parses successfully.""" + import base64 + + from lean_spec.types.rlp import encode_rlp + + # Build an ENR that is exactly 300 bytes + # Start with minimal structure and add padding in a value + signature = b"\x00" * 64 + seq = b"\x01" + # Calculate how much padding we need in value + # RLP overhead: ~4 bytes header + items + # We need to carefully construct this + + # Start with basic structure and measure + basic = encode_rlp([signature, seq, b"id", b"v4", b"secp256k1", b"\x02" + b"\x00" * 32]) + padding_needed = 300 - len(basic) + + # Add padding via a custom key with enough value bytes + # The key "z" + value needs to fit in remaining space + # Account for RLP overhead (key length byte + value length bytes) + if padding_needed > 3: + value_len = padding_needed - 3 # Approximate, may need adjustment + padded = encode_rlp( + [ + signature, + seq, + b"id", + b"v4", + b"secp256k1", + b"\x02" + b"\x00" * 32, + b"zz", + b"\x00" * value_len, + ] + ) + # Adjust if needed + while len(padded) < 300: + value_len += 1 + padded = encode_rlp( + [ + signature, + seq, + b"id", + b"v4", + b"secp256k1", + b"\x02" + b"\x00" * 32, + b"zz", + b"\x00" * value_len, + ] + ) + while len(padded) > 300: + value_len -= 1 + padded = encode_rlp( + [ + signature, + seq, + b"id", + b"v4", + b"secp256k1", + b"\x02" + b"\x00" * 32, + b"zz", + b"\x00" * value_len, + ] + ) + + assert len(padded) == 300 + b64 = base64.urlsafe_b64encode(padded).decode().rstrip("=") + enr = ENR.from_string(f"enr:{b64}") + assert enr is not None + + def test_enr_301_bytes_rejected(self) -> None: + """ENR with 301 bytes RLP is rejected.""" + import base64 + + from lean_spec.types.rlp import encode_rlp + + # Build an ENR that is exactly 301 bytes + signature = b"\x00" * 64 + seq = b"\x01" + + basic = encode_rlp([signature, seq, b"id", b"v4", b"secp256k1", b"\x02" + b"\x00" * 32]) + padding_needed = 301 - len(basic) + + if padding_needed > 3: + value_len = padding_needed - 3 + padded = encode_rlp( + [ + signature, + seq, + b"id", + b"v4", + b"secp256k1", + b"\x02" + b"\x00" * 32, + b"zz", + b"\x00" * value_len, + ] + ) + while len(padded) < 301: + value_len += 1 + padded = encode_rlp( + [ + signature, + seq, + b"id", + b"v4", + b"secp256k1", + b"\x02" + b"\x00" * 32, + b"zz", + b"\x00" * value_len, + ] + ) + while len(padded) > 301: + value_len -= 1 + padded = encode_rlp( + [ + signature, + seq, + b"id", + b"v4", + b"secp256k1", + b"\x02" + b"\x00" * 32, + b"zz", + b"\x00" * value_len, + ] + ) + + assert len(padded) == 301 + b64 = base64.urlsafe_b64encode(padded).decode().rstrip("=") + + with pytest.raises(ValueError, match="exceeds max size"): + ENR.from_string(f"enr:{b64}") + + +class TestKeyOrderingEnforcement: + """Tests for lexicographic key ordering enforcement.""" + + def test_sorted_keys_accepted(self) -> None: + """ENR with lexicographically sorted keys parses successfully.""" + import base64 + + from lean_spec.types.rlp import encode_rlp + + # Keys in sorted order: id, ip, secp256k1 + rlp = encode_rlp( + [ + b"\x00" * 64, # signature + b"\x01", # seq + b"id", + b"v4", + b"ip", + b"\x7f\x00\x00\x01", + b"secp256k1", + b"\x02" + b"\x00" * 32, + ] + ) + b64 = base64.urlsafe_b64encode(rlp).decode().rstrip("=") + enr = ENR.from_string(f"enr:{b64}") + assert enr is not None + + def test_unsorted_keys_rejected(self) -> None: + """ENR with unsorted keys is rejected.""" + import base64 + + from lean_spec.types.rlp import encode_rlp + + # Keys out of order: secp256k1 before id + rlp = encode_rlp( + [ + b"\x00" * 64, # signature + b"\x01", # seq + b"secp256k1", # Should be after "id" + b"\x02" + b"\x00" * 32, + b"id", + b"v4", + ] + ) + b64 = base64.urlsafe_b64encode(rlp).decode().rstrip("=") + + with pytest.raises(ValueError, match="lexicographically sorted"): + ENR.from_string(f"enr:{b64}") + + def test_duplicate_keys_rejected(self) -> None: + """ENR with duplicate keys is rejected.""" + import base64 + + from lean_spec.types.rlp import encode_rlp + + # Duplicate "id" key + rlp = encode_rlp( + [ + b"\x00" * 64, # signature + b"\x01", # seq + b"id", + b"v4", + b"id", # Duplicate! + b"v5", + ] + ) + b64 = base64.urlsafe_b64encode(rlp).decode().rstrip("=") + + with pytest.raises(ValueError, match="lexicographically sorted"): + ENR.from_string(f"enr:{b64}") + + +class TestRoundTripSerialization: + """Tests for ENR round-trip serialization.""" + + def test_roundtrip_official_enr(self) -> None: + """Official ENR round-trips through parse and serialize.""" + enr1 = ENR.from_string(OFFICIAL_ENR_STRING) + serialized = enr1.to_string() + enr2 = ENR.from_string(serialized) + + assert enr1.seq == enr2.seq + assert enr1.signature == enr2.signature + assert enr1.pairs == enr2.pairs + + def test_roundtrip_preserves_all_fields(self) -> None: + """Round-trip preserves all ENR fields.""" + import base64 + + from lean_spec.types.rlp import encode_rlp + + rlp = encode_rlp( + [ + b"\xab" * 64, # signature + b"\x42", # seq = 66 + b"eth2", + b"\x12\x34\x56\x78" + b"\x02\x00\x00\x00" + b"\x00" * 8, + b"id", + b"v4", + b"ip", + b"\xc0\xa8\x01\x01", + b"secp256k1", + b"\x02" + b"\x00" * 32, + b"tcp", + (9000).to_bytes(2, "big"), + ] + ) + b64 = base64.urlsafe_b64encode(rlp).decode().rstrip("=") + + enr1 = ENR.from_string(f"enr:{b64}") + enr2 = ENR.from_string(enr1.to_string()) + + assert enr1.seq == enr2.seq == Uint64(0x42) + assert enr1.ip4 == enr2.ip4 == "192.168.1.1" + assert enr1.tcp_port == enr2.tcp_port == 9000 + assert enr1.identity_scheme == enr2.identity_scheme == "v4" + + def test_to_string_produces_valid_enr_format(self) -> None: + """to_string() produces valid 'enr:' prefixed string.""" + enr = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.SECP256K1: b"\x02" + b"\x00" * 32}, + ) + result = enr.to_string() + + assert result.startswith("enr:") + # Should not have padding + assert "=" not in result + + +class TestSignatureVerification: + """Tests for verify_signature() method.""" + + def test_official_enr_signature_verifies(self) -> None: + """Official EIP-778 test vector signature verifies correctly.""" + enr = ENR.from_string(OFFICIAL_ENR_STRING) + assert enr.verify_signature() + + def test_self_signed_enr_verifies(self) -> None: + """ENR signed with cryptography library verifies correctly.""" + from Crypto.Hash import keccak + from cryptography.hazmat.primitives import hashes, serialization + from cryptography.hazmat.primitives.asymmetric import ec + from cryptography.hazmat.primitives.asymmetric.utils import ( + Prehashed, + decode_dss_signature, + ) + + from lean_spec.types.rlp import encode_rlp + + # Generate a test keypair using cryptography library. + private_key = ec.generate_private_key(ec.SECP256K1()) + public_key = private_key.public_key() + compressed_pubkey = public_key.public_bytes( + encoding=serialization.Encoding.X962, + format=serialization.PublicFormat.CompressedPoint, + ) + + # Create content (keys must be sorted). + content_items: list[bytes] = [ + b"\x01", + b"id", + b"v4", + b"secp256k1", + compressed_pubkey, + ] + content_rlp = encode_rlp(content_items) + + # Hash content. + k = keccak.new(digest_bits=256) + k.update(content_rlp) + digest = k.digest() + + # Sign with ECDSA using Prehashed mode. + signature_der = private_key.sign(digest, ec.ECDSA(Prehashed(hashes.SHA256()))) + + # Convert DER signature to r || s format. + r, s = decode_dss_signature(signature_der) + sig_64 = r.to_bytes(32, "big") + s.to_bytes(32, "big") + + # Create ENR. + enr = ENR( + signature=Bytes64(sig_64), + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.SECP256K1: compressed_pubkey}, + ) + + assert enr.verify_signature() + + def test_tampered_signature_fails_verification(self) -> None: + """ENR with tampered signature fails verification.""" + enr = ENR.from_string(OFFICIAL_ENR_STRING) + + # Tamper with signature + tampered_sig = bytes([enr.signature[0] ^ 0xFF]) + bytes(enr.signature[1:]) + tampered_enr = ENR( + signature=Bytes64(tampered_sig), + seq=enr.seq, + pairs=enr.pairs, + ) + + assert not tampered_enr.verify_signature() + + def test_tampered_content_fails_verification(self) -> None: + """ENR with tampered content fails verification.""" + enr = ENR.from_string(OFFICIAL_ENR_STRING) + + # Create ENR with different sequence number (content mismatch) + tampered_enr = ENR( + signature=enr.signature, + seq=Uint64(int(enr.seq) + 1), # Different sequence + pairs=enr.pairs, + ) + + assert not tampered_enr.verify_signature() + + def test_missing_public_key_fails_verification(self) -> None: + """ENR without public key fails verification.""" + enr = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={keys.ID: b"v4"}, # No secp256k1 key + ) + + assert not enr.verify_signature() + + +class TestNodeIdComputation: + """Tests for compute_node_id() method.""" + + def test_official_enr_node_id(self) -> None: + """compute_node_id() returns correct node ID for official ENR.""" + enr = ENR.from_string(OFFICIAL_ENR_STRING) + node_id = enr.compute_node_id() + + assert node_id is not None + assert node_id.hex() == OFFICIAL_NODE_ID + + def test_node_id_none_without_public_key(self) -> None: + """compute_node_id() returns None when public key is missing.""" + enr = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={keys.ID: b"v4"}, + ) + + assert enr.compute_node_id() is None + + +class TestIPv6Ports: + """Tests for tcp6_port and udp6_port properties.""" + + def test_tcp6_port_extracts_correctly(self) -> None: + """tcp6_port extracts IPv6-specific TCP port.""" + enr = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={ + keys.ID: b"v4", + keys.TCP6: (9001).to_bytes(2, "big"), + }, + ) + assert enr.tcp6_port == 9001 + + def test_tcp6_port_returns_none_when_missing(self) -> None: + """tcp6_port returns None when tcp6 key is absent.""" + enr = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={keys.ID: b"v4"}, + ) + assert enr.tcp6_port is None + + def test_udp6_port_extracts_correctly(self) -> None: + """udp6_port extracts IPv6-specific UDP port.""" + enr = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={ + keys.ID: b"v4", + keys.UDP6: (30304).to_bytes(2, "big"), + }, + ) + assert enr.udp6_port == 30304 + + def test_udp6_port_returns_none_when_missing(self) -> None: + """udp6_port returns None when udp6 key is absent.""" + enr = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={keys.ID: b"v4"}, + ) + assert enr.udp6_port is None + + def test_ipv6_ports_independent_of_ipv4(self) -> None: + """IPv6 ports are independent from IPv4 ports.""" + enr = ENR( + signature=Bytes64(b"\x00" * 64), + seq=Uint64(1), + pairs={ + keys.ID: b"v4", + keys.TCP: (9000).to_bytes(2, "big"), + keys.TCP6: (9001).to_bytes(2, "big"), + keys.UDP: (30303).to_bytes(2, "big"), + keys.UDP6: (30304).to_bytes(2, "big"), + }, + ) + assert enr.tcp_port == 9000 + assert enr.tcp6_port == 9001 + assert enr.udp_port == 30303 + assert enr.udp6_port == 30304 diff --git a/tests/lean_spec/subspecs/networking/test_enr.py b/tests/lean_spec/subspecs/networking/test_enr.py index f2a7fff3..ddeeb2da 100644 --- a/tests/lean_spec/subspecs/networking/test_enr.py +++ b/tests/lean_spec/subspecs/networking/test_enr.py @@ -3,9 +3,9 @@ import pytest from pydantic import ValidationError -from lean_spec.subspecs.networking.enr import ENR, Eth2Data, keys -from lean_spec.subspecs.networking.enr.eth2 import AttestationSubnets -from lean_spec.types import Bytes64, Uint64 +from lean_spec.subspecs.networking.enr import Eth2Data, keys +from lean_spec.subspecs.networking.enr.eth2 import AttestationSubnets, SyncCommitteeSubnets +from lean_spec.types import Uint64 from lean_spec.types.byte_arrays import Bytes4 @@ -116,150 +116,37 @@ def test_invalid_subnet_id_in_is_subscribed(self) -> None: subnets.is_subscribed(-1) -class TestENR: - """Tests for ENR structure.""" +class TestSyncCommitteeSubnets: + """Tests for SyncCommitteeSubnets bitvector.""" - def test_create_minimal_enr(self) -> None: - """ENR can be created with minimal valid data.""" - enr = ENR( - signature=Bytes64(b"\x00" * 64), - seq=Uint64(1), - pairs={ - "id": b"v4", - "secp256k1": b"\x02" + b"\x00" * 32, # Compressed pubkey - }, - ) - assert enr.seq == Uint64(1) - assert enr.identity_scheme == "v4" - - def test_enr_ip4_property(self) -> None: - """ip4 property formats IPv4 address.""" - enr = ENR( - signature=Bytes64(b"\x00" * 64), - seq=Uint64(1), - pairs={ - "id": b"v4", - "secp256k1": b"\x02" + b"\x00" * 32, - "ip": b"\xc0\xa8\x01\x01", # 192.168.1.1 - }, - ) - assert enr.ip4 == "192.168.1.1" - - def test_enr_tcp_port_property(self) -> None: - """tcp_port property extracts port number.""" - enr = ENR( - signature=Bytes64(b"\x00" * 64), - seq=Uint64(1), - pairs={ - "id": b"v4", - "secp256k1": b"\x02" + b"\x00" * 32, - "tcp": (9000).to_bytes(2, "big"), - }, - ) - assert enr.tcp_port == 9000 - - def test_enr_multiaddr_construction(self) -> None: - """multiaddr() constructs valid multiaddress.""" - enr = ENR( - signature=Bytes64(b"\x00" * 64), - seq=Uint64(1), - pairs={ - "id": b"v4", - "secp256k1": b"\x02" + b"\x00" * 32, - "ip": b"\xc0\xa8\x01\x01", - "tcp": (9000).to_bytes(2, "big"), - }, - ) - assert enr.multiaddr() == "/ip4/192.168.1.1/tcp/9000" - - def test_enr_has_key(self) -> None: - """has() correctly checks key presence.""" - enr = ENR( - signature=Bytes64(b"\x00" * 64), - seq=Uint64(1), - pairs={ - "id": b"v4", - "secp256k1": b"\x02" + b"\x00" * 32, - }, - ) - assert enr.has(keys.ID) - assert enr.has(keys.SECP256K1) - assert not enr.has(keys.IP) - assert not enr.has(keys.ETH2) - - def test_enr_get_key(self) -> None: - """get() retrieves values by key.""" - enr = ENR( - signature=Bytes64(b"\x00" * 64), - seq=Uint64(1), - pairs={ - "id": b"v4", - }, - ) - assert enr.get(keys.ID) == b"v4" - assert enr.get(keys.IP) is None - - def test_enr_is_valid_basic(self) -> None: - """is_valid() checks basic structure.""" - valid_enr = ENR( - signature=Bytes64(b"\x00" * 64), - seq=Uint64(1), - pairs={ - "id": b"v4", - "secp256k1": b"\x02" + b"\x00" * 32, - }, - ) - assert valid_enr.is_valid() - - # Missing public key - invalid_enr = ENR( - signature=Bytes64(b"\x00" * 64), - seq=Uint64(1), - pairs={ - "id": b"v4", - }, - ) - assert not invalid_enr.is_valid() - - def test_enr_compatibility(self) -> None: - """is_compatible_with() checks fork digest match.""" - eth2_bytes = b"\x12\x34\x56\x78" + b"\x02\x00\x00\x00" + b"\x00" * 8 - - enr1 = ENR( - signature=Bytes64(b"\x00" * 64), - seq=Uint64(1), - pairs={ - "id": b"v4", - "secp256k1": b"\x02" + b"\x00" * 32, - "eth2": eth2_bytes, - }, - ) + def test_none_creates_empty_subscriptions(self) -> None: + """none() creates empty subscriptions.""" + subnets = SyncCommitteeSubnets.none() + for i in range(4): + assert not subnets.is_subscribed(i) - enr2 = ENR( - signature=Bytes64(b"\x00" * 64), - seq=Uint64(2), - pairs={ - "id": b"v4", - "secp256k1": b"\x02" + b"\x00" * 32, - "eth2": eth2_bytes, - }, - ) + def test_all_creates_full_subscriptions(self) -> None: + """all() creates full subscriptions.""" + subnets = SyncCommitteeSubnets.all() + for i in range(4): + assert subnets.is_subscribed(i) - assert enr1.is_compatible_with(enr2) - - def test_enr_string_representation(self) -> None: - """ENR has readable string representation.""" - enr = ENR( - signature=Bytes64(b"\x00" * 64), - seq=Uint64(42), - pairs={ - "id": b"v4", - "secp256k1": b"\x02" + b"\x00" * 32, - "ip": b"\xc0\xa8\x01\x01", - "tcp": (9000).to_bytes(2, "big"), - }, - ) - s = str(enr) - assert "seq=42" in s - assert "192.168.1.1" in s - assert "tcp=9000" in s + def test_is_subscribed_with_valid_ids(self) -> None: + """is_subscribed() works for valid subnet IDs 0-3.""" + subnets = SyncCommitteeSubnets.all() + assert subnets.is_subscribed(0) + assert subnets.is_subscribed(1) + assert subnets.is_subscribed(2) + assert subnets.is_subscribed(3) + + def test_is_subscribed_raises_for_invalid_high_id(self) -> None: + """is_subscribed() raises for subnet ID >= 4.""" + subnets = SyncCommitteeSubnets.none() + with pytest.raises(ValueError, match="must be 0-3"): + subnets.is_subscribed(4) + + def test_is_subscribed_raises_for_negative_id(self) -> None: + """is_subscribed() raises for negative subnet ID.""" + subnets = SyncCommitteeSubnets.none() + with pytest.raises(ValueError, match="must be 0-3"): + subnets.is_subscribed(-1) diff --git a/tests/lean_spec/test_cli.py b/tests/lean_spec/test_cli.py index daf366df..6d945953 100644 --- a/tests/lean_spec/test_cli.py +++ b/tests/lean_spec/test_cli.py @@ -8,7 +8,7 @@ import asyncio import base64 -from unittest.mock import AsyncMock, patch +from unittest.mock import AsyncMock, Mock, patch import pytest @@ -18,7 +18,7 @@ from lean_spec.subspecs.containers.slot import Slot from lean_spec.subspecs.ssz.hash import hash_tree_root from lean_spec.types import Bytes32, Uint64 -from lean_spec.types.rlp import encode as rlp_encode +from lean_spec.types.rlp import encode_rlp from tests.lean_spec.helpers import make_genesis_state @@ -26,7 +26,7 @@ # This ENR has: ip=192.168.1.1, tcp=9000 def _make_enr_with_tcp(ip_bytes: bytes, tcp_port: int) -> str: """Create a minimal ENR string with IPv4 and TCP port.""" - rlp_data = rlp_encode( + rlp_data = encode_rlp( [ b"\x00" * 64, # signature b"\x01", # seq = 1 @@ -46,7 +46,7 @@ def _make_enr_with_tcp(ip_bytes: bytes, tcp_port: int) -> str: def _make_enr_with_ipv6_tcp(ip6_bytes: bytes, tcp_port: int) -> str: """Create a minimal ENR string with IPv6 and TCP port.""" - rlp_data = rlp_encode( + rlp_data = encode_rlp( [ b"\x00" * 64, # signature b"\x01", # seq = 1 @@ -66,7 +66,7 @@ def _make_enr_with_ipv6_tcp(ip6_bytes: bytes, tcp_port: int) -> str: def _make_enr_without_tcp(ip_bytes: bytes) -> str: """Create an ENR string with IPv4 but no TCP port (UDP only).""" - rlp_data = rlp_encode( + rlp_data = encode_rlp( [ b"\x00" * 64, # signature b"\x01", # seq = 1 @@ -137,7 +137,12 @@ def test_whitespace_prefix_not_detected(self) -> None: class TestResolveBootnode: - """Tests for resolve_bootnode() resolution function.""" + """Tests for resolve_bootnode() resolution function. + + Note: Tests use fake ENRs with zero signatures. + We mock verify_signature to return True since these tests + focus on resolution logic, not signature verification. + """ def test_resolve_multiaddr_unchanged(self) -> None: """Multiaddr strings are returned unchanged.""" @@ -150,19 +155,22 @@ def test_resolve_arbitrary_multiaddr_unchanged(self) -> None: arbitrary = "/some/arbitrary/path" assert resolve_bootnode(arbitrary) == arbitrary - def test_resolve_valid_enr_with_tcp(self) -> None: + @patch("lean_spec.subspecs.networking.enr.enr.ENR.verify_signature", return_value=True) + def test_resolve_valid_enr_with_tcp(self, mock_verify: AsyncMock) -> None: """ENR with IPv4+TCP extracts multiaddr correctly.""" result = resolve_bootnode(ENR_WITH_TCP) assert result == "/ip4/192.168.1.1/tcp/9000" - def test_resolve_enr_ipv6(self) -> None: + @patch("lean_spec.subspecs.networking.enr.enr.ENR.verify_signature", return_value=True) + def test_resolve_enr_ipv6(self, mock_verify: AsyncMock) -> None: """ENR with IPv6+TCP extracts multiaddr correctly.""" result = resolve_bootnode(ENR_WITH_IPV6_TCP) # IPv6 loopback ::1 formatted as full hex assert "/ip6/" in result assert "/tcp/9000" in result - def test_resolve_enr_without_tcp_raises(self) -> None: + @patch("lean_spec.subspecs.networking.enr.enr.ENR.verify_signature", return_value=True) + def test_resolve_enr_without_tcp_raises(self, mock_verify: AsyncMock) -> None: """ENR without TCP port raises ValueError.""" with pytest.raises(ValueError, match=r"no TCP connection info"): resolve_bootnode(ENR_WITHOUT_TCP) @@ -182,7 +190,8 @@ def test_resolve_enr_prefix_only_raises(self) -> None: with pytest.raises(ValueError): resolve_bootnode("enr:") - def test_resolve_enr_with_different_ports(self) -> None: + @patch("lean_spec.subspecs.networking.enr.enr.ENR.verify_signature", return_value=True) + def test_resolve_enr_with_different_ports(self, mock_verify: AsyncMock) -> None: """ENR resolution handles various port numbers.""" # Port 30303 enr_30303 = _make_enr_with_tcp(b"\x7f\x00\x00\x01", 30303) @@ -199,7 +208,8 @@ def test_resolve_enr_with_different_ports(self) -> None: result = resolve_bootnode(enr_max) assert result == "/ip4/127.0.0.1/tcp/65535" - def test_resolve_enr_with_different_ips(self) -> None: + @patch("lean_spec.subspecs.networking.enr.enr.ENR.verify_signature", return_value=True) + def test_resolve_enr_with_different_ips(self, mock_verify: Mock) -> None: """ENR resolution handles various IPv4 addresses.""" test_cases = [ (b"\x00\x00\x00\x00", "0.0.0.0"), @@ -215,7 +225,8 @@ def test_resolve_enr_with_different_ips(self) -> None: class TestMixedBootnodes: """Integration tests for mixed bootnode types.""" - def test_mixed_bootnodes_list(self) -> None: + @patch("lean_spec.subspecs.networking.enr.enr.ENR.verify_signature", return_value=True) + def test_mixed_bootnodes_list(self, mock_verify: Mock) -> None: """Process a list containing both ENR and multiaddr.""" bootnodes = [ MULTIADDR_IPV4, @@ -229,7 +240,8 @@ def test_mixed_bootnodes_list(self) -> None: assert resolved[1] == "/ip4/192.168.1.1/tcp/9000" assert resolved[2] == "/ip4/10.0.0.1/tcp/8000" - def test_filter_invalid_enrs(self) -> None: + @patch("lean_spec.subspecs.networking.enr.enr.ENR.verify_signature", return_value=True) + def test_filter_invalid_enrs(self, mock_verify: Mock) -> None: """Demonstrate filtering out invalid ENRs from a bootnode list.""" bootnodes = [ MULTIADDR_IPV4, diff --git a/tests/lean_spec/types/test_rlp.py b/tests/lean_spec/types/test_rlp.py index 8a8d8f24..0d835811 100644 --- a/tests/lean_spec/types/test_rlp.py +++ b/tests/lean_spec/types/test_rlp.py @@ -14,9 +14,9 @@ SINGLE_BYTE_MAX, RLPDecodingError, RLPItem, - decode, - decode_list, - encode, + decode_rlp, + decode_rlp_list, + encode_rlp, ) # Derived constants for test assertions. @@ -30,7 +30,7 @@ class TestEncodeEmptyString: def test_encode_empty_string(self) -> None: """Empty string encodes to 0x80.""" - result = encode(b"") + result = encode_rlp(b"") assert result == bytes.fromhex("80") @@ -39,24 +39,24 @@ class TestEncodeSingleByte: def test_encode_byte_0x00(self) -> None: """Byte 0x00 encodes as itself.""" - result = encode(b"\x00") + result = encode_rlp(b"\x00") assert result == bytes.fromhex("00") def test_encode_byte_0x01(self) -> None: """Byte 0x01 encodes as itself.""" - result = encode(b"\x01") + result = encode_rlp(b"\x01") assert result == bytes.fromhex("01") def test_encode_byte_0x7f(self) -> None: """Maximum single-byte value (0x7f) encodes as itself.""" - result = encode(b"\x7f") + result = encode_rlp(b"\x7f") assert result == bytes.fromhex("7f") @pytest.mark.parametrize("byte_val", range(0x00, SINGLE_BYTE_MAX + 1)) def test_encode_all_single_byte_values(self, byte_val: int) -> None: """All single-byte values 0x00-0x7f encode as themselves.""" data = bytes([byte_val]) - result = encode(data) + result = encode_rlp(data) assert result == data @@ -65,14 +65,14 @@ class TestEncodeShortString: def test_encode_short_string_dog(self) -> None: """'dog' encodes with prefix 0x83 (0x80 + 3) followed by ASCII bytes.""" - result = encode(b"dog") + result = encode_rlp(b"dog") assert result == bytes.fromhex("83646f67") def test_encode_short_string_55_bytes(self) -> None: """55-byte string uses short string encoding (max for this category).""" data = b"Lorem ipsum dolor sit amet, consectetur adipisicing eli" assert len(data) == SHORT_STRING_MAX_LEN - result = encode(data) + result = encode_rlp(data) expected = bytes.fromhex( "b74c6f72656d20697073756d20646f6c6f722073697420616d65742c20" "636f6e7365637465747572206164697069736963696e6720656c69" @@ -81,7 +81,7 @@ def test_encode_short_string_55_bytes(self) -> None: def test_encode_single_byte_above_0x7f(self) -> None: """Single byte 0x80 uses short string encoding, not single-byte encoding.""" - result = encode(b"\x80") + result = encode_rlp(b"\x80") assert result == bytes([SHORT_STRING_PREFIX + 1, 0x80]) @pytest.mark.parametrize("length", [1, 10, 20, 30, 40, 50, SHORT_STRING_MAX_LEN]) @@ -89,7 +89,7 @@ def test_encode_short_string_various_lengths(self, length: int) -> None: """Short strings of various lengths are prefixed with 0x80 + length.""" # Use bytes above 0x7f to ensure short string encoding is used data = bytes([0x80 + (i % 0x7F) for i in range(length)]) - result = encode(data) + result = encode_rlp(data) assert result[0] == SHORT_STRING_PREFIX + length assert result[1:] == data @@ -101,7 +101,7 @@ def test_encode_long_string_56_bytes(self) -> None: """56-byte string uses long string encoding.""" data = b"Lorem ipsum dolor sit amet, consectetur adipisicing elit" assert len(data) == SHORT_STRING_MAX_LEN + 1 - result = encode(data) + result = encode_rlp(data) expected = bytes.fromhex( "b8384c6f72656d20697073756d20646f6c6f722073697420616d65742c20" "636f6e7365637465747572206164697069736963696e6720656c6974" @@ -113,7 +113,7 @@ def test_encode_long_string_1024_bytes(self) -> None: # Use simple repeated bytes to avoid codespell false positives. data = b"x" * 1024 assert len(data) == 1024 - result = encode(data) + result = encode_rlp(data) # Prefix 0xb9 = 0xb7 + 2 (2 bytes for length) # Length 0x0400 = 1024 in big-endian assert result[0] == LONG_STRING_PREFIX + 1 # 0xb9 @@ -123,7 +123,7 @@ def test_encode_long_string_1024_bytes(self) -> None: def test_encode_long_string_boundary(self) -> None: """String at exact boundary (56 bytes) uses long encoding.""" data = b"a" * (SHORT_STRING_MAX_LEN + 1) - result = encode(data) + result = encode_rlp(data) # Prefix 0xb8 = 0xb7 + 1 (1 byte for length) assert result[0] == LONG_STRING_PREFIX assert result[1] == len(data) @@ -135,7 +135,7 @@ class TestEncodeEmptyList: def test_encode_empty_list(self) -> None: """Empty list encodes to 0xc0.""" - result = encode([]) + result = encode_rlp([]) assert result == bytes.fromhex("c0") @@ -144,14 +144,14 @@ class TestEncodeShortList: def test_encode_string_list(self) -> None: """List of strings ['dog', 'god', 'cat'] encodes correctly.""" - result = encode([b"dog", b"god", b"cat"]) + result = encode_rlp([b"dog", b"god", b"cat"]) assert result == bytes.fromhex("cc83646f6783676f6483636174") def test_encode_multilist(self) -> None: """Mixed list ['zw', [4], 1] encodes correctly.""" # 4 encodes as 0x04 (single byte) # 1 encodes as 0x01 (single byte) - result = encode([b"zw", [b"\x04"], b"\x01"]) + result = encode_rlp([b"zw", [b"\x04"], b"\x01"]) assert result == bytes.fromhex("c6827a77c10401") def test_encode_short_list_max_payload(self) -> None: @@ -159,7 +159,7 @@ def test_encode_short_list_max_payload(self) -> None: # Create a list that has exactly 55 bytes of payload # Each "a" encodes as 0x61 (single byte), so 55 "a"s = 55 bytes payload items: list[RLPItem] = [b"a" for _ in range(SHORT_LIST_MAX_LEN)] - result = encode(items) + result = encode_rlp(items) assert result[0] == SHORT_LIST_PREFIX + SHORT_LIST_MAX_LEN # 0xf7 @@ -169,7 +169,7 @@ class TestEncodeLongList: def test_encode_long_list_four_nested(self) -> None: """Long list with 4 nested lists encodes correctly.""" inner = [b"asdf", b"qwer", b"zxcv"] - result = encode([inner, inner, inner, inner]) + result = encode_rlp([inner, inner, inner, inner]) expected = bytes.fromhex( "f840cf84617364668471776572847a786376cf84617364668471776572847a786376" "cf84617364668471776572847a786376cf84617364668471776572847a786376" @@ -179,7 +179,7 @@ def test_encode_long_list_four_nested(self) -> None: def test_encode_long_list_32_nested(self) -> None: """Long list with 32 nested lists uses 2-byte length prefix.""" inner = [b"asdf", b"qwer", b"zxcv"] - result = encode([inner] * 32) + result = encode_rlp([inner] * 32) expected_start = bytes.fromhex("f90200") # 0xf9 = 0xf7 + 2, length = 0x0200 = 512 assert result[:3] == expected_start @@ -198,7 +198,7 @@ def test_encode_short_list_11_elements(self) -> None: b"asdf", b"qwer", ] - result = encode(items) + result = encode_rlp(items) expected = bytes.fromhex( "f784617364668471776572847a78637684617364668471776572847a78637684617364" "668471776572847a78637684617364668471776572" @@ -211,12 +211,12 @@ class TestEncodeNestedLists: def test_encode_lists_of_lists(self) -> None: """Nested empty lists [[[], []], []] encode correctly.""" - result = encode([[[], []], []]) + result = encode_rlp([[[], []], []]) assert result == bytes.fromhex("c4c2c0c0c0") def test_encode_lists_of_lists_complex(self) -> None: """Complex nested structure [[], [[]], [[], [[]]]] encodes correctly.""" - result = encode([[], [[]], [[], [[]]]]) + result = encode_rlp([[], [[]], [[], [[]]]]) assert result == bytes.fromhex("c7c0c1c0c3c0c1c0") @@ -226,32 +226,32 @@ class TestEncodeIntegers: def test_encode_zero(self) -> None: """Integer 0 encodes as empty string (0x80).""" # In RLP, 0 is represented as empty byte string - result = encode(b"") + result = encode_rlp(b"") assert result == bytes.fromhex("80") def test_encode_small_integers(self) -> None: """Small integers 1-127 encode as single bytes.""" - assert encode(b"\x01") == bytes.fromhex("01") - assert encode(b"\x10") == bytes.fromhex("10") # 16 - assert encode(b"\x4f") == bytes.fromhex("4f") # 79 - assert encode(b"\x7f") == bytes.fromhex("7f") # 127 + assert encode_rlp(b"\x01") == bytes.fromhex("01") + assert encode_rlp(b"\x10") == bytes.fromhex("10") # 16 + assert encode_rlp(b"\x4f") == bytes.fromhex("4f") # 79 + assert encode_rlp(b"\x7f") == bytes.fromhex("7f") # 127 def test_encode_medium_integers(self) -> None: """Integers >= 128 encode as short strings.""" # 128 = 0x80 (1 byte, but > 0x7f so needs prefix) - assert encode(b"\x80") == bytes.fromhex("8180") + assert encode_rlp(b"\x80") == bytes.fromhex("8180") # 1000 = 0x03e8 (2 bytes) - assert encode((1000).to_bytes(2, "big")) == bytes.fromhex("8203e8") + assert encode_rlp((1000).to_bytes(2, "big")) == bytes.fromhex("8203e8") # 100000 = 0x0186a0 (3 bytes) - assert encode((100000).to_bytes(3, "big")) == bytes.fromhex("830186a0") + assert encode_rlp((100000).to_bytes(3, "big")) == bytes.fromhex("830186a0") def test_encode_big_integer_2_pow_256(self) -> None: """2^256 encodes as 33-byte string.""" big_int = 2**256 big_bytes = big_int.to_bytes(33, "big") - result = encode(big_bytes) + result = encode_rlp(big_bytes) expected = bytes.fromhex( "a1010000000000000000000000000000000000000000000000000000000000000000" ) @@ -264,22 +264,22 @@ class TestEncodeTypeErrors: def test_encode_invalid_type_int(self) -> None: """Encoding an integer directly raises TypeError.""" with pytest.raises(TypeError, match=r"Cannot RLP encode type: int"): - encode(42) # type: ignore[arg-type] + encode_rlp(42) # type: ignore[arg-type] def test_encode_invalid_type_str(self) -> None: """Encoding a string directly raises TypeError.""" with pytest.raises(TypeError, match=r"Cannot RLP encode type: str"): - encode("hello") # type: ignore[arg-type] + encode_rlp("hello") # type: ignore[arg-type] def test_encode_invalid_type_none(self) -> None: """Encoding None raises TypeError.""" with pytest.raises(TypeError, match=r"Cannot RLP encode type: NoneType"): - encode(None) # type: ignore[arg-type] + encode_rlp(None) # type: ignore[arg-type] def test_encode_invalid_nested_type(self) -> None: """Encoding a list with invalid nested type raises TypeError.""" with pytest.raises(TypeError, match=r"Cannot RLP encode type: int"): - encode([b"valid", 123]) # type: ignore[list-item] + encode_rlp([b"valid", 123]) # type: ignore[list-item] class TestDecodeEmptyString: @@ -287,7 +287,7 @@ class TestDecodeEmptyString: def test_decode_empty_string(self) -> None: """0x80 decodes to empty string.""" - result = decode(bytes.fromhex("80")) + result = decode_rlp(bytes.fromhex("80")) assert result == b"" @@ -296,24 +296,24 @@ class TestDecodeSingleByte: def test_decode_byte_0x00(self) -> None: """0x00 decodes to single byte 0x00.""" - result = decode(bytes.fromhex("00")) + result = decode_rlp(bytes.fromhex("00")) assert result == b"\x00" def test_decode_byte_0x01(self) -> None: """0x01 decodes to single byte 0x01.""" - result = decode(bytes.fromhex("01")) + result = decode_rlp(bytes.fromhex("01")) assert result == b"\x01" def test_decode_byte_0x7f(self) -> None: """0x7f decodes to single byte 0x7f.""" - result = decode(bytes.fromhex("7f")) + result = decode_rlp(bytes.fromhex("7f")) assert result == b"\x7f" @pytest.mark.parametrize("byte_val", range(0x00, SINGLE_BYTE_MAX + 1)) def test_decode_all_single_byte_values(self, byte_val: int) -> None: """All single-byte values 0x00-0x7f decode correctly.""" data = bytes([byte_val]) - result = decode(data) + result = decode_rlp(data) assert result == data @@ -322,7 +322,7 @@ class TestDecodeShortString: def test_decode_short_string_dog(self) -> None: """0x83646f67 decodes to 'dog'.""" - result = decode(bytes.fromhex("83646f67")) + result = decode_rlp(bytes.fromhex("83646f67")) assert result == b"dog" def test_decode_short_string_55_bytes(self) -> None: @@ -331,7 +331,7 @@ def test_decode_short_string_55_bytes(self) -> None: "b74c6f72656d20697073756d20646f6c6f722073697420616d65742c20" "636f6e7365637465747572206164697069736963696e6720656c69" ) - result = decode(encoded) + result = decode_rlp(encoded) assert result == b"Lorem ipsum dolor sit amet, consectetur adipisicing eli" @@ -344,15 +344,15 @@ def test_decode_long_string_56_bytes(self) -> None: "b8384c6f72656d20697073756d20646f6c6f722073697420616d65742c20" "636f6e7365637465747572206164697069736963696e6720656c6974" ) - result = decode(encoded) + result = decode_rlp(encoded) assert result == b"Lorem ipsum dolor sit amet, consectetur adipisicing elit" def test_decode_long_string_1024_bytes(self) -> None: """1024-byte string with 2-byte length prefix decodes correctly.""" # Use simple repeated bytes to avoid codespell false positives. expected_data = b"y" * 1024 - encoded = encode(expected_data) - result = decode(encoded) + encoded = encode_rlp(expected_data) + result = decode_rlp(encoded) assert result == expected_data @@ -361,7 +361,7 @@ class TestDecodeEmptyList: def test_decode_empty_list(self) -> None: """0xc0 decodes to empty list.""" - result = decode(bytes.fromhex("c0")) + result = decode_rlp(bytes.fromhex("c0")) assert result == [] @@ -370,12 +370,12 @@ class TestDecodeShortList: def test_decode_string_list(self) -> None: """Encoded string list decodes correctly.""" - result = decode(bytes.fromhex("cc83646f6783676f6483636174")) + result = decode_rlp(bytes.fromhex("cc83646f6783676f6483636174")) assert result == [b"dog", b"god", b"cat"] def test_decode_multilist(self) -> None: """Mixed list decodes correctly.""" - result = decode(bytes.fromhex("c6827a77c10401")) + result = decode_rlp(bytes.fromhex("c6827a77c10401")) assert result == [b"zw", [b"\x04"], b"\x01"] @@ -388,7 +388,7 @@ def test_decode_long_list_four_nested(self) -> None: "f840cf84617364668471776572847a786376cf84617364668471776572847a786376" "cf84617364668471776572847a786376cf84617364668471776572847a786376" ) - result = decode(encoded) + result = decode_rlp(encoded) inner = [b"asdf", b"qwer", b"zxcv"] assert result == [inner, inner, inner, inner] @@ -398,12 +398,12 @@ class TestDecodeNestedLists: def test_decode_lists_of_lists(self) -> None: """Nested empty lists decode correctly.""" - result = decode(bytes.fromhex("c4c2c0c0c0")) + result = decode_rlp(bytes.fromhex("c4c2c0c0c0")) assert result == [[[], []], []] def test_decode_lists_of_lists_complex(self) -> None: """Complex nested structure decodes correctly.""" - result = decode(bytes.fromhex("c7c0c1c0c3c0c1c0")) + result = decode_rlp(bytes.fromhex("c7c0c1c0c3c0c1c0")) assert result == [[], [[]], [[], [[]]]] @@ -413,43 +413,43 @@ class TestDecodeErrors: def test_decode_empty_data(self) -> None: """Decoding empty data raises RLPDecodingError.""" with pytest.raises(RLPDecodingError, match=r"Empty RLP data"): - decode(b"") + decode_rlp(b"") def test_decode_trailing_data(self) -> None: """Extra bytes after valid RLP raise RLPDecodingError.""" # Valid empty string (0x80) followed by extra byte with pytest.raises(RLPDecodingError, match=r"Trailing data"): - decode(bytes.fromhex("8000")) + decode_rlp(bytes.fromhex("8000")) def test_decode_short_string_truncated(self) -> None: """Truncated short string raises RLPDecodingError.""" # 0x83 indicates 3-byte string, but only 2 bytes provided with pytest.raises(RLPDecodingError, match=r"Data too short"): - decode(bytes.fromhex("836465")) # "de" instead of "dog" + decode_rlp(bytes.fromhex("836465")) # "de" instead of "dog" def test_decode_long_string_truncated_length(self) -> None: """Truncated length field in long string raises RLPDecodingError.""" # 0xb9 indicates 2-byte length, but only 1 byte provided with pytest.raises(RLPDecodingError, match=r"Data too short"): - decode(bytes.fromhex("b904")) + decode_rlp(bytes.fromhex("b904")) def test_decode_long_string_truncated_payload(self) -> None: """Truncated payload in long string raises RLPDecodingError.""" # 0xb838 indicates 56 bytes, but insufficient data provided with pytest.raises(RLPDecodingError, match=r"Data too short"): - decode(bytes.fromhex("b8380000")) # Only 2 bytes of payload + decode_rlp(bytes.fromhex("b8380000")) # Only 2 bytes of payload def test_decode_short_list_truncated(self) -> None: """Truncated short list raises RLPDecodingError.""" # 0xc3 indicates 3-byte payload, but only 2 bytes provided with pytest.raises(RLPDecodingError, match=r"Data too short"): - decode(bytes.fromhex("c38080")) + decode_rlp(bytes.fromhex("c38080")) def test_decode_long_list_truncated_length(self) -> None: """Truncated length field in long list raises RLPDecodingError.""" # 0xf9 indicates 2-byte length, but only 1 byte provided with pytest.raises(RLPDecodingError, match=r"Data too short"): - decode(bytes.fromhex("f904")) + decode_rlp(bytes.fromhex("f904")) def test_decode_non_canonical_long_string_for_short(self) -> None: """Using long string encoding for short string is non-canonical.""" @@ -457,13 +457,13 @@ def test_decode_non_canonical_long_string_for_short(self) -> None: # but 0x38 <= 55, so this should be encoded as short string with pytest.raises(RLPDecodingError, match=r"Non-canonical.*long string"): # 0xb8 followed by length 0x37 (55) - should have used short encoding - decode(bytes.fromhex("b837") + b"a" * 55) + decode_rlp(bytes.fromhex("b837") + b"a" * 55) def test_decode_non_canonical_long_list_for_short(self) -> None: """Using long list encoding for short list is non-canonical.""" # 0xf8 followed by length 0x37 (55) - should have used short encoding with pytest.raises(RLPDecodingError, match=r"Non-canonical.*long list"): - decode(bytes.fromhex("f837") + bytes.fromhex("80") * 55) + decode_rlp(bytes.fromhex("f837") + bytes.fromhex("80") * 55) class TestDecodeListFunction: @@ -471,18 +471,18 @@ class TestDecodeListFunction: def test_decode_list_success(self) -> None: """decode_list returns list of bytes for flat list.""" - result = decode_list(bytes.fromhex("cc83646f6783676f6483636174")) + result = decode_rlp_list(bytes.fromhex("cc83646f6783676f6483636174")) assert result == [b"dog", b"god", b"cat"] def test_decode_list_not_a_list(self) -> None: """decode_list raises error when data is not a list.""" with pytest.raises(RLPDecodingError, match=r"Expected RLP list"): - decode_list(bytes.fromhex("83646f67")) # Encodes "dog", not a list + decode_rlp_list(bytes.fromhex("83646f67")) # Encodes "dog", not a list def test_decode_list_nested_list_rejected(self) -> None: """decode_list raises error when list contains nested lists.""" with pytest.raises(RLPDecodingError, match=r"Element .* is not bytes"): - decode_list(bytes.fromhex("c4c2c0c0c0")) # [[[], []], []] + decode_rlp_list(bytes.fromhex("c4c2c0c0c0")) # [[[], []], []] class TestEncodeDecodeRoundtrip: @@ -509,8 +509,8 @@ class TestEncodeDecodeRoundtrip: ) def test_roundtrip(self, item: RLPItem) -> None: """Encoding then decoding returns the original item.""" - encoded = encode(item) - decoded = decode(encoded) + encoded = encode_rlp(item) + decoded = decode_rlp(encoded) assert decoded == item def test_roundtrip_large_nested_structure(self) -> None: @@ -521,8 +521,8 @@ def test_roundtrip_large_nested_structure(self) -> None: [inner, inner], [[inner], [inner, inner]], ] - encoded = encode(structure) - decoded = decode(encoded) + encoded = encode_rlp(structure) + decoded = decode_rlp(encoded) assert decoded == structure @@ -531,28 +531,28 @@ class TestOfficialEthereumVectors: def test_emptystring(self) -> None: """Official test vector: emptystring.""" - assert encode(b"") == bytes.fromhex("80") - assert decode(bytes.fromhex("80")) == b"" + assert encode_rlp(b"") == bytes.fromhex("80") + assert decode_rlp(bytes.fromhex("80")) == b"" def test_bytestring00(self) -> None: """Official test vector: bytestring00.""" - assert encode(b"\x00") == bytes.fromhex("00") - assert decode(bytes.fromhex("00")) == b"\x00" + assert encode_rlp(b"\x00") == bytes.fromhex("00") + assert decode_rlp(bytes.fromhex("00")) == b"\x00" def test_bytestring01(self) -> None: """Official test vector: bytestring01.""" - assert encode(b"\x01") == bytes.fromhex("01") - assert decode(bytes.fromhex("01")) == b"\x01" + assert encode_rlp(b"\x01") == bytes.fromhex("01") + assert decode_rlp(bytes.fromhex("01")) == b"\x01" def test_bytestring7f(self) -> None: """Official test vector: bytestring7F.""" - assert encode(b"\x7f") == bytes.fromhex("7f") - assert decode(bytes.fromhex("7f")) == b"\x7f" + assert encode_rlp(b"\x7f") == bytes.fromhex("7f") + assert decode_rlp(bytes.fromhex("7f")) == b"\x7f" def test_shortstring(self) -> None: """Official test vector: shortstring.""" - assert encode(b"dog") == bytes.fromhex("83646f67") - assert decode(bytes.fromhex("83646f67")) == b"dog" + assert encode_rlp(b"dog") == bytes.fromhex("83646f67") + assert decode_rlp(bytes.fromhex("83646f67")) == b"dog" def test_shortstring2(self) -> None: """Official test vector: shortstring2 (55 bytes - max short string).""" @@ -561,8 +561,8 @@ def test_shortstring2(self) -> None: "b74c6f72656d20697073756d20646f6c6f722073697420616d65742c20" "636f6e7365637465747572206164697069736963696e6720656c69" ) - assert encode(data) == expected - assert decode(expected) == data + assert encode_rlp(data) == expected + assert decode_rlp(expected) == data def test_longstring(self) -> None: """Official test vector: longstring (56 bytes - min long string).""" @@ -571,42 +571,42 @@ def test_longstring(self) -> None: "b8384c6f72656d20697073756d20646f6c6f722073697420616d65742c20" "636f6e7365637465747572206164697069736963696e6720656c6974" ) - assert encode(data) == expected - assert decode(expected) == data + assert encode_rlp(data) == expected + assert decode_rlp(expected) == data def test_emptylist(self) -> None: """Official test vector: emptylist.""" - assert encode([]) == bytes.fromhex("c0") - assert decode(bytes.fromhex("c0")) == [] + assert encode_rlp([]) == bytes.fromhex("c0") + assert decode_rlp(bytes.fromhex("c0")) == [] def test_stringlist(self) -> None: """Official test vector: stringlist.""" data: RLPItem = [b"dog", b"god", b"cat"] expected = bytes.fromhex("cc83646f6783676f6483636174") - assert encode(data) == expected - assert decode(expected) == data + assert encode_rlp(data) == expected + assert decode_rlp(expected) == data def test_multilist(self) -> None: """Official test vector: multilist.""" # "zw" = 0x7a77, [4] = 0x04, 1 = 0x01 data: RLPItem = [b"zw", [b"\x04"], b"\x01"] expected = bytes.fromhex("c6827a77c10401") - assert encode(data) == expected - assert decode(expected) == data + assert encode_rlp(data) == expected + assert decode_rlp(expected) == data def test_listsoflists(self) -> None: """Official test vector: listsoflists.""" data: RLPItem = [[[], []], []] expected = bytes.fromhex("c4c2c0c0c0") - assert encode(data) == expected - assert decode(expected) == data + assert encode_rlp(data) == expected + assert decode_rlp(expected) == data def test_listsoflists2(self) -> None: """Official test vector: listsoflists2.""" data: RLPItem = [[], [[]], [[], [[]]]] expected = bytes.fromhex("c7c0c1c0c3c0c1c0") - assert encode(data) == expected - assert decode(expected) == data + assert encode_rlp(data) == expected + assert decode_rlp(expected) == data def test_dicttest1(self) -> None: """Official test vector: dictTest1 (list of key-value pairs).""" @@ -620,8 +620,8 @@ def test_dicttest1(self) -> None: "ecca846b6579318476616c31ca846b6579328476616c32" "ca846b6579338476616c33ca846b6579348476616c34" ) - assert encode(data) == expected - assert decode(expected) == data + assert encode_rlp(data) == expected + assert decode_rlp(expected) == data def test_longlist1(self) -> None: """Official test vector: longList1.""" @@ -631,8 +631,8 @@ def test_longlist1(self) -> None: "f840cf84617364668471776572847a786376cf84617364668471776572847a786376" "cf84617364668471776572847a786376cf84617364668471776572847a786376" ) - assert encode(data) == expected - assert decode(expected) == data + assert encode_rlp(data) == expected + assert decode_rlp(expected) == data class TestBoundaryConditions: @@ -641,32 +641,32 @@ class TestBoundaryConditions: def test_single_byte_max_boundary(self) -> None: """Verify SINGLE_BYTE_MAX boundary (0x7f vs 0x80).""" # 0x7f = single byte encoding - assert encode(bytes([SINGLE_BYTE_MAX])) == bytes([SINGLE_BYTE_MAX]) + assert encode_rlp(bytes([SINGLE_BYTE_MAX])) == bytes([SINGLE_BYTE_MAX]) # 0x80 = short string encoding - assert encode(bytes([SINGLE_BYTE_MAX + 1])) == bytes([0x81, 0x80]) + assert encode_rlp(bytes([SINGLE_BYTE_MAX + 1])) == bytes([0x81, 0x80]) def test_short_string_max_boundary(self) -> None: """Verify SHORT_STRING_MAX_LEN boundary (55 vs 56 bytes).""" # 55 bytes = short string encoding (prefix 0xb7) data_55 = b"a" * SHORT_STRING_MAX_LEN - encoded_55 = encode(data_55) + encoded_55 = encode_rlp(data_55) assert encoded_55[0] == SHORT_STRING_PREFIX + SHORT_STRING_MAX_LEN # 0xb7 # 56 bytes = long string encoding (prefix 0xb8) data_56 = b"a" * (SHORT_STRING_MAX_LEN + 1) - encoded_56 = encode(data_56) + encoded_56 = encode_rlp(data_56) assert encoded_56[0] == LONG_STRING_PREFIX # 0xb8 def test_short_list_max_boundary(self) -> None: """Verify SHORT_LIST_MAX_LEN boundary (55 vs 56 bytes payload).""" # 55 bytes payload = short list encoding (prefix 0xf7) items_55: list[RLPItem] = [b"a" for _ in range(SHORT_LIST_MAX_LEN)] - encoded_55 = encode(items_55) + encoded_55 = encode_rlp(items_55) assert encoded_55[0] == SHORT_LIST_PREFIX + SHORT_LIST_MAX_LEN # 0xf7 # 56 bytes payload = long list encoding (prefix 0xf8) items_56: list[RLPItem] = [b"a" for _ in range(SHORT_LIST_MAX_LEN + 1)] - encoded_56 = encode(items_56) + encoded_56 = encode_rlp(items_56) assert encoded_56[0] == LONG_LIST_PREFIX # 0xf8 def test_prefix_boundaries(self) -> None: From 3fd3cc8643be747207847ee907764b527003e774 Mon Sep 17 00:00:00 2001 From: Thomas Coratger Date: Thu, 29 Jan 2026 17:07:18 +0100 Subject: [PATCH 2/6] more tests --- .../{test_enr.py => enr/test_eth2.py} | 26 ++---------------- .../subspecs/networking/enr/test_keys.py | 27 +++++++++++++++++++ 2 files changed, 29 insertions(+), 24 deletions(-) rename tests/lean_spec/subspecs/networking/{test_enr.py => enr/test_eth2.py} (85%) create mode 100644 tests/lean_spec/subspecs/networking/enr/test_keys.py diff --git a/tests/lean_spec/subspecs/networking/test_enr.py b/tests/lean_spec/subspecs/networking/enr/test_eth2.py similarity index 85% rename from tests/lean_spec/subspecs/networking/test_enr.py rename to tests/lean_spec/subspecs/networking/enr/test_eth2.py index ddeeb2da..8f200fc1 100644 --- a/tests/lean_spec/subspecs/networking/test_enr.py +++ b/tests/lean_spec/subspecs/networking/enr/test_eth2.py @@ -1,36 +1,14 @@ -"""Tests for Ethereum Node Record (ENR) specification.""" +"""Tests for Ethereum 2.0 ENR types (Eth2Data, AttestationSubnets, SyncCommitteeSubnets).""" import pytest from pydantic import ValidationError -from lean_spec.subspecs.networking.enr import Eth2Data, keys +from lean_spec.subspecs.networking.enr import Eth2Data from lean_spec.subspecs.networking.enr.eth2 import AttestationSubnets, SyncCommitteeSubnets from lean_spec.types import Uint64 from lean_spec.types.byte_arrays import Bytes4 -class TestEnrKeys: - """Tests for ENR key constants.""" - - def test_identity_keys(self) -> None: - """Identity keys have correct values.""" - assert keys.ID == "id" - assert keys.SECP256K1 == "secp256k1" - - def test_network_keys(self) -> None: - """Network keys have correct values.""" - assert keys.IP == "ip" - assert keys.IP6 == "ip6" - assert keys.TCP == "tcp" - assert keys.UDP == "udp" - - def test_ethereum_keys(self) -> None: - """Ethereum-specific keys have correct values.""" - assert keys.ETH2 == "eth2" - assert keys.ATTNETS == "attnets" - assert keys.SYNCNETS == "syncnets" - - class TestEth2Data: """Tests for Eth2Data structure.""" diff --git a/tests/lean_spec/subspecs/networking/enr/test_keys.py b/tests/lean_spec/subspecs/networking/enr/test_keys.py new file mode 100644 index 00000000..f46722bd --- /dev/null +++ b/tests/lean_spec/subspecs/networking/enr/test_keys.py @@ -0,0 +1,27 @@ +"""Tests for ENR key constants.""" + +from lean_spec.subspecs.networking.enr import keys + + +class TestEnrKeys: + """Tests for ENR key constants.""" + + def test_identity_keys(self) -> None: + """Identity keys have correct values.""" + assert keys.ID == "id" + assert keys.SECP256K1 == "secp256k1" + + def test_network_keys(self) -> None: + """Network keys have correct values.""" + assert keys.IP == "ip" + assert keys.IP6 == "ip6" + assert keys.TCP == "tcp" + assert keys.UDP == "udp" + assert keys.TCP6 == "tcp6" + assert keys.UDP6 == "udp6" + + def test_ethereum_keys(self) -> None: + """Ethereum-specific keys have correct values.""" + assert keys.ETH2 == "eth2" + assert keys.ATTNETS == "attnets" + assert keys.SYNCNETS == "syncnets" From 31b58a3df1ede63875828f826d13f83cf5faa323 Mon Sep 17 00:00:00 2001 From: Thomas Coratger Date: Thu, 29 Jan 2026 17:11:09 +0100 Subject: [PATCH 3/6] no patch in tests --- tests/lean_spec/test_cli.py | 141 ++++++++++++++++++++---------------- 1 file changed, 79 insertions(+), 62 deletions(-) diff --git a/tests/lean_spec/test_cli.py b/tests/lean_spec/test_cli.py index 6d945953..462d08fc 100644 --- a/tests/lean_spec/test_cli.py +++ b/tests/lean_spec/test_cli.py @@ -8,9 +8,13 @@ import asyncio import base64 -from unittest.mock import AsyncMock, Mock, patch +from unittest.mock import AsyncMock, patch import pytest +from Crypto.Hash import keccak +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives.asymmetric.utils import Prehashed, decode_dss_signature from lean_spec.__main__ import create_anchor_block, is_enr_string, resolve_bootnode from lean_spec.subspecs.containers import Block, BlockBody @@ -21,65 +25,85 @@ from lean_spec.types.rlp import encode_rlp from tests.lean_spec.helpers import make_genesis_state +# Generate a test keypair once for all ENR tests. +_TEST_PRIVATE_KEY = ec.generate_private_key(ec.SECP256K1()) +_TEST_PUBLIC_KEY = _TEST_PRIVATE_KEY.public_key() +_TEST_COMPRESSED_PUBKEY = _TEST_PUBLIC_KEY.public_bytes( + encoding=serialization.Encoding.X962, + format=serialization.PublicFormat.CompressedPoint, +) + + +def _sign_enr_content(content_items: list[bytes]) -> bytes: + """Sign ENR content and return 64-byte r||s signature.""" + content_rlp = encode_rlp(content_items) + + k = keccak.new(digest_bits=256) + k.update(content_rlp) + digest = k.digest() + + signature_der = _TEST_PRIVATE_KEY.sign(digest, ec.ECDSA(Prehashed(hashes.SHA256()))) + r, s = decode_dss_signature(signature_der) + return r.to_bytes(32, "big") + s.to_bytes(32, "big") + -# Valid ENR with IPv4 and TCP port (derived from EIP-778 test vector structure) -# This ENR has: ip=192.168.1.1, tcp=9000 def _make_enr_with_tcp(ip_bytes: bytes, tcp_port: int) -> str: - """Create a minimal ENR string with IPv4 and TCP port.""" - rlp_data = encode_rlp( - [ - b"\x00" * 64, # signature - b"\x01", # seq = 1 - b"id", - b"v4", - b"ip", - ip_bytes, - b"secp256k1", - b"\x02" + b"\x00" * 32, # compressed pubkey - b"tcp", - tcp_port.to_bytes(2, "big"), - ] - ) + """Create a properly signed ENR string with IPv4 and TCP port.""" + # Content items (keys must be sorted). + content_items: list[bytes] = [ + b"\x01", # seq = 1 + b"id", + b"v4", + b"ip", + ip_bytes, + b"secp256k1", + _TEST_COMPRESSED_PUBKEY, + b"tcp", + tcp_port.to_bytes(2, "big"), + ] + signature = _sign_enr_content(content_items) + + rlp_data = encode_rlp([signature] + content_items) b64_content = base64.urlsafe_b64encode(rlp_data).decode("utf-8").rstrip("=") return f"enr:{b64_content}" def _make_enr_with_ipv6_tcp(ip6_bytes: bytes, tcp_port: int) -> str: - """Create a minimal ENR string with IPv6 and TCP port.""" - rlp_data = encode_rlp( - [ - b"\x00" * 64, # signature - b"\x01", # seq = 1 - b"id", - b"v4", - b"ip6", - ip6_bytes, - b"secp256k1", - b"\x02" + b"\x00" * 32, # compressed pubkey - b"tcp", - tcp_port.to_bytes(2, "big"), - ] - ) + """Create a properly signed ENR string with IPv6 and TCP port.""" + content_items: list[bytes] = [ + b"\x01", # seq = 1 + b"id", + b"v4", + b"ip6", + ip6_bytes, + b"secp256k1", + _TEST_COMPRESSED_PUBKEY, + b"tcp", + tcp_port.to_bytes(2, "big"), + ] + signature = _sign_enr_content(content_items) + + rlp_data = encode_rlp([signature] + content_items) b64_content = base64.urlsafe_b64encode(rlp_data).decode("utf-8").rstrip("=") return f"enr:{b64_content}" def _make_enr_without_tcp(ip_bytes: bytes) -> str: - """Create an ENR string with IPv4 but no TCP port (UDP only).""" - rlp_data = encode_rlp( - [ - b"\x00" * 64, # signature - b"\x01", # seq = 1 - b"id", - b"v4", - b"ip", - ip_bytes, - b"secp256k1", - b"\x02" + b"\x00" * 32, # compressed pubkey - b"udp", - (30303).to_bytes(2, "big"), # UDP only, no TCP - ] - ) + """Create a properly signed ENR string with IPv4 but no TCP port (UDP only).""" + content_items: list[bytes] = [ + b"\x01", # seq = 1 + b"id", + b"v4", + b"ip", + ip_bytes, + b"secp256k1", + _TEST_COMPRESSED_PUBKEY, + b"udp", + (30303).to_bytes(2, "big"), # UDP only, no TCP + ] + signature = _sign_enr_content(content_items) + + rlp_data = encode_rlp([signature] + content_items) b64_content = base64.urlsafe_b64encode(rlp_data).decode("utf-8").rstrip("=") return f"enr:{b64_content}" @@ -155,22 +179,19 @@ def test_resolve_arbitrary_multiaddr_unchanged(self) -> None: arbitrary = "/some/arbitrary/path" assert resolve_bootnode(arbitrary) == arbitrary - @patch("lean_spec.subspecs.networking.enr.enr.ENR.verify_signature", return_value=True) - def test_resolve_valid_enr_with_tcp(self, mock_verify: AsyncMock) -> None: + def test_resolve_valid_enr_with_tcp(self) -> None: """ENR with IPv4+TCP extracts multiaddr correctly.""" result = resolve_bootnode(ENR_WITH_TCP) assert result == "/ip4/192.168.1.1/tcp/9000" - @patch("lean_spec.subspecs.networking.enr.enr.ENR.verify_signature", return_value=True) - def test_resolve_enr_ipv6(self, mock_verify: AsyncMock) -> None: + def test_resolve_enr_ipv6(self) -> None: """ENR with IPv6+TCP extracts multiaddr correctly.""" result = resolve_bootnode(ENR_WITH_IPV6_TCP) # IPv6 loopback ::1 formatted as full hex assert "/ip6/" in result assert "/tcp/9000" in result - @patch("lean_spec.subspecs.networking.enr.enr.ENR.verify_signature", return_value=True) - def test_resolve_enr_without_tcp_raises(self, mock_verify: AsyncMock) -> None: + def test_resolve_enr_without_tcp_raises(self) -> None: """ENR without TCP port raises ValueError.""" with pytest.raises(ValueError, match=r"no TCP connection info"): resolve_bootnode(ENR_WITHOUT_TCP) @@ -190,8 +211,7 @@ def test_resolve_enr_prefix_only_raises(self) -> None: with pytest.raises(ValueError): resolve_bootnode("enr:") - @patch("lean_spec.subspecs.networking.enr.enr.ENR.verify_signature", return_value=True) - def test_resolve_enr_with_different_ports(self, mock_verify: AsyncMock) -> None: + def test_resolve_enr_with_different_ports(self) -> None: """ENR resolution handles various port numbers.""" # Port 30303 enr_30303 = _make_enr_with_tcp(b"\x7f\x00\x00\x01", 30303) @@ -208,8 +228,7 @@ def test_resolve_enr_with_different_ports(self, mock_verify: AsyncMock) -> None: result = resolve_bootnode(enr_max) assert result == "/ip4/127.0.0.1/tcp/65535" - @patch("lean_spec.subspecs.networking.enr.enr.ENR.verify_signature", return_value=True) - def test_resolve_enr_with_different_ips(self, mock_verify: Mock) -> None: + def test_resolve_enr_with_different_ips(self) -> None: """ENR resolution handles various IPv4 addresses.""" test_cases = [ (b"\x00\x00\x00\x00", "0.0.0.0"), @@ -225,8 +244,7 @@ def test_resolve_enr_with_different_ips(self, mock_verify: Mock) -> None: class TestMixedBootnodes: """Integration tests for mixed bootnode types.""" - @patch("lean_spec.subspecs.networking.enr.enr.ENR.verify_signature", return_value=True) - def test_mixed_bootnodes_list(self, mock_verify: Mock) -> None: + def test_mixed_bootnodes_list(self) -> None: """Process a list containing both ENR and multiaddr.""" bootnodes = [ MULTIADDR_IPV4, @@ -240,8 +258,7 @@ def test_mixed_bootnodes_list(self, mock_verify: Mock) -> None: assert resolved[1] == "/ip4/192.168.1.1/tcp/9000" assert resolved[2] == "/ip4/10.0.0.1/tcp/8000" - @patch("lean_spec.subspecs.networking.enr.enr.ENR.verify_signature", return_value=True) - def test_filter_invalid_enrs(self, mock_verify: Mock) -> None: + def test_filter_invalid_enrs(self) -> None: """Demonstrate filtering out invalid ENRs from a bootnode list.""" bootnodes = [ MULTIADDR_IPV4, From bb3e4a6fb5f352b1ec493715fa3d42a3391b95a4 Mon Sep 17 00:00:00 2001 From: Thomas Coratger Date: Thu, 29 Jan 2026 17:59:24 +0100 Subject: [PATCH 4/6] fix comment --- tests/lean_spec/test_cli.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/tests/lean_spec/test_cli.py b/tests/lean_spec/test_cli.py index 462d08fc..0e47fddf 100644 --- a/tests/lean_spec/test_cli.py +++ b/tests/lean_spec/test_cli.py @@ -161,12 +161,7 @@ def test_whitespace_prefix_not_detected(self) -> None: class TestResolveBootnode: - """Tests for resolve_bootnode() resolution function. - - Note: Tests use fake ENRs with zero signatures. - We mock verify_signature to return True since these tests - focus on resolution logic, not signature verification. - """ + """Tests for resolve_bootnode() resolution function.""" def test_resolve_multiaddr_unchanged(self) -> None: """Multiaddr strings are returned unchanged.""" From c9a61d8856b28629a91b06e5813c567e1e74204e Mon Sep 17 00:00:00 2001 From: Thomas Coratger Date: Thu, 29 Jan 2026 18:09:49 +0100 Subject: [PATCH 5/6] enr: some improvements for eth2 and tests --- src/lean_spec/subspecs/networking/enr/eth2.py | 48 +++++--- src/lean_spec/subspecs/networking/types.py | 3 + .../subspecs/networking/enr/test_eth2.py | 116 ++++++++++++++++-- 3 files changed, 144 insertions(+), 23 deletions(-) diff --git a/src/lean_spec/subspecs/networking/enr/eth2.py b/src/lean_spec/subspecs/networking/enr/eth2.py index 231e344c..7dc5437f 100644 --- a/src/lean_spec/subspecs/networking/enr/eth2.py +++ b/src/lean_spec/subspecs/networking/enr/eth2.py @@ -26,7 +26,7 @@ from typing import ClassVar -from lean_spec.subspecs.networking.types import ForkDigest +from lean_spec.subspecs.networking.types import ForkDigest, Version from lean_spec.types import StrictBaseModel, Uint64 from lean_spec.types.bitfields import BaseBitvector from lean_spec.types.boolean import Boolean @@ -45,18 +45,18 @@ class Eth2Data(StrictBaseModel): fork_digest: ForkDigest """Current active fork identifier (4 bytes).""" - next_fork_version: ForkDigest - """Fork version of next scheduled fork. Equals current if none scheduled.""" + next_fork_version: Version + """Fork version of next scheduled fork. Equals current version if none scheduled.""" next_fork_epoch: Uint64 """Epoch when next fork activates. FAR_FUTURE_EPOCH if none scheduled.""" @classmethod - def no_scheduled_fork(cls, current_digest: ForkDigest) -> "Eth2Data": - """Create Eth2Data with no scheduled fork.""" + def no_scheduled_fork(cls, current_digest: ForkDigest, current_version: Version) -> "Eth2Data": + """Create Eth2Data indicating no scheduled fork.""" return cls( fork_digest=current_digest, - next_fork_version=current_digest, + next_fork_version=current_version, next_fork_epoch=FAR_FUTURE_EPOCH, ) @@ -74,32 +74,32 @@ class AttestationSubnets(BaseBitvector): @classmethod def none(cls) -> "AttestationSubnets": """No subscriptions.""" - return cls(data=[Boolean(False)] * 64) + return cls(data=[Boolean(False)] * cls.LENGTH) @classmethod def all(cls) -> "AttestationSubnets": """Subscribe to all 64 subnets.""" - return cls(data=[Boolean(True)] * 64) + return cls(data=[Boolean(True)] * cls.LENGTH) @classmethod def from_subnet_ids(cls, subnet_ids: list[int]) -> "AttestationSubnets": """Subscribe to specific subnets.""" - bits = [Boolean(False)] * 64 + bits = [Boolean(False)] * cls.LENGTH for sid in subnet_ids: - if not 0 <= sid < 64: + if not 0 <= sid < cls.LENGTH: raise ValueError(f"Subnet ID must be 0-63, got {sid}") bits[sid] = Boolean(True) return cls(data=bits) def is_subscribed(self, subnet_id: int) -> bool: """Check if subscribed to a subnet.""" - if not 0 <= subnet_id < 64: + if not 0 <= subnet_id < self.LENGTH: raise ValueError(f"Subnet ID must be 0-63, got {subnet_id}") return bool(self.data[subnet_id]) def subscribed_subnets(self) -> list[int]: """List of subscribed subnet IDs.""" - return [i for i in range(64) if self.data[i]] + return [i for i in range(self.LENGTH) if self.data[i]] def subscription_count(self) -> int: """Number of subscribed subnets.""" @@ -119,15 +119,33 @@ class SyncCommitteeSubnets(BaseBitvector): @classmethod def none(cls) -> "SyncCommitteeSubnets": """No subscriptions.""" - return cls(data=[Boolean(False)] * 4) + return cls(data=[Boolean(False)] * cls.LENGTH) @classmethod def all(cls) -> "SyncCommitteeSubnets": """Subscribe to all 4 subnets.""" - return cls(data=[Boolean(True)] * 4) + return cls(data=[Boolean(True)] * cls.LENGTH) + + @classmethod + def from_subnet_ids(cls, subnet_ids: list[int]) -> "SyncCommitteeSubnets": + """Subscribe to specific sync subnets.""" + bits = [Boolean(False)] * cls.LENGTH + for sid in subnet_ids: + if not 0 <= sid < cls.LENGTH: + raise ValueError(f"Sync subnet ID must be 0-3, got {sid}") + bits[sid] = Boolean(True) + return cls(data=bits) def is_subscribed(self, subnet_id: int) -> bool: """Check if subscribed to a sync subnet.""" - if not 0 <= subnet_id < 4: + if not 0 <= subnet_id < self.LENGTH: raise ValueError(f"Sync subnet ID must be 0-3, got {subnet_id}") return bool(self.data[subnet_id]) + + def subscribed_subnets(self) -> list[int]: + """List of subscribed sync subnet IDs.""" + return [i for i in range(self.LENGTH) if self.data[i]] + + def subscription_count(self) -> int: + """Number of subscribed sync subnets.""" + return sum(1 for b in self.data if b) diff --git a/src/lean_spec/subspecs/networking/types.py b/src/lean_spec/subspecs/networking/types.py index 98c59334..373919aa 100644 --- a/src/lean_spec/subspecs/networking/types.py +++ b/src/lean_spec/subspecs/networking/types.py @@ -20,6 +20,9 @@ ForkDigest = Bytes4 """4-byte fork identifier ensuring network isolation between forks.""" +Version = Bytes4 +"""4-byte fork version number (e.g., 0x01000000 for Phase0).""" + SeqNumber = Uint64 """Sequence number used in ENR records, metadata, and ping messages.""" diff --git a/tests/lean_spec/subspecs/networking/enr/test_eth2.py b/tests/lean_spec/subspecs/networking/enr/test_eth2.py index 8f200fc1..e710f9cd 100644 --- a/tests/lean_spec/subspecs/networking/enr/test_eth2.py +++ b/tests/lean_spec/subspecs/networking/enr/test_eth2.py @@ -4,7 +4,11 @@ from pydantic import ValidationError from lean_spec.subspecs.networking.enr import Eth2Data -from lean_spec.subspecs.networking.enr.eth2 import AttestationSubnets, SyncCommitteeSubnets +from lean_spec.subspecs.networking.enr.eth2 import ( + FAR_FUTURE_EPOCH, + AttestationSubnets, + SyncCommitteeSubnets, +) from lean_spec.types import Uint64 from lean_spec.types.byte_arrays import Bytes4 @@ -25,11 +29,12 @@ def test_create_eth2_data(self) -> None: def test_no_scheduled_fork_factory(self) -> None: """no_scheduled_fork factory creates correct data.""" digest = Bytes4(b"\xab\xcd\xef\x01") - data = Eth2Data.no_scheduled_fork(digest) + version = Bytes4(b"\x01\x00\x00\x00") + data = Eth2Data.no_scheduled_fork(digest, version) assert data.fork_digest == digest - assert data.next_fork_version == digest - assert data.next_fork_epoch == Uint64(2**64 - 1) + assert data.next_fork_version == version + assert data.next_fork_epoch == FAR_FUTURE_EPOCH def test_eth2_data_immutable(self) -> None: """Eth2Data is immutable (frozen).""" @@ -41,6 +46,10 @@ def test_eth2_data_immutable(self) -> None: with pytest.raises(ValidationError): data.fork_digest = Bytes4(b"\x00\x00\x00\x00") + def test_far_future_epoch_value(self) -> None: + """FAR_FUTURE_EPOCH is max uint64.""" + assert FAR_FUTURE_EPOCH == Uint64(2**64 - 1) + class TestAttestationSubnets: """Tests for AttestationSubnets bitvector.""" @@ -77,22 +86,55 @@ def test_subscribed_subnets_list(self) -> None: def test_invalid_subnet_id_in_from_subnet_ids(self) -> None: """from_subnet_ids() raises for invalid subnet IDs.""" - with pytest.raises(ValueError): + with pytest.raises(ValueError, match="must be 0-63"): AttestationSubnets.from_subnet_ids([64]) - with pytest.raises(ValueError): + with pytest.raises(ValueError, match="must be 0-63"): AttestationSubnets.from_subnet_ids([-1]) def test_invalid_subnet_id_in_is_subscribed(self) -> None: """is_subscribed() raises for invalid subnet IDs.""" subnets = AttestationSubnets.none() - with pytest.raises(ValueError): + with pytest.raises(ValueError, match="must be 0-63"): subnets.is_subscribed(64) - with pytest.raises(ValueError): + with pytest.raises(ValueError, match="must be 0-63"): subnets.is_subscribed(-1) + def test_from_subnet_ids_empty_list(self) -> None: + """from_subnet_ids with empty list creates no subscriptions.""" + subnets = AttestationSubnets.from_subnet_ids([]) + assert subnets.subscription_count() == 0 + assert subnets.subscribed_subnets() == [] + + def test_from_subnet_ids_with_duplicates(self) -> None: + """from_subnet_ids handles duplicates correctly.""" + subnets = AttestationSubnets.from_subnet_ids([5, 5, 5, 10]) + assert subnets.subscription_count() == 2 + assert subnets.subscribed_subnets() == [5, 10] + + def test_encode_bytes_empty(self) -> None: + """Empty subscriptions serialize to 8 zero bytes.""" + subnets = AttestationSubnets.none() + assert subnets.encode_bytes() == b"\x00" * 8 + + def test_encode_bytes_all(self) -> None: + """All subscriptions serialize to 8 0xff bytes.""" + subnets = AttestationSubnets.all() + assert subnets.encode_bytes() == b"\xff" * 8 + + def test_decode_bytes_roundtrip(self) -> None: + """Encode then decode produces equivalent result.""" + original = AttestationSubnets.from_subnet_ids([0, 5, 63]) + encoded = original.encode_bytes() + decoded = AttestationSubnets.decode_bytes(encoded) + assert decoded.subscribed_subnets() == original.subscribed_subnets() + + def test_length_constant(self) -> None: + """LENGTH constant is 64.""" + assert AttestationSubnets.LENGTH == 64 + class TestSyncCommitteeSubnets: """Tests for SyncCommitteeSubnets bitvector.""" @@ -128,3 +170,61 @@ def test_is_subscribed_raises_for_negative_id(self) -> None: subnets = SyncCommitteeSubnets.none() with pytest.raises(ValueError, match="must be 0-3"): subnets.is_subscribed(-1) + + def test_from_subnet_ids_specific(self) -> None: + """from_subnet_ids() creates specific subscriptions.""" + subnets = SyncCommitteeSubnets.from_subnet_ids([0, 2]) + assert subnets.is_subscribed(0) + assert not subnets.is_subscribed(1) + assert subnets.is_subscribed(2) + assert not subnets.is_subscribed(3) + + def test_from_subnet_ids_empty_list(self) -> None: + """from_subnet_ids with empty list creates no subscriptions.""" + subnets = SyncCommitteeSubnets.from_subnet_ids([]) + assert subnets.subscription_count() == 0 + + def test_from_subnet_ids_with_duplicates(self) -> None: + """from_subnet_ids handles duplicates correctly.""" + subnets = SyncCommitteeSubnets.from_subnet_ids([1, 1, 1, 3]) + assert subnets.subscription_count() == 2 + assert subnets.subscribed_subnets() == [1, 3] + + def test_from_subnet_ids_invalid(self) -> None: + """from_subnet_ids() raises for invalid subnet IDs.""" + with pytest.raises(ValueError, match="must be 0-3"): + SyncCommitteeSubnets.from_subnet_ids([4]) + + with pytest.raises(ValueError, match="must be 0-3"): + SyncCommitteeSubnets.from_subnet_ids([-1]) + + def test_subscribed_subnets(self) -> None: + """subscribed_subnets() returns correct list.""" + subnets = SyncCommitteeSubnets.from_subnet_ids([1, 3]) + assert subnets.subscribed_subnets() == [1, 3] + + def test_subscription_count(self) -> None: + """subscription_count() returns correct count.""" + subnets = SyncCommitteeSubnets.from_subnet_ids([0, 2, 3]) + assert subnets.subscription_count() == 3 + + def test_encode_bytes_empty(self) -> None: + """Empty subscriptions serialize to 1 zero byte.""" + subnets = SyncCommitteeSubnets.none() + assert subnets.encode_bytes() == b"\x00" + + def test_encode_bytes_all(self) -> None: + """All subscriptions serialize to 0x0f (lower 4 bits set).""" + subnets = SyncCommitteeSubnets.all() + assert subnets.encode_bytes() == b"\x0f" + + def test_decode_bytes_roundtrip(self) -> None: + """Encode then decode produces equivalent result.""" + original = SyncCommitteeSubnets.from_subnet_ids([0, 2]) + encoded = original.encode_bytes() + decoded = SyncCommitteeSubnets.decode_bytes(encoded) + assert decoded.subscribed_subnets() == original.subscribed_subnets() + + def test_length_constant(self) -> None: + """LENGTH constant is 4.""" + assert SyncCommitteeSubnets.LENGTH == 4 From 4304f8e1f51ac22ed33151daa98bc0f85c185768 Mon Sep 17 00:00:00 2001 From: Thomas Coratger Date: Thu, 29 Jan 2026 18:14:23 +0100 Subject: [PATCH 6/6] fix test file --- .../subspecs/networking/enr/test_eth2.py | 20 ++----------------- 1 file changed, 2 insertions(+), 18 deletions(-) diff --git a/tests/lean_spec/subspecs/networking/enr/test_eth2.py b/tests/lean_spec/subspecs/networking/enr/test_eth2.py index 8cf0e31e..b04737b3 100644 --- a/tests/lean_spec/subspecs/networking/enr/test_eth2.py +++ b/tests/lean_spec/subspecs/networking/enr/test_eth2.py @@ -9,7 +9,6 @@ AttestationSubnets, SyncCommitteeSubnets, ) -from lean_spec.subspecs.networking.enr.eth2 import AttestationSubnets, SyncCommitteeSubnets from lean_spec.types import Uint64 from lean_spec.types.byte_arrays import Bytes4 @@ -36,11 +35,6 @@ def test_no_scheduled_fork_factory(self) -> None: assert data.fork_digest == digest assert data.next_fork_version == version assert data.next_fork_epoch == FAR_FUTURE_EPOCH - data = Eth2Data.no_scheduled_fork(digest) - - assert data.fork_digest == digest - assert data.next_fork_version == digest - assert data.next_fork_epoch == Uint64(2**64 - 1) def test_eth2_data_immutable(self) -> None: """Eth2Data is immutable (frozen).""" @@ -92,10 +86,6 @@ def test_subscribed_subnets_list(self) -> None: def test_invalid_subnet_id_in_from_subnet_ids(self) -> None: """from_subnet_ids() raises for invalid subnet IDs.""" - with pytest.raises(ValueError, match="must be 0-63"): - AttestationSubnets.from_subnet_ids([64]) - - with pytest.raises(ValueError, match="must be 0-63"): with pytest.raises(ValueError): AttestationSubnets.from_subnet_ids([64]) @@ -106,10 +96,10 @@ def test_invalid_subnet_id_in_is_subscribed(self) -> None: """is_subscribed() raises for invalid subnet IDs.""" subnets = AttestationSubnets.none() - with pytest.raises(ValueError, match="must be 0-63"): + with pytest.raises(ValueError): subnets.is_subscribed(64) - with pytest.raises(ValueError, match="must be 0-63"): + with pytest.raises(ValueError): subnets.is_subscribed(-1) def test_from_subnet_ids_empty_list(self) -> None: @@ -145,12 +135,6 @@ def test_length_constant(self) -> None: """LENGTH constant is 64.""" assert AttestationSubnets.LENGTH == 64 - with pytest.raises(ValueError): - subnets.is_subscribed(64) - - with pytest.raises(ValueError): - subnets.is_subscribed(-1) - class TestSyncCommitteeSubnets: """Tests for SyncCommitteeSubnets bitvector."""