diff --git a/docker/Dockerfile.wallet_server b/docker/Dockerfile.wallet_server index a3ed8b60e9..185a184a85 100644 --- a/docker/Dockerfile.wallet_server +++ b/docker/Dockerfile.wallet_server @@ -20,6 +20,7 @@ RUN apt-get update && \ python3-dev \ python3-pip \ python3-wheel \ + python3-cffi \ python3-setuptools && \ update-alternatives --install /usr/bin/pip pip /usr/bin/pip3 1 && \ rm -rf /var/lib/apt/lists/* diff --git a/docker/docker-compose-wallet-server.yml b/docker/docker-compose-wallet-server.yml index 548386a2db..0c58b8e25e 100644 --- a/docker/docker-compose-wallet-server.yml +++ b/docker/docker-compose-wallet-server.yml @@ -18,14 +18,18 @@ services: - "wallet_server:/database" environment: - DAEMON_URL=http://lbry:lbry@127.0.0.1:9245 + - MAX_QUERY_WORKERS=4 + - CACHE_MB=1024 + - CACHE_ALL_TX_HASHES= + - CACHE_ALL_CLAIM_TXOS= + - MAX_SEND=1000000000000000000 + - MAX_RECEIVE=1000000000000000000 + - MAX_SESSIONS=100000 + - HOST=0.0.0.0 - TCP_PORT=50001 - PROMETHEUS_PORT=2112 - - QUERY_TIMEOUT_MS=3000 # how long search queries allowed to run before cancelling, in milliseconds - - TRENDING_ALGORITHMS=variable_decay - - MAX_SEND=10000000000000 # deprecated. leave it high until its removed - - MAX_SUBS=1000000000000 # deprecated. leave it high until its removed - FILTERING_CHANNEL_IDS=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8 - - BLOCKING_CHANNEL_IDS=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6 e4e230b131082f6b10c8f7994bbb83f29e8e6fb9 + - BLOCKING_CHANNEL_IDS=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6 es01: image: docker.elastic.co/elasticsearch/elasticsearch:7.11.0 container_name: es01 diff --git a/docker/wallet_server_entrypoint.sh b/docker/wallet_server_entrypoint.sh index 1f87927ed2..86cd60dd1c 100755 --- a/docker/wallet_server_entrypoint.sh +++ b/docker/wallet_server_entrypoint.sh @@ -6,7 +6,7 @@ set -euo pipefail SNAPSHOT_URL="${SNAPSHOT_URL:-}" #off by default. latest snapshot at https://lbry.com/snapshot/wallet -if [[ -n "$SNAPSHOT_URL" ]] && [[ ! -f /database/claims.db ]]; then +if [[ -n "$SNAPSHOT_URL" ]] && [[ ! -f /database/lbry-leveldb ]]; then files="$(ls)" echo "Downloading wallet snapshot from $SNAPSHOT_URL" wget --no-verbose --trust-server-names --content-disposition "$SNAPSHOT_URL" @@ -20,6 +20,6 @@ if [[ -n "$SNAPSHOT_URL" ]] && [[ ! -f /database/claims.db ]]; then rm "$filename" fi -/home/lbry/.local/bin/lbry-hub-elastic-sync /database/claims.db +/home/lbry/.local/bin/lbry-hub-elastic-sync echo 'starting server' /home/lbry/.local/bin/lbry-hub "$@" diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 66cb6af02a..fa25006a4e 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -367,6 +367,7 @@ async def check_and_advance_blocks(self, raw_blocks): await self.db.search_index.apply_filters(self.db.blocked_streams, self.db.blocked_channels, self.db.filtered_streams, self.db.filtered_channels) await self.db.search_index.update_trending_score(self.activation_info_to_send_es) + await self._es_caught_up() self.db.search_index.clear_caches() self.touched_claims_to_send_es.clear() self.removed_claims_to_send_es.clear() @@ -1620,6 +1621,7 @@ async def backup_block(self): else: self.tx_count = self.db.tx_counts[-1] self.height -= 1 + # self.touched can include other addresses which is # harmless, but remove None. self.touched_hashXs.discard(None) @@ -1649,8 +1651,15 @@ async def backup_block(self): self.db.last_flush = now self.db.last_flush_tx_count = self.db.fs_tx_count - await self.run_in_thread_with_lock(self.db.prefix_db.rollback, self.height + 1) + def rollback(): + self.db.prefix_db.rollback(self.height + 1) + self.db.es_sync_height = self.height + self.db.write_db_state() + self.db.prefix_db.unsafe_commit() + + await self.run_in_thread_with_lock(rollback) self.clear_after_advance_or_reorg() + self.db.assert_db_state() elapsed = self.db.last_flush - start_time self.logger.warning(f'backup flush #{self.db.hist_flush_count:,d} took {elapsed:.1f}s. ' @@ -1713,6 +1722,17 @@ async def _process_prefetched_blocks(self): self.logger.exception("error while processing txs") raise + async def _es_caught_up(self): + self.db.es_sync_height = self.height + + def flush(): + assert len(self.db.prefix_db._op_stack) == 0 + self.db.write_db_state() + self.db.prefix_db.unsafe_commit() + self.db.assert_db_state() + + await self.run_in_thread_with_lock(flush) + async def _first_caught_up(self): self.logger.info(f'caught up to height {self.height}') # Flush everything but with first_sync->False state. diff --git a/lbry/wallet/server/cli.py b/lbry/wallet/server/cli.py index d99512b93a..74a3d092ac 100644 --- a/lbry/wallet/server/cli.py +++ b/lbry/wallet/server/cli.py @@ -1,7 +1,6 @@ import logging import traceback import argparse -import importlib from lbry.wallet.server.env import Env from lbry.wallet.server.server import Server @@ -10,27 +9,19 @@ def get_argument_parser(): parser = argparse.ArgumentParser( prog="lbry-hub" ) - parser.add_argument("spvserver", type=str, help="Python class path to SPV server implementation.", - nargs="?", default="lbry.wallet.server.coin.LBC") + Env.contribute_to_arg_parser(parser) return parser -def get_coin_class(spvserver): - spvserver_path, coin_class_name = spvserver.rsplit('.', 1) - spvserver_module = importlib.import_module(spvserver_path) - return getattr(spvserver_module, coin_class_name) - - def main(): parser = get_argument_parser() args = parser.parse_args() - coin_class = get_coin_class(args.spvserver) logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s") logging.info('lbry.server starting') logging.getLogger('aiohttp').setLevel(logging.WARNING) logging.getLogger('elasticsearch').setLevel(logging.WARNING) try: - server = Server(Env(coin_class)) + server = Server(Env.from_arg_parser(args)) server.run() except Exception: traceback.print_exc() diff --git a/lbry/wallet/server/db/elasticsearch/search.py b/lbry/wallet/server/db/elasticsearch/search.py index e7a8b58af9..c762920ef6 100644 --- a/lbry/wallet/server/db/elasticsearch/search.py +++ b/lbry/wallet/server/db/elasticsearch/search.py @@ -42,8 +42,7 @@ def __init__(self, got_version, expected_version): class SearchIndex: VERSION = 1 - def __init__(self, index_prefix: str, search_timeout=3.0, elastic_host='localhost', elastic_port=9200, - half_life=0.4, whale_threshold=10000, whale_half_life=0.99): + def __init__(self, index_prefix: str, search_timeout=3.0, elastic_host='localhost', elastic_port=9200): self.search_timeout = search_timeout self.sync_timeout = 600 # wont hit that 99% of the time, but can hit on a fresh import self.search_client: Optional[AsyncElasticsearch] = None @@ -54,9 +53,6 @@ def __init__(self, index_prefix: str, search_timeout=3.0, elastic_host='localhos self.search_cache = LRUCache(2 ** 17) self._elastic_host = elastic_host self._elastic_port = elastic_port - self._trending_half_life = half_life - self._trending_whale_threshold = whale_threshold - self._trending_whale_half_life = whale_half_life async def get_index_version(self) -> int: try: diff --git a/lbry/wallet/server/db/elasticsearch/sync.py b/lbry/wallet/server/db/elasticsearch/sync.py index 7fd76e64b9..35941b61a1 100644 --- a/lbry/wallet/server/db/elasticsearch/sync.py +++ b/lbry/wallet/server/db/elasticsearch/sync.py @@ -1,27 +1,28 @@ +import os import argparse import asyncio import logging from elasticsearch import AsyncElasticsearch from elasticsearch.helpers import async_streaming_bulk from lbry.wallet.server.env import Env -from lbry.wallet.server.coin import LBC from lbry.wallet.server.leveldb import LevelDB from lbry.wallet.server.db.elasticsearch.search import SearchIndex, IndexVersionMismatch from lbry.wallet.server.db.elasticsearch.constants import ALL_FIELDS -async def get_recent_claims(blocks: int, index_name='claims', db=None): - env = Env(LBC) +async def get_recent_claims(env, index_name='claims', db=None): need_open = db is None db = db or LevelDB(env) - if need_open: - await db.open_dbs() try: + if need_open: + await db.open_dbs() + db_state = db.prefix_db.db_state.get() + if db_state.es_sync_height == db_state.height: + return cnt = 0 - state = db.prefix_db.db_state.get() touched_claims = set() deleted_claims = set() - for height in range(state.height - blocks + 1, state.height + 1): + for height in range(db_state.es_sync_height, db_state.height + 1): touched_or_deleted = db.prefix_db.touched_or_deleted.get(height) touched_claims.update(touched_or_deleted.touched_claims) deleted_claims.update(touched_or_deleted.deleted_claims) @@ -47,19 +48,25 @@ async def get_recent_claims(blocks: int, index_name='claims', db=None): else: logging.warning("could not sync claim %s", touched.hex()) if cnt % 10000 == 0: - print(f"{cnt} claims sent") - print("sent %i claims, deleted %i" % (len(touched_claims), len(deleted_claims))) + logging.info("%i claims sent to ES", cnt) + + db.es_sync_height = db.db_height + db.write_db_state() + db.prefix_db.unsafe_commit() + db.assert_db_state() + + logging.info("finished sending %i claims to ES, deleted %i", cnt, len(touched_claims), len(deleted_claims)) finally: if need_open: db.close() -async def get_all_claims(index_name='claims', db=None): - env = Env(LBC) +async def get_all_claims(env, index_name='claims', db=None): need_open = db is None db = db or LevelDB(env) if need_open: await db.open_dbs() + logging.info("Fetching claims to send ES from leveldb") try: cnt = 0 async for claim in db.all_claims_producer(): @@ -72,40 +79,32 @@ async def get_all_claims(index_name='claims', db=None): } cnt += 1 if cnt % 10000 == 0: - print(f"{cnt} claims sent") + logging.info("sent %i claims to ES", cnt) finally: if need_open: db.close() -async def make_es_index(index=None): - env = Env(LBC) - if index is None: - index = SearchIndex('', elastic_host=env.elastic_host, elastic_port=env.elastic_port) - +async def make_es_index_and_run_sync(env: Env, clients=32, force=False, db=None, index_name='claims'): + index = SearchIndex(env.es_index_prefix, elastic_host=env.elastic_host, elastic_port=env.elastic_port) + logging.info("ES sync host: %s:%i", env.elastic_host, env.elastic_port) try: - return await index.start() + created = await index.start() except IndexVersionMismatch as err: logging.info( "dropping ES search index (version %s) for upgrade to version %s", err.got_version, err.expected_version ) await index.delete_index() await index.stop() - return await index.start() + created = await index.start() finally: index.stop() - -async def run_sync(index_name='claims', db=None, clients=32, blocks=0): - env = Env(LBC) - logging.info("ES sync host: %s:%i", env.elastic_host, env.elastic_port) es = AsyncElasticsearch([{'host': env.elastic_host, 'port': env.elastic_port}]) - if blocks > 0: - blocks = min(blocks, 200) - logging.info("Resyncing last %i blocks", blocks) - claim_generator = get_recent_claims(blocks, index_name=index_name, db=db) + if force or created: + claim_generator = get_all_claims(env, index_name=index_name, db=db) else: - claim_generator = get_all_claims(index_name=index_name, db=db) + claim_generator = get_recent_claims(env, index_name=index_name, db=db) try: async for ok, item in async_streaming_bulk(es, claim_generator, request_timeout=600, raise_on_error=False): if not ok: @@ -122,17 +121,14 @@ def run_elastic_sync(): logging.info('lbry.server starting') parser = argparse.ArgumentParser(prog="lbry-hub-elastic-sync") - # parser.add_argument("db_path", type=str) parser.add_argument("-c", "--clients", type=int, default=32) - parser.add_argument("-b", "--blocks", type=int, default=0) parser.add_argument("-f", "--force", default=False, action='store_true') + Env.contribute_to_arg_parser(parser) args = parser.parse_args() + env = Env.from_arg_parser(args) - # if not args.force and not os.path.exists(args.db_path): - # logging.info("DB path doesnt exist") - # return - - if not args.force and not asyncio.run(make_es_index()): - logging.info("ES is already initialized") + if not os.path.exists(os.path.join(args.db_dir, 'lbry-leveldb')): + logging.info("DB path doesnt exist, nothing to sync to ES") return - asyncio.run(run_sync(clients=args.clients, blocks=args.blocks)) + + asyncio.run(make_es_index_and_run_sync(env, clients=args.clients, force=args.force)) diff --git a/lbry/wallet/server/db/prefixes.py b/lbry/wallet/server/db/prefixes.py index d71b941b4b..c264016fb8 100644 --- a/lbry/wallet/server/db/prefixes.py +++ b/lbry/wallet/server/db/prefixes.py @@ -534,6 +534,7 @@ class DBState(typing.NamedTuple): hist_flush_count: int comp_flush_count: int comp_cursor: int + es_sync_height: int class ActiveAmountPrefixRow(PrefixRow): @@ -1521,7 +1522,7 @@ def pack_item(cls, claim_hash, amount): class DBStatePrefixRow(PrefixRow): prefix = DB_PREFIXES.db_state.value - value_struct = struct.Struct(b'>32sLL32sLLBBlll') + value_struct = struct.Struct(b'>32sLL32sLLBBlllL') key_struct = struct.Struct(b'') key_part_lambdas = [ @@ -1539,15 +1540,19 @@ def unpack_key(cls, key: bytes): @classmethod def pack_value(cls, genesis: bytes, height: int, tx_count: int, tip: bytes, utxo_flush_count: int, wall_time: int, first_sync: bool, db_version: int, hist_flush_count: int, comp_flush_count: int, - comp_cursor: int) -> bytes: + comp_cursor: int, es_sync_height: int) -> bytes: return super().pack_value( genesis, height, tx_count, tip, utxo_flush_count, wall_time, 1 if first_sync else 0, db_version, hist_flush_count, - comp_flush_count, comp_cursor + comp_flush_count, comp_cursor, es_sync_height ) @classmethod def unpack_value(cls, data: bytes) -> DBState: + if len(data) == 94: + # TODO: delete this after making a new snapshot - 10/20/21 + # migrate in the es_sync_height if it doesnt exist + data += data[32:36] return DBState(*super().unpack_value(data)) @classmethod diff --git a/lbry/wallet/server/db/revertable.py b/lbry/wallet/server/db/revertable.py index 099c2b48ef..e59bbcdf35 100644 --- a/lbry/wallet/server/db/revertable.py +++ b/lbry/wallet/server/db/revertable.py @@ -121,14 +121,14 @@ def append_op(self, op: RevertableOp): elif op.is_delete and has_stored_val and stored_val != op.value and not will_delete_existing_stored: # there is a value and we're not deleting it in this op # check that a delete for the stored value is in the stack - raise OpStackIntegrity(f"delete {op}") + raise OpStackIntegrity(f"db op tries to delete with incorrect existing value {op}") elif op.is_delete and not has_stored_val: raise OpStackIntegrity(f"db op tries to delete nonexistent key: {op}") elif op.is_delete and stored_val != op.value: raise OpStackIntegrity(f"db op tries to delete with incorrect value: {op}") except OpStackIntegrity as err: if op.key[:1] in self._unsafe_prefixes: - log.error(f"skipping over integrity error: {err}") + log.debug(f"skipping over integrity error: {err}") else: raise err self._items[op.key].append(op) diff --git a/lbry/wallet/server/env.py b/lbry/wallet/server/env.py index ff9e2a3161..a109abf763 100644 --- a/lbry/wallet/server/env.py +++ b/lbry/wallet/server/env.py @@ -13,7 +13,7 @@ from ipaddress import ip_address from lbry.wallet.server.util import class_logger -from lbry.wallet.server.coin import Coin +from lbry.wallet.server.coin import Coin, LBC, LBCTestNet, LBCRegTest import lbry.wallet.server.util as lib_util @@ -28,79 +28,84 @@ class Env: class Error(Exception): pass - def __init__(self, coin=None): + def __init__(self, coin=None, db_dir=None, daemon_url=None, host=None, rpc_host=None, elastic_host=None, + elastic_port=None, loop_policy=None, max_query_workers=None, websocket_host=None, websocket_port=None, + chain=None, es_index_prefix=None, es_mode=None, cache_MB=None, reorg_limit=None, tcp_port=None, + udp_port=None, ssl_port=None, ssl_certfile=None, ssl_keyfile=None, rpc_port=None, + prometheus_port=None, max_subscriptions=None, banner_file=None, anon_logs=None, log_sessions=None, + allow_lan_udp=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, country=None, + payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None, + session_timeout=None, drop_client=None, description=None, daily_fee=None, + database_query_timeout=None, db_max_open_files=512): self.logger = class_logger(__name__, self.__class__.__name__) - self.allow_root = self.boolean('ALLOW_ROOT', False) - self.host = self.default('HOST', 'localhost') - self.rpc_host = self.default('RPC_HOST', 'localhost') - self.elastic_host = self.default('ELASTIC_HOST', 'localhost') - self.elastic_port = self.integer('ELASTIC_PORT', 9200) - self.loop_policy = self.set_event_loop_policy() + + self.db_dir = db_dir if db_dir is not None else self.required('DB_DIRECTORY') + self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL') + self.db_max_open_files = db_max_open_files + + self.host = host if host is not None else self.default('HOST', 'localhost') + self.rpc_host = rpc_host if rpc_host is not None else self.default('RPC_HOST', 'localhost') + self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost') + self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200) + self.loop_policy = self.set_event_loop_policy( + loop_policy if loop_policy is not None else self.default('EVENT_LOOP_POLICY', None) + ) self.obsolete(['UTXO_MB', 'HIST_MB', 'NETWORK']) - self.db_dir = self.required('DB_DIRECTORY') - self.db_engine = self.default('DB_ENGINE', 'leveldb') - # self.trending_algorithms = [ - # trending for trending in set(self.default('TRENDING_ALGORITHMS', 'zscore').split(' ')) if trending - # ] - self.trending_half_life = math.log2(0.1 ** (1 / (3 + self.integer('TRENDING_DECAY_RATE', 48)))) + 1 - self.trending_whale_half_life = math.log2(0.1 ** (1 / (3 + self.integer('TRENDING_WHALE_DECAY_RATE', 24)))) + 1 - self.trending_whale_threshold = float(self.integer('TRENDING_WHALE_THRESHOLD', 10000)) * 1E8 - - self.max_query_workers = self.integer('MAX_QUERY_WORKERS', 4) - self.individual_tag_indexes = self.boolean('INDIVIDUAL_TAG_INDEXES', True) - self.track_metrics = self.boolean('TRACK_METRICS', False) - self.websocket_host = self.default('WEBSOCKET_HOST', self.host) - self.websocket_port = self.integer('WEBSOCKET_PORT', None) - self.daemon_url = self.required('DAEMON_URL') + self.max_query_workers = max_query_workers if max_query_workers is not None else self.integer('MAX_QUERY_WORKERS', 4) + self.websocket_host = websocket_host if websocket_host is not None else self.default('WEBSOCKET_HOST', self.host) + self.websocket_port = websocket_port if websocket_port is not None else self.integer('WEBSOCKET_PORT', None) if coin is not None: assert issubclass(coin, Coin) self.coin = coin else: - coin_name = self.required('COIN').strip() - network = self.default('NET', 'mainnet').strip() - self.coin = Coin.lookup_coin_class(coin_name, network) - self.es_index_prefix = self.default('ES_INDEX_PREFIX', '') - self.es_mode = self.default('ES_MODE', 'writer') - self.cache_MB = self.integer('CACHE_MB', 4096) - self.reorg_limit = self.integer('REORG_LIMIT', self.coin.REORG_LIMIT) + chain = chain if chain is not None else self.default('NET', 'mainnet').strip().lower() + if chain == 'mainnet': + self.coin = LBC + elif chain == 'testnet': + self.coin = LBCTestNet + else: + self.coin = LBCRegTest + self.es_index_prefix = es_index_prefix if es_index_prefix is not None else self.default('ES_INDEX_PREFIX', '') + self.es_mode = es_mode if es_mode is not None else self.default('ES_MODE', 'writer') + self.cache_MB = cache_MB if cache_MB is not None else self.integer('CACHE_MB', 1024) + self.reorg_limit = reorg_limit if reorg_limit is not None else self.integer('REORG_LIMIT', self.coin.REORG_LIMIT) # Server stuff - self.tcp_port = self.integer('TCP_PORT', None) - self.udp_port = self.integer('UDP_PORT', self.tcp_port) - self.ssl_port = self.integer('SSL_PORT', None) + self.tcp_port = tcp_port if tcp_port is not None else self.integer('TCP_PORT', None) + self.udp_port = udp_port if udp_port is not None else self.integer('UDP_PORT', self.tcp_port) + self.ssl_port = ssl_port if ssl_port is not None else self.integer('SSL_PORT', None) if self.ssl_port: - self.ssl_certfile = self.required('SSL_CERTFILE') - self.ssl_keyfile = self.required('SSL_KEYFILE') - self.rpc_port = self.integer('RPC_PORT', 8000) - self.prometheus_port = self.integer('PROMETHEUS_PORT', 0) - self.max_subscriptions = self.integer('MAX_SUBSCRIPTIONS', 10000) - self.banner_file = self.default('BANNER_FILE', None) - self.tor_banner_file = self.default('TOR_BANNER_FILE', self.banner_file) - self.anon_logs = self.boolean('ANON_LOGS', False) - self.log_sessions = self.integer('LOG_SESSIONS', 3600) - self.allow_lan_udp = self.boolean('ALLOW_LAN_UDP', False) - self.cache_all_tx_hashes = self.boolean('CACHE_ALL_TX_HASHES', False) - self.cache_all_claim_txos = self.boolean('CACHE_ALL_CLAIM_TXOS', False) - self.country = self.default('COUNTRY', 'US') + self.ssl_certfile = ssl_certfile if ssl_certfile is not None else self.required('SSL_CERTFILE') + self.ssl_keyfile = ssl_keyfile if ssl_keyfile is not None else self.required('SSL_KEYFILE') + self.rpc_port = rpc_port if rpc_port is not None else self.integer('RPC_PORT', 8000) + self.prometheus_port = prometheus_port if prometheus_port is not None else self.integer('PROMETHEUS_PORT', 0) + self.max_subscriptions = max_subscriptions if max_subscriptions is not None else self.integer('MAX_SUBSCRIPTIONS', 10000) + self.banner_file = banner_file if banner_file is not None else self.default('BANNER_FILE', None) + # self.tor_banner_file = self.default('TOR_BANNER_FILE', self.banner_file) + self.anon_logs = anon_logs if anon_logs is not None else self.boolean('ANON_LOGS', False) + self.log_sessions = log_sessions if log_sessions is not None else self.integer('LOG_SESSIONS', 3600) + self.allow_lan_udp = allow_lan_udp if allow_lan_udp is not None else self.boolean('ALLOW_LAN_UDP', False) + self.cache_all_tx_hashes = cache_all_tx_hashes if cache_all_tx_hashes is not None else self.boolean('CACHE_ALL_TX_HASHES', False) + self.cache_all_claim_txos = cache_all_claim_txos if cache_all_claim_txos is not None else self.boolean('CACHE_ALL_CLAIM_TXOS', False) + self.country = country if country is not None else self.default('COUNTRY', 'US') # Peer discovery self.peer_discovery = self.peer_discovery_enum() self.peer_announce = self.boolean('PEER_ANNOUNCE', True) self.peer_hubs = self.extract_peer_hubs() - self.force_proxy = self.boolean('FORCE_PROXY', False) - self.tor_proxy_host = self.default('TOR_PROXY_HOST', 'localhost') - self.tor_proxy_port = self.integer('TOR_PROXY_PORT', None) + # self.tor_proxy_host = self.default('TOR_PROXY_HOST', 'localhost') + # self.tor_proxy_port = self.integer('TOR_PROXY_PORT', None) # The electrum client takes the empty string as unspecified - self.payment_address = self.default('PAYMENT_ADDRESS', '') - self.donation_address = self.default('DONATION_ADDRESS', '') + self.payment_address = payment_address if payment_address is not None else self.default('PAYMENT_ADDRESS', '') + self.donation_address = donation_address if donation_address is not None else self.default('DONATION_ADDRESS', '') # Server limits to help prevent DoS - self.max_send = self.integer('MAX_SEND', 1000000) - self.max_receive = self.integer('MAX_RECEIVE', 1000000) - self.max_subs = self.integer('MAX_SUBS', 250000) - self.max_sessions = self.sane_max_sessions() - self.max_session_subs = self.integer('MAX_SESSION_SUBS', 50000) - self.session_timeout = self.integer('SESSION_TIMEOUT', 600) - self.drop_client = self.custom("DROP_CLIENT", None, re.compile) - self.description = self.default('DESCRIPTION', '') - self.daily_fee = self.string_amount('DAILY_FEE', '0') + self.max_send = max_send if max_send is not None else self.integer('MAX_SEND', 1000000) + self.max_receive = max_receive if max_receive is not None else self.integer('MAX_RECEIVE', 1000000) + # self.max_subs = self.integer('MAX_SUBS', 250000) + self.max_sessions = max_sessions if max_sessions is not None else self.sane_max_sessions() + # self.max_session_subs = self.integer('MAX_SESSION_SUBS', 50000) + self.session_timeout = session_timeout if session_timeout is not None else self.integer('SESSION_TIMEOUT', 600) + self.drop_client = drop_client if drop_client is not None else self.custom("DROP_CLIENT", None, re.compile) + self.description = description if description is not None else self.default('DESCRIPTION', '') + self.daily_fee = daily_fee if daily_fee is not None else self.string_amount('DAILY_FEE', '0') # Identities clearnet_identity = self.clearnet_identity() @@ -108,7 +113,8 @@ def __init__(self, coin=None): self.identities = [identity for identity in (clearnet_identity, tor_identity) if identity is not None] - self.database_query_timeout = float(self.integer('QUERY_TIMEOUT_MS', 3000)) / 1000.0 + self.database_query_timeout = database_query_timeout if database_query_timeout is not None else \ + (float(self.integer('QUERY_TIMEOUT_MS', 10000)) / 1000.0) @classmethod def default(cls, envvar, default): @@ -160,9 +166,9 @@ def obsolete(cls, envvars): if bad: raise cls.Error(f'remove obsolete environment variables {bad}') - def set_event_loop_policy(self): - policy_name = self.default('EVENT_LOOP_POLICY', None) - if not policy_name: + @classmethod + def set_event_loop_policy(cls, policy_name: str = None): + if not policy_name or policy_name == 'default': import asyncio return asyncio.get_event_loop_policy() elif policy_name == 'uvloop': @@ -171,7 +177,7 @@ def set_event_loop_policy(self): loop_policy = uvloop.EventLoopPolicy() asyncio.set_event_loop_policy(loop_policy) return loop_policy - raise self.Error(f'unknown event loop policy "{policy_name}"') + raise cls.Error(f'unknown event loop policy "{policy_name}"') def cs_host(self, *, for_rpc): """Returns the 'host' argument to pass to asyncio's create_server @@ -280,3 +286,99 @@ def peer_discovery_enum(self): def extract_peer_hubs(self): return [hub.strip() for hub in self.default('PEER_HUBS', '').split(',') if hub.strip()] + + @classmethod + def contribute_to_arg_parser(cls, parser): + parser.add_argument('--db_dir', type=str, help='path of the directory containing lbry-leveldb', + default=cls.default('DB_DIRECTORY', None)) + parser.add_argument('--daemon_url', + help='URL for rpc from lbrycrd, :@', + default=cls.default('DAEMON_URL', None)) + parser.add_argument('--db_max_open_files', type=int, default=512, + help='number of files leveldb can have open at a time') + parser.add_argument('--host', type=str, default=cls.default('HOST', 'localhost'), + help='Interface for hub server to listen on') + parser.add_argument('--tcp_port', type=int, default=cls.integer('TCP_PORT', 50001), + help='TCP port to listen on for hub server') + parser.add_argument('--udp_port', type=int, default=cls.integer('UDP_PORT', 50001), + help='UDP port to listen on for hub server') + parser.add_argument('--rpc_host', default=cls.default('RPC_HOST', 'localhost'), type=str, + help='Listening interface for admin rpc') + parser.add_argument('--rpc_port', default=cls.integer('RPC_PORT', 8000), type=int, + help='Listening port for admin rpc') + parser.add_argument('--websocket_host', default=cls.default('WEBSOCKET_HOST', 'localhost'), type=str, + help='Listening interface for websocket') + parser.add_argument('--websocket_port', default=cls.integer('WEBSOCKET_PORT', None), type=int, + help='Listening port for websocket') + + parser.add_argument('--ssl_port', default=cls.integer('SSL_PORT', None), type=int, + help='SSL port to listen on for hub server') + parser.add_argument('--ssl_certfile', default=cls.default('SSL_CERTFILE', None), type=str, + help='Path to SSL cert file') + parser.add_argument('--ssl_keyfile', default=cls.default('SSL_KEYFILE', None), type=str, + help='Path to SSL key file') + parser.add_argument('--reorg_limit', default=cls.integer('REORG_LIMIT', 200), type=int, help='Max reorg depth') + parser.add_argument('--elastic_host', default=cls.default('ELASTIC_HOST', 'localhost'), type=str, + help='elasticsearch host') + parser.add_argument('--elastic_port', default=cls.integer('ELASTIC_PORT', 9200), type=int, + help='elasticsearch port') + parser.add_argument('--es_mode', default=cls.default('ES_MODE', 'writer'), type=str, + choices=['reader', 'writer']) + parser.add_argument('--es_index_prefix', default=cls.default('ES_INDEX_PREFIX', ''), type=str) + parser.add_argument('--loop_policy', default=cls.default('EVENT_LOOP_POLICY', 'default'), type=str, + choices=['default', 'uvloop']) + parser.add_argument('--max_query_workers', type=int, default=cls.integer('MAX_QUERY_WORKERS', 4), + help='number of threads used by the request handler to read the database') + parser.add_argument('--cache_MB', type=int, default=cls.integer('CACHE_MB', 1024), + help='size of the leveldb lru cache, in megabytes') + parser.add_argument('--cache_all_tx_hashes', type=bool, + help='Load all tx hashes into memory. This will make address subscriptions and sync, ' + 'resolve, transaction fetching, and block sync all faster at the expense of higher ' + 'memory usage') + parser.add_argument('--cache_all_claim_txos', type=bool, + help='Load all claim txos into memory. This will make address subscriptions and sync, ' + 'resolve, transaction fetching, and block sync all faster at the expense of higher ' + 'memory usage') + parser.add_argument('--prometheus_port', type=int, default=cls.integer('PROMETHEUS_PORT', 0), + help='port for hub prometheus metrics to listen on, disabled by default') + parser.add_argument('--max_subscriptions', type=int, default=cls.integer('MAX_SUBSCRIPTIONS', 10000), + help='max subscriptions per connection') + parser.add_argument('--banner_file', type=str, default=cls.default('BANNER_FILE', None), + help='path to file containing banner text') + parser.add_argument('--anon_logs', type=bool, default=cls.boolean('ANON_LOGS', False), + help="don't log ip addresses") + parser.add_argument('--allow_lan_udp', type=bool, default=cls.boolean('ALLOW_LAN_UDP', False), + help='reply to hub UDP ping messages from LAN ip addresses') + parser.add_argument('--country', type=str, default=cls.default('COUNTRY', 'US'), help='') + parser.add_argument('--max_send', type=int, default=cls.default('MAX_SEND', 1000000), help='') + parser.add_argument('--max_receive', type=int, default=cls.default('MAX_RECEIVE', 1000000), help='') + parser.add_argument('--max_sessions', type=int, default=cls.default('MAX_SESSIONS', 1000), help='') + parser.add_argument('--session_timeout', type=int, default=cls.default('SESSION_TIMEOUT', 600), help='') + parser.add_argument('--drop_client', type=str, default=cls.default('DROP_CLIENT', None), help='') + parser.add_argument('--description', type=str, default=cls.default('DESCRIPTION', ''), help='') + parser.add_argument('--daily_fee', type=float, default=cls.default('DAILY_FEE', 0.0), help='') + parser.add_argument('--payment_address', type=str, default=cls.default('PAYMENT_ADDRESS', ''), help='') + parser.add_argument('--donation_address', type=str, default=cls.default('DONATION_ADDRESS', ''), help='') + parser.add_argument('--chain', type=str, default=cls.default('NET', 'mainnet'), + help="Which chain to use, default is mainnet") + parser.add_argument('--query_timeout_ms', type=int, default=cls.integer('QUERY_TIMEOUT_MS', 10000), + help="elasticsearch query timeout") + + @classmethod + def from_arg_parser(cls, args): + return cls( + db_dir=args.db_dir, daemon_url=args.daemon_url, db_max_open_files=args.db_max_open_files, + host=args.host, rpc_host=args.rpc_host, elastic_host=args.elastic_host, elastic_port=args.elastic_port, + loop_policy=args.loop_policy, max_query_workers=args.max_query_workers, websocket_host=args.websocket_host, + websocket_port=args.websocket_port, chain=args.chain, es_index_prefix=args.es_index_prefix, + es_mode=args.es_mode, cache_MB=args.cache_MB, reorg_limit=args.reorg_limit, tcp_port=args.tcp_port, + udp_port=args.udp_port, ssl_port=args.ssl_port, ssl_certfile=args.ssl_certfile, + ssl_keyfile=args.ssl_keyfile, rpc_port=args.rpc_port, prometheus_port=args.prometheus_port, + max_subscriptions=args.max_subscriptions, banner_file=args.banner_file, anon_logs=args.anon_logs, + log_sessions=None, allow_lan_udp=args.allow_lan_udp, + cache_all_tx_hashes=args.cache_all_tx_hashes, cache_all_claim_txos=args.cache_all_claim_txos, + country=args.country, payment_address=args.payment_address, donation_address=args.donation_address, + max_send=args.max_send, max_receive=args.max_receive, max_sessions=args.max_sessions, + session_timeout=args.session_timeout, drop_client=args.drop_client, description=args.description, + daily_fee=args.daily_fee, database_query_timeout=(args.query_timeout_ms / 1000) + ) diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index d33b1016bf..dddf3f1fbb 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -33,7 +33,7 @@ from lbry.wallet.server.db.common import ResolveResult, STREAM_TYPES, CLAIM_TYPES from lbry.wallet.server.db.prefixes import PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, HubDB from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE -from lbry.wallet.server.db.prefixes import PendingActivationKey, TXOToClaimValue +from lbry.wallet.server.db.prefixes import PendingActivationKey, TXOToClaimValue, DBStatePrefixRow from lbry.wallet.transaction import OutputScript from lbry.schema.claim import Claim, guess_stream_type from lbry.wallet.ledger import Ledger, RegTestLedger, TestNetLedger @@ -87,6 +87,8 @@ def __init__(self, env): self.hist_comp_flush_count = -1 self.hist_comp_cursor = -1 + self.es_sync_height = 0 + # blocking/filtering dicts blocking_channels = self.env.default('BLOCKING_CHANNEL_IDS', '').split(' ') filtering_channels = self.env.default('FILTERING_CHANNEL_IDS', '').split(' ') @@ -106,8 +108,6 @@ def __init__(self, env): self.encoded_headers = LRUCacheWithMetrics(1 << 21, metric_name='encoded_headers', namespace='wallet_server') self.last_flush = time.time() - self.logger.info(f'using {self.env.db_engine} for DB backend') - # Header merkle cache self.merkle = Merkle() self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes) @@ -125,9 +125,7 @@ def __init__(self, env): # Search index self.search_index = SearchIndex( self.env.es_index_prefix, self.env.database_query_timeout, - elastic_host=env.elastic_host, elastic_port=env.elastic_port, - half_life=self.env.trending_half_life, whale_threshold=self.env.trending_whale_threshold, - whale_half_life=self.env.trending_whale_half_life + elastic_host=env.elastic_host, elastic_port=env.elastic_port ) self.genesis_bytes = bytes.fromhex(self.coin.GENESIS_HASH) @@ -502,18 +500,8 @@ def get_claim_metadata(self, tx_hash, nout): script.parse() return Claim.from_bytes(script.values['claim']) except: - self.logger.error( - "tx parsing for ES went boom %s %s", tx_hash[::-1].hex(), - (raw or b'').hex() - ) - return - - def _prepare_claim_for_sync(self, claim_hash: bytes): - claim = self._fs_get_claim_by_hash(claim_hash) - if not claim: - print("wat") + self.logger.error("claim parsing for ES failed with tx: %s", tx_hash[::-1].hex()) return - return self._prepare_claim_metadata(claim_hash, claim) def _prepare_claim_metadata(self, claim_hash: bytes, claim: ResolveResult): metadata = self.get_claim_metadata(claim.tx_hash, claim.position) @@ -556,19 +544,10 @@ def _prepare_claim_metadata(self, claim_hash: bytes, claim: ResolveResult): ).outputs[reposted_claim.position] reposted_script = OutputScript(reposted_claim_txo.pk_script) reposted_script.parse() - except: - self.logger.error( - "repost tx parsing for ES went boom %s %s", reposted_tx_hash[::-1].hex(), - raw_reposted_claim_tx.hex() - ) - return - try: reposted_metadata = Claim.from_bytes(reposted_script.values['claim']) except: - self.logger.error( - "reposted claim parsing for ES went boom %s %s", reposted_tx_hash[::-1].hex(), - raw_reposted_claim_tx.hex() - ) + self.logger.error("failed to parse reposted claim in tx %s that was reposted by %s", + reposted_tx_hash[::-1].hex(), claim_hash.hex()) return if reposted_metadata: if reposted_metadata.is_stream: @@ -850,7 +829,8 @@ async def open_dbs(self): self.prefix_db = HubDB( os.path.join(self.env.db_dir, 'lbry-leveldb'), cache_mb=self.env.cache_MB, - reorg_limit=self.env.reorg_limit, max_open_files=512 + reorg_limit=self.env.reorg_limit, max_open_files=self.env.db_max_open_files, + unsafe_prefixes={DBStatePrefixRow.prefix} ) self.logger.info(f'opened db: lbry-leveldb') @@ -1082,7 +1062,8 @@ def write_db_state(self): self.prefix_db.db_state.stage_put((), ( self.genesis_bytes, self.db_height, self.db_tx_count, self.db_tip, self.utxo_flush_count, int(self.wall_time), self.first_sync, self.db_version, - self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor + self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor, + self.es_sync_height ) ) @@ -1124,11 +1105,12 @@ def read_db_state(self): def assert_db_state(self): state = self.prefix_db.db_state.get() - assert self.db_version == state.db_version - assert self.db_height == state.height - assert self.db_tx_count == state.tx_count - assert self.db_tip == state.tip - assert self.first_sync == state.first_sync + assert self.db_version == state.db_version, f"{self.db_version} != {state.db_version}" + assert self.db_height == state.height, f"{self.db_height} != {state.height}" + assert self.db_tx_count == state.tx_count, f"{self.db_tx_count} != {state.tx_count}" + assert self.db_tip == state.tip, f"{self.db_tip} != {state.tip}" + assert self.first_sync == state.first_sync, f"{self.first_sync} != {state.first_sync}" + assert self.es_sync_height == state.es_sync_height, f"{self.es_sync_height} != {state.es_sync_height}" async def all_utxos(self, hashX): """Return all UTXOs for an address sorted in no particular order.""" diff --git a/tests/integration/blockchain/test_wallet_server_sessions.py b/tests/integration/blockchain/test_wallet_server_sessions.py index 4f7930c051..139a0bf0bb 100644 --- a/tests/integration/blockchain/test_wallet_server_sessions.py +++ b/tests/integration/blockchain/test_wallet_server_sessions.py @@ -5,7 +5,7 @@ from lbry.error import ServerPaymentFeeAboveMaxAllowedError from lbry.wallet.network import ClientSession from lbry.wallet.rpc import RPCError -from lbry.wallet.server.db.elasticsearch.sync import run_sync, make_es_index +from lbry.wallet.server.db.elasticsearch.sync import make_es_index_and_run_sync from lbry.wallet.server.session import LBRYElectrumX from lbry.testcase import IntegrationTestCase, CommandTestCase from lbry.wallet.orchstr8.node import SPVNode @@ -95,16 +95,17 @@ async def test_es_sync_utility(self): await self.generate(1) self.assertEqual(10, len(await self.claim_search(order_by=['height']))) db = self.conductor.spv_node.server.db + env = self.conductor.spv_node.server.env + await db.search_index.delete_index() db.search_index.clear_caches() self.assertEqual(0, len(await self.claim_search(order_by=['height']))) await db.search_index.stop() - self.assertTrue(await make_es_index(db.search_index)) async def resync(): await db.search_index.start() db.search_index.clear_caches() - await run_sync(index_name=db.search_index.index, db=db) + await make_es_index_and_run_sync(env, db=db, index_name=db.search_index.index, force=True) self.assertEqual(10, len(await self.claim_search(order_by=['height']))) self.assertEqual(0, len(await self.claim_search(order_by=['height']))) @@ -114,9 +115,12 @@ async def resync(): # this time we will test a migration from unversioned to v1 await db.search_index.sync_client.indices.delete_template(db.search_index.index) await db.search_index.stop() - self.assertTrue(await make_es_index(db.search_index)) + + await make_es_index_and_run_sync(env, db=db, index_name=db.search_index.index, force=True) await db.search_index.start() + await resync() + self.assertEqual(10, len(await self.claim_search(order_by=['height']))) class TestHubDiscovery(CommandTestCase):