diff --git a/CHANGELOG.md b/CHANGELOG.md index b8f518f9..92d0689a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] -- n/a +### Added +- A new API `FileRepository.upload_file` to upload a file to Pulp repository ## [1.1.0] - 2019-07-03 diff --git a/examples/upload-files b/examples/upload-files new file mode 100755 index 00000000..2d6f8f7f --- /dev/null +++ b/examples/upload-files @@ -0,0 +1,81 @@ +#!/usr/bin/env python +import os +import logging +from argparse import ArgumentParser + +from pubtools.pulplib import Client + +log = logging.getLogger("upload") + + +def upload(client, path, repo_id): + repo = client.get_repository(repo_id).result() + + uploads = [] + if os.path.isdir(path): + for file in os.listdir(path): + file = os.path.join(path, file) + if os.path.isfile(file): + log.debug("Uploading %s to repo %s in threads", file, repo_id) + uploads.append(repo.upload_file(file)) + elif os.path.isfile(path): + log.debug("Uploading %s to repo %s", path, repo_id) + uploads.append(repo.upload_file(path)) + + for up in uploads: + result = up.result() + log.debug("Import task finished:\n%s", result) + + log.info("Uploaded %s files to repository %s", len(uploads), repo_id) + + +def make_client(args): + auth = None + + if args.username: + password = args.password + if password is None: + password = os.environ.get("PULP_PASSWORD") + if not password: + log.warning("No password provided for %s", args.username) + auth = (args.username, args.password) + + return Client(args.url, auth=auth, verify=not args.insecure) + + +def main(): + log.setLevel(logging.INFO) + logging.basicConfig(format="%(message)s", level=logging.INFO) + + parser = ArgumentParser(description="Upload files to Repository") + parser.add_argument("--url", help="Pulp server URL") + parser.add_argument("--repo-id", action="store") + parser.add_argument("--path", action="store", help="Path to a file or a directory") + parser.add_argument("--username", help="Pulp username") + parser.add_argument( + "--password", help="Pulp password (or set PULP_PASSWORD in env)" + ) + parser.add_argument("--debug", action="store_true") + parser.add_argument("--insecure", default=False, action="store_true") + + p = parser.parse_args() + + if not p.url: + parser.error("--url is required") + + if not p.repo_id: + parser.error("--repo-id is required") + + if not p.path: + parser.error("--path is required") + + if p.debug: + logging.getLogger("pubtools.pulplib").setLevel(logging.DEBUG) + log.setLevel(logging.DEBUG) + + client = make_client(p) + return upload(client, p.path, p.repo_id) + + +if __name__ == "__main__": + main() diff --git a/pubtools/pulplib/_impl/client/client.py b/pubtools/pulplib/_impl/client/client.py index 22ad688c..5c65e616 100644 --- a/pubtools/pulplib/_impl/client/client.py +++ b/pubtools/pulplib/_impl/client/client.py @@ -1,6 +1,7 @@ import os import logging import threading +import hashlib from functools import partial import requests @@ -56,6 +57,7 @@ class Client(object): _REQUEST_THREADS = int(os.environ.get("PUBTOOLS_PULPLIB_REQUEST_THREADS", "4")) _PAGE_SIZE = int(os.environ.get("PUBTOOLS_PULPLIB_PAGE_SIZE", "2000")) _TASK_THROTTLE = int(os.environ.get("PUBTOOLS_PULPLIB_TASK_THROTTLE", "200")) + _CHUNK_SIZE = int(os.environ.get("PUBTOOLS_PULPLIB_CHUNK_SIZE", 1024 * 1024)) # Policy used when deciding whether to retry operations. # This is mainly provided here as a hook for autotests, so the policy can be @@ -179,6 +181,29 @@ def search_repository(self, criteria=None): response_f, lambda data: self._handle_page(Repository, search, data) ) + def _do_upload_file(self, upload_id, file_obj, name): + def do_next_upload(checksum, size): + data = file_obj.read(self._CHUNK_SIZE) + if data: + checksum.update(data) + return f_flat_map( + self._do_upload(data, upload_id, size), + lambda _: do_next_upload(checksum, size + len(data)), + ) + # nothing more to upload, return checksum and size + return f_return((checksum.hexdigest(), size)) + + is_file_object = "close" in dir(file_obj) + if not is_file_object: + file_obj = open(file_obj, "rb") + + LOG.info("Uploading %s to Pulp", name) + upload_f = f_flat_map(f_return(), lambda _: do_next_upload(hashlib.sha256(), 0)) + + if not is_file_object: + upload_f.add_done_callback(lambda _: file_obj.close()) + return upload_f + def _publish_repository(self, repo, distributors_with_config): tasks_f = f_return([]) @@ -290,7 +315,6 @@ def _do_search(self, resource_type, search): url = os.path.join(self._url, "pulp/api/v2/{0}/search/".format(resource_type)) LOG.debug("Submitting %s search: %s", url, search) - return self._request_executor.submit( self._do_request, method="POST", url=url, json=search ) @@ -302,3 +326,40 @@ def _delete_resource(self, resource_type, resource_id): LOG.debug("Queuing request to DELETE %s", url) return self._task_executor.submit(self._do_request, method="DELETE", url=url) + + def _request_upload(self): + url = os.path.join(self._url, "pulp/api/v2/content/uploads/") + + LOG.debug("Requesting upload id") + return self._request_executor.submit(self._do_request, method="POST", url=url) + + def _do_upload(self, data, upload_id, offset): + url = os.path.join( + self._url, "pulp/api/v2/content/uploads/%s/%s/" % (upload_id, offset) + ) + + return self._request_executor.submit( + self._do_request, method="PUT", url=url, data=data + ) + + def _do_import(self, repo_id, upload_id, unit_type_id, unit_key): + url = os.path.join( + self._url, "pulp/api/v2/repositories/%s/actions/import_upload/" % repo_id + ) + + body = { + "unit_type_id": unit_type_id, + "upload_id": upload_id, + "unit_key": unit_key, + } + + LOG.debug("Importing contents to repo %s with upload id %s", repo_id, upload_id) + return self._task_executor.submit( + self._do_request, method="POST", url=url, json=body + ) + + def _delete_upload_request(self, upload_id): + url = os.path.join(self._url, "pulp/api/v2/content/uploads/%s/" % upload_id) + + LOG.debug("Deleting upload request %s", upload_id) + return self._request_executor.submit(self._do_request, method="DELETE", url=url) diff --git a/pubtools/pulplib/_impl/fake/client.py b/pubtools/pulplib/_impl/fake/client.py index 049e31a0..34ab5755 100644 --- a/pubtools/pulplib/_impl/fake/client.py +++ b/pubtools/pulplib/_impl/fake/client.py @@ -1,11 +1,12 @@ import random import uuid import threading +import hashlib from collections import namedtuple import six -from more_executors.futures import f_return, f_return_error +from more_executors.futures import f_return, f_return_error, f_flat_map from pubtools.pulplib import Page, PulpException, Criteria, Task from .. import compat_attr as attr @@ -13,6 +14,7 @@ from .match import match_object Publish = namedtuple("Publish", ["repository", "tasks"]) +Upload = namedtuple("Upload", ["repository", "tasks", "name", "sha256"]) class FakeClient(object): @@ -28,6 +30,7 @@ class FakeClient(object): def __init__(self): self._repositories = [] self._publish_history = [] + self._upload_history = [] self._lock = threading.RLock() self._uuidgen = random.Random() self._uuidgen.seed(0) @@ -79,6 +82,52 @@ def get_repository(self, repository_id): return f_return(data[0]) + def _do_upload_file(self, upload_id, file_obj, name): + # pylint: disable=unused-argument + is_file_obj = "close" in dir(file_obj) + if not is_file_obj: + file_obj = open(file_obj, "rb") + + def do_next_upload(checksum, size): + data = file_obj.read(1024 * 1024) + if data: + checksum.update(data) + size += len(data) + return do_next_upload(checksum, size) + return f_return((checksum.hexdigest(), size)) + + out = f_flat_map(f_return(), lambda _: do_next_upload(hashlib.sha256(), 0)) + + if not is_file_obj: + out.add_done_callback(lambda _: file_obj.close()) + + return out + + def _request_upload(self): + upload_request = { + "_href": "/pulp/api/v2/content/uploads/%s/" % self._next_request_id(), + "upload_id": "%s" % self._next_request_id(), + } + + return f_return(upload_request) + + def _do_import(self, repo_id, upload_id, unit_type_id, unit_key): + # pylint: disable=unused-argument + repo_f = self.get_repository(repo_id) + if repo_f.exception(): + # Repo can't be found, let that exception propagate + return repo_f + + repo = repo_f.result() + + task = Task(id=self._next_task_id(), completed=True, succeeded=True) + + self._upload_history.append( + Upload(repo, [task], unit_key["name"], unit_key["checksum"]) + ) + + return f_return([task]) + def _delete_resource(self, resource_type, resource_id): if resource_type == "repositories": return self._delete_repository(resource_id) @@ -128,3 +177,6 @@ def _next_task_id(self): with self._lock: next_raw_id = self._uuidgen.randint(0, 2 ** 128) return str(uuid.UUID(int=next_raw_id)) + + def _next_request_id(self): + return self._next_task_id() diff --git a/pubtools/pulplib/_impl/fake/controller.py b/pubtools/pulplib/_impl/fake/controller.py index 1d7a783d..92ac42fa 100644 --- a/pubtools/pulplib/_impl/fake/controller.py +++ b/pubtools/pulplib/_impl/fake/controller.py @@ -79,3 +79,22 @@ def publish_history(self): of this publish """ return self.client._publish_history[:] + + @property + def upload_history(self): + """A list of upload tasks triggered via this client. + + Each element of this list is a named tuple with the following attributes, + in order: + + ``repository``: + :class:`~pubtools.pulplib.Repository` for which upload was triggered + ``tasks``: + list of :class:`~pubtools.pulplib.Task` generated as a result + of this upload + ``name`` (str): + the remote path used + ``sha256`` (str): + checksum of the file uploaded + """ + return self.client._upload_history[:] diff --git a/pubtools/pulplib/_impl/model/repository/file.py b/pubtools/pulplib/_impl/model/repository/file.py index 0926888a..5b77ea61 100644 --- a/pubtools/pulplib/_impl/model/repository/file.py +++ b/pubtools/pulplib/_impl/model/repository/file.py @@ -1,8 +1,17 @@ +import os +import logging + +from more_executors.futures import f_flat_map, f_map + from .base import Repository, repo_type from ..attr import pulp_attrib +from ..common import DetachedException from ... import compat_attr as attr +LOG = logging.getLogger("pubtools.pulplib") + + @repo_type("iso-repo") @attr.s(kw_only=True, frozen=True) class FileRepository(Repository): @@ -20,3 +29,79 @@ class FileRepository(Repository): ) mutable_urls = attr.ib(default=attr.Factory(lambda: ["PULP_MANIFEST"]), type=list) + + def upload_file(self, file_obj, relative_url=None): + """Upload a file to this repository. + + Args: + file_obj (str, file object) + If it's a string, then it's the path of file to upload. + Else, it ought to be a + `file-like object `_. + + + relative_url (str) + Path that should be used in remote repository, can either + be a path to a directory or a path to a file, e.g: + - if relative_url is 'foo/bar/' and file_obj has name 'f.txt', + the resulting remote path wll be 'foo/bar/f.txt'. + - if relative_url is 'foo/bar/f.txt', no matter what the + name of file_obj is, the remote path is 'foo/bar/f.txt'. + + If omitted, the local name of the file will be used. Or, + if file_obj is a file object without a `name` attribute, + passing `relative_url` is mandatory. + + Returns: + Future[list of :class:`~pubtools.pulplib.Task`] + A future which is resolved when import succeeds. + + The future contains the task to import uploaded content + to repository. + + Raises: + DetachedException + If this instance is not attached to a Pulp client. + + .. versionadded:: 1.2.0 + """ + if not self._client: + raise DetachedException() + + relative_url = self._get_relative_url(file_obj, relative_url) + name = os.path.basename(relative_url) + + # request upload id and wait for it + upload_id = self._client._request_upload().result()["upload_id"] + + upload_complete_f = self._client._do_upload_file(upload_id, file_obj, name) + + import_complete_f = f_flat_map( + upload_complete_f, + lambda upload: self._client._do_import( + self.id, + upload_id, + "iso", + {"name": relative_url, "checksum": upload[0], "size": upload[1]}, + ), + ) + + f_map( + import_complete_f, lambda _: self._client._delete_upload_request(upload_id) + ) + + return import_complete_f + + def _get_relative_url(self, file_obj, relative_url): + is_file_object = "close" in dir(file_obj) + if not is_file_object: + name = os.path.basename(file_obj) + if not relative_url: + relative_url = name + elif relative_url.endswith("/"): + relative_url = os.path.join(relative_url, name) + elif is_file_object and (not relative_url or relative_url.endswith("/")): + msg = "%s is missing a name attribute and relative_url was not provided" + raise ValueError(msg % file_obj) + + return relative_url diff --git a/setup.py b/setup.py index 7cc90ec7..f42baf6c 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ def get_requirements(): setup( name="pubtools-pulplib", - version="1.1.0", + version="1.2.0", packages=find_packages(exclude=["tests"]), package_data={"pubtools.pulplib._impl.schema": ["*.yaml"]}, url="https://github.com/release-engineering/pubtools-pulplib", diff --git a/tests/fake/test_fake_upload.py b/tests/fake/test_fake_upload.py new file mode 100644 index 00000000..8f12306e --- /dev/null +++ b/tests/fake/test_fake_upload.py @@ -0,0 +1,78 @@ +import sys + +import pytest + +from pubtools.pulplib import FakeController, FileRepository, PulpException + + +def test_can_upload(tmpdir): + """repo.upload_file() succeeds with fake client and populates upload_history.""" + controller = FakeController() + + controller.insert_repository(FileRepository(id="repo1")) + + client = controller.client + repo1 = client.get_repository("repo1").result() + + somefile = tmpdir.join("some-file.txt") + somefile.write(b"there is some binary data:\x00\x01\x02") + + upload_f = repo1.upload_file(str(somefile)) + + # The future should resolve successfully + tasks = upload_f.result() + + # The task should be successful. + assert tasks[0].succeeded + + # The change should be reflected in the controller's upload history + history = controller.upload_history + + digest = "fad3fc1e6d583b2003ec0a5273702ed8fcc2504271c87c40d9176467ebe218cb" + assert len(history) == 1 + assert history[0].repository == repo1 + assert history[0].tasks == tasks + assert history[0].name == somefile.basename + assert history[0].sha256 == digest + + +def test_upload_nonexistent_file_raises(): + """repo.upload_file() with nonexistent file fails with fake client""" + controller = FakeController() + + controller.insert_repository(FileRepository(id="repo1")) + + client = controller.client + repo1 = client.get_repository("repo1").result() + + # If file's not found, Python 2 raises IOError and Python 3 raises + # FileNotFoundError. The latter one is not defined in Python 2. + if sys.version_info < (3,): + exception = IOError + else: + exception = FileNotFoundError + with pytest.raises(exception): + upload_f = repo1.upload_file("nonexistent_file") + + +def test_upload_repo_absent_raises(tmpdir): + controller = FakeController() + + controller.insert_repository(FileRepository(id="repo1")) + + client = controller.client + repo1 = client.get_repository("repo1").result() + + somefile = tmpdir.join("some-file.txt") + somefile.write(b"there is some binary data:\x00\x01\x02") + + repo_copy1 = client.get_repository("repo1").result() + repo_copy2 = client.get_repository("repo1").result() + + # if repo's deleted + assert repo_copy1.delete().result() + + exception = repo_copy2.upload_file(str(somefile)).exception() + + assert isinstance(exception, PulpException) + assert "repo1 not found" in str(exception) diff --git a/tests/repository/test_upload.py b/tests/repository/test_upload.py new file mode 100644 index 00000000..f4bfa66a --- /dev/null +++ b/tests/repository/test_upload.py @@ -0,0 +1,132 @@ +import logging +import time +import pytest +import json +import io + +from pubtools.pulplib import ( + Repository, + FileRepository, + Task, + DetachedException, + TaskFailedException, +) + + +def test_upload_detached(): + """upload_file raises if called on a detached repo""" + with pytest.raises(DetachedException): + FileRepository(id="some-repo").upload_file("some-file") + + +def test_upload_file(client, requests_mocker, tmpdir, caplog): + """test upload a file to a repo in pulp""" + + logging.getLogger().setLevel(logging.INFO) + caplog.set_level(logging.INFO) + + repo_id = "repo1" + repo = FileRepository(id=repo_id) + repo.__dict__["_client"] = client + + client._CHUNK_SIZE = 20 + + request_body = { + "_href": "/pulp/api/v2/content/uploads/cfb1fed0-752b-439e-aa68-fba68eababa3/", + "upload_id": "cfb1fed0-752b-439e-aa68-fba68eababa3", + } + upload_id = request_body["upload_id"] + + import_report = { + "result": {}, + "error": {}, + "spawned_tasks": [{"_href": "/pulp/api/v2/tasks/task1/", "task_id": "task1"}], + } + + tasks_report = [{"task_id": "task1", "state": "finished"}] + + somefile = tmpdir.join("some-file.txt") + somefile.write(b"there is some binary data:\x00\x01\x02") + + requests_mocker.post( + "https://pulp.example.com/pulp/api/v2/content/uploads/", json=request_body + ) + requests_mocker.put( + "https://pulp.example.com/pulp/api/v2/content/uploads/%s/0/" % upload_id, + json=[], + ) + requests_mocker.put( + "https://pulp.example.com/pulp/api/v2/content/uploads/%s/20/" % upload_id, + json=[], + ) + requests_mocker.post( + "https://pulp.example.com/pulp/api/v2/repositories/%s/actions/import_upload/" + % repo_id, + json=import_report, + ) + requests_mocker.post( + "https://pulp.example.com/pulp/api/v2/tasks/search/", json=tasks_report + ) + requests_mocker.delete( + "https://pulp.example.com/pulp/api/v2/content/uploads/%s/" % upload_id, json=[] + ) + + assert repo.upload_file(str(somefile)).result() == [ + Task(id="task1", succeeded=True, completed=True) + ] + + # the 6th request call might not be done in time, try 1000 + # times with .01 sec sleep before next try. + for i in range(1000): + time.sleep(0.01) + try: + assert requests_mocker.call_count == 6 + except AssertionError: + if i != 999: + continue + else: + raise + else: + break + + # 4th call should be import, check if right unit_key's passed + import_request = requests_mocker.request_history[3].json() + import_unit_key = { + u"name": somefile.basename, + u"checksum": u"fad3fc1e6d583b2003ec0a5273702ed8fcc2504271c87c40d9176467ebe218cb", + u"size": 29, + } + assert import_request["unit_key"] == import_unit_key + + messages = caplog.messages + # task's spawned and completed + assert "Created Pulp task: task1" in messages + assert "Pulp task completed: task1" in messages + + +@pytest.mark.parametrize( + "relative_url,expected", + [ + ("some/path/", "some/path/some-file.txt"), + ("some/path/foo.txt", "some/path/foo.txt"), + ], +) +def test_get_relative_url(tmpdir, relative_url, expected): + somefile = tmpdir.join("some-file.txt") + repo = FileRepository(id="some-repo") + result = repo._get_relative_url(str(somefile), relative_url) + + assert result == expected + + +def test_get_relative_url_with_file_object(tmpdir): + repo = FileRepository(id="some-repo") + file_obj = io.StringIO() + + with pytest.raises(ValueError): + repo._get_relative_url(file_obj, None) + + with pytest.raises(ValueError): + repo._get_relative_url(file_obj, "some/path/") + + assert repo._get_relative_url(file_obj, "path/foo.txt") == "path/foo.txt"