Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

- n/a
### Added
- A new API `FileRepository.upload_file` to upload a file to Pulp repository

## [1.1.0] - 2019-07-03

Expand Down
81 changes: 81 additions & 0 deletions examples/upload-files
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#!/usr/bin/env python
import os
import logging
from argparse import ArgumentParser

from pubtools.pulplib import Client

log = logging.getLogger("upload")


def upload(client, path, repo_id):
repo = client.get_repository(repo_id).result()

uploads = []
if os.path.isdir(path):
for file in os.listdir(path):
file = os.path.join(path, file)
if os.path.isfile(file):
log.debug("Uploading %s to repo %s in threads", file, repo_id)
uploads.append(repo.upload_file(file))
elif os.path.isfile(path):
log.debug("Uploading %s to repo %s", path, repo_id)
uploads.append(repo.upload_file(path))

for up in uploads:
result = up.result()
log.debug("Import task finished:\n%s", result)

log.info("Uploaded %s files to repository %s", len(uploads), repo_id)


def make_client(args):
auth = None

if args.username:
password = args.password
if password is None:
password = os.environ.get("PULP_PASSWORD")
if not password:
log.warning("No password provided for %s", args.username)
auth = (args.username, args.password)

return Client(args.url, auth=auth, verify=not args.insecure)


def main():
log.setLevel(logging.INFO)
logging.basicConfig(format="%(message)s", level=logging.INFO)

parser = ArgumentParser(description="Upload files to Repository")
parser.add_argument("--url", help="Pulp server URL")
parser.add_argument("--repo-id", action="store")
parser.add_argument("--path", action="store", help="Path to a file or a directory")
parser.add_argument("--username", help="Pulp username")
parser.add_argument(
"--password", help="Pulp password (or set PULP_PASSWORD in env)"
)
parser.add_argument("--debug", action="store_true")
parser.add_argument("--insecure", default=False, action="store_true")

p = parser.parse_args()

if not p.url:
parser.error("--url is required")

if not p.repo_id:
parser.error("--repo-id is required")

if not p.path:
parser.error("--path is required")

if p.debug:
logging.getLogger("pubtools.pulplib").setLevel(logging.DEBUG)
log.setLevel(logging.DEBUG)

client = make_client(p)
return upload(client, p.path, p.repo_id)


if __name__ == "__main__":
main()
63 changes: 62 additions & 1 deletion pubtools/pulplib/_impl/client/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import logging
import threading
import hashlib
from functools import partial

import requests
Expand Down Expand Up @@ -56,6 +57,7 @@ class Client(object):
_REQUEST_THREADS = int(os.environ.get("PUBTOOLS_PULPLIB_REQUEST_THREADS", "4"))
_PAGE_SIZE = int(os.environ.get("PUBTOOLS_PULPLIB_PAGE_SIZE", "2000"))
_TASK_THROTTLE = int(os.environ.get("PUBTOOLS_PULPLIB_TASK_THROTTLE", "200"))
_CHUNK_SIZE = int(os.environ.get("PUBTOOLS_PULPLIB_CHUNK_SIZE", 1024 * 1024))

# Policy used when deciding whether to retry operations.
# This is mainly provided here as a hook for autotests, so the policy can be
Expand Down Expand Up @@ -179,6 +181,29 @@ def search_repository(self, criteria=None):
response_f, lambda data: self._handle_page(Repository, search, data)
)

def _do_upload_file(self, upload_id, file_obj, name):
def do_next_upload(checksum, size):
data = file_obj.read(self._CHUNK_SIZE)
if data:
checksum.update(data)
return f_flat_map(
self._do_upload(data, upload_id, size),
lambda _: do_next_upload(checksum, size + len(data)),
)
# nothing more to upload, return checksum and size
return f_return((checksum.hexdigest(), size))

is_file_object = "close" in dir(file_obj)
if not is_file_object:
file_obj = open(file_obj, "rb")

LOG.info("Uploading %s to Pulp", name)
upload_f = f_flat_map(f_return(), lambda _: do_next_upload(hashlib.sha256(), 0))

