Introducing asyncio downloaders #3129
Introducing asyncio downloaders #3129
Conversation
b6f605a
to
df733fa
Compare
df733fa
to
c73b2ea
Compare
|
Hello @bmbouter! Thanks for updating the PR. Cheers ! There are no PEP8 issues in this Pull Request. 🍻 Comment last updated on September 18, 2017 at 21:38 Hours UTC |
c73b2ea
to
62b19c3
Compare
|
These docs build correctly. I published a built version here. I think the docs builder error is a docs builder problem. |
0d5c6c8
to
0ee5a45
Compare
93d0d7f
to
9d9a1f7
Compare
| writes to a random file in the current working directory or you can pass in your own file | ||
| object. See the ``custom_file_object`` keyword argument for more details. Allowing the download | ||
| instantiator to define the file to receive data allows the streamer to receive the data instead | ||
| of having it written to disk. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please document instance Attributes:.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in next push.
| expected_size (int): The number of bytes the download is expected to have. | ||
| """ | ||
| if custom_file_object: | ||
| self._writer = custom_file_object |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like self.path only exists when custom_file_object is not passed to __init__(). Suggest adding: self.path = None to ensure the instance always has the attribute.
(blocker)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah good catch. Fixed in next push along w/ some doc lines explaining the cases of None versus when its actually a path.
| Validate all digests validate if ``expected_digests`` is set | ||
|
|
||
| Raises: | ||
| :class:`~pulpcore.plugin.download.asyncio.DigestValidationError`: When any of the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What value does the :class: syntax add? Seems autodoc already links the class by just specifying the type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is how Sphinx needs it. When I compile the docs without the :class: they don't link or look correctly (like Sphinx recognizes them). This is the syntax also they recommend which has :class:, :attr:, :meth:, etc. I'm leaving this as-is.
| :attr:`~pulpcore.plugin.download.asyncio.BaseDownloader.artifact_attributes` property value. | ||
|
|
||
| Returns: | ||
| :class:`~pulpcore.plugin.download.asyncio.DownloadResult` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method actually returns a coroutine not a DownloadResult and It's the coroutine that actually returns a DownloadResult, right? If so, I'm not sure how to document that but this seems misleading.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I observed the same thing. I'm following what the aiohttp library doe,s and they have the return value of coroutines documented as the final return value emitted from the coroutine itself. Of the two choices only this one lets me express what the ultimate result will be. Otherwise, I'm not sure how to declare the result of the coroutine if I use the return value to declare that it is a coroutine. With that thinking I'm going to leave as is. I am going to check the three run() implementations to make sure their return values are consistent also.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yeah the other subclasses only refer to this docstring.
| if self._size != self.expected_size: | ||
| raise SizeValidationError() | ||
|
|
||
| async def run(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assuming this returns a coroutine and does not actually run the download, I suggest another name like get_coroutine() or as_coroutine()would be more accurate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In looking at other docs (like aiohttp which has lots of coroutines) the method names of co-routines are the method names you would expect with or without the co-routine pattern. This method runs the downloader so I want to keep the name run. With that reasoning I'm going to leave as-is.
Interestingly as Sphinx improves so will our docs once this is merged and released: sphinx-doc/sphinx#1826 I am adding a line to each run() method so that until this Sphinx improvement is merged it will still be clear it's a co-routine in the docs. In the code it should be totally clear with the async keyword in the method definition already.
| from pulpcore.exceptions import PulpException | ||
|
|
||
|
|
||
| class DownloaderValidationError(PulpException): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Considering your change in tasking to log non-fatal PulpException with stack traces - these exceptions extending PulpException will cause validation exceptions to be logged with a stack trace. This seems undesirable. I think we only want to log stack traces when unhandled exceptions indicate something is broken.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The downloaders can raise these exceptions, but they are designed to be caught by the user and not to propogate all the way up. For direct usage of the downloaders the user will need to catch and handle these types of exceptions. I wrote some about that here.
In the event that the exception isn't caught, I think having a stack trace is what we want. Celery typically does log the stack trace of a fatal exception, but for some reason it wasn't logging PulpException based exceptions. I think we want to also log those stacktraces since they are fatal task exceptions and to be consistent with celery's logging. What do you think given all ^?
| from .http import HttpDownloader | ||
|
|
||
|
|
||
| class DownloadFactory(object): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All objects are new-style in python3. Don't need the (object).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in next push and throughout PR.
| This includes the computed digest values along with size information. | ||
| """ | ||
|
|
||
| class BaseDownloader(object): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All objects are new-style in python3. Don't need the (object).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in next push and throughout PR.
| super().__init__("PLP0003") | ||
|
|
||
| def __str__(self): | ||
| return _("An Artifact failed validation due to checksum.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the file being downloaded w/ validation is not an artifact? Misleading? Also, I think including the url would be helpful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a leftover from a name change. Next push will update with s/An Artifact/A file/
We can't include any useful data (these exceptions are very unuseful) until we resolve https://pulp.plan.io/issues/2988
| super().__init__("PLP0004") | ||
|
|
||
| def __str__(self): | ||
| return _("An Artifact failed validation due to size.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the file being downloaded w/ validation is not an artifact? Misleading? Also, I think including the url would be helpful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same fix here as above.
| class DownloadFactory(object): | ||
| """ | ||
| A factory for creating configured downloader objects that all share a single session. | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please document the instance Attributes:.
| >>> result = task.result() # 'result' is a DownloadResult | ||
| """ | ||
|
|
||
| def __init__(self, importer, download_class=HttpDownloader): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't the default download_class depend on protocol?
| self._importer = importer | ||
| self._download_class = download_class | ||
| self._session = self._make_session_from_importer() | ||
| self._protocol_map = {'https': self._http_or_https, 'http': self._http_or_https} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't the _protocol_map support file:/// too?
| sslcontext.load_cert_chain( | ||
| self._importer.ssl_client_key.name, | ||
| self._importer.ssl_client_certificate.name | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's common for the certificate to contain the client cert and key. I think self._importer.ssl_client_key could be None.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would you recommend here? The way its written now, if ssl_client_key is None then these things won't be configured at all. I'm thinking this can be fixed later.
| """ | ||
| self._importer = importer | ||
| self._download_class = download_class | ||
| self._session = self._make_session_from_importer() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, every downloader created by this factory will share the same session? So, the streamer will need to cache factories?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The streamer will use get_downloader() right? If so, the importer instance will provide the session management. That way all the downloaders generated from the same importer instance will share their session. If this doesn't sound good, let me know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My concern is that the streamer will fetch the importer from the DB for each call. So, if the importer stores the factory as an instance attribute, the factory will not be shared across streamer requests. Unless I'm missing something.
9d9a1f7
to
5abf140
Compare
|
|
||
| return aiohttp.ClientSession(connector=conn, **auth_options) | ||
|
|
||
| def build(self, url, expected_digests=None, expected_size=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you mean to add **kwargs to build() and pass them to builder methods?
| """ | ||
| with open(self._path, 'r') as f_handle: | ||
| while True: | ||
| chunk = await f_handle.read(1024) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest 1MB instead of 1KB. I don't think this will perform well.
| :class:`~pulpcore.plugin.download.asyncio.BaseDownloader`. | ||
| """ | ||
| self.uri = uri | ||
| kwargs['custom_file_object'] = open(os.devnull, 'w') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the downloaded file always written to /dev/null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's because I didn't want to have the file importer make a copy of the file. I realize now that it needs to make a copy because otherwise when the original file is moved into place it will destroy the original file:// data. I'm going to remove this code now so that it makes a copy instead that way the original data can be imported multiple times.
5abf140
to
dfc9689
Compare
| return len(self.urls) == len(self.finished_urls) | ||
|
|
||
| @property | ||
| def finished_downloads(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add docstring with Returns:.
(blocker)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it returns (id, {DeferredArtifact: DownloadResult}). Right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that is what it returns. This is added in the next push along with one for the done property too.
| def __next__(self): | ||
| """ | ||
| Returns: | ||
| tuple: The first element is the `id` first specified by the user. The second element is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this actually returns (id, {DeferredArtifact: DownloadResult}). Right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a little odd that the input is Group(s) and output is something group-like. What do you think about returning the finished Group and letting the user inspect the group.
Something like:
for group in downloader:
for deferred_artifact, download_result in group.finished_downloads:
# do something.
Or, may just go ahead and create the Artifact for them since the group downloader is artifact oriented.
Something like:
for group in downloader:
for deferred_artifact, downloaded_artifact in group.downloaded_artifacts:
# do something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm switching it to return Group and also to have it be schedule_group(group).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good idea in making the switch a good amount of code was removed from the PR.
| self.group_iterator = None | ||
| self.downloads_not_done = set() | ||
| self.groups_not_done = [] | ||
| self.urls = defaultdict(list) # dict with url as the key and a lists of Groups as the value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using the defaultdict here is cool.
|
|
||
| Usage: | ||
| >>> import asyncio | ||
| >>> loop = asyncio._get_running_loop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you mean _get_running_loop()? It's named to be non-public.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah this was wrong. I updated it with the next push.
a7e8764
to
72c9793
Compare
| auth_options = {} | ||
| if self._importer.basic_auth_user and self._importer.basic_auth_password: | ||
| auth_options['auth'] = aiohttp.BasicAuth( | ||
| username=self._importer.basic_auth_user, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/username/login
class BasicAuth(namedtuple('BasicAuth', ['login', 'password', 'encoding'])):
(blocker)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Fixed in next push.
|
Testing the group downloader: I observed some undesirable behavior. A failed download emitted an exception in a way that I could not catch and continue iteration. Also, I would expect that all groups be emitted giving an indication of success for each group. (blocker) |
4387fff
to
b94d651
Compare
|
I pushed a small update regarding exception raising with the |
b94d651
to
ba1d9fd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code reviewed without consideration of detailed requirements, overall approach or design.
44818d6
to
9c07188
Compare
|
|
||
| .. _base-downloader: | ||
|
|
||
| BaseDownloader |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC, in your review of the futures download PR, you explicitly requested that the Download base class be removed from the documentation because plugin writers were not expected to create customer downloaders or override behavior. Do you still feel that way? If so should BaseDownloader be removed? Or, should I submit a PR to include the futures base class as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, thanks for bringing this up. I think opening a PR to include the futures base class would be ideal. I think the use cases that require subclassing are in the must-have list of requirements and since they are both ready we should include them sooner (like now). Sound ok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good.
4f8be20
to
05b77ba
Compare
Adds a HttpDownloader - handles both synchronous and asynchronous downloads - designed for subclassing to make custom downloader - size validation works - digest validation works - has a well defined return interface DownloadResult Adds a FileDownloader Adds a BaseDownloader Reworks the Factory - Updates the Factory to use HttpDownloader and FileDownloader - Handles http, https, and file - Fully configures HttpDownloader with all importer settings including: ssl client and server options, basic auth, proxy, proxy_auth - Can be reused with custom downloaders Adds a GroupDownloader - Downloads files in parallel but only return back to the use when any group of files are fully downloaded. - Drivable with or with generators - Constrains in-memory objects through generator use General Updates - Updates the importer to use the Factory - Updates the docs to use docs from the new code https://pulp.plan.io/issues/2951 closes pulp#2951
05b77ba
to
1b36eee
Compare
Adds a HttpDownloader
Reworks the Factory
ssl client and server options, basic auth, proxy, proxy_auth
Adds a GroupDownloader
group of files are fully downloaded.
General Updates
https://pulp.plan.io/issues/2951
closes #2951