diff --git a/README.md b/README.md index b03ca7d..886a5db 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,7 @@ python3 pilot_vault_events.py python3 daily_states_v2.py python3 daily_points_v2.py python3 aggregate_daily_points.py +python3 test/main_test.py ``` ## find_deployment_blocks.py diff --git a/daily_points_v2.py b/daily_points_v2.py index e63b684..0578f71 100644 --- a/daily_points_v2.py +++ b/daily_points_v2.py @@ -21,6 +21,7 @@ POINTS_PER_PILOT_VAULT_TOKEN_FOR_NFT = (142 * 1000) // 100 # 1.42 lp_balances_snapshot = {} +lp_balances_snapshot_start_block = 0 type Points = int @@ -110,7 +111,8 @@ def get_points(day_index) -> Dict[str, Points]: events = block_number_to_events[block_number] for event in events: user_state = process_event_above_user_state(event, user_state) - points = give_points_for_user_state(user_state, points) + if block_number > lp_balances_snapshot_start_block: + points = give_points_for_user_state(user_state, points) validate_end_state(day_index, user_state) return points @@ -139,5 +141,7 @@ def process_points(): ) if __name__ == "__main__": - lp_balances_snapshot = get_user_state("data/lp_balances_snapshot.json", "start_state") + lp_balances_snapshot_data_dir = "data/lp_balances_snapshot.json" + lp_balances_snapshot = get_user_state(lp_balances_snapshot_data_dir, "start_state") + lp_balances_snapshot_start_block = json.load(open(lp_balances_snapshot_data_dir, 'r'))["start_block"] process_points() diff --git a/find_daily_blocks.py b/find_daily_blocks.py index d487c50..dcc9613 100644 --- a/find_daily_blocks.py +++ b/find_daily_blocks.py @@ -1,10 +1,8 @@ #!/usr/bin/env python3 import json -import argparse from datetime import datetime, timezone -from web3 import Web3 import os -from utils.get_rpc import get_rpc +from utils.aggregated_w3_request import w3_instances, make_aggregated_call def get_min_deployment_block(): @@ -29,11 +27,11 @@ def get_min_deployment_block(): return min_block -def get_block(w3, num, cache): +def get_block(num, cache): """Fetch block with simple cache.""" if num in cache: return cache[num] - blk = w3.eth.get_block(num) + blk = make_aggregated_call(w3_instances, lambda w3: w3.eth.get_block(num)) cache[num] = blk return blk @@ -43,7 +41,7 @@ def get_block_date(block): return datetime.fromtimestamp(block["timestamp"], tz=timezone.utc).date() -def find_first_block_strictly_after_day(w3, start_block, latest_block, target_day): +def find_first_block_strictly_after_day(start_block, latest_block, target_day): """ Binary search for the smallest block number in [start_block, latest_block] whose UTC date is strictly greater than target_day. @@ -55,7 +53,7 @@ def find_first_block_strictly_after_day(w3, start_block, latest_block, target_da while lo < hi: mid = (lo + hi) // 2 - blk = get_block(w3, mid, cache) + blk = get_block(mid, cache) blk_day = get_block_date(blk) if blk_day <= target_day: @@ -70,33 +68,25 @@ def find_first_block_strictly_after_day(w3, start_block, latest_block, target_da return None # sanity check - blk = get_block(w3, lo, cache) + blk = get_block(lo, cache) if get_block_date(blk) > target_day: return lo return None def main(): - w3 = Web3(Web3.HTTPProvider(get_rpc())) - - is_connected = ( - w3.is_connected() if hasattr(w3, "is_connected") else w3.isConnected() - ) - if not is_connected: - raise RuntimeError("Cannot connect to RPC") - - latest_block = w3.eth.block_number + latest_block = make_aggregated_call(w3_instances, lambda w3: w3.eth.block_number) start_block = get_min_deployment_block() if start_block > latest_block: raise ValueError(f"start-block {start_block} is greater than latest block {latest_block}") # Get starting block and its day - start_blk = w3.eth.get_block(start_block) + start_blk = make_aggregated_call(w3_instances, lambda w3: w3.eth.get_block(start_block)) start_day = get_block_date(start_blk) # Get latest block and its day - latest_blk = w3.eth.get_block(latest_block) + latest_blk = make_aggregated_call(w3_instances, lambda w3: w3.eth.get_block(latest_block)) latest_day = get_block_date(latest_blk) print(f"Starting from block {start_block}, day = {start_day}") @@ -113,14 +103,14 @@ def main(): # Binary search for first block *after* this day first_after = find_first_block_strictly_after_day( - w3, current_search_start, latest_block, current_day + current_search_start, latest_block, current_day ) if first_after is None: # No next day found yet (we're at the latest day) # Use latest_block as the last block of current day last_block_same_day = latest_block - last_blk = get_block(w3, last_block_same_day, cache) + last_blk = get_block(last_block_same_day, cache) all_boundaries.append({ "day": str(current_day), @@ -139,8 +129,8 @@ def main(): break last_block_same_day = first_after - 1 - last_blk = get_block(w3, last_block_same_day, cache) - first_next_blk = get_block(w3, first_after, cache) + last_blk = get_block(last_block_same_day, cache) + first_next_blk = get_block(first_after, cache) next_day = get_block_date(first_next_blk) all_boundaries.append({ diff --git a/find_deployment_blocks.py b/find_deployment_blocks.py index 59b5618..61b405f 100644 --- a/find_deployment_blocks.py +++ b/find_deployment_blocks.py @@ -1,9 +1,10 @@ -from web3 import Web3 import json import sys import os from datetime import datetime, timezone from utils.get_rpc import get_rpc +from utils.aggregated_w3_request import w3_instances, make_aggregated_call +from web3 import Web3 def load_contract_addresses(): """Load contract addresses from config.json""" @@ -35,22 +36,21 @@ def load_contract_addresses(): sys.exit(1) -def has_contract_code(w3, address, block_number): +def has_contract_code(address, block_number): """Check if contract has code at a specific block""" try: - code = w3.eth.get_code(address, block_number) + code = make_aggregated_call(w3_instances, lambda w3: w3.eth.get_code(address, block_number)) return len(code) > 0 except Exception as e: print(f"Warning: Error checking code at block {block_number}: {e}") return False -def find_deployment_block(w3, address, start_block=0, end_block=None): +def find_deployment_block(address, start_block=0, end_block=None): """ Find the deployment block of a contract using binary search. Args: - w3: Web3 instance address: Contract address start_block: Starting block for search (default: 0) end_block: Ending block for search (default: latest block) @@ -59,12 +59,12 @@ def find_deployment_block(w3, address, start_block=0, end_block=None): Block number where contract was deployed, or None if not found """ if end_block is None: - end_block = w3.eth.block_number + end_block = make_aggregated_call(w3_instances, lambda w3: w3.eth.block_number) print(f" Searching for deployment block between {start_block} and {end_block}...") # First, check if contract exists at the end block - if not has_contract_code(w3, address, end_block): + if not has_contract_code(address, end_block): print(f" Error: Contract has no code at block {end_block}. Contract may not be deployed yet.") return None @@ -76,7 +76,7 @@ def find_deployment_block(w3, address, start_block=0, end_block=None): while left <= right: mid = (left + right) // 2 - if has_contract_code(w3, address, mid): + if has_contract_code(address, mid): # Contract exists at this block, search earlier result = mid right = mid - 1 @@ -87,10 +87,10 @@ def find_deployment_block(w3, address, start_block=0, end_block=None): return result -def get_block_info(w3, block_number): +def get_block_info(block_number): """Get block information including timestamp""" try: - block = w3.eth.get_block(block_number) + block = make_aggregated_call(w3_instances, lambda w3: w3.eth.get_block(block_number)) return { 'block_number': block_number, 'timestamp': block.timestamp, @@ -118,18 +118,8 @@ def main(): # Initialize Web3 connection print("\n2. Connecting to blockchain...") - try: - w3 = Web3(Web3.HTTPProvider(rpc_url)) - if not w3.is_connected(): - print(f"Error: Could not connect to RPC provider at {rpc_url}") - sys.exit(1) - print(" Connected successfully") - except Exception as e: - print(f"Error connecting to RPC provider: {e}") - sys.exit(1) - # Get latest block - latest_block = w3.eth.block_number + latest_block = make_aggregated_call(w3_instances, lambda w3: w3.eth.block_number) print(f" Latest block: {latest_block}") # Find deployment blocks @@ -139,9 +129,9 @@ def main(): # Find NFT contract deployment block print(f"\n NFT Contract ({addresses['nft']}):") - nft_deployment_block = find_deployment_block(w3, addresses['nft'], end_block=latest_block) + nft_deployment_block = find_deployment_block(addresses['nft'], end_block=latest_block) if nft_deployment_block: - nft_block_info = get_block_info(w3, nft_deployment_block) + nft_block_info = get_block_info(nft_deployment_block) results['nft'] = { 'address': addresses['nft'], 'deployment_block': nft_deployment_block, @@ -159,9 +149,9 @@ def main(): # Find Pilot Vault contract deployment block print(f"\n Pilot Vault Contract ({addresses['pilot_vault']}):") - vault_deployment_block = find_deployment_block(w3, addresses['pilot_vault'], end_block=latest_block) + vault_deployment_block = find_deployment_block(addresses['pilot_vault'], end_block=latest_block) if vault_deployment_block: - vault_block_info = get_block_info(w3, vault_deployment_block) + vault_block_info = get_block_info(vault_deployment_block) results['pilot_vault'] = { 'address': addresses['pilot_vault'], 'deployment_block': vault_deployment_block, diff --git a/nft_events.py b/nft_events.py index 68eeb0a..8cd4ffa 100644 --- a/nft_events.py +++ b/nft_events.py @@ -7,7 +7,11 @@ from datetime import datetime import time import sys -from utils.get_rpc import get_rpc +from utils.aggregated_w3_request import ( + create_contract_instances, + w3_instances, + make_aggregated_call, +) # ABI for Transfer event TRANSFER_EVENT_ABI = [ @@ -28,15 +32,15 @@ def get_nft_deployment_block(): """Get NFT block_number from deployment_blocks.json""" with open("data/deployment_blocks.json", "r") as f: deployment_data = json.load(f) - + nft_data = deployment_data.get("deployments", {}).get("nft") if not nft_data: raise ValueError("NFT deployment not found in deployment_blocks.json") - + block_number = nft_data.get("block_number") if block_number is None: raise ValueError("block_number not found for NFT in deployment_blocks.json") - + address = nft_data.get("address") return block_number, address @@ -46,11 +50,11 @@ def get_day_block_files(): days_blocks_dir = "data/days_blocks" if not os.path.exists(days_blocks_dir): raise ValueError(f"Directory {days_blocks_dir} not found") - + # Get all files matching pattern {index}_*.json pattern = os.path.join(days_blocks_dir, "*_*.json") files = glob.glob(pattern) - + # Extract index and sort file_data = [] for filepath in files: @@ -59,57 +63,69 @@ def get_day_block_files(): if match: index = int(match.group(1)) file_data.append((index, filepath)) - + # Sort by index file_data.sort(key=lambda x: x[0]) - + return file_data -def read_events_chunked(contract, start_block, end_block, chunk_size=10000): +def read_events_chunked(contracts, start_block, end_block, chunk_size=10000): """Read events in chunks to avoid RPC limits""" print(f" Fetching events from block {start_block} to {end_block}...") - - event = getattr(contract.events, "Transfer") + all_logs = [] current_block = start_block - + while current_block < end_block: chunk_end = min(current_block + chunk_size - 1, end_block) - + try: print(f" Fetching logs from block {current_block} to {chunk_end}...") - logs = event().get_logs(from_block=current_block, to_block=chunk_end) + logs = make_aggregated_call( + contracts, + lambda contract: contract.events.Transfer().get_logs( + from_block=current_block, to_block=chunk_end + ), + ) all_logs.extend(logs) print(f" Found {len(logs)} events in this chunk") - + # Small delay to avoid rate limiting time.sleep(0.1) - + except Exception as e: - print(f" Error fetching logs from block {current_block} to {chunk_end}: {e}") + print( + f" Error fetching logs from block {current_block} to {chunk_end}: {e}" + ) # Try smaller chunk size if we get an error if chunk_size > 1000: print(f" Retrying with smaller chunk size: {chunk_size // 2}") - return read_events_chunked(contract, start_block, end_block, chunk_size // 2) + return read_events_chunked( + contracts, start_block, end_block, chunk_size // 2 + ) else: - print("Could not fetch events from block {current_block} to {chunk_end}") + print( + "Could not fetch events from block {current_block} to {chunk_end}" + ) sys.exit(1) - + current_block = chunk_end + 1 - + return all_logs -def fetch_and_save_events(w3, contract, contract_address, start_block, end_block, output_file): +def fetch_and_save_events( + contracts, contract_address, start_block, end_block, output_file +): """Fetch transfer events and save to JSON file""" try: - logs = read_events_chunked(contract, start_block, end_block) + logs = read_events_chunked(contracts, start_block, end_block) if logs is None: logs = [] - + print(f" Total Transfer events: {len(logs)}") - + # Prepare data for JSON output events_data = [] for log in logs: @@ -118,10 +134,10 @@ def fetch_and_save_events(w3, contract, contract_address, start_block, end_block "transactionHash": log.transactionHash.hex(), "logIndex": log.logIndex, "args": dict(log.args), - "transactionIndex": log.transactionIndex + "transactionIndex": log.transactionIndex, } events_data.append(event_data) - + # Save to JSON file output_data = { "metadata": { @@ -130,97 +146,93 @@ def fetch_and_save_events(w3, contract, contract_address, start_block, end_block "startBlock": start_block, "endBlock": end_block, "totalEvents": len(events_data), - "exportedAt": datetime.now().isoformat() + "exportedAt": datetime.now().isoformat(), }, - "events": events_data + "events": events_data, } - - with open(output_file, 'w') as f: + + with open(output_file, "w") as f: json.dump(output_data, f, indent=2) - + print(f" Events saved to {output_file}") - + except Exception as e: print(f" Error reading events: {e}") def main(): - # Load RPC URL - # Initialize Web3 connection - print("Connecting to blockchain...") - w3 = Web3(Web3.HTTPProvider(get_rpc())) - is_connected = ( - w3.is_connected() if hasattr(w3, "is_connected") else w3.isConnected() - ) - if not is_connected: - raise RuntimeError("Cannot connect to RPC") - # Get NFT deployment block and address print("Reading deployment blocks...") deployment_block, nft_address = get_nft_deployment_block() print(f"NFT deployment block: {deployment_block}") print(f"NFT contract address: {nft_address}") - + # Get all day block files print("Reading day block files...") day_files = get_day_block_files() print(f"Found {len(day_files)} day block files") - + if not day_files: print("No day block files found. Exiting.") return - + # Setup contract print("Setting up contract...") contract_address = Web3.to_checksum_address(nft_address) - contract = w3.eth.contract(address=contract_address, abi=TRANSFER_EVENT_ABI) - + contracts = create_contract_instances( + w3_instances, contract_address, TRANSFER_EVENT_ABI + ) + # Create output directory output_dir = "data/events/nft" os.makedirs(output_dir, exist_ok=True) - + # Build ranges print("\nBuilding block ranges...") ranges = [] - + # First range: (deployment_block, last_block_of_day from file 0) if day_files: - with open(day_files[0][1], 'r') as f: + with open(day_files[0][1], "r") as f: day_data_0 = json.load(f) last_block_0 = day_data_0["last_block_of_day"]["number"] ranges.append((0, deployment_block, last_block_0)) print(f"Range 0: blocks {deployment_block} to {last_block_0} (inclusive)") - + # Subsequent ranges: (first_block_of_next_day from previous file, last_block_of_day from current file) for i in range(len(day_files) - 1): # Read previous file - with open(day_files[i][1], 'r') as f: + with open(day_files[i][1], "r") as f: prev_day_data = json.load(f) first_block_next = prev_day_data["first_block_of_next_day"]["number"] - + # Read current file - with open(day_files[i + 1][1], 'r') as f: + with open(day_files[i + 1][1], "r") as f: curr_day_data = json.load(f) last_block_curr = curr_day_data["last_block_of_day"]["number"] - + ranges.append((i + 1, first_block_next, last_block_curr)) - print(f"Range {i + 1}: blocks {first_block_next} to {last_block_curr} (inclusive)") - + print( + f"Range {i + 1}: blocks {first_block_next} to {last_block_curr} (inclusive)" + ) + # Fetch events for each range print(f"\nFetching transfer events for {len(ranges)} ranges...") for range_index, start_block, end_block in ranges: output_file = os.path.join(output_dir, f"{range_index}.json") - + # Skip if file already exists if os.path.exists(output_file): print(f"\nSkipping range {range_index}: file {output_file} already exists") continue - + print(f"\nProcessing range {range_index}: blocks {start_block} to {end_block}") - fetch_and_save_events(w3, contract, contract_address, start_block, end_block, output_file) - + fetch_and_save_events( + contracts, contract_address, start_block, end_block, output_file + ) + print(f"\nCompleted! Processed {len(ranges)} ranges.") if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/pilot_vault_events.py b/pilot_vault_events.py index 5308386..fd9930a 100644 --- a/pilot_vault_events.py +++ b/pilot_vault_events.py @@ -8,6 +8,7 @@ import time import sys from utils.get_rpc import get_rpc +from utils.aggregated_w3_request import create_contract_instances, w3_instances, make_aggregated_call # ABI for Transfer event TRANSFER_EVENT_ABI = [ @@ -66,11 +67,10 @@ def get_day_block_files(): return file_data -def read_events_chunked(contract, start_block, end_block, chunk_size=10000): +def read_events_chunked(contracts, start_block, end_block, chunk_size=10000): """Read events in chunks to avoid RPC limits""" print(f" Fetching events from block {start_block} to {end_block}...") - event = getattr(contract.events, "Transfer") all_logs = [] current_block = start_block @@ -79,7 +79,7 @@ def read_events_chunked(contract, start_block, end_block, chunk_size=10000): try: print(f" Fetching logs from block {current_block} to {chunk_end}...") - logs = event().get_logs(from_block=current_block, to_block=chunk_end) + logs = make_aggregated_call(contracts, lambda contract: contract.events.Transfer().get_logs(from_block=current_block, to_block=chunk_end)) all_logs.extend(logs) print(f" Found {len(logs)} events in this chunk") @@ -91,7 +91,7 @@ def read_events_chunked(contract, start_block, end_block, chunk_size=10000): # Try smaller chunk size if we get an error if chunk_size > 1000: print(f" Retrying with smaller chunk size: {chunk_size // 2}") - return read_events_chunked(contract, start_block, end_block, chunk_size // 2) + return read_events_chunked(contracts, start_block, end_block, chunk_size // 2) else: print("Could not fetch events from block {current_block} to {chunk_end}") sys.exit(1) @@ -101,7 +101,7 @@ def read_events_chunked(contract, start_block, end_block, chunk_size=10000): return all_logs -def fetch_and_save_events(w3, contract, contract_address, start_block, end_block, output_file): +def fetch_and_save_events(contracts, contract_address, start_block, end_block, output_file): """Fetch transfer events and save to JSON file""" # Validate block range if start_block > end_block: @@ -125,7 +125,7 @@ def fetch_and_save_events(w3, contract, contract_address, start_block, end_block return try: - logs = read_events_chunked(contract, start_block, end_block) + logs = read_events_chunked(contracts, start_block, end_block) if logs is None: logs = [] @@ -168,15 +168,6 @@ def fetch_and_save_events(w3, contract, contract_address, start_block, end_block def main(): - # Initialize Web3 connection - print("Connecting to blockchain...") - w3 = Web3(Web3.HTTPProvider(get_rpc())) - is_connected = ( - w3.is_connected() if hasattr(w3, "is_connected") else w3.isConnected() - ) - if not is_connected: - raise RuntimeError("Cannot connect to RPC") - # Get pilot_vault deployment block and address print("Reading deployment blocks...") deployment_block, pilot_vault_address = get_pilot_vault_deployment_block() @@ -195,7 +186,7 @@ def main(): # Setup contract print("Setting up contract...") contract_address = Web3.to_checksum_address(pilot_vault_address) - contract = w3.eth.contract(address=contract_address, abi=TRANSFER_EVENT_ABI) + contracts = create_contract_instances(w3_instances, contract_address, TRANSFER_EVENT_ABI) # Create output directory output_dir = "data/events/pilot_vault" @@ -239,7 +230,7 @@ def main(): continue print(f"\nProcessing range {range_index}: blocks {start_block} to {end_block}") - fetch_and_save_events(w3, contract, contract_address, start_block, end_block, output_file) + fetch_and_save_events(contracts, contract_address, start_block, end_block, output_file) print(f"\nCompleted! Processed {len(ranges)} ranges.") diff --git a/test/test_nft_balance.py b/test/test_nft_balance.py index ced96b1..bae6cda 100644 --- a/test/test_nft_balance.py +++ b/test/test_nft_balance.py @@ -70,8 +70,8 @@ def get_onchain_data(w3, state_data, users_data, addresses, block_key): def validate_nft_balance_and_ownership( users_data, onchain_data, state_data, w3, addresses ): - users_data_items = sorted(list(users_data.items())) - onchain_data_items = sorted(list(onchain_data.items())) + users_data_items = sorted(list(users_data.items()), key=lambda x: x[0].lower()) + onchain_data_items = sorted(list(onchain_data.items()), key=lambda x: x[0].lower()) assert len(users_data_items) == len( onchain_data_items ), "Users data and onchain data length mismatch" diff --git a/test/test_transfer_events_structure.py b/test/test_transfer_events_structure.py index 67edb32..7a3d909 100644 --- a/test/test_transfer_events_structure.py +++ b/test/test_transfer_events_structure.py @@ -71,8 +71,8 @@ def _check_transfer_balances_if_needed( ).call(block_identifier=prev_event_block) assert ( user_balances[user] == balance - ), f"User {user} balance mismatch: {user_balances[user]} != {total_supply}" + ), f"User {user} balance mismatch: {user_balances[user]} != {total_supply} at block {prev_event_block}" assert total_supply == sum( user_balances.values() - ), f"Total supply mismatch: {total_supply} != {sum(user_balances.values())}" + ), f"Total supply mismatch: {total_supply} != {sum(user_balances.values())} at block {prev_event_block}" affected_users.clear() diff --git a/test/test_user_balance.py b/test/test_user_balance.py index 9e212b0..5717740 100644 --- a/test/test_user_balance.py +++ b/test/test_user_balance.py @@ -5,6 +5,7 @@ from web3 import Web3 from utils.compare_state_and_onchain_data import compare_state_and_onchain_data + use_state_file = ["../lp_balances_snapshot.json", "84.json"] use_state_file_index = 0 diff --git a/utils/aggregated_w3_request.py b/utils/aggregated_w3_request.py new file mode 100644 index 0000000..ba9e985 --- /dev/null +++ b/utils/aggregated_w3_request.py @@ -0,0 +1,95 @@ +from web3 import Web3 +from collections import defaultdict +from typing import Optional +import threading + +w3_instances = [ + Web3(Web3.HTTPProvider("https://mainnet.gateway.tenderly.co")), + Web3(Web3.HTTPProvider("https://ethereum-rpc.publicnode.com")), + Web3(Web3.HTTPProvider("https://eth.drpc.org")), +] + +class RequestResult: + def __init__(self, result, error): + self.result = result + self.error: Optional[Exception] = error + + def __eq__(self, other): + if not isinstance(other, RequestResult): + return False + # Deep comparison for dicts and lists, normal for primitives + if type(self.result) != type(other.result): + return False + # Compare content, not just address, via recursive structural equality for dicts/lists + def deep_equal(a, b): + if isinstance(a, dict) and isinstance(b, dict): + if set(a.keys()) != set(b.keys()): + return False + for k in a: + if not deep_equal(a[k], b[k]): + return False + return True + elif isinstance(a, list) and isinstance(b, list): + if len(a) != len(b): + return False + for x, y in zip(a, b): + if not deep_equal(x, y): + return False + return True + else: + return a == b + + if not deep_equal(self.result, other.result): + return False + return self.error == other.error + + def __hash__(self): + def deep_hash(obj): + if isinstance(obj, dict): + # Hash based only on its content, independent of order + return hash(frozenset((k, deep_hash(v)) for k, v in obj.items())) + elif isinstance(obj, list): + # Hash based on the hashes of its items + return hash(tuple(deep_hash(x) for x in obj)) + else: + return hash(obj) + result_hash = deep_hash(self.result) + error_hash = hash(self.error) + return hash((result_hash, error_hash)) + +def create_contract_instances(w3_instances, address, abi): + contract_instances = [] + address = Web3.to_checksum_address(address) + for w3_instance in w3_instances: + contract_instances.append(w3_instance.eth.contract(address=address, abi=abi)) + return contract_instances + +def return_result_or_raise(result_to_amount: dict[RequestResult, int]): + results_length = len(result_to_amount) + acceptable_amount = results_length // 2 + results_length % 2 + for result, amount in result_to_amount.items(): + if amount >= acceptable_amount: + if result.error is not None: + raise result.error + return result.result + + raise ValueError(f"No result found, results: {result_to_amount}") + +def make_call(i, results, instance, function): + try: + result = function(instance) + results[i] = RequestResult(result, None) + except Exception as e: + results[i] = RequestResult(None, e) + +def make_aggregated_call(instances, function): + results = [None] * len(instances) + results_amount = defaultdict(lambda: 0) + + threads = [threading.Thread(target=make_call, args=(i, results, instance, function)) for i, instance in enumerate(instances)] + for thread in threads: + thread.start() + for i, thread in enumerate(threads): + thread.join() + results_amount[results[i]] += 1 + return return_result_or_raise(results_amount) diff --git a/utils/get_rpc.py b/utils/get_rpc.py index a97d003..91ebc87 100644 --- a/utils/get_rpc.py +++ b/utils/get_rpc.py @@ -1,2 +1,2 @@ def get_rpc(): - return "https://ethereum-rpc.publicnode.com" \ No newline at end of file + return "https://mainnet.gateway.tenderly.co"