if not is_file_object:
upload_f.add_done_callback(lambda _: file_obj.close())
return upload_f

def _publish_repository(self, repo, distributors_with_config):
tasks_f = f_return([])

Expand Down Expand Up @@ -290,7 +315,6 @@ def _do_search(self, resource_type, search):
url = os.path.join(self._url, "pulp/api/v2/{0}/search/".format(resource_type))

LOG.debug("Submitting %s search: %s", url, search)

return self._request_executor.submit(
self._do_request, method="POST", url=url, json=search
)
Expand All @@ -302,3 +326,40 @@ def _delete_resource(self, resource_type, resource_id):

LOG.debug("Queuing request to DELETE %s", url)
return self._task_executor.submit(self._do_request, method="DELETE", url=url)

def _request_upload(self):
url = os.path.join(self._url, "pulp/api/v2/content/uploads/")

LOG.debug("Requesting upload id")
return self._request_executor.submit(self._do_request, method="POST", url=url)

def _do_upload(self, data, upload_id, offset):
url = os.path.join(
self._url, "pulp/api/v2/content/uploads/%s/%s/" % (upload_id, offset)
)

return self._request_executor.submit(
self._do_request, method="PUT", url=url, data=data
)

def _do_import(self, repo_id, upload_id, unit_type_id, unit_key):
url = os.path.join(
self._url, "pulp/api/v2/repositories/%s/actions/import_upload/" % repo_id
)

body = {
"unit_type_id": unit_type_id,
"upload_id": upload_id,
"unit_key": unit_key,
}

LOG.debug("Importing contents to repo %s with upload id %s", repo_id, upload_id)
return self._task_executor.submit(
self._do_request, method="POST", url=url, json=body
)

def _delete_upload_request(self, upload_id):
url = os.path.join(self._url, "pulp/api/v2/content/uploads/%s/" % upload_id)

LOG.debug("Deleting upload request %s", upload_id)
return self._request_executor.submit(self._do_request, method="DELETE", url=url)
54 changes: 53 additions & 1 deletion pubtools/pulplib/_impl/fake/client.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
import random
import uuid
import threading
import hashlib

from collections import namedtuple

import six
from more_executors.futures import f_return, f_return_error
from more_executors.futures import f_return, f_return_error, f_flat_map

from pubtools.pulplib import Page, PulpException, Criteria, Task
from .. import compat_attr as attr

from .match import match_object

Publish = namedtuple("Publish", ["repository", "tasks"])
Upload = namedtuple("Upload", ["repository", "tasks", "name", "sha256"])


class FakeClient(object):
Expand All @@ -28,6 +30,7 @@ class FakeClient(object):
def __init__(self):
self._repositories = []
self._publish_history = []
self._upload_history = []
self._lock = threading.RLock()
self._uuidgen = random.Random()
self._uuidgen.seed(0)
Expand Down Expand Up @@ -79,6 +82,52 @@ def get_repository(self, repository_id):

return f_return(data[0])

def _do_upload_file(self, upload_id, file_obj, name):
# pylint: disable=unused-argument
is_file_obj = "close" in dir(file_obj)
if not is_file_obj:
file_obj = open(file_obj, "rb")

def do_next_upload(checksum, size):
data = file_obj.read(1024 * 1024)
if data:
checksum.update(data)
size += len(data)
return do_next_upload(checksum, size)
return f_return((checksum.hexdigest(), size))

out = f_flat_map(f_return(), lambda _: do_next_upload(hashlib.sha256(), 0))

if not is_file_obj:
out.add_done_callback(lambda _: file_obj.close())

return out

def _request_upload(self):
upload_request = {
"_href": "/pulp/api/v2/content/uploads/%s/" % self._next_request_id(),
"upload_id": "%s" % self._next_request_id(),
}

return f_return(upload_request)

def _do_import(self, repo_id, upload_id, unit_type_id, unit_key):
# pylint: disable=unused-argument
repo_f = self.get_repository(repo_id)
if repo_f.exception():
# Repo can't be found, let that exception propagate
return repo_f

repo = repo_f.result()

