From d088d9f67a0247272fa4f39cf18eed6d0808568c Mon Sep 17 00:00:00 2001 From: Valery Litvin Date: Wed, 12 Jul 2017 23:15:45 +0200 Subject: [PATCH] Added basic support of continuous sync --- ethdrain.py | 75 ++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 63 insertions(+), 12 deletions(-) diff --git a/ethdrain.py b/ethdrain.py index 1757385..a3e965b 100755 --- a/ethdrain.py +++ b/ethdrain.py @@ -18,9 +18,10 @@ class Ethdrain: data_store_classes = () eth_url = "http://localhost:8545" - sem_size = 256 + sem_size = 128 - def __init__(self, block_range): + def __init__(self, block_range, block): + self.block = block self.block_range = block_range self.data_stores = list() @@ -29,20 +30,30 @@ def load_datastore_classes(cls, *data_store_classes): cls.data_store_classes = data_store_classes @classmethod - def launch(cls, block_range): + def launch_history_sync(cls, block_range): """ This class method will instanciate Ethdrain classes (one per process) and then instanciate and attach every datastore available to each on them """ - inst = cls(block_range) + inst = cls(block_range, 0) for data_class in cls.data_store_classes: inst.data_stores.append(data_class()) - inst.setup_process() + inst.setup_process_history_sync() + @classmethod + def launch_continuous_sync(cls, block): + """ + This class method will instanciate Ethdrain classes (one per process) + and then instanciate and attach every datastore available to each on them + """ + inst = cls([], block) + for data_class in cls.data_store_classes: + inst.data_stores.append(data_class()) + inst.setup_process_continuous_sync() - def setup_process(self): + def setup_process_history_sync(self): loop = asyncio.get_event_loop() - future = asyncio.ensure_future(self.run(self.block_range)) + future = asyncio.ensure_future(self.run_history_sync(self.block_range)) loop.run_until_complete(future) # Now that everything has been "extracted", perform the "save" action @@ -50,6 +61,11 @@ def setup_process(self): msg = data_store.save() print("{}: {}".format(data_store.__class__.__name__, msg)) + def setup_process_continuous_sync(self): + # print("setup_process_continuous_sync") + loop = asyncio.get_event_loop() + asyncio.ensure_future(self.indexNext()) + loop.run_forever() async def fetch(self, session, block_nb): try: @@ -63,13 +79,12 @@ async def fetch(self, session, block_nb): logging.error("block: " + str(block_nb)) print("Issue with block {}:\n{}\n".format(block_nb, exception)) - async def sema_fetch(self, sem, session, block_nb): async with sem: await self.fetch(session, block_nb) - async def run(self, block_range): + async def run_history_sync(self, block_range): tasks = [] sem = asyncio.Semaphore(self.__class__.sem_size) @@ -83,6 +98,38 @@ async def run(self, block_range): await asyncio.gather(*tasks) + async def fetch2(self, session, block_nb): + # print("[fetch2] Fethcing block number #{}".format(block_nb)) + try: + async with session.post(self.__class__.eth_url, + data=Ethdrain.make_request(block_nb), + headers={"content-type": "application/json"}) as response: + return await response.json() + + except (aiohttp.ClientError, asyncio.TimeoutError) as exception: + logging.error("block: " + str(block_nb)) + print("Issue with block {}:\n{}\n".format(block_nb, exception)) + + async def indexNext(self): + try: + async with aiohttp.ClientSession() as session: + response = await self.fetch2(session, self.block) + if response["result"] != None: + for data_store in self.data_stores: + data_store.actions = list() + data_store.extract(response) + msg = data_store.save() + print("{}: {}".format(data_store.__class__.__name__, msg)) + self.block = self.block + 1 + return asyncio.ensure_future(self.indexNext()) + else: + # print("[indexNext] Waiting block #{} for next 5 seconds".format(self.block+1)) + await asyncio.sleep(5) + return asyncio.ensure_future(self.indexNext()) + except (aiohttp.ClientError, asyncio.TimeoutError) as exception: + logging.error("block: " + str(self.block)) + print("Issue with block {}:\n{}\n".format(self.block, exception)) + return asyncio.ensure_future(self.indexNext()) @staticmethod def make_request(block_nb, use_hex=True): @@ -112,7 +159,7 @@ def chunks(lst, nb_chunks=250): # Ethereum RPC endpoint ETH_URL = "http://localhost:8545" # Size of multiprocessing Pool processing the chunks - POOL_SIZE = mp.cpu_count() + 2 + POOL_SIZE = mp.cpu_count() BLOCK_WAIT = 10 @@ -123,12 +170,13 @@ def chunks(lst, nb_chunks=250): help='What block to finish indexing. If nothing is provided, the latest one will be used.') parser.add_argument('-f', '--file', default=None, help='Use an input file, each block number on a new line.') - parser.add_argument('-u', '--esurl', default=ES_URL, + parser.add_argument('-es', '--esurl', default=ES_URL, help='The elasticsearch url and port. Accepts all the same parameters needed as a normal Elasticsearch client expects.') parser.add_argument('-m', '--esmaxsize', default=ES_MAXSIZE, help='The elasticsearch max chunk size.') parser.add_argument('-r', '--ethrpcurl', default=ETH_URL, help='The Ethereum RPC node url and port.') + args = parser.parse_args() # Setup all datastores @@ -170,4 +218,7 @@ def chunks(lst, nb_chunks=250): Ethdrain.load_datastore_classes(ElasticDatastore) POOL = mp.Pool(POOL_SIZE) - POOL.map(Ethdrain.launch, CHUNKS_ARR) + POOL.map(Ethdrain.launch_history_sync, CHUNKS_ARR) + print("[message] Ended history sync") + block = args.end_block + 1 + Ethdrain.launch_continuous_sync(block)