Skip to content

Commit

Permalink
Merge 69297ea into 6258651
Browse files Browse the repository at this point in the history
  • Loading branch information
moodyjon committed Dec 27, 2022
2 parents 6258651 + 69297ea commit 81996c7
Show file tree
Hide file tree
Showing 27 changed files with 90 additions and 91 deletions.
14 changes: 7 additions & 7 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: ci
on: ["push", "pull_request"]
on: ["push", "pull_request", "workflow_dispatch"]

jobs:

Expand All @@ -10,7 +10,7 @@ jobs:
- uses: actions/checkout@v2
- uses: actions/setup-python@v1
with:
python-version: '3.7'
python-version: '3.9'
- name: extract pip cache
uses: actions/cache@v2
with:
Expand All @@ -34,7 +34,7 @@ jobs:
- uses: actions/checkout@v2
- uses: actions/setup-python@v1
with:
python-version: '3.7'
python-version: '3.9'
- name: set pip cache dir
id: pip-cache
run: echo "::set-output name=dir::$(pip cache dir)"
Expand All @@ -60,7 +60,7 @@ jobs:
- if: startsWith(runner.os, 'linux') != true
env:
HOME: /tmp
run: coverage run --source=lbry -m unittest tests/unit/test_conf.py
run: coverage run --source=lbry -m unittest -vv tests/unit/test_conf.py tests/unit/core/test_utils.py
- name: submit coverage report
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
Expand Down Expand Up @@ -96,7 +96,7 @@ jobs:
- uses: actions/checkout@v2
- uses: actions/setup-python@v1
with:
python-version: '3.7'
python-version: '3.9'
- if: matrix.test == 'other'
run: |
sudo apt-get update
Expand Down Expand Up @@ -138,15 +138,15 @@ jobs:
strategy:
matrix:
os:
- ubuntu-18.04
- ubuntu-20.04
- macos-latest
- windows-latest
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v1
with:
python-version: '3.7'
python-version: '3.9'
- id: os-name
uses: ASzc/change-string-case-action@v1
with:
Expand Down
2 changes: 1 addition & 1 deletion lbry/blob_exchange/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async def request_blob_from_peer(self, blob: 'AbstractBlob', peer: 'KademliaPeer
self.scores[peer] = bytes_received / elapsed if bytes_received and elapsed else 1

async def new_peer_or_finished(self):
active_tasks = list(self.active_connections.values()) + [asyncio.sleep(1)]
active_tasks = list(self.active_connections.values()) + [asyncio.create_task(asyncio.sleep(1))]
await asyncio.wait(active_tasks, return_when='FIRST_COMPLETED')

def cleanup_active(self):
Expand Down
2 changes: 0 additions & 2 deletions lbry/dht/blob_announcer.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ async def _run_consumer(self):
log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers)
except Exception as err:
self.announcements_sent_metric.labels(peers=0, error=True).inc()
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise err
log.warning("error announcing %s: %s", blob_hash[:8], str(err))

async def _announce(self, batch_size: typing.Optional[int] = 10):
Expand Down
2 changes: 1 addition & 1 deletion lbry/extras/daemon/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def __lt__(self, other):
def running(self):
return self._running

async def get_status(self):
async def get_status(self): # pylint: disable=no-self-use
return

async def start(self):
Expand Down
4 changes: 2 additions & 2 deletions lbry/extras/daemon/componentmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ async def start(self):
component._setup() for component in stage if not component.running
]
if needing_start:
await asyncio.wait(needing_start)
await asyncio.wait(map(asyncio.create_task, needing_start))
self.started.set()

async def stop(self):
Expand All @@ -131,7 +131,7 @@ async def stop(self):
component._stop() for component in stage if component.running
]
if needing_stop:
await asyncio.wait(needing_stop)
await asyncio.wait(map(asyncio.create_task, needing_stop))

