Skip to content

Commit

Permalink
Merge fcffe1b into 01cd95f
Browse files Browse the repository at this point in the history
  • Loading branch information
moodyjon committed Aug 11, 2022
2 parents 01cd95f + fcffe1b commit 7e07bac
Show file tree
Hide file tree
Showing 46 changed files with 184 additions and 185 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: ci
on: ["push", "pull_request"]
on: ["push", "pull_request", "workflow_dispatch"]

jobs:

Expand All @@ -10,7 +10,7 @@ jobs:
- uses: actions/checkout@v2
- uses: actions/setup-python@v1
with:
python-version: '3.7'
python-version: '3.9'
- name: extract pip cache
uses: actions/cache@v2
with:
Expand All @@ -34,7 +34,7 @@ jobs:
- uses: actions/checkout@v2
- uses: actions/setup-python@v1
with:
python-version: '3.7'
python-version: '3.9'
- name: set pip cache dir
id: pip-cache
run: echo "::set-output name=dir::$(pip cache dir)"
Expand Down Expand Up @@ -96,7 +96,7 @@ jobs:
- uses: actions/checkout@v2
- uses: actions/setup-python@v1
with:
python-version: '3.7'
python-version: '3.9'
- if: matrix.test == 'other'
run: |
sudo apt-get update
Expand Down Expand Up @@ -138,15 +138,15 @@ jobs:
strategy:
matrix:
os:
- ubuntu-18.04
- ubuntu-20.04
- macos-latest
- windows-latest
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v1
with:
python-version: '3.7'
python-version: '3.9'
- id: os-name
uses: ASzc/change-string-case-action@v1
with:
Expand Down
4 changes: 2 additions & 2 deletions lbry/blob/blob_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ def __init__(
self.blob_completed_callback = blob_completed_callback
self.blob_directory = blob_directory
self.writers: typing.Dict[typing.Tuple[typing.Optional[str], typing.Optional[int]], HashBlobWriter] = {}
self.verified: asyncio.Event = asyncio.Event(loop=self.loop)
self.writing: asyncio.Event = asyncio.Event(loop=self.loop)
self.verified: asyncio.Event = asyncio.Event()
self.writing: asyncio.Event = asyncio.Event()
self.readers: typing.List[typing.BinaryIO] = []
self.added_on = added_on or time.time()
self.is_mine = is_mine
Expand Down
8 changes: 4 additions & 4 deletions lbry/blob_exchange/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(self, loop: asyncio.AbstractEventLoop, peer_timeout: typing.Optiona
self.buf = b''

# this is here to handle the race when the downloader is closed right as response_fut gets a result
self.closed = asyncio.Event(loop=self.loop)
self.closed = asyncio.Event()

def data_received(self, data: bytes):
if self.connection_manager:
Expand Down Expand Up @@ -111,7 +111,7 @@ async def _download_blob(self) -> typing.Tuple[int, Optional['BlobExchangeClient
self.transport.write(msg)
if self.connection_manager:
self.connection_manager.sent_data(f"{self.peer_address}:{self.peer_port}", len(msg))
response: BlobResponse = await asyncio.wait_for(self._response_fut, self.peer_timeout, loop=self.loop)
response: BlobResponse = await asyncio.wait_for(self._response_fut, self.peer_timeout)
availability_response = response.get_availability_response()
price_response = response.get_price_response()
blob_response = response.get_blob_response()
Expand Down Expand Up @@ -151,7 +151,7 @@ async def _download_blob(self) -> typing.Tuple[int, Optional['BlobExchangeClient
f" timeout in {self.peer_timeout}"
log.debug(msg)
msg = f"downloaded {self.blob.blob_hash[:8]} from {self.peer_address}:{self.peer_port}"
await asyncio.wait_for(self.writer.finished, self.peer_timeout, loop=self.loop)
await asyncio.wait_for(self.writer.finished, self.peer_timeout)
# wait for the io to finish
await self.blob.verified.wait()
log.info("%s at %fMB/s", msg,
Expand Down Expand Up @@ -244,7 +244,7 @@ async def request_blob(loop: asyncio.AbstractEventLoop, blob: Optional['Abstract
try:
if not connected_protocol:
await asyncio.wait_for(loop.create_connection(lambda: protocol, address, tcp_port),
peer_connect_timeout, loop=loop)
peer_connect_timeout)
connected_protocol = protocol
if blob is None or blob.get_is_verified() or not blob.is_writeable():
# blob is None happens when we are just opening a connection
Expand Down
8 changes: 4 additions & 4 deletions lbry/blob_exchange/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manag
self.failures: typing.Dict['KademliaPeer', int] = {}
self.connection_failures: typing.Set['KademliaPeer'] = set()
self.connections: typing.Dict['KademliaPeer', 'BlobExchangeClientProtocol'] = {}
self.is_running = asyncio.Event(loop=self.loop)
self.is_running = asyncio.Event()

def should_race_continue(self, blob: 'AbstractBlob'):
max_probes = self.config.max_connections_per_download * (1 if self.connections else 10)
Expand Down Expand Up @@ -64,8 +64,8 @@ async def request_blob_from_peer(self, blob: 'AbstractBlob', peer: 'KademliaPeer
self.scores[peer] = bytes_received / elapsed if bytes_received and elapsed else 1

async def new_peer_or_finished(self):
active_tasks = list(self.active_connections.values()) + [asyncio.sleep(1)]
await asyncio.wait(active_tasks, loop=self.loop, return_when='FIRST_COMPLETED')
active_tasks = list(self.active_connections.values()) + [asyncio.create_task(asyncio.sleep(1))]
await asyncio.wait(active_tasks, return_when='FIRST_COMPLETED')

def cleanup_active(self):
if not self.active_connections and not self.connections:
Expand Down Expand Up @@ -126,7 +126,7 @@ def close(self):

async def download_blob(loop, config: 'Config', blob_manager: 'BlobManager', dht_node: 'Node',
blob_hash: str) -> 'AbstractBlob':
search_queue = asyncio.Queue(loop=loop, maxsize=config.max_connections_per_download)
search_queue = asyncio.Queue(maxsize=config.max_connections_per_download)
search_queue.put_nowait(blob_hash)
peer_queue, accumulate_task = dht_node.accumulate_peers(search_queue)
fixed_peers = None if not config.fixed_peers else await get_kademlia_peers_from_hosts(config.fixed_peers)
Expand Down
12 changes: 6 additions & 6 deletions lbry/blob_exchange/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,19 @@ def __init__(self, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager',
self.idle_timeout = idle_timeout
self.transfer_timeout = transfer_timeout
self.server_task: typing.Optional[asyncio.Task] = None
self.started_listening = asyncio.Event(loop=self.loop)
self.started_listening = asyncio.Event()
self.buf = b''
self.transport: typing.Optional[asyncio.Transport] = None
self.lbrycrd_address = lbrycrd_address
self.peer_address_and_port: typing.Optional[str] = None
self.started_transfer = asyncio.Event(loop=self.loop)
self.transfer_finished = asyncio.Event(loop=self.loop)
self.started_transfer = asyncio.Event()
self.transfer_finished = asyncio.Event()
self.close_on_idle_task: typing.Optional[asyncio.Task] = None

async def close_on_idle(self):
while self.transport:
try:
await asyncio.wait_for(self.started_transfer.wait(), self.idle_timeout, loop=self.loop)
await asyncio.wait_for(self.started_transfer.wait(), self.idle_timeout)
except asyncio.TimeoutError:
log.debug("closing idle connection from %s", self.peer_address_and_port)
return self.close()
Expand Down Expand Up @@ -101,7 +101,7 @@ async def handle_request(self, request: BlobRequest):
log.debug("send %s to %s:%i", blob_hash, peer_address, peer_port)
self.started_transfer.set()
try:
sent = await asyncio.wait_for(blob.sendfile(self), self.transfer_timeout, loop=self.loop)
sent = await asyncio.wait_for(blob.sendfile(self), self.transfer_timeout)
if sent and sent > 0:
self.blob_manager.connection_manager.sent_data(self.peer_address_and_port, sent)
log.info("sent %s (%i bytes) to %s:%i", blob_hash, sent, peer_address, peer_port)
Expand Down Expand Up @@ -157,7 +157,7 @@ def __init__(self, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager',
self.loop = loop
self.blob_manager = blob_manager
self.server_task: typing.Optional[asyncio.Task] = None
self.started_listening = asyncio.Event(loop=self.loop)
self.started_listening = asyncio.Event()
self.lbrycrd_address = lbrycrd_address
self.idle_timeout = idle_timeout
self.transfer_timeout = transfer_timeout
Expand Down
2 changes: 1 addition & 1 deletion lbry/connection_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async def _update(self):

while True:
last = time.perf_counter()
await asyncio.sleep(0.1, loop=self.loop)
await asyncio.sleep(0.1)
self._status['incoming_bps'].clear()
self._status['outgoing_bps'].clear()
now = time.perf_counter()
Expand Down
6 changes: 2 additions & 4 deletions lbry/dht/blob_announcer.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,13 @@ async def _run_consumer(self):
log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers)
except Exception as err:
self.announcements_sent_metric.labels(peers=0, error=True).inc()
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise err
log.warning("error announcing %s: %s", blob_hash[:8], str(err))

async def _announce(self, batch_size: typing.Optional[int] = 10):
while batch_size:
if not self.node.joined.is_set():
await self.node.joined.wait()
await asyncio.sleep(60, loop=self.loop)
await asyncio.sleep(60)
if not self.node.protocol.routing_table.get_peers():
log.warning("No peers in DHT, announce round skipped")
continue
Expand All @@ -59,7 +57,7 @@ async def _announce(self, batch_size: typing.Optional[int] = 10):
log.debug("announcer task wake up, %d blobs to announce", len(self.announce_queue))
while len(self.announce_queue) > 0:
log.info("%i blobs to announce", len(self.announce_queue))
await asyncio.gather(*[self._run_consumer() for _ in range(batch_size)], loop=self.loop)
await asyncio.gather(*[self._run_consumer() for _ in range(batch_size)])
announced = list(filter(None, self.announced))
if announced:
await self.storage.update_last_announced_blobs(announced)
Expand Down
10 changes: 5 additions & 5 deletions lbry/dht/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
self.protocol = KademliaProtocol(loop, peer_manager, node_id, external_ip, udp_port, peer_port, rpc_timeout,
split_buckets_under_index)
self.listening_port: asyncio.DatagramTransport = None
self.joined = asyncio.Event(loop=self.loop)
self.joined = asyncio.Event()
self._join_task: asyncio.Task = None
self._refresh_task: asyncio.Task = None
self._storage = storage
Expand Down Expand Up @@ -115,7 +115,7 @@ async def announce_blob(self, blob_hash: str) -> typing.List[bytes]:
for peer in peers:
log.debug("store to %s %s %s", peer.address, peer.udp_port, peer.tcp_port)
stored_to_tup = await asyncio.gather(
*(self.protocol.store_to_peer(hash_value, peer) for peer in peers), loop=self.loop
*(self.protocol.store_to_peer(hash_value, peer) for peer in peers)
)
stored_to = [node_id for node_id, contacted in stored_to_tup if contacted]
if stored_to:
Expand Down Expand Up @@ -189,14 +189,14 @@ def peers_from_urls(urls: typing.Optional[typing.List[typing.Tuple[bytes, str, i
for address, udp_port in known_node_urls or []
]))
except socket.gaierror:
await asyncio.sleep(30, loop=self.loop)
await asyncio.sleep(30)
continue

self.protocol.peer_manager.reset()
self.protocol.ping_queue.enqueue_maybe_ping(*seed_peers, delay=0.0)
await self.peer_search(self.protocol.node_id, shortlist=seed_peers, count=32)

await asyncio.sleep(1, loop=self.loop)
await asyncio.sleep(1)

def start(self, interface: str, known_node_urls: typing.Optional[typing.List[typing.Tuple[str, int]]] = None):
self._join_task = self.loop.create_task(self.join_network(interface, known_node_urls))
Expand Down Expand Up @@ -280,7 +280,7 @@ async def put_into_result_queue_after_pong(_peer):
def accumulate_peers(self, search_queue: asyncio.Queue,
peer_queue: typing.Optional[asyncio.Queue] = None
) -> typing.Tuple[asyncio.Queue, asyncio.Task]:
queue = peer_queue or asyncio.Queue(loop=self.loop)
queue = peer_queue or asyncio.Queue()
return queue, self.loop.create_task(self._accumulate_peers_for_value(search_queue, queue))


Expand Down
2 changes: 1 addition & 1 deletion lbry/dht/protocol/iterative_find.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
self.contacted: typing.Set['KademliaPeer'] = set()
self.distance = Distance(key)

self.iteration_queue = asyncio.Queue(loop=self.loop)
self.iteration_queue = asyncio.Queue()

self.running_probes: typing.Dict['KademliaPeer', asyncio.Task] = {}
self.iteration_count = 0
Expand Down
8 changes: 4 additions & 4 deletions lbry/dht/protocol/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ async def _process(self): # send up to 1 ping per second
del self._pending_contacts[peer]
self.maybe_ping(peer)
break
await asyncio.sleep(1, loop=self._loop)
await asyncio.sleep(1)

def start(self):
assert not self._running
Expand Down Expand Up @@ -314,10 +314,10 @@ def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
self.ping_queue = PingQueue(self.loop, self)
self.node_rpc = KademliaRPC(self, self.loop, self.peer_port)
self.rpc_timeout = rpc_timeout
self._split_lock = asyncio.Lock(loop=self.loop)
self._split_lock = asyncio.Lock()
self._to_remove: typing.Set['KademliaPeer'] = set()
self._to_add: typing.Set['KademliaPeer'] = set()
self._wakeup_routing_task = asyncio.Event(loop=self.loop)
self._wakeup_routing_task = asyncio.Event()
self.maintaing_routing_task: typing.Optional[asyncio.Task] = None

@functools.lru_cache(128)
Expand Down Expand Up @@ -443,7 +443,7 @@ async def routing_table_task(self):
while self._to_add:
async with self._split_lock:
await self._add_peer(self._to_add.pop())
await asyncio.gather(self._wakeup_routing_task.wait(), asyncio.sleep(.1, loop=self.loop), loop=self.loop)
await asyncio.gather(self._wakeup_routing_task.wait(), asyncio.sleep(.1))
self._wakeup_routing_task.clear()

def _handle_rpc(self, sender_contact: 'KademliaPeer', message: RequestDatagram):
Expand Down
2 changes: 1 addition & 1 deletion lbry/extras/daemon/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def __lt__(self, other):
def running(self):
return self._running

async def get_status(self):
async def get_status(self): # pylint: disable=no-self-use
return

async def start(self):
Expand Down
6 changes: 3 additions & 3 deletions lbry/extras/daemon/componentmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def __init__(self, conf: Config, analytics_manager=None, skip_components=None,
self.analytics_manager = analytics_manager
self.component_classes = {}
self.components = set()
self.started = asyncio.Event(loop=self.loop)
self.started = asyncio.Event()
self.peer_manager = peer_manager or PeerManager(asyncio.get_event_loop_policy().get_event_loop())

for component_name, component_class in self.default_component_classes.items():
Expand Down Expand Up @@ -118,7 +118,7 @@ async def start(self):
component._setup() for component in stage if not component.running
]
if needing_start:
await asyncio.wait(needing_start)
await asyncio.wait(map(asyncio.create_task, needing_start))
self.started.set()

async def stop(self):
Expand All @@ -131,7 +131,7 @@ async def stop(self):
component._stop() for component in stage if component.running
]
if needing_stop:
await asyncio.wait(needing_stop)
await asyncio.wait(map(asyncio.create_task, needing_stop))

def all_components_running(self, *component_names):
"""
Expand Down
8 changes: 3 additions & 5 deletions lbry/extras/daemon/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ async def start(self):
log.info('Done setting up file manager')

async def stop(self):
self.file_manager.stop()
await self.file_manager.stop()


class BackgroundDownloaderComponent(Component):
Expand Down Expand Up @@ -558,7 +558,7 @@ async def _repeatedly_maintain_redirects(self, now=True):
while True:
if now:
await self._maintain_redirects()
await asyncio.sleep(360, loop=self.component_manager.loop)
await asyncio.sleep(360)

async def _maintain_redirects(self):
# setup the gateway if necessary
Expand All @@ -567,8 +567,6 @@ async def _maintain_redirects(self):
self.upnp = await UPnP.discover(loop=self.component_manager.loop)
log.info("found upnp gateway: %s", self.upnp.gateway.manufacturer_string)
except Exception as err:
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise
log.warning("upnp discovery failed: %s", err)
self.upnp = None

Expand Down Expand Up @@ -680,7 +678,7 @@ async def stop(self):
log.info("Removing upnp redirects: %s", self.upnp_redirects)
await asyncio.wait([
self.upnp.delete_port_mapping(port, protocol) for protocol, port in self.upnp_redirects.items()
], loop=self.component_manager.loop)
])
if self._maintain_redirects_task and not self._maintain_redirects_task.done():
self._maintain_redirects_task.cancel()

Expand Down
5 changes: 3 additions & 2 deletions lbry/extras/daemon/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,8 @@ async def handle_old_jsonrpc(self, request):
content_type='application/json'
)

async def handle_metrics_get_request(self, request: web.Request):
@staticmethod
async def handle_metrics_get_request(request: web.Request):
try:
return web.Response(
text=prom_generate_latest().decode(),
Expand Down Expand Up @@ -4976,7 +4977,7 @@ async def jsonrpc_peer_list(self, blob_hash, page=None, page_size=None):
if not is_valid_blobhash(blob_hash):
# TODO: use error from lbry.error
raise Exception("invalid blob hash")
peer_q = asyncio.Queue(loop=self.component_manager.loop)
peer_q = asyncio.Queue()
if self.component_manager.has_component(TRACKER_ANNOUNCER_COMPONENT):
tracker = self.component_manager.get_component(TRACKER_ANNOUNCER_COMPONENT)
tracker_peers = await tracker.get_kademlia_peer_list(bytes.fromhex(blob_hash))
Expand Down
2 changes: 0 additions & 2 deletions lbry/extras/daemon/exchange_rate_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ async def get_rate(self):
self.rate = ExchangeRate(self.market, rate, int(time.time()))
self.last_check = time.time()
return self.rate
except asyncio.CancelledError:
raise
except asyncio.TimeoutError:
log.warning("Timed out fetching exchange rate from %s.", self.name)
except json.JSONDecodeError as e:
Expand Down
2 changes: 1 addition & 1 deletion lbry/extras/daemon/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ def _save_claims(transaction):

await self.db.run(_save_claims)
if update_file_callbacks:
await asyncio.wait(update_file_callbacks)
await asyncio.wait(map(asyncio.create_task, update_file_callbacks))
if claim_id_to_supports:
await self.save_supports(claim_id_to_supports)

Expand Down
Loading

0 comments on commit 7e07bac

Please sign in to comment.