From 0ea8ba72dd0384f9289f3cffd0f66a7e87eb8adf Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 6 May 2022 04:32:35 -0300 Subject: [PATCH 1/5] Env->ServerEnv from scribe changes --- lbry/wallet/orchstr8/node.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lbry/wallet/orchstr8/node.py b/lbry/wallet/orchstr8/node.py index 9606c0a37a..a706ec9c9a 100644 --- a/lbry/wallet/orchstr8/node.py +++ b/lbry/wallet/orchstr8/node.py @@ -17,8 +17,9 @@ import urllib.request from uuid import uuid4 + try: - from scribe.env import Env + from scribe.hub.env import ServerEnv from scribe.hub.service import HubServerService from scribe.elasticsearch.service import ElasticSyncService from scribe.blockchain.service import BlockchainProcessorService @@ -260,7 +261,7 @@ async def start(self, lbcwallet_node: 'LBCWalletNode', extraconf=None): } if extraconf: conf.update(extraconf) - env = Env(**conf) + env = ServerEnv(**conf) self.writer = BlockchainProcessorService(env) self.server = HubServerService(env) self.es_writer = ElasticSyncService(env) From ea8adc53676cf6d616ad92f098c45734942271cb Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 9 May 2022 11:34:28 -0400 Subject: [PATCH 2/5] update scribe env and fix tests --- lbry/blob_exchange/server.py | 2 +- lbry/wallet/orchstr8/node.py | 38 ++++++++++++------- .../test_blockchain_reorganization.py | 7 ++-- tests/integration/blockchain/test_network.py | 10 ++--- .../blockchain/test_wallet_server_sessions.py | 4 +- .../takeovers/test_resolve_command.py | 24 ++++++++++++ 6 files changed, 61 insertions(+), 24 deletions(-) diff --git a/lbry/blob_exchange/server.py b/lbry/blob_exchange/server.py index 6e4950eeaa..8ee9212ff5 100644 --- a/lbry/blob_exchange/server.py +++ b/lbry/blob_exchange/server.py @@ -138,7 +138,7 @@ def data_received(self, data): try: request = BlobRequest.deserialize(self.buf + data) self.buf = remainder - except JSONDecodeError: + except (UnicodeDecodeError, JSONDecodeError): log.error("request from %s is not valid json (%i bytes): %s", self.peer_address_and_port, len(self.buf + data), '' if not data else binascii.hexlify(self.buf + data).decode()) self.close() diff --git a/lbry/wallet/orchstr8/node.py b/lbry/wallet/orchstr8/node.py index a706ec9c9a..1f0fd1bcae 100644 --- a/lbry/wallet/orchstr8/node.py +++ b/lbry/wallet/orchstr8/node.py @@ -18,14 +18,6 @@ from uuid import uuid4 -try: - from scribe.hub.env import ServerEnv - from scribe.hub.service import HubServerService - from scribe.elasticsearch.service import ElasticSyncService - from scribe.blockchain.service import BlockchainProcessorService -except ImportError: - pass - import lbry from lbry.wallet import Wallet, Ledger, RegTestLedger, WalletManager, Account, BlockHeightEvent from lbry.conf import KnownHubsList, Config @@ -33,6 +25,16 @@ log = logging.getLogger(__name__) +try: + from hub.herald.env import ServerEnv + from hub.scribe.env import BlockchainEnv + from hub.elastic_sync.env import ElasticEnv + from hub.herald.service import HubServerService + from hub.elastic_sync.service import ElasticSyncService + from hub.scribe.service import BlockchainProcessorService +except ImportError: + pass + def get_lbcd_node_from_ledger(ledger_module): return LBCDNode( @@ -257,14 +259,24 @@ async def start(self, lbcwallet_node: 'LBCWalletNode', extraconf=None): 'session_timeout': self.session_timeout, 'max_query_workers': 0, 'es_index_prefix': self.index_name, - 'chain': 'regtest' + 'chain': 'regtest', + 'index_address_status': False } if extraconf: conf.update(extraconf) - env = ServerEnv(**conf) - self.writer = BlockchainProcessorService(env) - self.server = HubServerService(env) - self.es_writer = ElasticSyncService(env) + self.writer = BlockchainProcessorService( + BlockchainEnv(db_dir=self.data_path, daemon_url=lbcwallet_node.rpc_url, + reorg_limit=100, max_query_workers=0, chain='regtest', index_address_status=False) + ) + self.server = HubServerService(ServerEnv(**conf)) + self.es_writer = ElasticSyncService( + ElasticEnv( + db_dir=self.data_path, reorg_limit=100, max_query_workers=0, chain='regtest', + elastic_notifier_port=self.elastic_notifier_port, + es_index_prefix=self.index_name, filtering_channel_ids=(extraconf or {}).get('filtering_channel_ids'), + blocking_channel_ids=(extraconf or {}).get('blocking_channel_ids') + ) + ) await self.writer.start() await self.es_writer.start() await self.server.start() diff --git a/tests/integration/blockchain/test_blockchain_reorganization.py b/tests/integration/blockchain/test_blockchain_reorganization.py index 10ae90e6e7..a48705c923 100644 --- a/tests/integration/blockchain/test_blockchain_reorganization.py +++ b/tests/integration/blockchain/test_blockchain_reorganization.py @@ -10,20 +10,21 @@ class BlockchainReorganizationTests(CommandTestCase): async def assertBlockHash(self, height): bp = self.conductor.spv_node.writer + reader = self.conductor.spv_node.server def get_txids(): return [ - bp.db.fs_tx_hash(tx_num)[0][::-1].hex() + reader.db.fs_tx_hash(tx_num)[0][::-1].hex() for tx_num in range(bp.db.tx_counts[height - 1], bp.db.tx_counts[height]) ] block_hash = await self.blockchain.get_block_hash(height) self.assertEqual(block_hash, (await self.ledger.headers.hash(height)).decode()) - self.assertEqual(block_hash, (await bp.db.fs_block_hashes(height, 1))[0][::-1].hex()) + self.assertEqual(block_hash, (await reader.db.fs_block_hashes(height, 1))[0][::-1].hex()) txids = await asyncio.get_event_loop().run_in_executor(None, get_txids) - txs = await bp.db.get_transactions_and_merkles(txids) + txs = await reader.db.get_transactions_and_merkles(txids) block_txs = (await bp.daemon.deserialised_block(block_hash))['tx'] self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions') self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order') diff --git a/tests/integration/blockchain/test_network.py b/tests/integration/blockchain/test_network.py index 2301105963..24cd5e8f47 100644 --- a/tests/integration/blockchain/test_network.py +++ b/tests/integration/blockchain/test_network.py @@ -1,12 +1,12 @@ import asyncio -import scribe +import hub from unittest.mock import Mock -from scribe.hub import HUB_PROTOCOL_VERSION -from scribe.hub.udp import StatusServer -from scribe.hub.session import LBRYElectrumX -from scribe.blockchain.network import LBCRegTest +from hub.herald import HUB_PROTOCOL_VERSION +from hub.herald.udp import StatusServer +from hub.herald.session import LBRYElectrumX +from hub.scribe.network import LBCRegTest from lbry.wallet.network import Network from lbry.wallet.orchstr8 import Conductor diff --git a/tests/integration/blockchain/test_wallet_server_sessions.py b/tests/integration/blockchain/test_wallet_server_sessions.py index d09c398f74..5cc7b3902f 100644 --- a/tests/integration/blockchain/test_wallet_server_sessions.py +++ b/tests/integration/blockchain/test_wallet_server_sessions.py @@ -1,7 +1,7 @@ import asyncio -from scribe.hub import HUB_PROTOCOL_VERSION -from scribe.hub.session import LBRYElectrumX +from hub.herald import HUB_PROTOCOL_VERSION +from hub.herald.session import LBRYElectrumX from lbry.error import ServerPaymentFeeAboveMaxAllowedError from lbry.wallet.network import ClientSession diff --git a/tests/integration/takeovers/test_resolve_command.py b/tests/integration/takeovers/test_resolve_command.py index ebef0f9178..5ffe182249 100644 --- a/tests/integration/takeovers/test_resolve_command.py +++ b/tests/integration/takeovers/test_resolve_command.py @@ -326,6 +326,30 @@ async def test_winning_by_effective_amount(self): await self.support_abandon(claim_id1) await self.assertResolvesToClaimId('@foo', claim_id2) + async def test_resolve_duplicate_name_in_channel(self): + db_resolve = self.conductor.spv_node.server.db.resolve + # first one remains winner unless something else changes + channel_id = self.get_claim_id(await self.channel_create('@foo')) + + file_path = self.create_upload_file(data=b'hi!') + tx = await self.daemon.jsonrpc_stream_create('duplicate', '0.1', file_path=file_path, allow_duplicate_name=True, channel_id=channel_id) + await self.ledger.wait(tx) + + first_claim = tx.outputs[0].claim_id + + file_path = self.create_upload_file(data=b'hi!') + tx = await self.daemon.jsonrpc_stream_create('duplicate', '0.1', file_path=file_path, allow_duplicate_name=True, channel_id=channel_id) + await self.ledger.wait(tx) + duplicate_claim = tx.outputs[0].claim_id + await self.generate(1) + + stream, channel, _, _ = await db_resolve(f"@foo:{channel_id}/duplicate:{first_claim}") + self.assertEqual(stream.claim_hash.hex(), first_claim) + self.assertEqual(channel.claim_hash.hex(), channel_id) + stream, channel, _, _ = await db_resolve(f"@foo:{channel_id}/duplicate:{duplicate_claim}") + self.assertEqual(stream.claim_hash.hex(), duplicate_claim) + self.assertEqual(channel.claim_hash.hex(), channel_id) + async def test_advanced_resolve(self): claim_id1 = self.get_claim_id( await self.stream_create('foo', '0.7', allow_duplicate_name=True)) From a391fe9fc74a31184aa25d1bf542b6982244c55c Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 27 May 2022 09:44:54 -0400 Subject: [PATCH 3/5] scribe -> hub --- setup.py | 4 ++-- tox.ini | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/setup.py b/setup.py index 484104d0e0..314b38cc50 100644 --- a/setup.py +++ b/setup.py @@ -68,8 +68,8 @@ 'coverage', 'jsonschema==4.4.0', ], - 'scribe': [ - 'scribe @ git+https://github.com/lbryio/scribe.git@311db529a03de7fce43ed8579f51ac23a1a884ea' + 'hub': [ + 'hub@git+https://github.com/lbryio/hub.git@0901f67d89a17b403dd43aed939dd3f056af3d58' ] }, classifiers=[ diff --git a/tox.ini b/tox.ini index 2a2642c56c..b65cfa80d6 100644 --- a/tox.ini +++ b/tox.ini @@ -4,7 +4,7 @@ deps = coverage extras = test - scribe + hub torrent changedir = {toxinidir}/tests setenv = From ac7e94c6edc84117952366d480aa97130149c2f2 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 27 May 2022 09:59:11 -0400 Subject: [PATCH 4/5] pylint --- lbry/wallet/orchstr8/node.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lbry/wallet/orchstr8/node.py b/lbry/wallet/orchstr8/node.py index 1f0fd1bcae..a769ae20ba 100644 --- a/lbry/wallet/orchstr8/node.py +++ b/lbry/wallet/orchstr8/node.py @@ -273,7 +273,8 @@ async def start(self, lbcwallet_node: 'LBCWalletNode', extraconf=None): ElasticEnv( db_dir=self.data_path, reorg_limit=100, max_query_workers=0, chain='regtest', elastic_notifier_port=self.elastic_notifier_port, - es_index_prefix=self.index_name, filtering_channel_ids=(extraconf or {}).get('filtering_channel_ids'), + es_index_prefix=self.index_name, + filtering_channel_ids=(extraconf or {}).get('filtering_channel_ids'), blocking_channel_ids=(extraconf or {}).get('blocking_channel_ids') ) ) From 2313d309963e613ea6cdfd0c741203300fbf30e7 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 27 May 2022 11:58:32 -0400 Subject: [PATCH 5/5] fix reconnect test --- lbry/wallet/network.py | 1 - setup.py | 2 +- tests/integration/blockchain/test_network.py | 3 ++- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lbry/wallet/network.py b/lbry/wallet/network.py index 27c57dd1bc..d3c9879d98 100644 --- a/lbry/wallet/network.py +++ b/lbry/wallet/network.py @@ -348,7 +348,6 @@ async def network_loop(self): await self._keepalive_task if self._urgent_need_reconnect.is_set(): log.warning("urgent reconnect needed") - self._urgent_need_reconnect.clear() if self._keepalive_task and not self._keepalive_task.done(): self._keepalive_task.cancel() except asyncio.CancelledError: diff --git a/setup.py b/setup.py index 314b38cc50..8bbe35e93d 100644 --- a/setup.py +++ b/setup.py @@ -69,7 +69,7 @@ 'jsonschema==4.4.0', ], 'hub': [ - 'hub@git+https://github.com/lbryio/hub.git@0901f67d89a17b403dd43aed939dd3f056af3d58' + 'hub@git+https://github.com/lbryio/hub.git@76dd9c392b776a2823015762814f375794120076' ] }, classifiers=[ diff --git a/tests/integration/blockchain/test_network.py b/tests/integration/blockchain/test_network.py index 24cd5e8f47..bfbcb1f5f6 100644 --- a/tests/integration/blockchain/test_network.py +++ b/tests/integration/blockchain/test_network.py @@ -116,7 +116,7 @@ async def test_connection_drop_still_receives_events_after_reconnected(self): # disconnect and send a new tx, should reconnect and get it self.ledger.network.client.transport.close() self.assertFalse(self.ledger.network.is_connected) - await self.ledger.resolve([], 'derp') + await self.ledger.resolve([], ['derp']) sendtxid = await self.send_to_address_and_wait(address1, 1.1337, 1) self.assertLess(self.ledger.network.client.response_time, 1) # response time properly set lower, we are fine @@ -139,6 +139,7 @@ async def test_connection_drop_still_receives_events_after_reconnected(self): if not self.ledger.network.is_connected: await asyncio.wait_for(self.ledger.network.on_connected.first, timeout=10.0) # omg, the burned cable still works! torba is fire proof! + await self.ledger.on_header.where(self.blockchain.is_expected_block) await self.ledger.network.get_transaction(sendtxid) async def test_timeout_then_reconnect(self):