def all_components_running(self, *component_names):
"""
Expand Down
4 changes: 1 addition & 3 deletions lbry/extras/daemon/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ async def start(self):
log.info('Done setting up file manager')

async def stop(self):
self.file_manager.stop()
await self.file_manager.stop()


class BackgroundDownloaderComponent(Component):
Expand Down Expand Up @@ -560,8 +560,6 @@ async def _maintain_redirects(self):
self.upnp = await UPnP.discover(loop=self.component_manager.loop)
log.info("found upnp gateway: %s", self.upnp.gateway.manufacturer_string)
except Exception as err:
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise
log.warning("upnp discovery failed: %s", err)
self.upnp = None

Expand Down
3 changes: 2 additions & 1 deletion lbry/extras/daemon/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,8 @@ async def handle_old_jsonrpc(self, request):
content_type='application/json'
)

async def handle_metrics_get_request(self, request: web.Request):
@staticmethod
async def handle_metrics_get_request(request: web.Request):
try:
return web.Response(
text=prom_generate_latest().decode(),
Expand Down
2 changes: 0 additions & 2 deletions lbry/extras/daemon/exchange_rate_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ async def get_rate(self):
self.rate = ExchangeRate(self.market, rate, int(time.time()))
self.last_check = time.time()
return self.rate
except asyncio.CancelledError:
raise
except asyncio.TimeoutError:
log.warning("Timed out fetching exchange rate from %s.", self.name)
except json.JSONDecodeError as e:
Expand Down
2 changes: 1 addition & 1 deletion lbry/extras/daemon/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ def _save_claims(transaction):

await self.db.run(_save_claims)
if update_file_callbacks:
await asyncio.wait(update_file_callbacks)
await asyncio.wait(map(asyncio.create_task, update_file_callbacks))
if claim_id_to_supports:
await self.save_supports(claim_id_to_supports)

Expand Down
8 changes: 3 additions & 5 deletions lbry/file/file_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ async def start(self):
await manager.started.wait()
self.started.set()

def stop(self):
async def stop(self):
for manager in self.source_managers.values():
# fixme: pop or not?
manager.stop()
await manager.stop()
self.started.clear()

@cache_concurrent
Expand Down Expand Up @@ -99,8 +99,6 @@ async def download_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManag
except asyncio.TimeoutError:
raise ResolveTimeoutError(uri)
except Exception as err:
if isinstance(err, asyncio.CancelledError):
raise
log.exception("Unexpected error resolving stream:")
raise ResolveError(f"Unexpected error resolving stream: {str(err)}")
if 'error' in resolved_result:
Expand Down Expand Up @@ -249,7 +247,7 @@ async def download_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManag
except asyncio.TimeoutError:
error = DownloadDataTimeoutError(stream.sd_hash)
raise error
except Exception as err: # forgive data timeout, don't delete stream
except (Exception, asyncio.CancelledError) as err: # forgive data timeout, don't delete stream
expected = (DownloadSDTimeoutError, DownloadDataTimeoutError, InsufficientFundsError,
KeyFeeAboveMaxAllowedError, ResolveError, InvalidStreamURLError)
if isinstance(err, expected):
Expand Down
2 changes: 1 addition & 1 deletion lbry/file/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async def stop(self, finished: bool = False):
async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None):
raise NotImplementedError()

def stop_tasks(self):
async def stop_tasks(self):
raise NotImplementedError()

def set_claim(self, claim_info: typing.Dict, claim: 'Claim'):
Expand Down
10 changes: 5 additions & 5 deletions lbry/file/source_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', storage: '
def add(self, source: ManagedDownloadSource):
self._sources[source.identifier] = source

def remove(self, source: ManagedDownloadSource):
async def remove(self, source: ManagedDownloadSource):
if source.identifier not in self._sources:
return
self._sources.pop(source.identifier)
source.stop_tasks()
await source.stop_tasks()

async def initialize_from_database(self):
raise NotImplementedError()
Expand All @@ -72,18 +72,18 @@ async def start(self):
await self.initialize_from_database()
self.started.set()

def stop(self):
async def stop(self):
while self._sources:
_, source = self._sources.popitem()
source.stop_tasks()
await source.stop_tasks()
self.started.clear()

async def create(self, file_path: str, key: Optional[bytes] = None,
iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedDownloadSource:
raise NotImplementedError()

async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
self.remove(source)
await self.remove(source)
if delete_file and source.output_file_exists:
os.remove(source.full_path)

Expand Down
1 change: 1 addition & 0 deletions lbry/stream/background_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ async def download_blobs(self, sd_hash):
except ValueError:
return
except asyncio.CancelledError:
log.debug("Cancelled background downloader")
raise
except Exception:
log.error("Unexpected download error on background downloader")
Expand Down
11 changes: 6 additions & 5 deletions lbry/stream/managed_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ async def stop(self, finished: bool = False):
Stop any running save/stream tasks as well as the downloader and update the status in the database
"""

self.stop_tasks()
await self.stop_tasks()
if (finished and self.status != self.STATUS_FINISHED) or self.status == self.STATUS_RUNNING:
await self.update_status(self.STATUS_FINISHED if finished else self.STATUS_STOPPED)

