diff --git a/oracle/oracle/clients.py b/oracle/oracle/clients.py index cfae181..0825685 100644 --- a/oracle/oracle/clients.py +++ b/oracle/oracle/clients.py @@ -30,17 +30,6 @@ class GraphqlConsensusError(ConnectionError): pass -def with_consensus(f): - def wrapper(*args, **kwargs): - try: - return f(*args, **kwargs) - except GraphqlConsensusError as e: - logger.error(f"There is no consensus in GraphQL query: {e}") - return - - return wrapper - - async def execute_single_gql_query( subgraph_url: str, query: DocumentNode, variables: Dict ): diff --git a/oracle/oracle/distributor/controller.py b/oracle/oracle/distributor/controller.py index c3a5eb6..0ed15a3 100644 --- a/oracle/oracle/distributor/controller.py +++ b/oracle/oracle/distributor/controller.py @@ -6,7 +6,7 @@ from web3 import Web3 from oracle.networks import NETWORKS -from oracle.oracle.clients import with_consensus +from oracle.oracle.utils import save from oracle.settings import DISTRIBUTOR_VOTE_FILENAME from ..eth1 import submit_vote @@ -43,7 +43,7 @@ def __init__(self, network: str, oracle: LocalAccount) -> None: "REWARD_TOKEN_CONTRACT_ADDRESS" ] - @with_consensus + @save async def process(self, voting_params: DistributorVotingParameters) -> None: """Submits vote for the new merkle root and merkle proofs to the IPFS.""" from_block = voting_params["from_block"] diff --git a/oracle/oracle/eth1.py b/oracle/oracle/eth1.py index 220e957..8e1b44b 100644 --- a/oracle/oracle/eth1.py +++ b/oracle/oracle/eth1.py @@ -11,11 +11,7 @@ from web3.types import BlockNumber, Timestamp, Wei from oracle.networks import NETWORKS -from oracle.oracle.clients import ( - execute_single_gql_query, - execute_sw_gql_query, - with_consensus, -) +from oracle.oracle.clients import execute_single_gql_query, execute_sw_gql_query from oracle.oracle.graphql_queries import ( FINALIZED_BLOCK_QUERY, LATEST_BLOCK_QUERY, @@ -98,7 +94,6 @@ async def has_synced_block(network: str, block_number: BlockNumber) -> bool: return block_number <= int(result["_meta"]["block"]["number"]) -@with_consensus async def get_voting_parameters( network: str, block_number: BlockNumber ) -> VotingParameters: diff --git a/oracle/oracle/main.py b/oracle/oracle/main.py index a7f2e53..3da26f2 100644 --- a/oracle/oracle/main.py +++ b/oracle/oracle/main.py @@ -45,7 +45,42 @@ async def main() -> None: oracle_accounts: Dict[str, LocalAccount] = await get_oracle_accounts() + # aiohttp session + session = aiohttp.ClientSession() + await init_checks(oracle_accounts, session) + + # wait for interrupt + interrupt_handler = InterruptHandler() + + # fetch ETH2 genesis + controllers = [] + for network in ENABLED_NETWORKS: + genesis = await get_genesis(network, session) + oracle = oracle_accounts[network] + rewards_controller = RewardsController( + network=network, + aiohttp_session=session, + genesis_timestamp=int(genesis["genesis_time"]), + oracle=oracle, + ) + distributor_controller = DistributorController(network, oracle) + validators_controller = ValidatorsController(network, oracle) + controllers.append( + ( + interrupt_handler, + network, + rewards_controller, + distributor_controller, + validators_controller, + ) + ) + + await asyncio.gather(*[process_network(*args) for args in controllers]) + + await session.close() + +async def init_checks(oracle_accounts, session): # try submitting test vote for network, oracle in oracle_accounts.items(): logger.info(f"[{network}] Submitting test vote for account {oracle.address}...") @@ -69,9 +104,6 @@ async def main() -> None: ] logger.info(f"[{network}] Connected to graph nodes at {parsed_uris}") - # aiohttp session - session = aiohttp.ClientSession() - # check ETH2 API connection for network in ENABLED_NETWORKS: network_config = NETWORKS[network] @@ -82,28 +114,16 @@ async def main() -> None: ) logger.info(f"[{network}] Connected to ETH2 node at {parsed_uri}") - # wait for interrupt - interrupt_handler = InterruptHandler() - - # fetch ETH2 genesis - controllers = [] - for network in ENABLED_NETWORKS: - genesis = await get_genesis(network, session) - oracle = oracle_accounts[network] - rewards_controller = RewardsController( - network=network, - aiohttp_session=session, - genesis_timestamp=int(genesis["genesis_time"]), - oracle=oracle, - ) - distributor_controller = DistributorController(network, oracle) - validators_controller = ValidatorsController(network, oracle) - controllers.append( - (network, rewards_controller, distributor_controller, validators_controller) - ) +async def process_network( + interrupt_handler: InterruptHandler, + network: str, + rewards_ctrl: RewardsController, + distributor_ctrl: DistributorController, + validators_ctrl: ValidatorsController, +) -> None: while not interrupt_handler.exit: - for (network, rewards_ctrl, distributor_ctrl, validators_ctrl) in controllers: + try: # fetch current finalized ETH1 block data finalized_block = await get_finalized_block(network) current_block_number = finalized_block["block_number"] @@ -119,7 +139,7 @@ async def main() -> None: ) # there is no consensus if not voting_parameters: - continue + return await asyncio.gather( # check and update staking rewards @@ -136,12 +156,11 @@ async def main() -> None: block_number=latest_block_number, ), ) + except BaseException as e: + logger.exception(e) - # wait until next processing time await asyncio.sleep(ORACLE_PROCESS_INTERVAL) - await session.close() - if __name__ == "__main__": if ENABLE_HEALTH_SERVER: diff --git a/oracle/oracle/rewards/controller.py b/oracle/oracle/rewards/controller.py index 1be6adb..b528f5b 100644 --- a/oracle/oracle/rewards/controller.py +++ b/oracle/oracle/rewards/controller.py @@ -10,9 +10,9 @@ from web3.types import Timestamp, Wei from oracle.networks import GNOSIS_CHAIN, NETWORKS -from oracle.oracle.clients import with_consensus from oracle.oracle.eth1 import submit_vote from oracle.oracle.rewards.types import RewardsVotingParameters, RewardVote +from oracle.oracle.utils import save from oracle.settings import MGNO_RATE, REWARD_VOTE_FILENAME, WAD from .eth1 import get_registered_validators_public_keys @@ -50,7 +50,7 @@ def __init__( self.deposit_token_symbol = NETWORKS[network]["DEPOSIT_TOKEN_SYMBOL"] self.last_vote_total_rewards = None - @with_consensus + @save async def process( self, voting_params: RewardsVotingParameters, diff --git a/oracle/oracle/utils.py b/oracle/oracle/utils.py new file mode 100644 index 0000000..09bc036 --- /dev/null +++ b/oracle/oracle/utils.py @@ -0,0 +1,27 @@ +import asyncio +import logging +from functools import wraps + +logger = logging.getLogger(__name__) + + +def save(func): + if asyncio.iscoroutinefunction(func): + + @wraps(func) + async def wrapper(*args, **kwargs): + try: + return await func(*args, **kwargs) + except BaseException as e: + logger.exception(e) + + else: + + @wraps(func) + def wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except BaseException as e: + logger.exception(e) + + return wrapper diff --git a/oracle/oracle/validators/controller.py b/oracle/oracle/validators/controller.py index df921d3..7f966e8 100644 --- a/oracle/oracle/validators/controller.py +++ b/oracle/oracle/validators/controller.py @@ -7,8 +7,8 @@ from web3.types import Wei from oracle.networks import GNOSIS_CHAIN, NETWORKS -from oracle.oracle.clients import with_consensus from oracle.oracle.eth1 import submit_vote +from oracle.oracle.utils import save from oracle.settings import MGNO_RATE, VALIDATOR_VOTE_FILENAME, WAD from .eth1 import get_validators_deposit_root, select_validator @@ -30,7 +30,7 @@ def __init__(self, network: str, oracle: LocalAccount) -> None: self.validators_batch_size = NETWORKS[self.network]["VALIDATORS_BATCH_SIZE"] self.last_validators_deposit_data = [] - @with_consensus + @save async def process( self, voting_params: ValidatorVotingParameters,