Skip to content

Commit

Permalink
Merge 782f4bf into ddbbb6f
Browse files Browse the repository at this point in the history
  • Loading branch information
cristi-zz committed Oct 28, 2021
2 parents ddbbb6f + 782f4bf commit ee185a7
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 16 deletions.
8 changes: 6 additions & 2 deletions lbry/blob_exchange/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
from lbry.utils import cache_concurrent
from lbry.blob_exchange.client import request_blob
from lbry.dht.node import get_kademlia_peers_from_hosts
if typing.TYPE_CHECKING:
from lbry.conf import Config
from lbry.dht.node import Node
Expand Down Expand Up @@ -133,11 +134,14 @@ def close(self):
protocol.close()


async def download_blob(loop, config: 'Config', blob_manager: 'BlobManager', node: 'Node',
async def download_blob(loop, config: 'Config', blob_manager: 'BlobManager', dht_node: 'Node',
blob_hash: str) -> 'AbstractBlob':
search_queue = asyncio.Queue(loop=loop, maxsize=config.max_connections_per_download)
search_queue.put_nowait(blob_hash)
peer_queue, accumulate_task = node.accumulate_peers(search_queue)
peer_queue, accumulate_task = dht_node.accumulate_peers(search_queue)
fixed_peers = None if not config.fixed_peers else await get_kademlia_peers_from_hosts(config.fixed_peers)
if fixed_peers:
loop.call_later(config.fixed_peer_delay, peer_queue.put_nowait, fixed_peers)
downloader = BlobDownloader(loop, config, blob_manager, peer_queue)
try:
return await downloader.download_blob(blob_hash)
Expand Down
7 changes: 7 additions & 0 deletions lbry/dht/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,3 +257,10 @@ def accumulate_peers(self, search_queue: asyncio.Queue,
) -> typing.Tuple[asyncio.Queue, asyncio.Task]:
queue = peer_queue or asyncio.Queue(loop=self.loop)
return queue, self.loop.create_task(self._accumulate_peers_for_value(search_queue, queue))


async def get_kademlia_peers_from_hosts(peer_list: typing.List[typing.Tuple[str, int]]) -> typing.List['KademliaPeer']:
peer_address_list = [(await resolve_host(url, port, proto='tcp'), port) for url, port in peer_list]
kademlia_peer_list = [make_kademlia_peer(None, address, None, tcp_port=port, allow_localhost=True)
for address, port in peer_address_list]
return kademlia_peer_list
19 changes: 6 additions & 13 deletions lbry/stream/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import logging
import binascii

from lbry.dht.peer import make_kademlia_peer
from lbry.dht.node import get_kademlia_peers_from_hosts
from lbry.error import DownloadSDTimeoutError
from lbry.utils import resolve_host, lru_cache_concurrent
from lbry.utils import lru_cache_concurrent
from lbry.stream.descriptor import StreamDescriptor
from lbry.blob_exchange.downloader import BlobDownloader
if typing.TYPE_CHECKING:
Expand Down Expand Up @@ -48,26 +48,19 @@ async def cached_read_blob(blob_info: 'BlobInfo') -> bytes:
self.cached_read_blob = cached_read_blob

async def add_fixed_peers(self):
def _delayed_add_fixed_peers():
def _add_fixed_peers(fixed_peers):
self.peer_queue.put_nowait(fixed_peers)
self.added_fixed_peers = True
self.peer_queue.put_nowait([
make_kademlia_peer(None, address, None, tcp_port=port, allow_localhost=True)
for address, port in addresses
])

if not self.config.fixed_peers:
return
addresses = [
(await resolve_host(url, port, proto='tcp'), port)
for url, port in self.config.fixed_peers
]
if 'dht' in self.config.components_to_skip or not self.node or not \
len(self.node.protocol.routing_table.get_peers()) > 0:
self.fixed_peers_delay = 0.0
else:
self.fixed_peers_delay = self.config.fixed_peer_delay

self.fixed_peers_handle = self.loop.call_later(self.fixed_peers_delay, _delayed_add_fixed_peers)
fixed_peers = await get_kademlia_peers_from_hosts(self.config.fixed_peers)
self.fixed_peers_handle = self.loop.call_later(self.fixed_peers_delay, _add_fixed_peers, fixed_peers)

async def load_descriptor(self, connection_id: int = 0):
# download or get the sd blob
Expand Down
35 changes: 34 additions & 1 deletion tests/unit/blob_exchange/test_transfer_blob.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
import asyncio
import tempfile
from io import BytesIO
from unittest import mock

import shutil
import os
import copy

from lbry.blob_exchange.serialization import BlobRequest
from lbry.testcase import AsyncioTestCase
from lbry.conf import Config
from lbry.extras.daemon.storage import SQLiteStorage
from lbry.extras.daemon.daemon import Daemon
from lbry.blob.blob_manager import BlobManager
from lbry.blob_exchange.server import BlobServer, BlobServerProtocol
from lbry.blob_exchange.client import request_blob
from lbry.dht.peer import PeerManager, make_kademlia_peer

from lbry.dht.node import Node

# import logging
# logging.getLogger("lbry").setLevel(logging.DEBUG)
Expand Down Expand Up @@ -326,3 +329,33 @@ async def sendfile(writer):
with self.assertRaises(asyncio.CancelledError):
await request_blob(self.loop, client_blob, self.server_from_client.address,
self.server_from_client.tcp_port, 2, 3)

async def test_download_blob_using_jsonrpc_blob_get(self):
blob_hash = "7f5ab2def99f0ddd008da71db3a3772135f4002b19b7605840ed1034c8955431bd7079549e65e6b2a3b9c17c773073ed"
mock_blob_bytes = b'1' * ((2 * 2 ** 20) - 1)
await self._add_blob_to_server(blob_hash, mock_blob_bytes)

# setup RPC Daemon
daemon_config = copy.deepcopy(self.client_config)
daemon_config.fixed_peers = [(self.server_from_client.address, self.server_from_client.tcp_port)]
daemon = Daemon(daemon_config)

mock_node = mock.Mock(spec=Node)

def _mock_accumulate_peers(q1, q2=None):
async def _task():
pass
q2 = q2 or asyncio.Queue(loop=self.loop)
return q2, self.loop.create_task(_task())

mock_node.accumulate_peers = _mock_accumulate_peers
with mock.patch('lbry.extras.daemon.componentmanager.ComponentManager.all_components_running',
return_value=True):
with mock.patch('lbry.extras.daemon.daemon.Daemon.dht_node', new_callable=mock.PropertyMock) \
as daemon_mock_dht:
with mock.patch('lbry.extras.daemon.daemon.Daemon.blob_manager', new_callable=mock.PropertyMock) \
as daemon_mock_blob_manager:
daemon_mock_dht.return_value = mock_node
daemon_mock_blob_manager.return_value = self.client_blob_manager
result = await daemon.jsonrpc_blob_get(blob_hash)
self.assertIsNotNone(result)

0 comments on commit ee185a7

Please sign in to comment.