Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions oracle/oracle/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,6 @@ class GraphqlConsensusError(ConnectionError):
pass


def with_consensus(f):
def wrapper(*args, **kwargs):
try:
return f(*args, **kwargs)
except GraphqlConsensusError as e:
logger.error(f"There is no consensus in GraphQL query: {e}")
return

return wrapper


async def execute_single_gql_query(
subgraph_url: str, query: DocumentNode, variables: Dict
):
Expand Down
4 changes: 2 additions & 2 deletions oracle/oracle/distributor/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from web3 import Web3

from oracle.networks import NETWORKS
from oracle.oracle.clients import with_consensus
from oracle.oracle.utils import save
from oracle.settings import DISTRIBUTOR_VOTE_FILENAME

from ..eth1 import submit_vote
Expand Down Expand Up @@ -43,7 +43,7 @@ def __init__(self, network: str, oracle: LocalAccount) -> None:
"REWARD_TOKEN_CONTRACT_ADDRESS"
]

@with_consensus
@save
async def process(self, voting_params: DistributorVotingParameters) -> None:
"""Submits vote for the new merkle root and merkle proofs to the IPFS."""
from_block = voting_params["from_block"]
Expand Down
7 changes: 1 addition & 6 deletions oracle/oracle/eth1.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,7 @@
from web3.types import BlockNumber, Timestamp, Wei

from oracle.networks import NETWORKS
from oracle.oracle.clients import (
execute_single_gql_query,
execute_sw_gql_query,
with_consensus,
)
from oracle.oracle.clients import execute_single_gql_query, execute_sw_gql_query
from oracle.oracle.graphql_queries import (
FINALIZED_BLOCK_QUERY,
LATEST_BLOCK_QUERY,
Expand Down Expand Up @@ -98,7 +94,6 @@ async def has_synced_block(network: str, block_number: BlockNumber) -> bool:
return block_number <= int(result["_meta"]["block"]["number"])


@with_consensus
async def get_voting_parameters(
network: str, block_number: BlockNumber
) -> VotingParameters:
Expand Down
73 changes: 46 additions & 27 deletions oracle/oracle/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,42 @@

async def main() -> None:
oracle_accounts: Dict[str, LocalAccount] = await get_oracle_accounts()
# aiohttp session
session = aiohttp.ClientSession()
await init_checks(oracle_accounts, session)

# wait for interrupt
interrupt_handler = InterruptHandler()

# fetch ETH2 genesis
controllers = []
for network in ENABLED_NETWORKS:
genesis = await get_genesis(network, session)
oracle = oracle_accounts[network]
rewards_controller = RewardsController(
network=network,
aiohttp_session=session,
genesis_timestamp=int(genesis["genesis_time"]),
oracle=oracle,
)
distributor_controller = DistributorController(network, oracle)
validators_controller = ValidatorsController(network, oracle)
controllers.append(
(
interrupt_handler,
network,
rewards_controller,
distributor_controller,
validators_controller,
)
)

await asyncio.gather(*[process_network(*args) for args in controllers])

await session.close()


async def init_checks(oracle_accounts, session):
# try submitting test vote
for network, oracle in oracle_accounts.items():
logger.info(f"[{network}] Submitting test vote for account {oracle.address}...")
Expand All @@ -69,9 +104,6 @@ async def main() -> None:
]
logger.info(f"[{network}] Connected to graph nodes at {parsed_uris}")

# aiohttp session
session = aiohttp.ClientSession()

# check ETH2 API connection
for network in ENABLED_NETWORKS:
network_config = NETWORKS[network]
Expand All @@ -82,28 +114,16 @@ async def main() -> None:
)
logger.info(f"[{network}] Connected to ETH2 node at {parsed_uri}")

# wait for interrupt
interrupt_handler = InterruptHandler()

# fetch ETH2 genesis
controllers = []
for network in ENABLED_NETWORKS:
genesis = await get_genesis(network, session)
oracle = oracle_accounts[network]
rewards_controller = RewardsController(
network=network,
aiohttp_session=session,
genesis_timestamp=int(genesis["genesis_time"]),
oracle=oracle,
)
distributor_controller = DistributorController(network, oracle)
validators_controller = ValidatorsController(network, oracle)
controllers.append(
(network, rewards_controller, distributor_controller, validators_controller)
)

async def process_network(
interrupt_handler: InterruptHandler,
network: str,
rewards_ctrl: RewardsController,
distributor_ctrl: DistributorController,
validators_ctrl: ValidatorsController,
) -> None:
while not interrupt_handler.exit:
for (network, rewards_ctrl, distributor_ctrl, validators_ctrl) in controllers:
try:
# fetch current finalized ETH1 block data
finalized_block = await get_finalized_block(network)
current_block_number = finalized_block["block_number"]
Expand All @@ -119,7 +139,7 @@ async def main() -> None:
)
# there is no consensus
if not voting_parameters:
continue
return

await asyncio.gather(
# check and update staking rewards
Expand All @@ -136,12 +156,11 @@ async def main() -> None:
block_number=latest_block_number,
),
)
except BaseException as e:
logger.exception(e)

# wait until next processing time
await asyncio.sleep(ORACLE_PROCESS_INTERVAL)

await session.close()


if __name__ == "__main__":
if ENABLE_HEALTH_SERVER:
Expand Down
4 changes: 2 additions & 2 deletions oracle/oracle/rewards/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
from web3.types import Timestamp, Wei

from oracle.networks import GNOSIS_CHAIN, NETWORKS
from oracle.oracle.clients import with_consensus
from oracle.oracle.eth1 import submit_vote
from oracle.oracle.rewards.types import RewardsVotingParameters, RewardVote
from oracle.oracle.utils import save
from oracle.settings import MGNO_RATE, REWARD_VOTE_FILENAME, WAD

from .eth1 import get_registered_validators_public_keys
Expand Down Expand Up @@ -50,7 +50,7 @@ def __init__(
self.deposit_token_symbol = NETWORKS[network]["DEPOSIT_TOKEN_SYMBOL"]
self.last_vote_total_rewards = None

@with_consensus
@save
async def process(
self,
voting_params: RewardsVotingParameters,
Expand Down
27 changes: 27 additions & 0 deletions oracle/oracle/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import asyncio
import logging
from functools import wraps

logger = logging.getLogger(__name__)


def save(func):
if asyncio.iscoroutinefunction(func):

@wraps(func)
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except BaseException as e:
logger.exception(e)

else:

@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except BaseException as e:
logger.exception(e)

return wrapper
4 changes: 2 additions & 2 deletions oracle/oracle/validators/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from web3.types import Wei

from oracle.networks import GNOSIS_CHAIN, NETWORKS
from oracle.oracle.clients import with_consensus
from oracle.oracle.eth1 import submit_vote
from oracle.oracle.utils import save
from oracle.settings import MGNO_RATE, VALIDATOR_VOTE_FILENAME, WAD

from .eth1 import get_validators_deposit_root, select_validator
Expand All @@ -30,7 +30,7 @@ def __init__(self, network: str, oracle: LocalAccount) -> None:
self.validators_batch_size = NETWORKS[self.network]["VALIDATORS_BATCH_SIZE"]
self.last_validators_deposit_data = []

@with_consensus
@save
async def process(
self,
voting_params: ValidatorVotingParameters,
Expand Down