Skip to content

Commit

Permalink
#153 - [trader] Optimize main loop by waiting till soonest epoch (#155)
Browse files Browse the repository at this point in the history
* Wait till soonest epoch

* Test functionality and cleanup load agent test

* Increase max tries to 10

* Add docstring
  • Loading branch information
trizin committed Sep 8, 2023
1 parent ffcd1a0 commit e0591e0
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 15 deletions.
31 changes: 25 additions & 6 deletions pdr_backend/trader/test/test_trader_agent.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os
from pathlib import Path
from unittest.mock import Mock, patch

import pytest
Expand Down Expand Up @@ -74,39 +76,49 @@ def test_process_block_at_feed():
trader_config.get_contracts.return_value = {"0x123": predictoor_contract}

agent = TraderAgent(trader_config, custom_trader)
agent.prev_traded_epochs_per_feed.clear()
agent.prev_traded_epochs_per_feed["0x123"] = []

# epoch_s_left = 60 - 55 = 5, so we should not trade
# because it's too close to the epoch end
agent._process_block_at_feed("0x123", 55)
s_till_epoch_end = agent._process_block_at_feed("0x123", 55)
assert len(agent.prev_traded_epochs_per_feed["0x123"]) == 0
assert s_till_epoch_end == 5

# epoch_s_left = 60 + 60 - 80 = 40, so we should not trade
agent._process_block_at_feed("0x123", 80)
s_till_epoch_end = agent._process_block_at_feed("0x123", 80)
assert len(agent.prev_traded_epochs_per_feed["0x123"]) == 1
assert s_till_epoch_end == 40

# but not again, because we've already traded this epoch
agent._process_block_at_feed("0x123", 80)
s_till_epoch_end = agent._process_block_at_feed("0x123", 80)
assert len(agent.prev_traded_epochs_per_feed["0x123"]) == 1
assert s_till_epoch_end == 40

# but we should trade again in the next epoch
predictoor_contract.get_current_epoch.return_value = 2
agent._process_block_at_feed("0x123", 20)
s_till_epoch_end = agent._process_block_at_feed("0x123", 140)
assert len(agent.prev_traded_epochs_per_feed["0x123"]) == 2
assert s_till_epoch_end == 40

# prediction is empty, so no trading
predictoor_contract.get_current_epoch.return_value = 3
predictoor_contract.get_agg_predval.side_effect = Exception(
"An error occurred while getting agg_predval."
)
agent._process_block_at_feed("0x123", 20)
s_till_epoch_end = agent._process_block_at_feed("0x123", 20)
assert len(agent.prev_traded_epochs_per_feed["0x123"]) == 2
assert s_till_epoch_end == 40

# default trader
agent = TraderAgent(trader_config)
agent.prev_traded_epochs_per_feed.clear()
agent.prev_traded_epochs_per_feed["0x123"] = []
predictoor_contract.get_agg_predval.return_value = (1, 3)
predictoor_contract.get_agg_predval.side_effect = None
agent._process_block_at_feed("0x123", 20)
s_till_epoch_end = agent._process_block_at_feed("0x123", 20)
assert len(agent.prev_traded_epochs_per_feed["0x123"]) == 1
assert s_till_epoch_end == 40


def test_save_and_load_cache():
Expand Down Expand Up @@ -139,3 +151,10 @@ def test_save_and_load_cache():
assert agent_new.prev_traded_epochs_per_feed["0x1"] == [3]
assert agent_new.prev_traded_epochs_per_feed["0x2"] == [6]
assert agent_new.prev_traded_epochs_per_feed["0x3"] == [66]
cache_dir_path = (
Path(os.path.dirname(os.path.abspath(__file__))).parent.parent
/ "util/.test_cache"
)
for item in cache_dir_path.iterdir():
item.unlink()
cache_dir_path.rmdir()
34 changes: 25 additions & 9 deletions pdr_backend/trader/trader_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from pdr_backend.util.cache import Cache


MAX_TRIES = 5
MAX_TRIES = 10


# pylint: disable=too-many-instance-attributes
Expand Down Expand Up @@ -81,11 +81,27 @@ def take_step(self):
self.prev_block_number = block_number
self.prev_block_timestamp = block["timestamp"]

s_till_epoch_ends = []

# do work at new block
for addr in self.feeds:
self._process_block_at_feed(addr, block["timestamp"])

def _process_block_at_feed(self, addr: str, timestamp: int, tries: int = 0):
s_till_epoch_ends.append(
self._process_block_at_feed(addr, block["timestamp"])
)

sleep_time = min(s_till_epoch_ends) - 1
print(f"-- Soonest epoch is in {sleep_time} seconds, waiting... --")
time.sleep(sleep_time)

def _process_block_at_feed(self, addr: str, timestamp: int, tries: int = 0) -> int:
"""
@param:
addr - contract address of the feed
timestamp - timestamp/epoch to process
[tries] - number of attempts made in case of an error, 0 by default
@return:
epoch_s_left - number of seconds left till the epoch end
"""
feed, predictoor_contract = self.feeds[addr], self.contracts[addr]

s_per_epoch = feed.seconds_per_epoch
Expand All @@ -105,28 +121,28 @@ def _process_block_at_feed(self, addr: str, timestamp: int, tries: int = 0):
and epoch == self.prev_traded_epochs_per_feed[addr][-1]
):
print(" Done feed: already traded this epoch")
return
return epoch_s_left

if epoch_s_left < self.config.trader_min_buffer:
print(" Done feed: not enough time left in epoch")
return
return epoch_s_left

try:
prediction = predictoor_contract.get_agg_predval((epoch + 1) * s_per_epoch)
except Exception as e:
if tries < MAX_TRIES:
print(" Could not get aggpredval, trying again in a second")
time.sleep(1)
self._process_block_at_feed(addr, timestamp, tries + 1)
return
return self._process_block_at_feed(addr, timestamp, tries + 1)
print(" Done feed: aggpredval not available, an error occured:", e)
return
return epoch_s_left

print(f"Got {prediction}.")

self._get_trader(feed, prediction)
self.prev_traded_epochs_per_feed[addr].append(epoch)
self.save_previous_epochs()
return epoch_s_left


def get_trader(feed: Feed, prediction: Tuple[float, float]):
Expand Down

0 comments on commit e0591e0

Please sign in to comment.