Skip to content

Commit

Permalink
Merge 2488316 into 6bef09a
Browse files Browse the repository at this point in the history
  • Loading branch information
jackrobison committed Oct 21, 2021
2 parents 6bef09a + 2488316 commit 40bef61
Show file tree
Hide file tree
Showing 12 changed files with 271 additions and 170 deletions.
1 change: 1 addition & 0 deletions docker/Dockerfile.wallet_server
Expand Up @@ -20,6 +20,7 @@ RUN apt-get update && \
python3-dev \
python3-pip \
python3-wheel \
python3-cffi \
python3-setuptools && \
update-alternatives --install /usr/bin/pip pip /usr/bin/pip3 1 && \
rm -rf /var/lib/apt/lists/*
Expand Down
14 changes: 9 additions & 5 deletions docker/docker-compose-wallet-server.yml
Expand Up @@ -18,14 +18,18 @@ services:
- "wallet_server:/database"
environment:
- DAEMON_URL=http://lbry:lbry@127.0.0.1:9245
- MAX_QUERY_WORKERS=4
- CACHE_MB=1024
- CACHE_ALL_TX_HASHES=
- CACHE_ALL_CLAIM_TXOS=
- MAX_SEND=1000000000000000000
- MAX_RECEIVE=1000000000000000000
- MAX_SESSIONS=100000
- HOST=0.0.0.0
- TCP_PORT=50001
- PROMETHEUS_PORT=2112
- QUERY_TIMEOUT_MS=3000 # how long search queries allowed to run before cancelling, in milliseconds
- TRENDING_ALGORITHMS=variable_decay
- MAX_SEND=10000000000000 # deprecated. leave it high until its removed
- MAX_SUBS=1000000000000 # deprecated. leave it high until its removed
- FILTERING_CHANNEL_IDS=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8
- BLOCKING_CHANNEL_IDS=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6 e4e230b131082f6b10c8f7994bbb83f29e8e6fb9
- BLOCKING_CHANNEL_IDS=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6
es01:
image: docker.elastic.co/elasticsearch/elasticsearch:7.11.0
container_name: es01
Expand Down
4 changes: 2 additions & 2 deletions docker/wallet_server_entrypoint.sh
Expand Up @@ -6,7 +6,7 @@ set -euo pipefail

SNAPSHOT_URL="${SNAPSHOT_URL:-}" #off by default. latest snapshot at https://lbry.com/snapshot/wallet

if [[ -n "$SNAPSHOT_URL" ]] && [[ ! -f /database/claims.db ]]; then
if [[ -n "$SNAPSHOT_URL" ]] && [[ ! -f /database/lbry-leveldb ]]; then
files="$(ls)"
echo "Downloading wallet snapshot from $SNAPSHOT_URL"
wget --no-verbose --trust-server-names --content-disposition "$SNAPSHOT_URL"
Expand All @@ -20,6 +20,6 @@ if [[ -n "$SNAPSHOT_URL" ]] && [[ ! -f /database/claims.db ]]; then
rm "$filename"
fi

/home/lbry/.local/bin/lbry-hub-elastic-sync /database/claims.db
/home/lbry/.local/bin/lbry-hub-elastic-sync
echo 'starting server'
/home/lbry/.local/bin/lbry-hub "$@"
22 changes: 21 additions & 1 deletion lbry/wallet/server/block_processor.py
Expand Up @@ -367,6 +367,7 @@ async def check_and_advance_blocks(self, raw_blocks):
await self.db.search_index.apply_filters(self.db.blocked_streams, self.db.blocked_channels,
self.db.filtered_streams, self.db.filtered_channels)
await self.db.search_index.update_trending_score(self.activation_info_to_send_es)
await self._es_caught_up()
self.db.search_index.clear_caches()
self.touched_claims_to_send_es.clear()
self.removed_claims_to_send_es.clear()
Expand Down Expand Up @@ -1620,6 +1621,7 @@ async def backup_block(self):
else:
self.tx_count = self.db.tx_counts[-1]
self.height -= 1

# self.touched can include other addresses which is
# harmless, but remove None.
self.touched_hashXs.discard(None)
Expand Down Expand Up @@ -1649,8 +1651,15 @@ async def backup_block(self):
self.db.last_flush = now
self.db.last_flush_tx_count = self.db.fs_tx_count

await self.run_in_thread_with_lock(self.db.prefix_db.rollback, self.height + 1)
def rollback():
self.db.prefix_db.rollback(self.height + 1)
self.db.es_sync_height = self.height
self.db.write_db_state()
self.db.prefix_db.unsafe_commit()

await self.run_in_thread_with_lock(rollback)
self.clear_after_advance_or_reorg()
self.db.assert_db_state()

elapsed = self.db.last_flush - start_time
self.logger.warning(f'backup flush #{self.db.hist_flush_count:,d} took {elapsed:.1f}s. '
Expand Down Expand Up @@ -1713,6 +1722,17 @@ async def _process_prefetched_blocks(self):
self.logger.exception("error while processing txs")
raise

async def _es_caught_up(self):
self.db.es_sync_height = self.height

def flush():
assert len(self.db.prefix_db._op_stack) == 0
self.db.write_db_state()
self.db.prefix_db.unsafe_commit()
self.db.assert_db_state()

await self.run_in_thread_with_lock(flush)

async def _first_caught_up(self):
self.logger.info(f'caught up to height {self.height}')
# Flush everything but with first_sync->False state.
Expand Down
13 changes: 2 additions & 11 deletions lbry/wallet/server/cli.py
@@ -1,7 +1,6 @@
import logging
import traceback
import argparse
import importlib
from lbry.wallet.server.env import Env
from lbry.wallet.server.server import Server

Expand All @@ -10,27 +9,19 @@ def get_argument_parser():
parser = argparse.ArgumentParser(
prog="lbry-hub"
)
parser.add_argument("spvserver", type=str, help="Python class path to SPV server implementation.",
nargs="?", default="lbry.wallet.server.coin.LBC")
Env.contribute_to_arg_parser(parser)
return parser


def get_coin_class(spvserver):
spvserver_path, coin_class_name = spvserver.rsplit('.', 1)
spvserver_module = importlib.import_module(spvserver_path)
return getattr(spvserver_module, coin_class_name)


def main():
parser = get_argument_parser()
args = parser.parse_args()
coin_class = get_coin_class(args.spvserver)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s")
logging.info('lbry.server starting')
logging.getLogger('aiohttp').setLevel(logging.WARNING)
logging.getLogger('elasticsearch').setLevel(logging.WARNING)
try:
server = Server(Env(coin_class))
server = Server(Env.from_arg_parser(args))
server.run()
except Exception:
traceback.print_exc()
Expand Down
6 changes: 1 addition & 5 deletions lbry/wallet/server/db/elasticsearch/search.py
Expand Up @@ -42,8 +42,7 @@ def __init__(self, got_version, expected_version):
class SearchIndex:
VERSION = 1

def __init__(self, index_prefix: str, search_timeout=3.0, elastic_host='localhost', elastic_port=9200,
half_life=0.4, whale_threshold=10000, whale_half_life=0.99):
def __init__(self, index_prefix: str, search_timeout=3.0, elastic_host='localhost', elastic_port=9200):
self.search_timeout = search_timeout
self.sync_timeout = 600 # wont hit that 99% of the time, but can hit on a fresh import
self.search_client: Optional[AsyncElasticsearch] = None
Expand All @@ -54,9 +53,6 @@ def __init__(self, index_prefix: str, search_timeout=3.0, elastic_host='localhos
self.search_cache = LRUCache(2 ** 17)
self._elastic_host = elastic_host
self._elastic_port = elastic_port
self._trending_half_life = half_life
self._trending_whale_threshold = whale_threshold
self._trending_whale_half_life = whale_half_life

async def get_index_version(self) -> int:
try:
Expand Down
70 changes: 33 additions & 37 deletions lbry/wallet/server/db/elasticsearch/sync.py
@@ -1,27 +1,28 @@
import os
import argparse
import asyncio
import logging
from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import async_streaming_bulk
from lbry.wallet.server.env import Env
from lbry.wallet.server.coin import LBC
from lbry.wallet.server.leveldb import LevelDB
from lbry.wallet.server.db.elasticsearch.search import SearchIndex, IndexVersionMismatch
from lbry.wallet.server.db.elasticsearch.constants import ALL_FIELDS


async def get_recent_claims(blocks: int, index_name='claims', db=None):
env = Env(LBC)
async def get_recent_claims(env, index_name='claims', db=None):
need_open = db is None
db = db or LevelDB(env)
if need_open:
await db.open_dbs()
try:
if need_open:
await db.open_dbs()
db_state = db.prefix_db.db_state.get()
if db_state.es_sync_height == db_state.height:
return
cnt = 0
state = db.prefix_db.db_state.get()
touched_claims = set()
deleted_claims = set()
for height in range(state.height - blocks + 1, state.height + 1):
for height in range(db_state.es_sync_height, db_state.height + 1):
touched_or_deleted = db.prefix_db.touched_or_deleted.get(height)
touched_claims.update(touched_or_deleted.touched_claims)
deleted_claims.update(touched_or_deleted.deleted_claims)
Expand All @@ -47,19 +48,25 @@ async def get_recent_claims(blocks: int, index_name='claims', db=None):
else:
logging.warning("could not sync claim %s", touched.hex())
if cnt % 10000 == 0:
print(f"{cnt} claims sent")
print("sent %i claims, deleted %i" % (len(touched_claims), len(deleted_claims)))
logging.info("%i claims sent to ES", cnt)

db.es_sync_height = db.db_height
db.write_db_state()
db.prefix_db.unsafe_commit()
db.assert_db_state()

logging.info("finished sending %i claims to ES, deleted %i", cnt, len(touched_claims), len(deleted_claims))
finally:
if need_open:
db.close()


async def get_all_claims(index_name='claims', db=None):
env = Env(LBC)
async def get_all_claims(env, index_name='claims', db=None):
need_open = db is None
db = db or LevelDB(env)
if need_open:
await db.open_dbs()
logging.info("Fetching claims to send ES from leveldb")
try:
cnt = 0
async for claim in db.all_claims_producer():
Expand All @@ -72,40 +79,32 @@ async def get_all_claims(index_name='claims', db=None):
}
cnt += 1
if cnt % 10000 == 0:
print(f"{cnt} claims sent")
logging.info("sent %i claims to ES", cnt)
finally:
if need_open:
db.close()


async def make_es_index(index=None):
env = Env(LBC)
if index is None:
index = SearchIndex('', elastic_host=env.elastic_host, elastic_port=env.elastic_port)

async def make_es_index_and_run_sync(env: Env, clients=32, force=False, db=None, index_name='claims'):
index = SearchIndex(env.es_index_prefix, elastic_host=env.elastic_host, elastic_port=env.elastic_port)
logging.info("ES sync host: %s:%i", env.elastic_host, env.elastic_port)
try:
return await index.start()
created = await index.start()
except IndexVersionMismatch as err:
logging.info(
"dropping ES search index (version %s) for upgrade to version %s", err.got_version, err.expected_version
)
await index.delete_index()
await index.stop()
return await index.start()
created = await index.start()
finally:
index.stop()


async def run_sync(index_name='claims', db=None, clients=32, blocks=0):
env = Env(LBC)
logging.info("ES sync host: %s:%i", env.elastic_host, env.elastic_port)
es = AsyncElasticsearch([{'host': env.elastic_host, 'port': env.elastic_port}])
if blocks > 0:
blocks = min(blocks, 200)
logging.info("Resyncing last %i blocks", blocks)
claim_generator = get_recent_claims(blocks, index_name=index_name, db=db)
if force or created:
claim_generator = get_all_claims(env, index_name=index_name, db=db)
else:
claim_generator = get_all_claims(index_name=index_name, db=db)
claim_generator = get_recent_claims(env, index_name=index_name, db=db)
try:
async for ok, item in async_streaming_bulk(es, claim_generator, request_timeout=600, raise_on_error=False):
if not ok:
Expand All @@ -122,17 +121,14 @@ def run_elastic_sync():

logging.info('lbry.server starting')
parser = argparse.ArgumentParser(prog="lbry-hub-elastic-sync")
# parser.add_argument("db_path", type=str)
parser.add_argument("-c", "--clients", type=int, default=32)
parser.add_argument("-b", "--blocks", type=int, default=0)
parser.add_argument("-f", "--force", default=False, action='store_true')
Env.contribute_to_arg_parser(parser)
args = parser.parse_args()
env = Env.from_arg_parser(args)

# if not args.force and not os.path.exists(args.db_path):
# logging.info("DB path doesnt exist")
# return

if not args.force and not asyncio.run(make_es_index()):
logging.info("ES is already initialized")
if not os.path.exists(os.path.join(args.db_dir, 'lbry-leveldb')):
logging.info("DB path doesnt exist, nothing to sync to ES")
return
asyncio.run(run_sync(clients=args.clients, blocks=args.blocks))

asyncio.run(make_es_index_and_run_sync(env, clients=args.clients, force=args.force))
11 changes: 8 additions & 3 deletions lbry/wallet/server/db/prefixes.py
Expand Up @@ -534,6 +534,7 @@ class DBState(typing.NamedTuple):
hist_flush_count: int
comp_flush_count: int
comp_cursor: int
es_sync_height: int


class ActiveAmountPrefixRow(PrefixRow):
Expand Down Expand Up @@ -1521,7 +1522,7 @@ def pack_item(cls, claim_hash, amount):

class DBStatePrefixRow(PrefixRow):
prefix = DB_PREFIXES.db_state.value
value_struct = struct.Struct(b'>32sLL32sLLBBlll')
value_struct = struct.Struct(b'>32sLL32sLLBBlllL')
key_struct = struct.Struct(b'')

key_part_lambdas = [
Expand All @@ -1539,15 +1540,19 @@ def unpack_key(cls, key: bytes):
@classmethod
def pack_value(cls, genesis: bytes, height: int, tx_count: int, tip: bytes, utxo_flush_count: int, wall_time: int,
first_sync: bool, db_version: int, hist_flush_count: int, comp_flush_count: int,
comp_cursor: int) -> bytes:
comp_cursor: int, es_sync_height: int) -> bytes:
return super().pack_value(
genesis, height, tx_count, tip, utxo_flush_count,
wall_time, 1 if first_sync else 0, db_version, hist_flush_count,
comp_flush_count, comp_cursor
comp_flush_count, comp_cursor, es_sync_height
)

@classmethod
def unpack_value(cls, data: bytes) -> DBState:
if len(data) == 94:
# TODO: delete this after making a new snapshot - 10/20/21
# migrate in the es_sync_height if it doesnt exist
data += data[32:36]
return DBState(*super().unpack_value(data))

@classmethod
Expand Down
4 changes: 2 additions & 2 deletions lbry/wallet/server/db/revertable.py
Expand Up @@ -121,14 +121,14 @@ def append_op(self, op: RevertableOp):
elif op.is_delete and has_stored_val and stored_val != op.value and not will_delete_existing_stored:
# there is a value and we're not deleting it in this op
# check that a delete for the stored value is in the stack
raise OpStackIntegrity(f"delete {op}")
raise OpStackIntegrity(f"db op tries to delete with incorrect existing value {op}")
elif op.is_delete and not has_stored_val:
raise OpStackIntegrity(f"db op tries to delete nonexistent key: {op}")
elif op.is_delete and stored_val != op.value:
raise OpStackIntegrity(f"db op tries to delete with incorrect value: {op}")
except OpStackIntegrity as err:
if op.key[:1] in self._unsafe_prefixes:
log.error(f"skipping over integrity error: {err}")
log.debug(f"skipping over integrity error: {err}")
else:
raise err
self._items[op.key].append(op)
Expand Down

0 comments on commit 40bef61

Please sign in to comment.