Skip to content

Commit

Permalink
threadpools for block processor and es sync reader
Browse files Browse the repository at this point in the history
  • Loading branch information
jackrobison committed Oct 14, 2021
1 parent f8c4664 commit 39d5078
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 71 deletions.
19 changes: 13 additions & 6 deletions lbry/wallet/server/block_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import typing
from bisect import bisect_right
from struct import pack, unpack
from concurrent.futures.thread import ThreadPoolExecutor
from typing import Optional, List, Tuple, Set, DefaultDict, Dict, NamedTuple
from prometheus_client import Gauge, Histogram
from collections import defaultdict
Expand Down Expand Up @@ -203,6 +204,8 @@ def __init__(self, env, db: 'LevelDB', daemon, shutdown_event: asyncio.Event):
self.env = env
self.db = db
self.daemon = daemon
self._chain_executor = ThreadPoolExecutor(1, thread_name_prefix='block-processor')
self._sync_reader_executor = ThreadPoolExecutor(1, thread_name_prefix='hub-es-sync')
self.mempool = MemPool(env.coin, daemon, db, self.state_lock)
self.shutdown_event = shutdown_event
self.coin = env.coin
Expand Down Expand Up @@ -299,7 +302,11 @@ async def claim_producer(self):

for claim_hash in self.removed_claims_to_send_es:
yield 'delete', claim_hash.hex()
async for claim in self.db.claims_producer(self.touched_claims_to_send_es):

to_update = await asyncio.get_event_loop().run_in_executor(
self._sync_reader_executor, self.db.claims_producer, self.touched_claims_to_send_es
)
for claim in to_update:
yield 'update', claim

async def run_in_thread_with_lock(self, func, *args):
Expand All @@ -310,13 +317,12 @@ async def run_in_thread_with_lock(self, func, *args):
# consistent and not being updated elsewhere.
async def run_in_thread_locked():
async with self.state_lock:
return await asyncio.get_event_loop().run_in_executor(None, func, *args)
return await asyncio.get_event_loop().run_in_executor(self._chain_executor, func, *args)
return await asyncio.shield(run_in_thread_locked())

@staticmethod
async def run_in_thread(func, *args):
async def run_in_thread(self, func, *args):
async def run_in_thread():
return await asyncio.get_event_loop().run_in_executor(None, func, *args)
return await asyncio.get_event_loop().run_in_executor(self._chain_executor, func, *args)
return await asyncio.shield(run_in_thread())

async def check_and_advance_blocks(self, raw_blocks):
Expand Down Expand Up @@ -1746,5 +1752,6 @@ async def fetch_and_process_blocks(self, caught_up_event):
self.status_server.stop()
# Shut down block processing
self.logger.info('closing the DB for a clean shutdown...')
self._sync_reader_executor.shutdown(wait=True)
self._chain_executor.shutdown(wait=True)
self.db.close()
# self.executor.shutdown(wait=True)
63 changes: 27 additions & 36 deletions lbry/wallet/server/leveldb.py
Original file line number Diff line number Diff line change
Expand Up @@ -696,45 +696,36 @@ async def all_claims_producer(self, batch_size=500_000):
yield meta
batch.clear()

async def claims_producer(self, claim_hashes: Set[bytes]):
loop = asyncio.get_event_loop()

def produce_claims(claims):
batch = []
_results = []

for claim_hash in claims:
if claim_hash not in self.claim_to_txo:
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
continue
name = self.claim_to_txo[claim_hash].normalized_name
if not self.prefix_db.claim_takeover.get(name):
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
continue
claim_txo = self.claim_to_txo.get(claim_hash)
if not claim_txo:
continue
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
claim = self._prepare_resolve_result(
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
)
if claim:
batch.append(claim)

batch.sort(key=lambda x: x.tx_hash)
def claims_producer(self, claim_hashes: Set[bytes]):
batch = []
results = []

for claim in batch:
_meta = self._prepare_claim_metadata(claim.claim_hash, claim)
if _meta:
_results.append(_meta)
return _results
for claim_hash in claim_hashes:
if claim_hash not in self.claim_to_txo:
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
continue
name = self.claim_to_txo[claim_hash].normalized_name
if not self.prefix_db.claim_takeover.get(name):
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
continue
claim_txo = self.claim_to_txo.get(claim_hash)
if not claim_txo:
continue
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
claim = self._prepare_resolve_result(
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
)
if claim:
batch.append(claim)

if claim_hashes:
results = await loop.run_in_executor(None, produce_claims, claim_hashes)
batch.sort(key=lambda x: x.tx_hash)

for meta in results:
yield meta
for claim in batch:
_meta = self._prepare_claim_metadata(claim.claim_hash, claim)
if _meta:
results.append(_meta)
return results

def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]:
activated = defaultdict(list)
Expand Down
2 changes: 1 addition & 1 deletion lbry/wallet/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ async def stop(self):

def run(self):
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(self.env.max_query_workers)
executor = ThreadPoolExecutor(self.env.max_query_workers, thread_name_prefix='hub-worker')
loop.set_default_executor(executor)

def __exit():
Expand Down
30 changes: 2 additions & 28 deletions lbry/wallet/server/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ async def _notify_sessions(self, height, touched, new_touched):
self.mempool_statuses.pop(hashX, None)

await asyncio.get_event_loop().run_in_executor(
None, touched.intersection_update, self.hashx_subscriptions_by_session.keys()
self.bp._chain_executor, touched.intersection_update, self.hashx_subscriptions_by_session.keys()
)

if touched or new_touched or (height_changed and self.mempool_statuses):
Expand Down Expand Up @@ -775,10 +775,9 @@ class LBRYSessionManager(SessionManager):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.query_executor = None
self.websocket = None
# self.metrics = ServerLoadData()
self.metrics_loop = None
# self.metrics_loop = None
self.running = False
if self.env.websocket_host is not None and self.env.websocket_port is not None:
self.websocket = AdminWebSocket(self)
Expand All @@ -795,20 +794,13 @@ def __init__(self, *args, **kwargs):

async def start_other(self):
self.running = True
if self.env.max_query_workers is not None and self.env.max_query_workers == 0:
self.query_executor = ThreadPoolExecutor(max_workers=1)
else:
self.query_executor = ProcessPoolExecutor(
max_workers=self.env.max_query_workers or max(os.cpu_count(), 4)
)
if self.websocket is not None:
await self.websocket.start()

async def stop_other(self):
self.running = False
if self.websocket is not None:
await self.websocket.stop()
self.query_executor.shutdown()


class LBRYElectrumX(SessionBase):
Expand Down Expand Up @@ -971,24 +963,6 @@ async def send_history_notifications(self, *hashXes: typing.Iterable[bytes]):
# else:
# return APICallMetrics(query_name)

async def run_in_executor(self, query_name, func, kwargs):
start = time.perf_counter()
try:
self.session_mgr.pending_query_metric.inc()
result = await asyncio.get_running_loop().run_in_executor(
self.session_mgr.query_executor, func, kwargs
)
except asyncio.CancelledError:
raise
except Exception:
log.exception("dear devs, please handle this exception better")
self.session_mgr.db_error_metric.inc()
raise RPCError(JSONRPC.INTERNAL_ERROR, 'unknown server error')
else:
return base64.b64encode(result).decode()
finally:
self.session_mgr.pending_query_metric.dec()
self.session_mgr.executor_time_metric.observe(time.perf_counter() - start)

# async def run_and_cache_query(self, query_name, kwargs):
# start = time.perf_counter()
Expand Down

0 comments on commit 39d5078

Please sign in to comment.