diff --git a/docs/contributing/plugin_api/changeset.rst b/docs/contributing/plugin_api/changeset.rst new file mode 100644 index 0000000000..4dcd584786 --- /dev/null +++ b/docs/contributing/plugin_api/changeset.rst @@ -0,0 +1,47 @@ +pulpcore.plugin.changeset +========================= + +All classes documented here should be imported directly from +the ``pulpcore.plugin.changeset`` namespace. + +.. automodule:: pulpcore.plugin.changeset + +.. autoclass:: pulpcore.plugin.changeset.ChangeSet + :members: apply + + +New Content & Artifacts +----------------------- + +Classes used to define *new* content to be added to a repository. + + +.. autoclass:: pulpcore.plugin.changeset.RemoteContent + :members: artifacts + +.. autoclass:: pulpcore.plugin.changeset.RemoteArtifact + :members: content + + +Reporting +--------- + +Reports and Exceptions. + + +.. autoclass:: pulpcore.plugin.changeset.ChangeReport + :members: + +.. autoclass:: pulpcore.plugin.changeset.ChangeFailed + :show-inheritance: + :members: + + +Additional Tools +---------------- + +.. autoclass:: pulpcore.plugin.changeset.BatchIterator + :special-members: __len__, __iter__ + +.. autoclass:: pulpcore.plugin.changeset.SizedIterable + :special-members: __len__ diff --git a/docs/contributing/plugin_api/download.rst b/docs/contributing/plugin_api/download.rst new file mode 100644 index 0000000000..b1e9ac3828 --- /dev/null +++ b/docs/contributing/plugin_api/download.rst @@ -0,0 +1,97 @@ +pulpcore.plugin.download +======================== + +All classes documented here should be imported directly from +the ``pulpcore.plugin.download`` namespace. + +.. automodule:: pulpcore.plugin.download + + + +Single File +----------- + + +.. autoclass:: pulpcore.plugin.download.HttpDownload + :members: + :special-members: __call__ + +.. autoclass:: pulpcore.plugin.download.FileDownload + :members: + :special-members: __call__ + + +Multiple Files (concurrent) +--------------------------- + + +.. autoclass:: pulpcore.plugin.download.Batch + :members: download, shutdown + :special-members: __call__ + + +File Validation +--------------- + +.. autoclass:: pulpcore.plugin.download.SizeValidation + :members: + +.. autoclass:: pulpcore.plugin.download.DigestValidation + :members: + :special-members: __call__ + + +Writers +------- + +Downloading consists of two related operations. First, is reading the file content +from a remote source. Second, is writing those bits locally. The most typical case is +to write the bits to a file on the local filesystem and is accomplished using +a `FileWriter`. Another case, is to store the file content (text) in memory and then +inspect as a `str`. As a convenience, this may be done using the `BufferWriter`. + + +.. autoclass:: pulpcore.plugin.download.FileWriter + :members: + +.. autoclass:: pulpcore.plugin.download.BufferWriter + :members: + + +Settings +-------- + +.. autoclass:: pulpcore.download.SSL + :members: + +.. autoclass:: pulpcore.download.User + :members: + +.. autoclass:: pulpcore.download.Timeout + :members: + + +Errors +------ + +A download is successful unless an exception is raised. + +.. autoclass:: pulpcore.plugin.download.DownloadError + :show-inheritance: + :members: download, reason + +.. autoclass:: pulpcore.plugin.download.DownloadFailed + :show-inheritance: + :members: download, reason + +.. autoclass:: pulpcore.plugin.download.NotAuthorized + :show-inheritance: + :members: download, reason + +.. autoclass:: pulpcore.plugin.download.NotFound + :show-inheritance: + :members: download, reason + +.. autoclass:: pulpcore.plugin.download.ValidationError + :show-inheritance: + :members: download, reason diff --git a/docs/contributing/plugin_api/index.rst b/docs/contributing/plugin_api/index.rst index f57f2788fc..184bea99d7 100644 --- a/docs/contributing/plugin_api/index.rst +++ b/docs/contributing/plugin_api/index.rst @@ -11,6 +11,8 @@ Plugin API reaches stability with v1.0. For the latest version of the Plugin API models serializers viewsets + changeset + download .. automodule:: pulpcore.plugin :imported-members: diff --git a/platform/pulpcore/app/models/catalog.py b/platform/pulpcore/app/models/catalog.py index 1f82f91daf..a1bab55dd7 100644 --- a/platform/pulpcore/app/models/catalog.py +++ b/platform/pulpcore/app/models/catalog.py @@ -34,6 +34,9 @@ class DownloadCatalog(Model): artifact = models.ForeignKey(Artifact, on_delete=models.CASCADE) importer = models.ForeignKey(Importer, on_delete=models.CASCADE) + class Meta: + unique_together = ('artifact', 'importer') + def __str__(self): """ Human-readable representation of this model. @@ -42,5 +45,8 @@ def __str__(self): interface. """ - return _('{artifact} is retrievable at {url} by {importer}'.format( - artifact=self.artifact, url=self.url, importer=self.importer)) + return _( + '{artifact} is retrievable at {url} by {importer}'.format( + artifact=self.artifact, + url=self.url, + importer=self.importer)) diff --git a/platform/pulpcore/download/__init__.py b/platform/pulpcore/download/__init__.py new file mode 100644 index 0000000000..68ff9a522c --- /dev/null +++ b/platform/pulpcore/download/__init__.py @@ -0,0 +1,71 @@ +""" +This package provides objects used for downloading files. +The workhorse is the ``Download`` object which performs the task of downloading a single file. +A more natural name for this class might be a download `Task` but this might cause confusion +with celery tasks in Pulp. Here, a `download` is used as a noun and has a command-object +design pattern. That is, the download is callable. The Batch provides a way to perform multiple +downloads concurrently. + +The model:: + + |*-------1 Batch + Download |--------------------------------------------------| + ^ |1-----------------* Validation *-----------------1| Writer + | ^ ^ + -------|------ | | + | | | --------------------- ----------------- + | | | | | | | + HttpDownload | FtpDownload SizeValidation DigestValidation FileWriter BufferWriter + | + FileDownload + +Recipes: + + A single download. + + >>> + >>> download = HttpDownload('http://my-url', FileWriter('put-file-here')) + >>> download() + >>> # Go use the file. + >>> + + Multiple downloads concurrently. + + >>> + >>> downloads = [ + >>> HttpDownload('http://my-url0', FileWriter('put-file-here0')), + >>> FileDownload('file://my-url1', FileWriter('put-file-here1')), + >>> FtpDownload('ftp://my-url2', FileWriter('put-file-here2')), + >>> ] + >>> + >>> with Batch(downloads) as batch: + >>> for plan in batch(): + >>> try: + >>> plan.result() + >>> except Exception: + >>> # Failed + >>> else: + >>> # Use the downloaded file \o/ + >>> + + Download a text file to a use as a string. + + >>> + >>> download = HttpDownload('http://my-url', BufferWriter()) + >>> download() + >>> document = download.writer.read() + >>> # Use the document + >>> + +""" + +from .batch import Batch # noqa +from .delegation import delegate # noqa +from .error import DownloadError, DownloadFailed, NotFound, NotAuthorized # noqa +from .ftp import FtpDownload # noqa +from .file import FileDownload # noqa +from .http import HttpDownload # noqa +from .settings import Settings, SSL, Timeout, User # noqa +from .single import Download # noqa +from .validation import ValidationError, SizeValidation, DigestValidation # noqa +from .writer import Writer, FileWriter, BufferWriter # noqa diff --git a/platform/pulpcore/download/batch.py b/platform/pulpcore/download/batch.py new file mode 100644 index 0000000000..0d0fb96eb7 --- /dev/null +++ b/platform/pulpcore/download/batch.py @@ -0,0 +1,509 @@ +from concurrent.futures import ThreadPoolExecutor +from gettext import gettext as _ +from logging import getLogger, DEBUG +from queue import Queue, Empty +from threading import Thread, current_thread + +from .single import Context + + +# The default queuing backlog. +BACKLOG = 1024 +# The default concurrency. +CONCURRENT = 4 + + +log = getLogger(__name__) + + +class Batch: + """ + Provides batching and concurrent execution of downloads. + + Attributes: + downloads (collections.abc.Iterable): An Iterable of downloads. + concurrent (int): The number of downloads to execute in concurrently. + iterator (PlanIterator): Used to iterate downloads as they complete. + context (SharedContext): A shared download context. + feeder (DownloadFeeder): Used to feed submit downloads to the executor. + is_shutdown (bool): Batch has been shutdown. + + Notes: + * The batch should be used as a context manager. + * Or, `shutdown()` must be called manually. + + Examples: + + >>> + >>> from pulpcore.download import Batch, HttpDownload, DownloadError + >>> + >>> url = 'http://content.org/dog.rpm' + >>> path = '/tmp/working/dog.rpm' + >>> downloads = (HttpDownload(url, path) for _ in range(10)) + >>> + >>> with Batch(downloads) as batch: + >>> for plan in batch(): + >>> try: + >>> plan.result() + >>> except DownloadError: + >>> # An error occurred. + >>> else: + >>> # Use the downloaded file \o/ + >>> + """ + + def __init__(self, downloads, concurrent=CONCURRENT, backlog=BACKLOG): + """ + Args: + downloads (collections.abc.Iterable): An Iterable of downloads. + concurrent (int): The number of downloads to execute in concurrently. + backlog (int): The number of downloads kept in memory. + + Raises: + ValueError: concurrent less than 2 or backlog is less than concurrent. + """ + super(Batch, self).__init__() + self.downloads = downloads + self.concurrent = concurrent + self.iterator = PlanIterator(backlog) + self.executor = BatchExecutor(concurrent=concurrent, backlog=backlog) + self.context = Context() + self.feeder = DownloadFeeder(self) + self.is_shutdown = False + + def download(self): + """ + Execute all of the downloads. + + Returns: + PlanIterator: A plan iterator. + The iterator will yield the download `Plan` in the order completed. + """ + log.debug(_('%(batch)s - download started'), {'batch': self}) + self.feeder.start() + return self.iterator + + def shutdown(self): + """ + End processing and shutdown the feeder and the thread pool. + + Notes: + This must be called to prevent leaking resources unless the Batch + is used as a context manager. + + Examples: + >>> + >>> with Batch(..) as batch: + >>> # ... + """ + if self.is_shutdown: + return + self.is_shutdown = True + log.debug(_('%(batch)s - shutdown'), {'batch': self}) + self.feeder.shutdown() + self.executor.shutdown() + + def __call__(self): + """ + Execute all of the downloads. + Calls `download()`. + + Returns: + PlanIterator: A plan iterator. + The iterator will yield the download `Plan` in the order completed. + """ + return self.download() + + def __enter__(self): + return self + + def __exit__(self, *unused): + try: + self.shutdown() + except Exception: + if log.isEnabledFor(DEBUG): + log.exception(_('Batch shutdown failed.')) + + def __str__(self): + _id = str(id(self))[-4:] + return _('Batch: id={s} concurrent={c}').format(s=_id, c=self.concurrent) + + +class BatchExecutor(ThreadPoolExecutor): + """ + Batch thread pool executor. + """ + + def __init__(self, concurrent=CONCURRENT, backlog=BACKLOG): + """ + A thread pool executor tailored for the batch. + The worker queue size is restricted to limit memory footprint. + + Args: + concurrent (int): The number of downloads to execute in concurrently. + backlog (int): The number of downloads kept in memory. + + Raises: + ValueError: concurrent less than 2 or backlog is less than concurrent. + """ + super(BatchExecutor, self).__init__(max_workers=concurrent) + self._work_queue = Queue(maxsize=backlog) + if concurrent < 2: + raise ValueError(_('concurrent may not be < 2')) + if backlog < concurrent: + raise ValueError(_('backlog may not be < concurrent')) + + +class DownloadFeeder(Thread): + """ + Download feeder. + A thread used to feed each batched download into the executor. + May be interrupted and terminated by calling shutdown(). + + Attributes: + batch (Batch): A batch to feed. + total (int): The total number of downloads queued. + is_shutdown (bool): Feeder has been shutdown. + """ + + def __init__(self, batch): + super(DownloadFeeder, self).__init__(name='feeder') + self.batch = batch + self.daemon = True + self.total = 0 + self.is_shutdown = False + + @property + def iterator(self): + return self.batch.iterator + + @property + def executor(self): + return self.batch.executor + + @property + def downloads(self): + return self.batch.downloads + + @property + def context(self): + return self.batch.context + + def shutdown(self, wait=True): + """ + Shutdown. + Abort feeding and terminate. + + Args: + wait (bool): Wait for thread to terminate. + """ + if self.is_shutdown: + return + self.is_shutdown = True + if wait: + self.join() + + def run(self): + """ + Thread (main) loop. + Submit each download to the batch executor. + """ + try: + for download in self.downloads: + if self.is_shutdown: + return + log.debug( + _('%(feeder)s - feed #%(total)d url=%(url)s'), + { + 'feeder': self, + 'total': self.total, + 'url': download.url + }) + download.context = self.context + future = self.executor.submit(Plan(self.batch, download)) + future.add_done_callback(self.iterator.add) + self.total += 1 + except Exception as e: + log.exception(_('feeder failed.')) + self.iterator.raised(e) + self.total += 1 + finally: + self.done() + + def done(self): + """ + Done feeding downloads and need to update the iterator appropriately. + """ + if self.total: + self.iterator.total = self.total + else: + self.iterator.drain() + + def __str__(self): + _id = str(id(self))[-4:] + return _('DownloadFeeder: id={s} shutdown={a}').format(s=_id, a=self.is_shutdown) + + +class QueueIterator: + """ + A Queue iterator. + Each item in the queue is a tuple of: (code, payload). + + Attributes: + queue (Queue): The input queue to be iterated. + iterated (int): The number of times `__next__()` was called. + total (int): The total number queued. A value of `-1` indicates + the total is not yet known. + """ + + NEXT = 'NEXT' + EXCEPTION = 'EXCEPTION' + END = 'END' + + def __init__(self, backlog=BACKLOG): + self.queue = Queue(maxsize=backlog) + self.iterated = 0 + self.total = -1 + + def put(self, code, payload=None, block=True): + """ + Enqueue a message. + + Args: + code (str): The message code. + payload (object): The message payload. + block (bool): Block when queue is full (default:True). + """ + log.debug( + _('%(iterator)s put: code=%(code)s payload=%(payload)s'), + { + 'iterator': self, + 'code': code, + 'payload': payload + }) + message = (code, payload) + self.queue.put(message, block=block) + + def add(self, payload): + """ + Add the next object to the input queue to be rendered by `__next__()`. + + Args: + payload: An object to be rendered by `__next__()`. + """ + self.put(self.NEXT, payload) + + def raised(self, exception): + """ + Add a fatal exception to the input queue. The exception has been raised by + the object providing the objects to be iterated. + + Args: + exception: An exception to be raised by `__next__()`. + """ + self.put(self.EXCEPTION, exception) + + def drain(self): + """: + Drain the input queue. + Add an message to the input queue that signals that the input + queue will always be empty. The object feeding the queue has nothing + to be iterated. + """ + log.debug(_('%(iterator)s - input drained'), {'iterator': self}) + while True: + try: + self.queue.get(block=False) + except Empty: + break + self.end() + + def end(self): + """ + Add an message to the input queue that marks the end of input. + """ + self.put(self.END) + + def __next__(self): + """ + Get the next enqueued object. + + Returns: + The next enqueued object. + + Raises: + StopIteration: when finished iterating. + """ + log.debug(_('%(iterator)s - next'), {'iterator': self}) + + if self.iterated == self.total: + raise StopIteration() + + code, payload = self.queue.get() + + log.debug( + _('%(iterator)s next: code=%(code)s payload=%(payload)s'), + { + 'iterator': self, + 'code': code, + 'payload': payload + }) + + # next + if code == self.NEXT: + self.iterated += 1 + return payload + # fatal + if code == self.EXCEPTION: + raise payload + # empty + if code == self.END: + raise StopIteration() + + def __iter__(self): + return self + + def __str__(self): + _id = str(id(self))[-4:] + description = _('Iterator: id={s} iterated={i}/{t}') + return description.format( + s=_id, + i=self.iterated, + t=self.total) + + +class FutureIterator(QueueIterator): + """ + A queue iterator that expects the payload to be a `concurrent.futures.Future`. + """ + + def __next__(self): + """ + Get the next future and propagate any raised exceptions. + + Returns: + The next `Future.result()` + + Raises: + Anything raised by the object executed. + """ + future = super(FutureIterator, self).__next__() + try: + return future.result() + except Exception as exception: + log.debug( + _('%(iterator)s - raising: %(exception)s'), + { + 'iterator': self, + 'exception': exception + }) + raise + + +class PlanIterator(FutureIterator): + """ + Batched download plan iterator. + """ + + def __next__(self): + """ + Get the next completed download plan and propagate any raised exceptions. + + Returns: + Plan: The next completed download execution plan. + + Raises: + Anything raised by the object executed. + """ + while True: + download = super(PlanIterator, self).__next__() + if download: + return download + + +class Plan: + """ + Batch download execution plan. + The plan provides: + - Ensure self is returned in the future.result. + - Catch and store exceptions raised by the download. This ensure that + only fatal batch framework exceptions are raised during iteration. + + Attributes: + batch (pulpcore.download.Batch): The batch. + download (pulpcore.download.Download): The wrapped download. + executed (bool): Indicates the plan has been executed. + error (Exception): An exception raised by the download. + """ + + def __init__(self, batch, download): + """ + + Args: + batch (pulpcore.download.Batch): The batch. + download (pulpcore.download.Download): The wrapped download. + """ + self.batch = batch + self.download = download + self.executed = False + self.error = None + + def result(self): + """ + Get the execution result. + This **should** be called to ensure that error cases are properly handled. + + Returns: + pulpcore.download.Download: The planned download. + + Raises: + Exception: Any exception raised during the download. + """ + if self.error is None: + return self.download + else: + raise self.error + + def __call__(self): + """ + Execute the plan. + + Returns: + Plan: self + """ + if self.batch.is_shutdown: + return + + with self as download: + self.executed = True + try: + download() + except Exception as error: + self.error = error + return self + + def __enter__(self): + thread = current_thread() + log.debug( + _('%(download)s thread=%(thread)s - started'), + { + 'thread': thread.getName(), + 'download': self + }) + return self.download + + def __exit__(self, *unused): + thread = current_thread() + log.debug( + _('%(download)s thread=%(thread)s - end'), + { + 'thread': thread.getName(), + 'download': self + }) + + def __str__(self): + return _( + 'Plan: {download} executed: {executed} error: {error}'.format( + download=self.download, + executed=self.executed, + error=self.error)) diff --git a/platform/pulpcore/download/delegation.py b/platform/pulpcore/download/delegation.py new file mode 100644 index 0000000000..27b7428ce1 --- /dev/null +++ b/platform/pulpcore/download/delegation.py @@ -0,0 +1,97 @@ +class DelegateDecorator: + """ + Delegation decorator. + Any decorated method may be implemented by the download delegate. + + Attributes: + decorated: The decorated method (function). + enabled (bool): Dispatch to the delegate is enabled. + Used to prevent recursion when delegate calls back into the download. + download (pulpcore.download.Download): A download object. + + Examples: + >>> + >>> class ErrorHandler: + >>> def on_error(self, download, error): + >>> ... + >>> repaired = True # Fixed the problem so retry. + >>> return repaired + >>> + >>> download = ... # + >>> download.delegate = ErrorHandler() + >>> + + See Also: + [1] https://en.wikipedia.org/wiki/Delegation_(object-oriented_programming) + """ + + def __init__(self, decorated): + """ + Args: + decorated: The decorated method function. + """ + self.__doc__ = decorated.__doc__ + self.__repr__ = decorated.__repr__ + self.decorated = decorated + self.enabled = True + self.download = None + + @property + def name(self): + """ + The name of the decorated method. + + Returns: + (str): The method name. + """ + return self.decorated.__name__ + + def select_method(self): + """ + Select the actual method to be invoked. + + Returns: + The identically named method on the delegate when defined. + Else, the decorated method's function. + """ + method = self.decorated + try: + if self.enabled: + method = getattr(self.download.delegate, self.name.lstrip('_')) + except AttributeError: + pass + return method + + def __call__(self, *args, **kwargs): + """ + Delegate the method call. + - find the appropriate method. + - disable self to prevent recursion. + - invoke the method. + - enable self + """ + method = self.select_method() + try: + self.enabled = False + return method(self.download, *args, **kwargs) + finally: + self.enabled = True + + def __get__(self, instance, owner): + """ + Using python descriptors to assign the download attribute when the + decorated method is referenced. + + Args: + instance (pulpcore.download.Download): The decorated instance. + owner (class): The decorated class. + + Returns: + Delegate: self + """ + self.download = instance + return self + + +# Alias lowercase. +delegate = DelegateDecorator diff --git a/platform/pulpcore/download/error.py b/platform/pulpcore/download/error.py new file mode 100644 index 0000000000..b4220d13a7 --- /dev/null +++ b/platform/pulpcore/download/error.py @@ -0,0 +1,44 @@ +from gettext import gettext as _ + + +class DownloadError(Exception): + """ + Base for all download related exceptions. + """ + pass + + +class DownloadFailed(DownloadError): + """ + Download failed. + + Attributes: + download (pulpcore.download.Download): The failed download. + reason (str): The reason it failed. + """ + + def __init__(self, download, reason=''): + """ + Args: + download (pulpcore.download.Download): The failed download. + reason (str): The reason it failed. + """ + self.download = download + self.reason = reason + + def __str__(self): + return _('{r} - Failed. Reason: {d}'.format(r=self.download, d=self.reason)) + + +class NotFound(DownloadFailed): + """ + Resource referenced by the URL was not found. + """ + pass + + +class NotAuthorized(DownloadFailed): + """ + Not authorized to access the resource referenced by the URL. + """ + pass diff --git a/platform/pulpcore/download/file.py b/platform/pulpcore/download/file.py new file mode 100644 index 0000000000..a108ce1153 --- /dev/null +++ b/platform/pulpcore/download/file.py @@ -0,0 +1,68 @@ +from errno import ENOENT, EPERM +from urllib.parse import urlparse + +from .error import DownloadFailed, NotFound, NotAuthorized +from .single import Download + + +# ERRNO mapped to standard exception. +ERROR = { + ENOENT: NotFound, + EPERM: NotAuthorized +} + + +class FileDownload(Download): + """ + Local File download. + Handles the file:// protocol. + + Attributes: + error (int): Status code. (0 = OK, else set to ERRNO) + + Examples: + >>> + >>> from pulpcore.download import DownloadError, FileDownload, FileWriter + >>> + >>> url = ... + >>> path = ... + >>> + >>> download = FileDownload(url, FileWriter(path)) + >>> + >>> try: + >>> download() + >>> except DownloadError: + >>> # An error occurred. + >>> else: + >>> # Go read the downloaded file \o/ + """ + + __slots__ = ('error',) + + def __init__(self, url, writer): + """ + Args: + url (str): A file download URL. + writer (Writer): An object used to store downloaded file. + """ + super(FileDownload, self).__init__(url, writer) + self.error = 0 + + def _send(self): + """ + Read the file. + + Raises: + DownloadFailed: The download failed and could not be repaired. + """ + try: + path = urlparse(self.url).path + with open(path, 'rb') as fp: + while True: + buffer = fp.read(self.BLOCK) + if buffer: + self._write(buffer) + else: + break + except OSError as error: + raise ERROR.get(error.errno, DownloadFailed)(self, str(error)) diff --git a/platform/pulpcore/download/ftp.py b/platform/pulpcore/download/ftp.py new file mode 100644 index 0000000000..eecde7fb30 --- /dev/null +++ b/platform/pulpcore/download/ftp.py @@ -0,0 +1,58 @@ +from ftplib import FTP +from urllib.parse import urlparse + +from .single import Download + +from .settings import User + + +class FtpDownload(Download): + """ + FTP download object. + Handles the ftp:// protocol. + + Attributes: + user (pulpcore.download.User): User settings for basic authentication. + """ + + __slots__ = ('user',) + + def __init__(self, url, writer): + """ + Args: + url (str): A file download URL. + writer (Writer): An object used to store downloaded file. + """ + super(FtpDownload, self).__init__(url, writer) + self.user = User('anonymous', 'anonymous') + + def _send(self): + """ + Send the `RETR` (command). + This is the *main* method responsible for implementing the actual + download by sending a protocol specific download. The reply + is handled by on_reply(), on_succeeded() and on_error(). + + Raises: + DownloadFailed: The download failed and could not be repaired. + + Notes: + Must be implemented by subclass. + """ + with FTP() as ftp: + parsed_url = urlparse(self.url) + ftp.connect(host=parsed_url.netloc) + ftp.login( + user=self.user.name, + passwd=self.user.password) + ftp.retrbinary( + cmd='RETR {}'.format(parsed_url.path), + callback=self._write, + blocksize=self.BLOCK) + + def __str__(self): + base = super(FtpDownload, self).__str__() + return ' | '.join([ + base, + str(self.user), + ]) diff --git a/platform/pulpcore/download/http.py b/platform/pulpcore/download/http.py new file mode 100644 index 0000000000..78b55d4fa2 --- /dev/null +++ b/platform/pulpcore/download/http.py @@ -0,0 +1,238 @@ +from gettext import gettext as _ +from logging import getLogger +from http import HTTPStatus +from requests import Session +from requests.exceptions import SSLError + +from .error import DownloadFailed, NotAuthorized, NotFound +from .single import Download + +from .settings import Timeout, SSL, User + + +log = getLogger(__name__) + + +# HTTP codes mapped to standard exceptions. +ERROR = { + HTTPStatus.NOT_FOUND: NotFound, + HTTPStatus.UNAUTHORIZED: NotAuthorized +} + + +class HttpDownload(Download): + """ + An HTTP/HTTPS download. + + Attributes: + timeout (pulpcore.download.Timeout): Timeout settings. + user (pulpcore.download.User): User settings for basic-authentication. + ssl (pulpcore.download.SSL): SSL/TLS settings. + proxy_url (str): An optional proxy URL. + headers (dict): The optional HTTP headers. + + Examples: + >>> + >>> from pulpcore.download import DownloadError, HttpDownload, FileWriter + >>> + >>> url = 'http://..' + >>> path = ... + >>> download = HttpDownload(url, FileWriter(path)) + >>> + >>> try: + >>> download() + >>> except DownloadError: + >>> # An error occurred. + >>> else: + >>> # Go read the downloaded file \o/ + >>> + >>> url = 'https://..' + >>> path = .. + >>> download = HttpDownload(url, FileWriter(path)) + >>> # optional settings + >>> download.ssl.ca_certificate='path-to-certificate', + >>> download.ssl.client_certificate='path-to-certificate', + >>> download.ssl.client_key='path-to-key', + >>> download.ssl.validation=True), + >>> download.proxy_url='http://user:password@gateway.org')) + >>> + >>> try: + >>> download() + >>> except DownloadError: + >>> # An error occurred. + >>> else: + >>> # Go read the downloaded file \o/ + >>> + + Notes: + The 'session' may be shared through the context.session. + """ + + __slots__ = ( + 'timeout', + 'user', + 'ssl', + 'proxy_url', + 'headers', + 'method' + ) + + def __init__(self, url, writer, method='GET'): + """ + Args: + url (str): A file download URL. + writer (Writer): An object used to store downloaded file. + method (str): The HTTP method (GET|HEAD). + """ + super(HttpDownload, self).__init__(url, writer) + self.timeout = Timeout() + self.user = User() + self.ssl = SSL() + self.proxy_url = None + self.headers = {} + self.method = self._find_method(method) + + @property + def status(self): + """ + The status code in the reply. + + Returns: + (int): The reply status. + + """ + try: + return self.reply.status_code + except AttributeError: + return 0 + + @property + def session(self): + """ + The `requests` session. + + Returns: + Session: The shared or newly created session. + + Notes: + The session can be shared between download but this needs to be + facilitated by a 3rd object by setting the `context` to be the same. + """ + with self.context as context: + try: + return context.session + except AttributeError: + session = Session() + context.session = session + return session + + def _find_method(self, name): + """ + Find an http method by name. + + Args: + name: The method name. see: METHODS. + + Returns: + instancemethod: When matched. + + Raises: + ValueError: When not matched. + """ + methods = { + 'GET': self._get, + 'HEAD': self._head, + } + try: + return methods[name.upper()] + except KeyError: + _list = '|'.join(sorted(methods.keys())) + msg = _('method must be: ({list})'.format(list=_list)) + raise ValueError(msg) + + def _settings(self): + """ + Get `requests` keyword options based on attributes. + + Returns: + dict: The options. + """ + options = { + 'stream': True, + 'timeout': (self.timeout.connect, self.timeout.read) + } + if self.user.name: + options['auth'] = (self.user.name, self.user.password) + if self.proxy_url: + options['proxies'] = { + 'http': self.proxy_url, + 'https': self.proxy_url, + } + if self.headers: + options['headers'] = self.headers + if self.ssl.ca_certificate: + options['verify'] = self.ssl.ca_certificate + if self.ssl.client_certificate: + options['cert'] = self.ssl.client_certificate + if self.ssl.client_key: + options['cert'] = (options['cert'], self.ssl.client_key) + + return options + + def _get(self): + """ + Send the HTTP `GET` download. + + Returns: + download.Response + + Raises: + SSLError: On SSL error. + """ + return self.session.get(self.url, **self._settings()) + + def _head(self): + """ + Send the HTTP `HEAD` download. + + Returns: + download.Response + + Raises: + SSLError: On SSL error. + """ + return self.session.head(self.url, **self._settings()) + + def _send(self): + """ + Send the HTTP download request. + + Raises: + DownloadFailed: The download failed and could not be repaired. + """ + try: + self.reply = self.method() + if self.status != HTTPStatus.OK: + reason = _('HTTP [{n}]').format(n=self.status) + raise ERROR.get(self.status, DownloadFailed)(self, reason) + for buffer in self.reply.iter_content(chunk_size=self.BLOCK): + self._write(buffer) + except OSError as error: + # Cannot read certificate. + raise DownloadFailed(self, str(error)) + except SSLError as error: + # SSL handshake failed. + raise NotAuthorized(self, str(error)) + + def __str__(self): + base = super(HttpDownload, self).__str__() + http = _('proxy={p} headers={h}').format( + p=self.proxy_url, + h=self.headers) + return ' | '.join([ + base, + str(self.timeout), + str(self.ssl), + str(self.user), + http, + ]) diff --git a/platform/pulpcore/download/settings.py b/platform/pulpcore/download/settings.py new file mode 100644 index 0000000000..5990b55c4d --- /dev/null +++ b/platform/pulpcore/download/settings.py @@ -0,0 +1,181 @@ +""" +Settings represent optional and most likely protocol specific *configuration*. +They are modeled separately and using composition to provide the most flexibility +and re-usability. Each settings group optionally supports validation. +""" +import os + +from gettext import gettext as _ + + +class Settings: + """ + Download settings. + """ + + __slots__ = () + + def validate(self): + """ + Validate the settings. + + Raises: + ValueError: When validation fails. + """ + pass + + def _path_readable(self, attribute): + """ + Validate that the path (value) for specified attribute can be read. + + Args: + attribute (str): The name of an attribute to validate. + + Raises: + ValueError: when validation fails. + """ + path = getattr(self, attribute) + if not path: + return + if os.access(path, os.R_OK): + return + raise ValueError( + _('{a}: "{p}" not found or [READ] permission denied.').format( + a=attribute, + p=path)) + + def __str__(self): + return '' + + +class SSL(Settings): + """ + SSL/TLS Settings. + + Attributes: + ca_certificate (str): An optional absolute path to an PEM + encoded CA certificate. + client_certificate (str): An optional absolute path to an PEM + encoded client certificate. + client_key (str): An optional absolute path to an PEM encoded + client private key. + validation (bool): Validate the server SSL certificate. + """ + + __slots__ = ( + 'ca_certificate', + 'client_certificate', + 'client_key', + 'validation' + ) + + def __init__(self, + ca_certificate=None, + client_certificate=None, + client_key=None, + validation=True): + """ + Args: + ca_certificate (str): An optional absolute path to an PEM + encoded CA certificate. + client_certificate (str): An optional absolute path to an PEM + encoded client certificate. May also contain the private key. + client_key (str): An optional absolute path to an PEM encoded + client private key. + validation (bool): Validate the server SSL certificate. + + Raises: + ValueError: when validation fails. + """ + self.ca_certificate = ca_certificate + self.client_certificate = client_certificate + self.client_key = client_key + self.validation = validation + + def validate(self): + """ + Validate the certificate paths can be read. + + Raises: + ValueError: When validation fails. + """ + attributes = ( + 'ca_certificate', + 'client_certificate', + 'client_key' + ) + for attr in attributes: + self._path_readable(attr) + + def __str__(self): + description = _('ssl: validation={v} CA={a} key={k} certificate={c}') + return description.format( + v=self.validation, + a=self.ca_certificate, + k=self.client_key, + c=self.client_certificate) + + +class User(Settings): + """ + User settings used for authentication/authorization. + + Attributes: + name (str): A username. + password (str): A password. + """ + + __slots__ = ( + 'name', + 'password' + ) + + def __init__(self, name=None, password=None): + """ + Args: + name (str): A username. + password (str): A password. + """ + self.name = name + self.password = password + + def __str__(self): + description = _('User: name={n} password={p}') + return description.format( + n=self.name, + p=self.password) + + +class Timeout(Settings): + """ + Timeout settings. + + Attributes: + connect (int): Connection timeout in seconds. + read (int): Read timeout in seconds. + """ + + # Default connect timeout (seconds). + CONNECT = 10 + # Default read timeout (seconds). + READ = 30 + + __slots__ = ( + 'connect', + 'read' + ) + + def __init__(self, connect=CONNECT, read=READ): + """ + Args: + connect (int): Connection timeout in seconds. + read (int): Read timeout in seconds. + """ + self.connect = connect + self.read = read + + def __str__(self): + description = _('timeout: connect={c} read={r}') + return description.format( + c=self.connect, + r=self.read) diff --git a/platform/pulpcore/download/single.py b/platform/pulpcore/download/single.py new file mode 100644 index 0000000000..1ba4cd693a --- /dev/null +++ b/platform/pulpcore/download/single.py @@ -0,0 +1,291 @@ +from gettext import gettext as _ +from logging import getLogger +from threading import RLock + + +from .delegation import delegate +from .error import DownloadFailed +from .validation import ValidationError + + +log = getLogger(__name__) + + +class Download: + """ + An ABSTRACT download object. + Represents a unit of work to download a file. A download is a + command-pattern object. + + Attributes: + url (str): A file download URL. + writer (Writer): An object used to store downloaded file. + validations (list): Collection of Validations to be applied. + context (Context): The download context. + retries (int): The total number of retries possible. + delegate (Delegate): A download delegate. + attachment: Arbitrary object attached to the download. + reply: The remote server reply. + + Any method decorated with `@delegate` may be forwarded to the `delegate` for + implementation. The delegate must define a public method with the same name + (without the `_` prefix). For example, for `_on_error()`, the delegate would + define a method named `on_error(self, download, error)`. Note: The `download` + is injected (passed) as the first argument. + + Notes: + The validations are applied in order listed. + Settings may be shared between downloads. + """ + + # Data transfer buffer size in bytes. + BLOCK = 102400 + # Number of retires on failed download. + RETRIES = 1 + + __slots__ = ( + 'url', + 'writer', + 'validations', + 'context', + 'retries', + 'delegate', + 'attachment', + 'reply' + ) + + def __init__(self, url, writer): + """ + Args: + url (str): A file download URL. + writer (Writer): An object used to store downloaded file. + """ + super(Download, self).__init__() + self.url = url + self.writer = writer + self.validations = [] + self.context = Context() + self.retries = Download.RETRIES + self.delegate = None + self.attachment = None + self.reply = None + + def _prepare(self): + """ + Prepare to be executed. + """ + log.debug(_('Prepare: %(d)s'), {'d': self}) + self._on_prepare() + + def _send(self): + """ + Send the download. + This is the *main* method responsible for implementing the actual + download by sending a protocol specific download. + + Raises: + DownloadFailed: The download failed and could not be repaired. + + Notes: + Must be implemented by subclass. + """ + raise NotImplementedError() + + def _repair(self, error): + """ + The download has encountered an error. + Attempt to repair and retry using `_on_error()` depending on remaining + available retries. + + Args: + error (DownloadFailed): The raised exception. + + Raises: + DownloadFailed: The download failed and could not be repaired. + """ + log.debug(_('Repair: %(d)s'), {'d': self}) + try: + retries = self.retries + while retries: + retries -= 1 + repaired = self._on_error(error) + if not repaired: + break + try: + self._attempt() + except DownloadFailed: + continue + else: + return + if not retries: + self._on_failed() + raise error + except Exception: + log.exception(_('Repair failed: {d}').format(d=self)) + self._on_failed() + raise error + + def _write(self, bfr): + """ + Write downloaded content. + + Args: + bfr (bytes): A buffer to append. + """ + self.writer.append(bfr) + for validation in self.validations: + validation.update(bfr) + + def _attempt(self): + """ + Attempt to download. + + Raises: + DownloadFailed: The download failed. + ValidationError: On validation failed. + """ + log.debug(_('Attempt: %(d)s'), {'d': self}) + with self.writer: + self._send() + self._on_reply() + self._on_succeeded() + self._validate() + + def _validate(self): + """ + Apply validations. + + Raises: + ValidationError: On validation failed. + """ + for validation in self.validations: + log.debug( + _('Apply validation: %(validator)s'), + { + 'validator': validation + }) + try: + validation.apply() + except ValidationError as error: + log.info(_('Validation failed: %(error)s'), {'error': error}) + raise error + except Exception as unexpected: + log.exception(unexpected) + raise unexpected + + @delegate + def _on_prepare(self): + """ + Prepared to be executed. + """ + pass + + @delegate + def _on_reply(self): + """ + A reply has been received. + """ + pass + + @delegate + def _on_succeeded(self): + """ + The download has succeeded. + """ + pass + + @delegate + def _on_failed(self): + """ + The download has failed. + All efforts to repair and retry have failed. + """ + pass + + @delegate + def _on_error(self, error): + """ + The download has encountered an error. + This provides the (optional) delegate an opportunity to repair + and try the download. + + Args: + error (DownloadFailed): The raised exception. + + Returns: + (bool): True if repaired and may be retried. + """ + return False + + def __call__(self): + """ + Execute the download. + + Raises: + DownloadFailed: The download failed and could not be repaired. + ValidationError: On validation failed. + """ + try: + self._prepare() + self._attempt() + except DownloadFailed as error: + self._repair(error) + + def __str__(self): + _id = str(id(self))[-4:] + description = _( + '{t}: id={id} url={u} writer={w}' + ' | repair: retries={r}') + return description.format( + t=type(self).__name__, + id=_id, + u=self.url, + w=self.writer, + r=self.retries) + + +class Context: + """ + A download context. + Each download has a reference to a context used to safely share resources + such as HTTP sessions and authentication tokens. + + Attributes: + properties (dict): Arbitrary properties. + _mutex (RLock): The object mutex. + + Examples: + >>> + >>> def get_token(self): + >>> with self.context as context: + >>> try: + >>> return context.token + >>> except KeyError: + >>> token = self.generate_token() + >>> context.token = token + >>> return token + >>> + """ + + def __init__(self, **properties): + """ + Args: + properties (dict): Initial properties. + """ + self.__dict__.update(properties) + self.__dict__['MUTEX'] = RLock() + + @property + def _mutex(self): + return self.__dict__['MUTEX'] + + def __setattr__(self, key, value): + with self._mutex: + super(Context, self).__setattr__(key, value) + + def __enter__(self): + self._mutex.acquire() + return self + + def __exit__(self, *unused): + self._mutex.release() diff --git a/platform/pulpcore/download/validation.py b/platform/pulpcore/download/validation.py new file mode 100644 index 0000000000..084c56e123 --- /dev/null +++ b/platform/pulpcore/download/validation.py @@ -0,0 +1,276 @@ +import hashlib + +from gettext import gettext as _ +from logging import getLogger + +from .error import DownloadError + + +log = getLogger(__name__) + + +class ValidationError(DownloadError): + """ + Downloaded file failed validation. + """ + pass + + +class MismatchError(ValidationError): + """ + Value mismatch. + An error related to a difference between and expected and actual value. + """ + + def __init__(self, expected, actual): + """ + Args: + expected: The expected value. + actual: The actual value. + """ + super(MismatchError, self).__init__() + self.expected = expected + self.actual = actual + + +class SizeMismatch(MismatchError): + """ + The file size does not match what is expected. + """ + + def __str__(self): + return _('File size mismatch: expected={e} actual={a}'.format( + e=self.expected, + a=self.actual)) + + +class DigestMismatch(MismatchError): + """ + The digest (checksum) did not match what was expected. + """ + + def __str__(self): + return _('Digest mismatch: expected={e} actual={a}'.format( + e=self.expected, + a=self.actual)) + + +class Validation: + """ + Validation. + + Attributes: + enforced (bool): Validation enforced. When enforced, the failed + validation results in raising a ValidationError. When not enforced, + only a warning is logged. + """ + + __slots__ = ('enforced',) + + def __init__(self, enforced=True): + """ + Args: + enforced (bool): Validation enforced. When enforced, the failed + validation results in raising a ValidationError. When not enforced, + only a warning is logged. + """ + self.enforced = enforced + + def update(self, bfr): + """ + Update collected information. + + Args: + bfr (str): The actual bytes downloaded. + + Notes: + Must be implemented by subclass. + """ + raise NotImplementedError() + + def apply(self): + """ + Apply the validation. + + Raises: + ValidationError: When validation has failed. + + Notes: + Must be implemented by subclass. + """ + raise NotImplementedError() + + +class SizeValidation(Validation): + """ + Validate the size of the downloaded file matches what is expected. + + Attributes: + expected (int): The expected size in bytes. + actual (int): The actual size in bytes. + + Examples: + >>> + >>> from pulpcore.download import HttpDownload + >>> + >>> download = HttpDownload(...) + >>> download.validations.append(SizeValidation(100)) # Expected file size in bytes. + >>> + >>> try: + >>> download() + >>> except ValidationError: + >>> # validation failed. + >>> else: + >>> # Go read the downloaded file \o/ + >>> + """ + + __slots__ = ('expected', 'actual') + + def __init__(self, expected, enforced=True): + """ + Args: + expected (int): The expected file size in bytes. + enforced (bool): Validation enforced. + """ + super(SizeValidation, self).__init__(enforced) + self.expected = expected + self.actual = 0 + + def update(self, bfr): + """ + Update collected information. + + Args: + bfr (str): The actual bytes downloaded. + """ + self.actual += len(bfr) + + def apply(self): + """ + Apply the validation. + + Raises: + SizeMismatch: When validation has failed. + """ + if self.expected == self.actual: + return + error = SizeMismatch(expected=self.expected, actual=self.actual) + if self.enforced: + raise error + else: + log.warn(str(error)) + + def __str__(self): + return _( + 'SizeValidation: expected={e} actual={a}').format( + e=self.expected, + a=self.actual) + + +class DigestValidation(Validation): + """ + Validate the digest (checksum) of the downloaded file matches what is expected. + + Attributes: + algorithm (hashlib.Algorithm): The hash algorithm. + expected (int): The expected hex digest. + actual (int): The actual (calculated) hex digest. + + Examples: + >>> + >>> from pulpcore.download import HttpDownload, ValidationError + >>> + >>> download = HttpDownload(...) + >>> download.validations.append(DigestValidation('sha256', '..f17a599e4bf624a7c..')) + >>> + >>> try: + >>> download() + >>> except ValidationError: + >>> # validation failed. + >>> else: + >>> # Go read the downloaded file \o/ + >>> + """ + + __slots__ = ( + 'algorithm', + 'expected', + 'actual' + ) + + # ordered by strength + ALGORITHMS = ( + 'sha512', + 'sha384', + 'sha256', + 'sha224', + 'sha1', + 'md5', + ) + + @staticmethod + def _find_algorithm(name): + """ + Find the hash algorithm by name in hashlib. + + Args: + name: The algorithm name. + + Returns: + hashlib.Algorithm: The algorithm object. + + Raises: + ValueError: When not found. + """ + try: + return getattr(hashlib, name.lower())() + except AttributeError: + raise ValueError(_('Algorithm {n} not supported').format(n=name)) + + def __init__(self, algorithm, digest, enforced=True): + """ + Args: + algorithm (str): The hash algorithm. + digest (str): The expected digest. + enforced (bool): Validation enforced. + + Raises: + ValueError: When `algorithm` not supported by hashlib. + """ + super(DigestValidation, self).__init__(enforced) + self.algorithm = self._find_algorithm(algorithm) + self.expected = digest + self.actual = None + + def update(self, bfr): + """ + Update collected information. + + Args: + bfr (str): The actual bytes downloaded. + """ + self.algorithm.update(bfr) + self.actual = self.algorithm.hexdigest() + + def apply(self): + """ + Apply the validation. + + Raises: + DigestMismatch: When validation has failed. + """ + if self.expected == self.actual: + return + error = DigestMismatch(expected=self.expected, actual=self.actual) + if self.enforced: + raise error + else: + log.warn(str(error)) + + def __str__(self): + return _( + 'DigestValidation: alg={al} expected={e} actual={a}').format( + al=self.algorithm, + e=self.expected, + a=self.actual) diff --git a/platform/pulpcore/download/writer.py b/platform/pulpcore/download/writer.py new file mode 100644 index 0000000000..dbe1c0ec4f --- /dev/null +++ b/platform/pulpcore/download/writer.py @@ -0,0 +1,168 @@ +import os +import errno + +from io import BytesIO +from logging import getLogger + + +log = getLogger(__name__) + + +class Writer: + """ + Downloaded content writer. + + Attributes: + fp: An open file-like object used for writing. + """ + + __slots__ = ('fp',) + + def __init__(self, fp=None): + """ + Args: + fp: An open file-like object used for writing. + """ + self.fp = fp + + @property + def is_open(self): + """ + Get whether the writer is open. + + Returns: + bool: True when open. + """ + return (self.fp is not None) and (not self.fp.closed) + + def open(self): + """ + Open for writing. + The directory tree is created as necessary. + """ + pass + + def append(self, buffer): + """ + Append (write) the buffer. + + Args: + buffer (bytes): A buffer to append. + + Returns: + int: The number of bytes appended. + """ + return self.fp.write(buffer) + + def close(self): + """ + Close the target file. + """ + pass + + def discard(self): + """ + Discard written content. + """ + pass + + def __enter__(self): + self.open() + return self + + def __exit__(self, *unused): + self.close() + + +class FileWriter(Writer): + """ + Downloaded content writer. + Write content to a file. + + Attributes: + path (str): The absolute path to the file. May be None. + """ + + __slots__ = ('path',) + + def __init__(self, path): + """ + Args: + path (str): The absolute path to the file. May be None. + """ + super(FileWriter, self).__init__() + self.path = path + + def open(self): + """ + Open for writing. + The directory tree is created as necessary. + """ + if self.is_open: + return + self._mkdir() + self.fp = open(self.path, 'wb') + + def close(self): + """ + Close the target file. + """ + if not self.is_open: + return + try: + self.fp.close() + except Exception: + log.exception(self) + finally: + self.fp = None + + def discard(self): + """ + Discard written content. + """ + os.unlink(self.path) + + def _mkdir(self): + """ + Create the directory as needed. + + Raises: + OSError: When the directory cannot be created. + """ + _dir = os.path.dirname(self.path) + if not _dir: + return + try: + os.makedirs(_dir) + except OSError as e: + if e.errno != errno.EEXIST: + raise + + def __str__(self): + return self.path + + +class BufferWriter(Writer): + """ + Buffer writer. + Used to store downloaded file content into a buffer. + """ + + def open(self): + """ + Create the buffer. + """ + self.fp = BytesIO() + + def read(self): + """ + Get the buffered content. + + Returns: + str: The buffered content. + """ + self.fp.seek(0) + return self.fp.read().decode('utf8', 'replace') + + def __str__(self): + return 'buffer' diff --git a/plugin/pulpcore/plugin/changeset/__init__.py b/plugin/pulpcore/plugin/changeset/__init__.py new file mode 100644 index 0000000000..f3aa86675c --- /dev/null +++ b/plugin/pulpcore/plugin/changeset/__init__.py @@ -0,0 +1,140 @@ +""" + +The ``ChangeSet`` is the primary object used by plugin writers to support the Importer. +It represents a set of changes to be made to the repository in terms of content +that needs to be added (additions) and content that needs to be removed (removals). +The term `Remote` is used to describe the repository that the importer is synchronizing +with. The term ``RemoteContent`` is content contained in the remote repository. The +term ``RemoteArtifact`` is an artifact associated with a `RemoteContent` unit. + +Plugin writers need to pass `additions` to the `ChangeSet` as a collection of +`RemoteContent`. In other words, `additions` are a collection of content in the remote +repository that is not in the (local) Pulp repository and it needs to be added. + +Plugin writers need to pass `removals` to the `ChangeSet` as a collection of `Content` +that has been fetched from the Pulp DB. In other words, `removals` are a collection +content that is in the Pulp (local) repository that is not in the remote repository. +Or, content that needs to be removed for any reason as determined by the importer. + +The `ChangeSet` is designed for `stream` processing. It is strongly encouraged that both +the `additions` and `removals` be a `generator` that is wrapped in a ``SizedIterable``. +Wrapping the generator in a `SizedIterable` provides the total number of items that the +generator will yield. This is needed for progress reporting. + +Once the `ChangeSet` is constructed, the `apply()` method is called which returns an +iterator of ``ChangeReport``. Due to the `streams processing` design of the `ChangeSet`, +the returned iterator **must** be iterated for work to be performed. The `ChangeReport` +contains: + +- The requested action (ADD|REMOVE). +- The content (model) instance that was added/removed. +- A list of any exceptions raised. + +The ChangeSet is used something like this: + +Examples: + >>> + >>> from django.db.models import Q + >>> from collections import namedtuple + >>> from pulpcore.plugin.changeset import ( + >>> ChangeSet, RemoteContent, RemoteArtifact, SizedIterable) + >>> from pulpcore.plugin.models import Artifact, Content, Importer, Repository + >>> + >>> + >>> Delta = namedtuple('Delta', ['additions', 'removals']) + >>> + >>> + >>> class Thing(Content): + >>> pass + >>> + >>> + >>> class ThingImporter(Importer): + >>> + >>> def _build_additions(self, delta, metadata): + >>> # Using the set of additions defined by the delta and the metadata, + >>> # build and yield the content that needs to be added. + >>> + >>> for thing in metadata: + >>> # Needed? + >>> if not thing.natural_key in delta.additions: + >>> continue + >>> # Create a concrete model instance for Thing content + >>> # using the (thing) metadata. + >>> model = Thing(...) + >>> # Create a remote content instance using the model. + >>> content = RemoteContent(model) + >>> # Create a remote artifact for each file associated with the remote content. + >>> for file in thing: + >>> # create the artifact model instance. + >>> model = Artifact(...) + >>> # Create an object that may be used to download the file. + >>> download = self.get_download(...) + >>> # Create the remote artifact with the model instance and the download. + >>> artifact = RemoteArtifact(model, download) + >>> # Add the remote artifact to the remote content. + >>> content.artifacts.add(artifact) + >>> yield content + >>> + >>> def _fetch_removals(self, delta): + >>> # Query the DB and yield any content in the repository + >>> # matching the natural key in the delta. + >>> for natural_keys in BatchIterator(delta.removals): + >>> q = Q() + >>> for key in natural_keys: + >>> q |= Q(...) + >>> q_set = self.repository.content.filter(q) + >>> q_set = q_set.only('artifacts') + >>> for content in q_set: + >>> yield content + >>> + >>> def _find_delta(self, metadata, inventory): + >>> # Using the metadata and inventory (of content in the repository), + >>> # determine the set of content that needs to be added and the set of + >>> # content that needs to be removed. + >>> remote = set() + >>> for thing in metadata: + >>> remote.add(thing.natural_key()) + >>> additions = remote - inventory + >>> removals = inventory - remote + >>> return Delta(additions=additions, removals=removals) + >>> + >>> def _fetch_inventory(self): + >>> # Query the DB and find all `Thing` content in the repository. + >>> # The `inventory` is used for comparison and should only be the natural + >>> # key for each content. + >>> inventory = set() + >>> q_set = Thing.objects.filter(repositories=self.repository) + >>> q_set = q_set.only(*[f.name for f in Thing.natural_key_fields]) + >>> for content in (c.cast() for c in q_set): + >>> inventory.add(content.natural_key()) + >>> return inventory + >>> + >>> def _build_changeset(self): + >>> # Build a changeset. + >>> metadata = # + >>> inventory = self._fetch_inventory() + >>> delta = self.find_delta(metadata, inventory) + >>> additions = SizedIterable( + >>> self._build_additions(delta, metadata), + >>> len(delta.additions)) + >>> removals = SizedIterable( + >>> self._fetch_removals(delta), + >>> len(delta.removals)) + >>> return ChangeSet(self, additions=additions, removals=removals) + >>> + >>> def sync(self): + >>> changeset = self._build_changeset() + >>> for report in changeset.apply(): + >>> try: + >>> report.result() + >>> except ChangeFailed: + >>> # Failed + >>> else: + >>> # Succeeded + >>> +""" + +from .iterator import BatchIterator # noqa +from .main import ChangeSet, SizedIterable # noqa +from .model import RemoteContent, RemoteArtifact # noqa +from .report import ChangeReport, ChangeFailed # noqa diff --git a/plugin/pulpcore/plugin/changeset/iterator.py b/plugin/pulpcore/plugin/changeset/iterator.py new file mode 100644 index 0000000000..745d015337 --- /dev/null +++ b/plugin/pulpcore/plugin/changeset/iterator.py @@ -0,0 +1,239 @@ +import itertools + +from collections.abc import Iterable +from logging import getLogger + +from django.db.models import Q + +from pulpcore.download import Download + +from .model import RemoteArtifact + + +log = getLogger(__name__) + + +class BatchIterator(Iterable): + """ + Iterate large numbers of items in batches. + + Attributes: + iterable (Iterable): An iterable to batch. + batch (int): The size of each batch. + + Examples: + >>> + >>> numbers = [1, 2, 3, 4, 5, 6] + >>> for batch in BatchIterator(numbers, 3): + >>> repr(batch) + '(1, 2, 3)' + '(4, 5, 6)' + """ + + # Default batch size. + BATCH = 1000 + + def __init__(self, iterable, batch=BATCH): + """ + Args: + iterable (Iterable): An iterable to batch. + batch (int): The size of each batch. + """ + self.iterable = iterable + self.batch = batch + + def __iter__(self): + generator = (c for c in self.iterable) + while True: + batch = tuple(itertools.islice(generator, 0, self.batch)) + if batch: + yield batch + else: + return + + +class ContentIterator(Iterable): + """ + Iterate `RemoteContent` and replace associated content models + With models fetched from the DB (when found). + """ + + # Number of keys in each batched query. + BATCH = 1024 + + @staticmethod + def _batch_q(content): + """ + Build a query for the specified batch of content. + + Args: + content (tuple): A batch of content. Each is: RemoteContent. + + Returns: + Q: The built query. + """ + q = Q() + for c in content: + q |= Q(**c.key) + return q + + def __init__(self, content): + """ + Args: + content (Iterable): An Iterable of RemoteContent. + """ + self.content = content + + def _collated_content(self): + """ + Collate each batch of content into lists by model. + + Yields: + dict: A dictionary of {model_class: [content,]} + Each content is: RemoteContent. + """ + for batch in BatchIterator(self.content, self.BATCH): + collated = {} + for content in batch: + _list = collated.setdefault(type(content.model), list()) + _list.append(content) + yield collated + + def _fetch(self): + """ + Fetch each batch of collated content. + + Yields: + tuple: (content, fetched). + The content is a list of RemoteContent. + The fetched is a dictionary of fetched content models keyed by natural key. + """ + for collated in self._collated_content(): + for model, content in collated.items(): + fields = {f.name for f in model.natural_key_fields} + q = self._batch_q(content) + q_set = model.objects.filter(q) + q_set = q_set.only(*fields) + fetched = {c.natural_key(): c for c in q_set} + yield (content, fetched) + + def _iter(self): + """ + The remote content is iterated and its model is replaced with a model + fetched from the DB (when found). This is batched to limit the memory + footprint. + + Yields: + RemoteContent: The transformed content. + """ + for batch, fetched in self._fetch(): + for content in batch: + natural_key = content.model.natural_key() + try: + model = fetched[natural_key] + except KeyError: + pass + else: + content.update(model) + yield content + + def __iter__(self): + return iter(self._iter()) + + +class DownloadIterator(Iterable): + """ + Iterate the content artifacts and yield the appropriate download object. + When downloading is deferred or the artifact is already downloaded, + A NopDownload is yielded. + """ + + def __init__(self, content, deferred=False): + """ + Args: + content (Iterable): An Iterable of RemoteContent. + deferred (bool): When true, downloading is deferred. + """ + self.content = content + self.deferred = deferred + + def _iter(self): + """ + Iterate the content artifacts and yield the appropriate download object. + When downloading is deferred or the artifact is already downloaded, + A NopDownload is yielded. + + Yields: + Download: The appropriate download object. + """ + for artifact in ArtifactIterator(self.content): + if self.deferred or artifact.model.downloaded: + download = NopDownload() + download.attachment = artifact + artifact.path = None + else: + download = artifact.download + yield download + + def __iter__(self): + return iter(self._iter()) + + +class ArtifactIterator(Iterable): + """ + Iterate the content and flatten the artifacts. + Ensure that at least (1) artifact is yielded for each content. + """ + + def __init__(self, content): + """ + Args: + content (Iterable): An Iterable of RemoteContent. + """ + self.content = content + + def _iter(self): + """ + Iterate the content and flatten the artifacts. + Ensure that at least (1) artifact is yielded for each content. When content + does not have any un-downloaded artifacts, A NopArtifact is yielded. + + Yields: + RemoteArtifact: The flattened artifacts. + """ + for content in ContentIterator(self.content): + if content.artifacts: + for artifact in content.artifacts: + artifact.content = content + yield artifact + else: + artifact = RemoteArtifact(NopArtifact(), NopDownload()) + artifact.content = content + yield artifact + + def __iter__(self): + return iter(self._iter()) + + +class NopDownload(Download): + """ + A no-operation (NOP) download. + """ + + def __init__(self): + super(NopDownload, self).__init__('', None) + + def _send(self): + pass + + def __call__(self): + pass + + +class NopArtifact: + """ + No operation (NOP) artifact model. + """ + + def __init__(self): + self.downloaded = True diff --git a/plugin/pulpcore/plugin/changeset/main.py b/plugin/pulpcore/plugin/changeset/main.py new file mode 100644 index 0000000000..8242bc2e5e --- /dev/null +++ b/plugin/pulpcore/plugin/changeset/main.py @@ -0,0 +1,253 @@ +from collections.abc import Sized, Iterable +from gettext import gettext as _ +from logging import getLogger + +from django.db.utils import IntegrityError +from django.db import transaction + +from ..download import Batch +from ..models import ProgressBar, DownloadCatalog, RepositoryContent +from ..tasking import Task + +from .iterator import BatchIterator, DownloadIterator +from .report import ChangeReport + + +log = getLogger(__name__) + + +class ChangeSet: + """ + A set of changes to be applied to a repository. + + When applied, the following changes are made. + + For each content (unit) added: + + - All content artifacts are downloaded as needed (unless deferred=True). + - The content (model) is saved. + - All artifacts (models) are saved. + - Deferred download catalog entries are created for each artifact. + - The content (unit) is added to the repository. + + For each content (unit) removed: + + - Content removed from the repository. + - Deferred download catalog deleted for each artifact. + + Attributes: + importer (pulpcore.plugin.Importer): An importer. + additions (SizedIterable): The content to be added to the repository. + removals (SizedIterable): The content IDs to be removed. + deferred (bool): Downloading is deferred. When true, downloads are not performed + but content is still created and added to the repository. + + Examples: + >>> + >>> from pulpcore.plugin.changeset import ChangeSet, ChangeFailed + >>> + >>> changeset = ChangeSet(...) + >>> for report in changeset.apply(): + >>> try: + >>> report.result() + >>> except ChangeFailed: + >>> # failed. Decide what to do. + >>> else: + >>> # be happy + >>> + """ + + def __init__(self, importer, additions=(), removals=()): + """ + Args: + importer (pulpcore.plugin.Importer): An importer. + additions (SizedIterable): The content to be added to the repository. + removals (SizedIterable): The content IDs to be removed. + + Notes: + The content to be added may already exist but not be associated + to the repository. Existing content is fetched and used instead + of provided content models as needed. + """ + self.importer = importer + self.additions = additions + self.removals = removals + self.deferred = False + + @property + def repository(self): + """ + The repository being updated. + + Returns: + pulpcore.plugin.models.Repository: The repository being updated. + """ + return self.importer.repository + + def _add_content(self, content): + """ + Add the specified content to the repository. + Download catalog entries are created foreach artifact. + + Args: + content (RemoteContent): The content to be added. + """ + try: + association = RepositoryContent( + repository=self.importer.repository, + content=content.model) + association.save() + except IntegrityError: + # Duplicate + pass + for artifact in content.artifacts: + log.info( + _('Cataloging artifact: %(a)s'), + { + 'a', artifact + }) + try: + entry = DownloadCatalog() + entry.importer = self.importer + entry.url = artifact.download.url + entry.artifact = artifact.model + entry.save() + except IntegrityError: + entry = DownloadCatalog.objects.filter( + importer=self.importer, + artifact=artifact) + entry.url = artifact.download.url + entry.save() + + def _remove_content(self, content): + """ + Remove content from the repository. + Download catalog entries are deleted foreach artifact. + + Args: + content (pulpcore.plugin.Content): A content model to be removed. + """ + q_set = DownloadCatalog.objects.filter( + importer=self.importer, + artifact__in=content.artifacts.all()) + q_set.delete() + q_set = RepositoryContent.objects.filter( + repository=self.repository, + content=content) + q_set.delete() + + def _apply_additions(self): + """ + Apply additions. + Content listed in `additions` is created (as needed) and added to the repository. + + Yields: + ChangeReport: A report for each content added. + """ + log.info( + _('Apply additions: repository=%(r)s.'), + { + 'r': self.repository.name + }) + with Batch(DownloadIterator(self.additions, self.deferred)) as batch: + with ProgressBar(message=_('Add Content'), total=len(self.additions)) as bar: + for plan in batch(): + artifact = plan.download.attachment + content = artifact.content + try: + plan.result() + except Exception as error: + task = Task() + task.append_non_fatal_error(error) + bar.increment() + report = ChangeReport(ChangeReport.ADDED, content.model) + report.error = plan.error + yield report + continue + artifact.settle() + content.settle() + if not content.settled: + continue + with transaction.atomic(): + self._add_content(content) + bar.increment() + report = ChangeReport(ChangeReport.ADDED, content.model) + yield report + + def _apply_removals(self): + """ + Apply removals. + Content listed in `removals` is removed from the repository. + + Yields: + ChangeReport: A report for each removed content. + """ + log.info( + _('Apply removals: repository=%(r)s.'), + { + 'r': self.repository.name + }) + with ProgressBar(message=_('Remove Content'), total=len(self.removals)) as bar: + for batch in BatchIterator(self.removals, 100): + with transaction.atomic(): + for content in batch: + self._remove_content(content) + bar.done += len(batch) + bar.save() + report = ChangeReport(ChangeReport.REMOVED, content) + yield report + + def apply(self): + """ + Apply the requested changes to the repository. + + Yields: + ChangeReport: For each change applied. + """ + log.info( + _('Apply ChangeSet: repository=%(r)s.'), + { + 'r': self.repository.name + }) + for report in self._apply_additions(): + yield report + for report in self._apply_removals(): + yield report + + +class SizedIterable(Sized, Iterable): + """ + A sized iterable. + + Attributes: + iterable (Iterable): An iterable. + length (int): The number of items expected to be yielded by the iterable. + + Examples: + >>> + >>> generator = (n for n in [1, 2, 3]) + >>> iterable = SizedIterable(generator, 3) + >>> len(iterable) + 3 + >>> list(iterable) + [1, 2, 3] + """ + + def __init__(self, iterable, length): + """ + Args: + iterable (Iterable): An iterable. + length (int): The number of items expected to be yielded by the iterable. + """ + self._iterable = iterable + self._length = length + + def __len__(self): + """ + Returns: + The number of items expected to be yielded by the iterable. + """ + return self._length + + def __iter__(self): + return iter(self._iterable) diff --git a/plugin/pulpcore/plugin/changeset/model.py b/plugin/pulpcore/plugin/changeset/model.py new file mode 100644 index 0000000000..d8635437e8 --- /dev/null +++ b/plugin/pulpcore/plugin/changeset/model.py @@ -0,0 +1,207 @@ +from gettext import gettext as _ +from logging import getLogger + +from django.db.utils import IntegrityError +from django.db import transaction +from django.core.files import File + + +log = getLogger(__name__) + + +class Remote: + """ + Represents content related things contained within the remote repository. + + Attributes: + model (Model): A remote (wanted) model instance. + settled (bool): All matters are settled and the object is ready + to be (optionally created) and added to the repository. + """ + + __slots__ = ( + 'model', + 'settled' + ) + + def __init__(self, model): + """ + Args: + model (Model): A remote (wanted) model instance. + """ + self.model = model + self.settled = False + + def save(self): + """ + Save the model. + """ + self.model.save() + + +class RemoteContent(Remote): + """ + Represents content that is contained within the remote repository. + + Attributes: + model (pulpcore.plugin.Content): A content model instance. + artifacts (set): The set of related `RemoteArtifact`. + + Examples: + >>> + >>> from pulpcore.plugin.models import Content + >>> + >>> + >>> class Thing(Content): + >>> ... + >>> thing = Thing() # DB model instance. + >>> ... + >>> content = RemoteContent(thing) + >>> + """ + + __slots__ = ('artifacts',) + + def __init__(self, model): + """ + Args: + model (pulpcore.plugin.models.Content): A content model instance. + This instance will be used to store newly created content in the DB. + """ + super(RemoteContent, self).__init__(model) + self.artifacts = set() + + @property + def key(self): + """ + The natural key as a dictionary. + + Returns: + dict: The natural key. + """ + return {f.name: getattr(self.model, f.name) for f in self.model.natural_key_fields} + + def update(self, model): + """ + Update this `model` stored with the specified model that has been + fetched from the database. The artifacts are matched by `relative_path` + and their model object is replaced by the fetched model. + + Args: + model (pulpcore.plugin.Content): A fetched content model object. + """ + self.model = model + known = {a.model.relative_path: a for a in self.artifacts} + self.artifacts.clear() + for artifact in model.artifacts.all(): + try: + found = known[artifact.relative_path] + found.model = artifact + self.artifacts.add(found) + except KeyError: + log.error(_('Artifact not matched.')) + + def settle(self): + """ + Ensures that all prerequisite matters pertaining to adding the content + to a repository have been settled: + - All artifacts are settled. + - Content created. + - Artifacts created. + + Notes: + Called whenever an artifact has been downloaded. + """ + for artifact in self.artifacts: + if not artifact.settled: + return + self.save() + self.settled = True + + def save(self): + """ + Save the content and related artifacts in a single DB transaction. + Due to race conditions, the content may already exist raising an IntegrityError. + When this happens, the model is fetched and replaced. + """ + is_duplicate = False + with transaction.atomic(): + try: + super(RemoteContent, self).save() + except IntegrityError: + is_duplicate = True + else: + for artifact in self.artifacts: + artifact.model.content = self.model + artifact.save() + if is_duplicate: + model = type(self.model) + content = model.objects.get(**self.key) + self.update(content) + + +class RemoteArtifact(Remote): + """ + Represents an artifact related to content that is contained within + the remote repository. + + Attributes: + download (pulpcore.download.Download): An object used to download the content. + content (RemoteContent): The associated remote content. + path (str): Absolute path to the downloaded file. May be (None) when + downloading is deferred or the artifact has already been downloaded. + + Examples: + >>> + >>> from pulpcore.plugin.models import Artifact + >>> + >>> model = Artifact(...) # DB model instance. + >>> download = ... + >>> ... + >>> artifact = RemoteArtifact(model, download) + >>> + """ + + __slots__ = ( + 'content', + 'download', + 'path' + ) + + def __init__(self, model, download): + """ + + Args: + model (pulpcore.plugin.models.Artifact): An artifact model instance. + This instance will be used to store a newly created or updated + artifact in the DB. + download (pulpcore.download.Download): A An object used to download the content. + """ + super(RemoteArtifact, self).__init__(model) + self.download = download + self.download.attachment = self + self.path = download.writer.path + self.content = None + + def settle(self): + """ + Ensures that all prerequisite matters pertaining to adding the artifact + to the DB have been settled: + + Notes: + Called whenever an artifact has been processed. + """ + self.settled = True + + def save(self): + """ + Update the DB model to store the downloaded file. + Then, save in the DB. + """ + if self.path: + self.model.file = File(open(self.path, mode='rb')) + self.model.downloaded = True + super(RemoteArtifact, self).save() + + def __hash__(self): + return hash(self.model.id) diff --git a/plugin/pulpcore/plugin/changeset/report.py b/plugin/pulpcore/plugin/changeset/report.py new file mode 100644 index 0000000000..2eb64b5b55 --- /dev/null +++ b/plugin/pulpcore/plugin/changeset/report.py @@ -0,0 +1,64 @@ +from gettext import gettext as _ + + +class ChangeReport: + """ + Report changes to a repository. + + Attributes: + action (str): The requested action (ADD|REMOVE). + content (pulpcore.plugin.Content): The affected content model. + error (Exception): An exception raised during plan execution. + """ + + # Actions + ADDED = 'ADD' + REMOVED = 'REMOVE' + + __slots__ = ( + 'action', + 'content', + 'error' + ) + + def __init__(self, action, content): + """ + Args: + action (str): The requested action (ADD|REMOVE). + content (pulpcore.plugin.Content): The affected content model. + """ + self.action = action + self.content = content + self.error = None + + def result(self): + """ + Get the execution result. + This **should** be called to ensure that error cases are properly handled. + + Returns: + pulpcore.plugin.Content: The affected content model. + + Raises: + ChangeFailed: Any exception raised during plan execution. + """ + if self.error is None: + return self.content + else: + raise ChangeFailed(str(self.error)) + + +class ChangeFailed(Exception): + """ + A requested change has failed. + """ + + def __init__(self, reason): + """ + Args: + reason (str): The reason the change failed. + """ + self.reason = reason + + def __str__(self): + return _('Change Failed. Reason: {r}'.format(r=self.reason)) diff --git a/plugin/pulpcore/plugin/download/__init__.py b/plugin/pulpcore/plugin/download/__init__.py new file mode 100644 index 0000000000..43011ad402 --- /dev/null +++ b/plugin/pulpcore/plugin/download/__init__.py @@ -0,0 +1,59 @@ +""" +Provides classes to support the Importer downloading. Primarily for downloading +``metadata`` but ``may`` also be used to download content when not using the `ChangeSet`. + +Examples: + >>> + >>> from pulpcore.plugin.download import Batch, HttpDownload + >>> + >>> # One file. + >>> url = # based on feed URL. + >>> path = 'md' + >>> download = HttpDownload(url, FileWriter('md.txt')) + >>> try: + >>> download() + >>> except DownloadFailed: + >>> # Failed + >>> else: + >>> with open(path): + >>> # read the metadata + >>> + >>> # --- + >>> + >>> # Many files. + >>> downloads = [ + >>> HttpDownload(...), # file-1 + >>> HttpDownload(...), # file-2 + >>> HttpDownload(...), # file-3 + >>> ] + >>> with Batch(downloads) as batch: + >>> for plan in batch(): + >>> try: + >>> plan.result() + >>> except DownloadFailed: + >>> # Failed + >>> else: + >>> # Use the downloaded file \o/ + >>> # read the metadata files. + >>> +""" + +from pulpcore.download import ( # noqa: F401 + Batch, + BufferWriter, + DigestValidation, + Download, + DownloadFailed, + DownloadError, + FileDownload, + FileWriter, + HttpDownload, + NotAuthorized, + NotFound, + SizeValidation, + SSL, + Timeout, + User, + ValidationError) + +from .factory import Factory # noqa: F401 diff --git a/plugin/pulpcore/plugin/download/factory.py b/plugin/pulpcore/plugin/download/factory.py new file mode 100644 index 0000000000..f6e6d5365c --- /dev/null +++ b/plugin/pulpcore/plugin/download/factory.py @@ -0,0 +1,156 @@ +from gettext import gettext as _ +from urllib.parse import urlparse + +from pulpcore.download import (DigestValidation, HttpDownload, FileDownload, FileWriter, + SizeValidation) + + +class Factory: + """ + Factory for creating download object based on URL scheme. + + Attributes: + importer (pulpcore.plugin.models.Importer): An importer. + + Examples: + >>> + >>> importer = .. + >>> url = .. + >>> artifact = .. + >>> download = Factory(importer).build(url, artifact=artifact) + >>> + """ + + def __init__(self, importer): + """ + + Args: + importer (pulpcore.plugin.models.Importer): An importer. + """ + self.importer = importer + + def build(self, url, path=None, artifact=None): + """ + Build a downloader. + + Args: + url (str): The download URL. + path (str): The optional absolute path to where the downloaded file is to be stored. + artifact (pulpcore.app.models.Artifact): An optional artifact. + + Returns: + pulpcore.download.Download: A download object configured using the + attributes of the importer. + """ + if (not path) and (not artifact): + raise ValueError(_('Either "path" or "artifact" is required.')) + + try: + builder = Factory.BUILDER[urlparse(url).scheme.lower()] + except KeyError: + raise ValueError(_('URL: {u} not supported.'.format(u=url))) + else: + return builder(self, url, path, artifact) + + def _file(self, url, path=None, artifact=None): + """ + Build a download for file:// URLs. + + Args: + url (str): The download URL. + path (str): The optional absolute path to where the downloaded file is to be stored. + artifact (pulpcore.app.models.Artifact): An optional artifact. + + Returns: + FileDownload: + """ + if artifact: + _path = artifact.relative_path + else: + _path = path + download = FileDownload(url, FileWriter(_path)) + self._add_validation(download, artifact) + return download + + def _http(self, url, path=None, artifact=None): + """ + Build a download for http:// URLs. + + Args: + url (str): The download URL. + path (str): The optional absolute path to where the downloaded file is to be stored. + artifact (pulpcore.app.models.Artifact): An optional artifact. + + Returns: + HttpDownload: An http download. + """ + if artifact: + _path = artifact.relative_path + else: + _path = path + download = HttpDownload(url, FileWriter(_path)) + download.user.name = self.importer.basic_auth_user + download.user.password = self.importer.basic_auth_password + self._add_validation(download, artifact) + return download + + def _https(self, url, path=None, artifact=None): + """ + Build a download for https:// URLs. + + Args: + url (str): The download URL. + path (str): The optional absolute path to where the downloaded file is to be stored. + artifact (pulpcore.app.models.Artifact): An optional artifact. + + Returns: + HttpDownload: An https download. + """ + if artifact: + _path = artifact.relative_path + else: + _path = path + download = HttpDownload(url, FileWriter(_path)) + download.ssl.ca_certificate = self.importer.ssl_validation, + download.ssl.client_certificate = self.importer.ssl_client_certificate, + download.ssl.client_key = self.importer.ssl_client_key, + download.ssl.validation = self.importer.ssl_validation + download.user.name = self.importer.basic_auth_user + download.user.password = self.importer.basic_auth_password + download.proxy_url = self.importer.proxy_url + self._add_validation(download, artifact) + return download + + def _add_validation(self, download, artifact): + """ + Add validations based on the artifact. + + Args: + download (pulpcore.download.Download): A download object. + artifact (pulpcore.app.models.Artifact): A content artifact. + """ + if not artifact: + return + if not self.importer.validate: + return + if artifact.size: + validation = SizeValidation(artifact.size) + download.validations.append(validation) + for algorithm in DigestValidation.ALGORITHMS: + try: + digest = getattr(artifact, algorithm) + if not digest: + continue + except AttributeError: + continue + else: + validation = DigestValidation(algorithm, digest) + download.validations.append(validation) + break + + # Map URLs scheme to builder method. + BUILDER = { + 'file': _file, + 'http': _http, + 'https': _https, + } diff --git a/plugin/pulpcore/plugin/models/__init__.py b/plugin/pulpcore/plugin/models/__init__.py index b5cefc736b..088e8e88a6 100644 --- a/plugin/pulpcore/plugin/models/__init__.py +++ b/plugin/pulpcore/plugin/models/__init__.py @@ -1,7 +1,10 @@ # Models are exposed selectively in the versioned plugin API. # Any models defined in the pulpcore.plugin namespace should probably be proxy models. + from pulpcore.app.models import ( # NOQA - Content, ProgressBar, ProgressSpinner, Repository, RepositoryContent) + Artifact, Content, DownloadCatalog, ProgressBar, ProgressSpinner, Repository, + RepositoryContent) + -from .publisher import Publisher -from .importer import Importer +from .publisher import Publisher # noqa +from .importer import Importer # noqa diff --git a/plugin/pulpcore/plugin/models/importer.py b/plugin/pulpcore/plugin/models/importer.py index 8f12569e99..5caf3d434c 100644 --- a/plugin/pulpcore/plugin/models/importer.py +++ b/plugin/pulpcore/plugin/models/importer.py @@ -1,5 +1,7 @@ from pulpcore.app.models import Importer as PlatformImporter +from pulpcore.plugin.download import Factory + class Importer(PlatformImporter): """ @@ -42,3 +44,25 @@ def sync(self): class Meta: abstract = True + + def get_download(self, url, destination, artifact=None): + """ + Get an appropriate download object based on the URL that is fully configured using + the importer attributes. When an artifact is specified, the download is tailored + for the artifact. Plugin writers are expected to override when additional + configuration is needed or when another class of download is required. + + Args: + + url (str): The download URL. + destination (str): The absolute path to where the downloaded file is to be stored. + artifact (pulpcore.app.models.Artifact): An optional artifact. + + Returns: + pulpcore.download.Download: The appropriate download object. + + Notes: + This method supports plugins downloading metadata and the + `streamer` downloading artifacts. + """ + return Factory(self).build(url, destination, artifact)