Expand Down Expand Up @@ -279,7 +279,7 @@ async def _save_file(self, output_path: str):
log.info("finished saving file for lbry://%s#%s (sd hash %s...) -> %s", self.claim_name, self.claim_id,
self.sd_hash[:6], self.full_path)
await self.blob_manager.storage.set_saved_file(self.stream_hash)
except Exception as err:
except (Exception, asyncio.CancelledError) as err:
if os.path.isfile(output_path):
log.warning("removing incomplete download %s for %s", output_path, self.sd_hash)
os.remove(output_path)
Expand Down Expand Up @@ -324,12 +324,13 @@ async def save_file(self, file_name: Optional[str] = None, download_directory: O
await asyncio.wait_for(self.started_writing.wait(), self.config.download_timeout)
except asyncio.TimeoutError:
log.warning("timeout starting to write data for lbry://%s#%s", self.claim_name, self.claim_id)
self.stop_tasks()
await self.stop_tasks()
await self.update_status(ManagedStream.STATUS_STOPPED)

def stop_tasks(self):
async def stop_tasks(self):
if self.file_output_task and not self.file_output_task.done():
self.file_output_task.cancel()
await asyncio.gather(self.file_output_task, return_exceptions=True)
self.file_output_task = None
while self.streaming_responses:
req, response = self.streaming_responses.pop()
Expand Down Expand Up @@ -366,7 +367,7 @@ async def upload_to_reflector(self, host: str, port: int) -> typing.List[str]:
return sent
except ConnectionError:
return sent
except (OSError, Exception) as err:
except (OSError, Exception, asyncio.CancelledError) as err:
if isinstance(err, asyncio.CancelledError):
log.warning("stopped uploading %s#%s to reflector", self.claim_name, self.claim_id)
elif isinstance(err, OSError):
Expand Down
11 changes: 5 additions & 6 deletions lbry/stream/stream_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,6 @@ async def initialize_from_database(self):
async def reflect_streams(self):
try:
return await self._reflect_streams()
except asyncio.CancelledError:
raise
except Exception:
log.exception("reflector task encountered an unexpected error!")

Expand Down Expand Up @@ -198,8 +196,8 @@ async def start(self):
await super().start()
self.re_reflect_task = self.loop.create_task(self.reflect_streams())

def stop(self):
super().stop()
async def stop(self):
await super().stop()
if self.resume_saving_task and not self.resume_saving_task.done():
self.resume_saving_task.cancel()
if self.re_reflect_task and not self.re_reflect_task.done():
Expand All @@ -226,7 +224,8 @@ def reflect_stream(self, stream: ManagedStream, server: Optional[str] = None,
)
return task

async def _retriable_reflect_stream(self, stream, host, port):
@staticmethod
async def _retriable_reflect_stream(stream, host, port):
sent = await stream.upload_to_reflector(host, port)
while not stream.is_fully_reflected and stream.reflector_progress > 0 and len(sent) > 0:
stream.reflector_progress = 0
Expand Down Expand Up @@ -261,7 +260,7 @@ async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool
return
if source.identifier in self.running_reflector_uploads:
self.running_reflector_uploads[source.identifier].cancel()
source.stop_tasks()
await source.stop_tasks()
if source.identifier in self.streams:
del self.streams[source.identifier]
blob_hashes = [source.identifier] + [b.blob_hash for b in source.descriptor.blobs[:-1]]
Expand Down
6 changes: 3 additions & 3 deletions lbry/torrent/torrent_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def torrent_name(self):
def bt_infohash(self):
return self.identifier

def stop_tasks(self):
async def stop_tasks(self):
pass

@property
Expand Down Expand Up @@ -118,8 +118,8 @@ async def initialize_from_database(self):
async def start(self):
await super().start()

def stop(self):
super().stop()
async def stop(self):
await super().stop()
log.info("finished stopping the torrent manager")

async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
Expand Down
2 changes: 1 addition & 1 deletion lbry/wallet/coinselection.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def random_draw(self, txos: List[OutputEffectiveAmountEstimator],
_) -> List[OutputEffectiveAmountEstimator]:
""" Accumulate UTXOs at random until there is enough to cover the target. """
target = self.target + self.cost_of_change
self.random.shuffle(txos, self.random.random)
self.random.shuffle(txos, random=self.random.random) # pylint: disable=deprecated-argument
selection = []
amount = 0
for coin in txos:
Expand Down

0 comments on commit 81996c7

Please sign in to comment.