Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic support of continuous sync #8

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 63 additions & 12 deletions ethdrain.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ class Ethdrain:
data_store_classes = ()

eth_url = "http://localhost:8545"
sem_size = 256
sem_size = 128

def __init__(self, block_range):
def __init__(self, block_range, block):
self.block = block
self.block_range = block_range
self.data_stores = list()

Expand All @@ -29,27 +30,42 @@ def load_datastore_classes(cls, *data_store_classes):
cls.data_store_classes = data_store_classes

@classmethod
def launch(cls, block_range):
def launch_history_sync(cls, block_range):
"""
This class method will instanciate Ethdrain classes (one per process)
and then instanciate and attach every datastore available to each on them
"""
inst = cls(block_range)
inst = cls(block_range, 0)
for data_class in cls.data_store_classes:
inst.data_stores.append(data_class())
inst.setup_process()
inst.setup_process_history_sync()

@classmethod
def launch_continuous_sync(cls, block):
"""
This class method will instanciate Ethdrain classes (one per process)
and then instanciate and attach every datastore available to each on them
"""
inst = cls([], block)
for data_class in cls.data_store_classes:
inst.data_stores.append(data_class())
inst.setup_process_continuous_sync()

def setup_process(self):
def setup_process_history_sync(self):
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(self.run(self.block_range))
future = asyncio.ensure_future(self.run_history_sync(self.block_range))
loop.run_until_complete(future)

# Now that everything has been "extracted", perform the "save" action
for data_store in self.data_stores:
msg = data_store.save()
print("{}: {}".format(data_store.__class__.__name__, msg))

def setup_process_continuous_sync(self):
# print("setup_process_continuous_sync")
loop = asyncio.get_event_loop()
asyncio.ensure_future(self.indexNext())
loop.run_forever()

async def fetch(self, session, block_nb):
try:
Expand All @@ -63,13 +79,12 @@ async def fetch(self, session, block_nb):
logging.error("block: " + str(block_nb))
print("Issue with block {}:\n{}\n".format(block_nb, exception))


async def sema_fetch(self, sem, session, block_nb):
async with sem:
await self.fetch(session, block_nb)


async def run(self, block_range):
async def run_history_sync(self, block_range):
tasks = []
sem = asyncio.Semaphore(self.__class__.sem_size)

Expand All @@ -83,6 +98,38 @@ async def run(self, block_range):

await asyncio.gather(*tasks)

async def fetch2(self, session, block_nb):
# print("[fetch2] Fethcing block number #{}".format(block_nb))
try:
async with session.post(self.__class__.eth_url,
data=Ethdrain.make_request(block_nb),
headers={"content-type": "application/json"}) as response:
return await response.json()

except (aiohttp.ClientError, asyncio.TimeoutError) as exception:
logging.error("block: " + str(block_nb))
print("Issue with block {}:\n{}\n".format(block_nb, exception))

async def indexNext(self):
try:
async with aiohttp.ClientSession() as session:
response = await self.fetch2(session, self.block)
if response["result"] != None:
for data_store in self.data_stores:
data_store.actions = list()
data_store.extract(response)
msg = data_store.save()
print("{}: {}".format(data_store.__class__.__name__, msg))
self.block = self.block + 1
return asyncio.ensure_future(self.indexNext())
else:
# print("[indexNext] Waiting block #{} for next 5 seconds".format(self.block+1))
await asyncio.sleep(5)
return asyncio.ensure_future(self.indexNext())
except (aiohttp.ClientError, asyncio.TimeoutError) as exception:
logging.error("block: " + str(self.block))
print("Issue with block {}:\n{}\n".format(self.block, exception))
return asyncio.ensure_future(self.indexNext())

@staticmethod
def make_request(block_nb, use_hex=True):
Expand Down Expand Up @@ -112,7 +159,7 @@ def chunks(lst, nb_chunks=250):
# Ethereum RPC endpoint
ETH_URL = "http://localhost:8545"
# Size of multiprocessing Pool processing the chunks
POOL_SIZE = mp.cpu_count() + 2
POOL_SIZE = mp.cpu_count()

BLOCK_WAIT = 10

Expand All @@ -123,12 +170,13 @@ def chunks(lst, nb_chunks=250):
help='What block to finish indexing. If nothing is provided, the latest one will be used.')
parser.add_argument('-f', '--file', default=None,
help='Use an input file, each block number on a new line.')
parser.add_argument('-u', '--esurl', default=ES_URL,
parser.add_argument('-es', '--esurl', default=ES_URL,
help='The elasticsearch url and port. Accepts all the same parameters needed as a normal Elasticsearch client expects.')
parser.add_argument('-m', '--esmaxsize', default=ES_MAXSIZE,
help='The elasticsearch max chunk size.')
parser.add_argument('-r', '--ethrpcurl', default=ETH_URL,
help='The Ethereum RPC node url and port.')

args = parser.parse_args()

# Setup all datastores
Expand Down Expand Up @@ -170,4 +218,7 @@ def chunks(lst, nb_chunks=250):
Ethdrain.load_datastore_classes(ElasticDatastore)

POOL = mp.Pool(POOL_SIZE)
POOL.map(Ethdrain.launch, CHUNKS_ARR)
POOL.map(Ethdrain.launch_history_sync, CHUNKS_ARR)
print("[message] Ended history sync")
block = args.end_block + 1
Ethdrain.launch_continuous_sync(block)