Skip to content

Commit

Permalink
skip loading tx/claim caches in the elastic sync script when not needed
Browse files Browse the repository at this point in the history
  • Loading branch information
jackrobison committed Oct 22, 2021
1 parent 48505c2 commit 5014ef9
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 13 deletions.
4 changes: 3 additions & 1 deletion lbry/wallet/server/block_processor.py
Expand Up @@ -1754,11 +1754,13 @@ async def fetch_and_process_blocks(self, caught_up_event):

self._caught_up_event = caught_up_event
try:
await self.db.open_dbs()
self.db.open_db()
self.height = self.db.db_height
self.tip = self.db.db_tip
self.tx_count = self.db.db_tx_count
self.status_server.set_height(self.db.fs_height, self.db.db_tip)
await self.db.initialize_caches()
await self.db.search_index.start()
await asyncio.wait([
self.prefetcher.main_loop(self.height),
self._process_prefetched_blocks()
Expand Down
12 changes: 7 additions & 5 deletions lbry/wallet/server/db/elasticsearch/sync.py
Expand Up @@ -15,14 +15,15 @@ async def get_recent_claims(env, index_name='claims', db=None):
db = db or LevelDB(env)
try:
if need_open:
await db.open_dbs()
db_state = db.prefix_db.db_state.get()
if db_state.es_sync_height == db_state.height:
db.open_db()
if db.es_sync_height == db.db_height or db.db_height <= 0:
return
if need_open:
await db.initialize_caches()
cnt = 0
touched_claims = set()
deleted_claims = set()
for height in range(db_state.es_sync_height, db_state.height + 1):
for height in range(db.es_sync_height, db.db_height + 1):
touched_or_deleted = db.prefix_db.touched_or_deleted.get(height)
touched_claims.update(touched_or_deleted.touched_claims)
deleted_claims.update(touched_or_deleted.deleted_claims)
Expand Down Expand Up @@ -65,7 +66,8 @@ async def get_all_claims(env, index_name='claims', db=None):
need_open = db is None
db = db or LevelDB(env)
if need_open:
await db.open_dbs()
db.open_db()
await db.initialize_caches()
logging.info("Fetching claims to send ES from leveldb")
try:
cnt = 0
Expand Down
10 changes: 4 additions & 6 deletions lbry/wallet/server/leveldb.py
Expand Up @@ -823,7 +823,7 @@ def estimate_timestamp(self, height: int) -> int:
return struct.unpack('<I', self.headers[height][100:104])[0]
return int(160.6855883050695 * height)

async def open_dbs(self):
def open_db(self):
if self.prefix_db and not self.prefix_db.closed:
return

Expand Down Expand Up @@ -856,20 +856,16 @@ async def open_dbs(self):
self.logger.error(msg)
raise RuntimeError(msg)
self.logger.info(f'flush count: {self.hist_flush_count:,d}')

self.utxo_flush_count = self.hist_flush_count

# Read TX counts (requires meta directory)
async def initialize_caches(self):
await self._read_tx_counts()
await self._read_headers()
if self.env.cache_all_claim_txos:
await self._read_claim_txos()
if self.env.cache_all_tx_hashes:
await self._read_tx_hashes()

# start search index
await self.search_index.start()

def close(self):
self.prefix_db.close()

Expand Down Expand Up @@ -1082,6 +1078,7 @@ def read_db_state(self):
self.hist_comp_flush_count = -1
self.hist_comp_cursor = -1
self.hist_db_version = max(self.DB_VERSIONS)
self.es_sync_height = 0
else:
self.db_version = state.db_version
if self.db_version not in self.DB_VERSIONS:
Expand All @@ -1102,6 +1099,7 @@ def read_db_state(self):
self.hist_comp_flush_count = state.comp_flush_count
self.hist_comp_cursor = state.comp_cursor
self.hist_db_version = state.db_version
self.es_sync_height = state.es_sync_height

def assert_db_state(self):
state = self.prefix_db.db_state.get()
Expand Down
3 changes: 2 additions & 1 deletion tests/integration/other/test_chris45.py
Expand Up @@ -199,5 +199,6 @@ async def test_no_this_is_not_a_test_its_an_adventure(self):
# He closes and opens the wallet server databases to see how horribly they break
db = self.conductor.spv_node.server.db
db.close()
await db.open_dbs()
db.open_db()
await db.initialize_caches()
# They didn't! (error would be AssertionError: 276 vs 266 (264 counts) on startup)

0 comments on commit 5014ef9

Please sign in to comment.