diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 3005085b58..141d5c6b1e 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -1351,6 +1351,8 @@ def _get_cumulative_update_ops(self, height: int): set(self.removed_active_support_amount_by_claim.keys()) ).union( set(self.activated_support_amount_by_claim.keys()) + ).union( + set(self.pending_support_amount_change.keys()) ).difference( self.removed_claim_hashes ) @@ -1407,9 +1409,13 @@ def _get_cumulative_update_ops(self, height: int): self.db.prefix_db.effective_amount.stage_put( (name, new_effective_amount, tx_num, position), (touched,) ) - self._add_claim_activation_change_notification( - touched.hex(), height, prev_effective_amount, new_effective_amount - ) + if touched in self.claim_hash_to_txo or touched in self.removed_claim_hashes \ + or touched in self.pending_support_amount_change: + # exclude sending notifications for claims/supports that activated but + # weren't added/spent in this block + self._add_claim_activation_change_notification( + touched.hex(), height, prev_effective_amount, new_effective_amount + ) for channel_hash, count in self.pending_channel_counts.items(): if count != 0: diff --git a/lbry/wallet/server/db/prefixes.py b/lbry/wallet/server/db/prefixes.py index c264016fb8..4dbfe707ef 100644 --- a/lbry/wallet/server/db/prefixes.py +++ b/lbry/wallet/server/db/prefixes.py @@ -1470,7 +1470,7 @@ class ChannelCountPrefixRow(PrefixRow): ] @classmethod - def pack_key(cls, channel_hash: int): + def pack_key(cls, channel_hash: bytes): return super().pack_key(channel_hash) @classmethod @@ -1558,10 +1558,10 @@ def unpack_value(cls, data: bytes) -> DBState: @classmethod def pack_item(cls, genesis: bytes, height: int, tx_count: int, tip: bytes, utxo_flush_count: int, wall_time: int, first_sync: bool, db_version: int, hist_flush_count: int, comp_flush_count: int, - comp_cursor: int): + comp_cursor: int, es_sync_height: int): return cls.pack_key(), cls.pack_value( genesis, height, tx_count, tip, utxo_flush_count, wall_time, first_sync, db_version, hist_flush_count, - comp_flush_count, comp_cursor + comp_flush_count, comp_cursor, es_sync_height ) diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 426778d07a..4203ac5059 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -213,7 +213,7 @@ def _prepare_resolve_result(self, tx_num: int, position: int, claim_hash: bytes, support_amount = self.get_support_amount(claim_hash) claim_amount = self.get_cached_claim_txo(claim_hash).amount - effective_amount = support_amount + claim_amount + effective_amount = self.get_effective_amount(claim_hash) channel_hash = self.get_channel_for_claim(claim_hash, tx_num, position) reposted_claim_hash = self.get_repost(claim_hash) short_url = self.get_short_claim_id_url(name, normalized_name, claim_hash, root_tx_num, root_position) @@ -679,7 +679,7 @@ async def all_claims_producer(self, batch_size=500_000): if claim: batch.append(claim) if len(batch) == batch_size: - batch.sort(key=lambda x: x.tx_hash) + batch.sort(key=lambda x: x.tx_hash) # sort is to improve read-ahead hits for claim in batch: meta = self._prepare_claim_metadata(claim.claim_hash, claim) if meta: diff --git a/tests/integration/takeovers/test_resolve_command.py b/tests/integration/takeovers/test_resolve_command.py index 57112cf929..42e7e3fc02 100644 --- a/tests/integration/takeovers/test_resolve_command.py +++ b/tests/integration/takeovers/test_resolve_command.py @@ -20,6 +20,25 @@ class ClaimStateValue(NamedTuple): class BaseResolveTestCase(CommandTestCase): + def assertMatchESClaim(self, claim_from_es, claim_from_db): + self.assertEqual(claim_from_es['claim_hash'][::-1].hex(), claim_from_db.claim_hash.hex()) + self.assertEqual(claim_from_es['claim_id'], claim_from_db.claim_hash.hex()) + self.assertEqual(claim_from_es['activation_height'], claim_from_db.activation_height) + self.assertEqual(claim_from_es['last_take_over_height'], claim_from_db.last_takeover_height) + self.assertEqual(claim_from_es['tx_id'], claim_from_db.tx_hash[::-1].hex()) + self.assertEqual(claim_from_es['tx_nout'], claim_from_db.position) + self.assertEqual(claim_from_es['amount'], claim_from_db.amount) + self.assertEqual(claim_from_es['effective_amount'], claim_from_db.effective_amount) + + def assertMatchDBClaim(self, expected, claim): + self.assertEqual(expected['claimId'], claim.claim_hash.hex()) + self.assertEqual(expected['validAtHeight'], claim.activation_height) + self.assertEqual(expected['lastTakeoverHeight'], claim.last_takeover_height) + self.assertEqual(expected['txId'], claim.tx_hash[::-1].hex()) + self.assertEqual(expected['n'], claim.position) + self.assertEqual(expected['amount'], claim.amount) + self.assertEqual(expected['effectiveAmount'], claim.effective_amount) + async def assertResolvesToClaimId(self, name, claim_id): other = await self.resolve(name) if claim_id is None: @@ -57,19 +76,17 @@ async def assertMatchWinningClaim(self, name): expected = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name)) stream, channel, _, _ = await self.conductor.spv_node.server.bp.db.resolve(name) claim = stream if stream else channel + await self._assertMatchClaim(expected, claim) + return claim + + async def _assertMatchClaim(self, expected, claim): + self.assertMatchDBClaim(expected, claim) claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search( claim_id=claim.claim_hash.hex() ) self.assertEqual(len(claim_from_es[0]), 1) - self.assertEqual(claim_from_es[0][0]['claim_hash'][::-1].hex(), claim.claim_hash.hex()) - self.assertEqual(expected['claimId'], claim.claim_hash.hex()) - self.assertEqual(expected['validAtHeight'], claim.activation_height) - self.assertEqual(expected['lastTakeoverHeight'], claim.last_takeover_height) - self.assertEqual(expected['txId'], claim.tx_hash[::-1].hex()) - self.assertEqual(expected['n'], claim.position) - self.assertEqual(expected['amount'], claim.amount) - self.assertEqual(expected['effectiveAmount'], claim.effective_amount) - return claim + self.assertMatchESClaim(claim_from_es[0][0], claim) + self._check_supports(claim.claim_hash.hex(), expected['supports'], claim_from_es[0][0]['support_amount']) async def assertMatchClaim(self, claim_id, is_active_in_lbrycrd=True): expected = json.loads(await self.blockchain._cli_cmnd('getclaimbyid', claim_id)) @@ -78,58 +95,61 @@ async def assertMatchClaim(self, claim_id, is_active_in_lbrycrd=True): if not expected: self.assertIsNone(claim) return - self.assertEqual(expected['claimId'], claim.claim_hash.hex()) - self.assertEqual(expected['validAtHeight'], claim.activation_height) - self.assertEqual(expected['lastTakeoverHeight'], claim.last_takeover_height) - self.assertEqual(expected['txId'], claim.tx_hash[::-1].hex()) - self.assertEqual(expected['n'], claim.position) - self.assertEqual(expected['amount'], claim.amount) - self.assertEqual(expected['effectiveAmount'], claim.effective_amount) + self.assertMatchDBClaim(expected, claim) else: self.assertDictEqual({}, expected) - claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search( claim_id=claim.claim_hash.hex() ) self.assertEqual(len(claim_from_es[0]), 1) self.assertEqual(claim_from_es[0][0]['claim_hash'][::-1].hex(), claim.claim_hash.hex()) - self.assertEqual(claim_from_es[0][0]['claim_id'], claim.claim_hash.hex()) - self.assertEqual(claim_from_es[0][0]['activation_height'], claim.activation_height) - self.assertEqual(claim_from_es[0][0]['last_take_over_height'], claim.last_takeover_height) - self.assertEqual(claim_from_es[0][0]['tx_id'], claim.tx_hash[::-1].hex()) - self.assertEqual(claim_from_es[0][0]['tx_nout'], claim.position) - self.assertEqual(claim_from_es[0][0]['amount'], claim.amount) - self.assertEqual(claim_from_es[0][0]['effective_amount'], claim.effective_amount) + self.assertMatchESClaim(claim_from_es[0][0], claim) + self._check_supports( + claim.claim_hash.hex(), expected.get('supports', []), claim_from_es[0][0]['support_amount'], + is_active_in_lbrycrd + ) return claim async def assertMatchClaimIsWinning(self, name, claim_id): self.assertEqual(claim_id, (await self.assertMatchWinningClaim(name)).claim_hash.hex()) - await self.assertMatchClaim(claim_id) await self.assertMatchClaimsForName(name) - async def assertMatchClaimsForName(self, name): - expected = json.loads(await self.blockchain._cli_cmnd('getclaimsforname', name)) - + def _check_supports(self, claim_id, lbrycrd_supports, es_support_amount, is_active_in_lbrycrd=True): + total_amount = 0 db = self.conductor.spv_node.server.bp.db - def check_supports(claim_id, lbrycrd_supports): - for i, (tx_num, position, amount) in enumerate(db.get_supports(bytes.fromhex(claim_id))): + for i, (tx_num, position, amount) in enumerate(db.get_supports(bytes.fromhex(claim_id))): + total_amount += amount + if is_active_in_lbrycrd: support = lbrycrd_supports[i] self.assertEqual(support['txId'], db.prefix_db.tx_hash.get(tx_num, deserialize_value=False)[::-1].hex()) self.assertEqual(support['n'], position) self.assertEqual(support['height'], bisect_right(db.tx_counts, tx_num)) self.assertEqual(support['validAtHeight'], db.get_activation(tx_num, position, is_support=True)) + self.assertEqual(total_amount, es_support_amount, f"lbrycrd support amount: {total_amount} vs es: {es_support_amount}") + + async def assertMatchClaimsForName(self, name): + expected = json.loads(await self.blockchain._cli_cmnd('getclaimsforname', name)) + db = self.conductor.spv_node.server.bp.db # self.assertEqual(len(expected['claims']), len(db_claims.claims)) # self.assertEqual(expected['lastTakeoverHeight'], db_claims.lastTakeoverHeight) + last_takeover = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name))['lastTakeoverHeight'] for c in expected['claims']: - check_supports(c['claimId'], c['supports']) - claim_hash = bytes.fromhex(c['claimId']) - self.assertEqual(c['validAtHeight'], db.get_activation( - db.prefix_db.tx_num.get(bytes.fromhex(c['txId'])[::-1]).tx_num, c['n'] - )) - self.assertEqual(c['effectiveAmount'], db.get_effective_amount(claim_hash)) + c['lastTakeoverHeight'] = last_takeover + claim_id = c['claimId'] + claim_hash = bytes.fromhex(claim_id) + claim = db._fs_get_claim_by_hash(claim_hash) + self.assertMatchDBClaim(c, claim) + + claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search( + claim_id=c['claimId'] + ) + self.assertEqual(len(claim_from_es[0]), 1) + self.assertEqual(claim_from_es[0][0]['claim_hash'][::-1].hex(), c['claimId']) + self.assertMatchESClaim(claim_from_es[0][0], claim) + self._check_supports(c['claimId'], c['supports'], claim_from_es[0][0]['support_amount']) class ResolveCommand(BaseResolveTestCase): @@ -1176,10 +1196,12 @@ async def test_block_takeover_with_delay_1_support(self): self.assertNotEqual(first_claim_id, second_claim_id) # takeover should not have happened yet await self.assertMatchClaimIsWinning(name, first_claim_id) - await self.generate(8) - await self.assertMatchClaimIsWinning(name, first_claim_id) + for _ in range(8): + await self.generate(1) + await self.assertMatchClaimIsWinning(name, first_claim_id) # prevent the takeover by adding a support one block before the takeover happens await self.support_create(first_claim_id, bid='1.0') + await self.assertMatchClaimIsWinning(name, first_claim_id) # one more block until activation await self.generate(1) await self.assertMatchClaimIsWinning(name, first_claim_id) @@ -1322,27 +1344,35 @@ async def test_remove_controlling_support(self): first_support_tx = await self.daemon.jsonrpc_support_create(first_claim_id, '0.9') await self.ledger.wait(first_support_tx) await self.assertMatchClaimIsWinning(name, first_claim_id) - await self.generate(320) # give the first claim long enough for a 10 block takeover delay - await self.assertMatchClaimIsWinning(name, first_claim_id) + await self.assertNameState(527, name, first_claim_id, last_takeover_height=207, non_winning_claims=[]) # make a second claim which will take over the name second_claim_id = (await self.stream_create(name, '0.1', allow_duplicate_name=True))['outputs'][0]['claim_id'] - self.assertNotEqual(first_claim_id, second_claim_id) + await self.assertNameState(528, name, first_claim_id, last_takeover_height=207, non_winning_claims=[ + ClaimStateValue(second_claim_id, activation_height=538, active_in_lbrycrd=False) + ]) + second_claim_support_tx = await self.daemon.jsonrpc_support_create(second_claim_id, '1.5') await self.ledger.wait(second_claim_support_tx) await self.generate(1) # neither the second claim or its support have activated yet - await self.assertMatchClaimIsWinning(name, first_claim_id) - + await self.assertNameState(529, name, first_claim_id, last_takeover_height=207, non_winning_claims=[ + ClaimStateValue(second_claim_id, activation_height=538, active_in_lbrycrd=False) + ]) await self.generate(9) # claim activates, but is not yet winning - await self.assertMatchClaimIsWinning(name, first_claim_id) - + await self.assertNameState(538, name, first_claim_id, last_takeover_height=207, non_winning_claims=[ + ClaimStateValue(second_claim_id, activation_height=538, active_in_lbrycrd=True) + ]) await self.generate(1) # support activates, takeover happens - await self.assertMatchClaimIsWinning(name, second_claim_id) + await self.assertNameState(539, name, second_claim_id, last_takeover_height=539, non_winning_claims=[ + ClaimStateValue(first_claim_id, activation_height=207, active_in_lbrycrd=True) + ]) await self.daemon.jsonrpc_txo_spend(type='support', claim_id=second_claim_id, blocking=True) await self.generate(1) # support activates, takeover happens - await self.assertMatchClaimIsWinning(name, first_claim_id) + await self.assertNameState(540, name, first_claim_id, last_takeover_height=540, non_winning_claims=[ + ClaimStateValue(second_claim_id, activation_height=538, active_in_lbrycrd=True) + ]) async def test_claim_expiration(self): name = 'derp'