task = Task(id=self._next_task_id(), completed=True, succeeded=True)

self._upload_history.append(
Upload(repo, [task], unit_key["name"], unit_key["checksum"])
)

return f_return([task])

def _delete_resource(self, resource_type, resource_id):
if resource_type == "repositories":
return self._delete_repository(resource_id)
Expand Down Expand Up @@ -128,3 +177,6 @@ def _next_task_id(self):
with self._lock:
next_raw_id = self._uuidgen.randint(0, 2 ** 128)
return str(uuid.UUID(int=next_raw_id))

def _next_request_id(self):
return self._next_task_id()
19 changes: 19 additions & 0 deletions pubtools/pulplib/_impl/fake/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,22 @@ def publish_history(self):
of this publish
"""
return self.client._publish_history[:]

@property
def upload_history(self):
"""A list of upload tasks triggered via this client.

Each element of this list is a named tuple with the following attributes,
in order:

``repository``:
:class:`~pubtools.pulplib.Repository` for which upload was triggered
``tasks``:
list of :class:`~pubtools.pulplib.Task` generated as a result
of this upload
``name`` (str):
the remote path used
``sha256`` (str):
checksum of the file uploaded
"""
return self.client._upload_history[:]
85 changes: 85 additions & 0 deletions pubtools/pulplib/_impl/model/repository/file.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
import os
import logging

from more_executors.futures import f_flat_map, f_map

from .base import Repository, repo_type
from ..attr import pulp_attrib
from ..common import DetachedException
from ... import compat_attr as attr


LOG = logging.getLogger("pubtools.pulplib")


@repo_type("iso-repo")
@attr.s(kw_only=True, frozen=True)
class FileRepository(Repository):
Expand All @@ -20,3 +29,79 @@ class FileRepository(Repository):
)

mutable_urls = attr.ib(default=attr.Factory(lambda: ["PULP_MANIFEST"]), type=list)

def upload_file(self, file_obj, relative_url=None):
"""Upload a file to this repository.

Args:
file_obj (str, file object)
If it's a string, then it's the path of file to upload.
Else, it ought to be a
`file-like object <https://docs.python.org/3/glossary.html#term-file-object>`_.


relative_url (str)
Path that should be used in remote repository, can either
be a path to a directory or a path to a file, e.g:
- if relative_url is 'foo/bar/' and file_obj has name 'f.txt',
the resulting remote path wll be 'foo/bar/f.txt'.
- if relative_url is 'foo/bar/f.txt', no matter what the
name of file_obj is, the remote path is 'foo/bar/f.txt'.

If omitted, the local name of the file will be used. Or,
if file_obj is a file object without a `name` attribute,
passing `relative_url` is mandatory.

Returns:
Future[list of :class:`~pubtools.pulplib.Task`]
A future which is resolved when import succeeds.

The future contains the task to import uploaded content
to repository.

Raises:
DetachedException
If this instance is not attached to a Pulp client.

.. versionadded:: 1.2.0
"""
if not self._client:
raise DetachedException()

relative_url = self._get_relative_url(file_obj, relative_url)
name = os.path.basename(relative_url)

# request upload id and wait for it
upload_id = self._client._request_upload().result()["upload_id"]

upload_complete_f = self._client._do_upload_file(upload_id, file_obj, name)

import_complete_f = f_flat_map(
upload_complete_f,
lambda upload: self._client._do_import(
self.id,
upload_id,
"iso",
{"name": relative_url, "checksum": upload[0], "size": upload[1]},
),
)

f_map(
import_complete_f, lambda _: self._client._delete_upload_request(upload_id)
)

return import_complete_f

def _get_relative_url(self, file_obj, relative_url):
is_file_object = "close" in dir(file_obj)
if not is_file_object:
name = os.path.basename(file_obj)
if not relative_url:
relative_url = name
elif relative_url.endswith("/"):
relative_url = os.path.join(relative_url, name)
elif is_file_object and (not relative_url or relative_url.endswith("/")):
msg = "%s is missing a name attribute and relative_url was not provided"
raise ValueError(msg % file_obj)

return relative_url
Loading