Skip to content

Commit

Permalink
_process_raw_data refactored to create market objects and call market…
Browse files Browse the repository at this point in the history
….closed_market when closed

marketrecorder strategy updated to use process_closed_market and load catalogue data on closure
  • Loading branch information
liampauling committed Aug 3, 2020
1 parent b1b0791 commit ece3e46
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 11 deletions.
36 changes: 27 additions & 9 deletions examples/strategies/marketrecorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,8 @@ def process_raw_data(self, publish_time, data):
json.dumps({"op": "mcm", "clk": None, "pt": publish_time, "mc": [data]})
+ "\n"
)
if (
"marketDefinition" in data
and data["marketDefinition"]["status"] == "CLOSED"
):
self._on_market_closed(data)

def _on_market_closed(self, data: dict) -> None:
def process_closed_market(self, market, data: dict) -> None:
market_id = data.get(self.MARKET_ID_LOOKUP)
if market_id in self._loaded_markets:
if self._force_update:
Expand Down Expand Up @@ -91,7 +86,7 @@ def _on_market_closed(self, data: dict) -> None:
zip_file_dir = self._zip_file(file_dir, market_id)

# core load code
self._load(zip_file_dir, market_definition)
self._load(market, zip_file_dir, market_definition)

# clean up
self._clean_up()
Expand All @@ -110,7 +105,7 @@ def _zip_file(self, file_dir: str, market_id: str) -> str:
)
return zip_file_directory

def _load(self, zip_file_dir: str, market_definition: dict) -> None:
def _load(self, market, zip_file_dir: str, market_definition: dict) -> None:
pass

def _clean_up(self) -> None:
Expand Down Expand Up @@ -154,7 +149,7 @@ def add(self) -> None:
super().add()
self.s3.head_bucket(Bucket=self._bucket) # validate bucket/access

def _load(self, zip_file_dir: str, market_definition: dict) -> None:
def _load(self, market, zip_file_dir: str, market_definition: dict) -> None:
# note this will block the main handler queue during upload
# todo create background worker instead?
event_type_id = (
Expand All @@ -179,3 +174,26 @@ def _load(self, zip_file_dir: str, market_definition: dict) -> None:
logger.info("%s successfully loaded to s3" % zip_file_dir)
except (BotoCoreError, Exception) as e:
logger.error("Error loading to s3: %s" % e)

# upload marketCatalogue data
if self.context.get("load_market_catalogue", True):
if market.market_catalogue is None:
logger.warning(
"No marketCatalogue data available for %s" % market.market_id
)
return
try:
self.s3.put_object(
Body=market.market_catalogue.json(),
Bucket=self._bucket,
Key=os.path.join(
"marketdata",
"marketCatalogue",
"{0}.json".format(market.market_id),
),
)
logger.info(
"%s successfully loaded marketCatalogue to s3" % market.market_id
)
except (BotoCoreError, Exception) as e:
logger.error("Error loading to s3: %s" % e)
18 changes: 18 additions & 0 deletions flumine/baseflumine.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,27 @@ def _remove_market(self, market: Market) -> None:
def _process_raw_data(self, event: events.RawDataEvent) -> None:
stream_id, publish_time, data = event.event
for datum in data:
market_id = datum["id"]
market = self.markets.markets.get(market_id)
if market is None:
market = self._add_market(market_id, None)
elif market.closed:
self.markets.add_market(market_id, market)

if (
"marketDefinition" in datum
and datum["marketDefinition"]["status"] == "CLOSED"
):
market.close_market()
_closed = True
else:
_closed = False

for strategy in self.strategies:
if stream_id in strategy.stream_ids:
strategy.process_raw_data(publish_time, datum)
if _closed:
strategy.process_closed_market(market, datum)

def _process_market_catalogues(self, event: events.MarketCatalogueEvent) -> None:
for market_catalogue in event.event:
Expand Down
18 changes: 16 additions & 2 deletions tests/test_baseflumine.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,24 @@ def test__remove_market(self, _):
mock_markets.remove_market.assert_called_with(mock_market.market_id)
mock_middleware.remove_market.assert_called_with(mock_market)

def test__process_raw_data(self):
@mock.patch("flumine.baseflumine.BaseFlumine._add_market")
def test__process_raw_data(self, mock__add_market):
mock_event = mock.Mock()
mock_event.event = (12, 12345, {})
mock_event.event = (12, 12345, [{"id": "1.23"}])
self.base_flumine._process_raw_data(mock_event)
mock__add_market.assert_called_with("1.23", None)

@mock.patch("flumine.baseflumine.BaseFlumine._add_market")
def test__process_raw_data_closed(self, mock__add_market):
mock_event = mock.Mock()
mock_event.event = (
12,
12345,
[{"id": "1.23", "marketDefinition": {"status": "CLOSED"}}],
)
self.base_flumine._process_raw_data(mock_event)
mock__add_market.assert_called_with("1.23", None)
mock__add_market().close_market.assert_called()

@mock.patch("flumine.baseflumine.events")
@mock.patch("flumine.baseflumine.BaseFlumine.log_control")
Expand Down

0 comments on commit ece3e46

Please sign in to comment.