Skip to content

Commit

Permalink
#60 Run a single Coinbase Task for all pairs
Browse files Browse the repository at this point in the history
  • Loading branch information
pantunes committed Feb 25, 2024
1 parent 88c92b2 commit fa67906
Showing 1 changed file with 6 additions and 22 deletions.
28 changes: 6 additions & 22 deletions exchange_radar/producer/tasks/coinbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@


class CoinbaseTradesTask(Task):
async_client = None
async def task(self, symbols: tuple[str, ...]):
await asyncio.gather(self.process(symbols))

def process(self, symbol_or_symbols: str | tuple):
async def process(self, symbol_or_symbols: str | tuple):
class CustomClient(Client):
def on_message(self, message):
match message:
Expand All @@ -29,24 +30,7 @@ def on_message(self, message):
except Exception as error:
logger.error(f"ERROR: {error}")

CustomClient(self.loop, Channel("matches", list(symbol_or_symbols)))

while True:
try:
self.async_client = CustomClient(self.loop, Channel("matches", list(symbol_or_symbols)))
break
except Exception as error2:
logger.error(f"GENERAL ERROR: {error2}")
logger.error(f"Trying again in {ITER_SLEEP} seconds...")
asyncio.sleep(ITER_SLEEP)

def start(self, symbols: tuple[str, ...]):
logger.info("Starting Task...")
try:
self.process(symbol_or_symbols=symbols)
self.loop.run_forever()
except KeyboardInterrupt:
if self.async_client is not None:
self.loop.run_until_complete(self.async_client.close())
self.loop.close()
logger.info("Task was interrupted...")
finally:
pass
await asyncio.sleep(ITER_SLEEP)

0 comments on commit fa67906

Please sign in to comment.