Skip to content

Commit

Permalink
Teach exporter to understand, validate, and respect chunk_size= param…
Browse files Browse the repository at this point in the history
…eter.

Since we can now have multiple-output-files, replaced filename/sha256
columns with output_file_info, a JSONField which is a dictionary of
filename: hash pairs.
closes #6736
  • Loading branch information
ggainey authored and daviddavis committed May 26, 2020
1 parent 3b94b84 commit 44bb062
Show file tree
Hide file tree
Showing 11 changed files with 267 additions and 75 deletions.
1 change: 1 addition & 0 deletions CHANGES/6736.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Taught export how to split export-file into chunk_size bytes.
1 change: 1 addition & 0 deletions CHANGES/6736.removal
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Replaced PulpExport filename/sha256 fields, with output_info_file, a '<filename>': '<hash>' dictionary.
26 changes: 26 additions & 0 deletions docs/workflows/import-export.rst
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,32 @@ accomplish this by setting the ``full`` parameter on the ``/exports/`` invocatio
This results in an export of all content-entities, but only ::term::`Artifacts<Artifact>`
that have been **added** since the `last_export` of the same Exporter.

Exporting Chunked Files
-----------------------

By default, PulpExport streams data into a single ``.tar.gz`` file. Since ::term:`Respoitories<Repository>`
can contain a lot of artifacts and content, that can result in a file too large to be
copied to transport media. In this case, you can specify a maximum-file-size, and the
export process will chunk the tar.gz into a series of files no larger than this.

You accomplish this by setting the ``chunk_size`` parameter to the desired maximum number of bytes. This
parameter takes an integer, or size-units of KB, MB, or GB. Files appear in the Exporter.path
directory, with a four-digit sequence number suffix::

http POST :/pulp/api/v3/exporters/core/pulp/1ddbe6bf-a6c3-4a88-8614-ad9511d21b94/exports/ chunk_size="10KB"
{
"task": "/pulp/api/v3/tasks/da3350f7-0102-4dd5-81e0-81becf3ffdc7/"
}
ls -l /tmp/exports/
total 76
10K export-780822a4-d280-4ed0-a53c-382a887576a6-20200522_2325.tar.gz.0000
10K export-780822a4-d280-4ed0-a53c-382a887576a6-20200522_2325.tar.gz.0001
10K export-780822a4-d280-4ed0-a53c-382a887576a6-20200522_2325.tar.gz.0002
10K export-780822a4-d280-4ed0-a53c-382a887576a6-20200522_2325.tar.gz.0003
10K export-780822a4-d280-4ed0-a53c-382a887576a6-20200522_2325.tar.gz.0004
10K export-780822a4-d280-4ed0-a53c-382a887576a6-20200522_2325.tar.gz.0005
2.3K export-780822a4-d280-4ed0-a53c-382a887576a6-20200522_2325.tar.gz.0006

Updating an Exporter
--------------------

Expand Down
27 changes: 27 additions & 0 deletions pulpcore/app/migrations/0032_export_to_chunks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Generated by Django 2.2.11 on 2020-05-22 18:31

import django.contrib.postgres.fields.jsonb
from django.db import migrations


class Migration(migrations.Migration):

dependencies = [
('core', '0031_import_export_validate_params'),
]

operations = [
migrations.RemoveField(
model_name='pulpexport',
name='filename',
),
migrations.RemoveField(
model_name='pulpexport',
name='sha256',
),
migrations.AddField(
model_name='pulpexport',
name='output_file_info',
field=django.contrib.postgres.fields.jsonb.JSONField(null=True),
),
]
4 changes: 2 additions & 2 deletions pulpcore/app/models/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ class PulpExport(Export):

tarfile = None
validated_versions = None
sha256 = models.CharField(max_length=64, null=True)
filename = models.CharField(max_length=4096, null=True)
validated_chunk_size = None
output_file_info = JSONField(null=True)

def export_tarfile_path(self):
"""
Expand Down
52 changes: 41 additions & 11 deletions pulpcore/app/serializers/exporter.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import re
from gettext import gettext as _

from rest_framework import serializers
Expand Down Expand Up @@ -109,12 +110,9 @@ class PulpExportSerializer(ExportSerializer):
Serializer for PulpExports.
"""

sha256 = serializers.CharField(
help_text=_("The SHA-256 checksum of the exported .tar.gz."), read_only=True,
)

filename = serializers.CharField(
help_text=_("The full-path filename of the exported .tar.gz."), read_only=True,
output_file_info = serializers.JSONField(
help_text=_("Dictionary of filename: sha256hash entries for export-output-file(s)"),
read_only=True,
)

dry_run = serializers.BooleanField(
Expand All @@ -136,6 +134,15 @@ class PulpExportSerializer(ExportSerializer):
write_only=True,
)

chunk_size = serializers.CharField(
help_text=_(
"Chunk export-tarfile into pieces of chunk_size bytes."
+ "Recognizes units of B/KB/MB/GB/TB."
),
required=False,
write_only=True,
)

def validate_versions(self, versions):
"""
If specifying repo-versions explicitly, must provide a version for each exporter-repository
Expand All @@ -146,8 +153,8 @@ def validate_versions(self, versions):
if num_repos != len(versions):
raise serializers.ValidationError(
_(
"Number of versions does not match the number of Repositories for the owning "
+ "Exporter!"
"Number of versions ({}) does not match the number of Repositories ({}) for "
+ "the owning Exporter!"
).format(num_repos, len(versions))
)

Expand All @@ -159,18 +166,41 @@ def validate_versions(self, versions):
_(
"Requested RepositoryVersions must belong to the Repositories named by the "
+ "Exporter!"
).format(exporter_repos, version_repos)
)
)
return versions

@staticmethod
def _parse_size(size):
try:
# based on https://stackoverflow.com/a/42865957/2002471
units = {"B": 1, "KB": 2 ** 10, "MB": 2 ** 20, "GB": 2 ** 30, "TB": 2 ** 40}
size = size.upper()
if not re.match(r" ", size):
size = re.sub(r"([KMGT]?B)", r" \1", size)
number, unit = [string.strip() for string in size.split()]
return int(float(number) * units[unit])
except ValueError:
raise serializers.ValidationError(
_("chunk_size '{}' is not valid (valid units are B/KB/MB/GB/TB)").format(size)
)

def validate_chunk_size(self, chunk_size):
the_size = self._parse_size(chunk_size)
if the_size <= 0:
raise serializers.ValidationError(
_("Chunk size {} is not greater than zero!").format(the_size)
)
return the_size

class Meta:
model = models.PulpExport
fields = ExportSerializer.Meta.fields + (
"sha256",
"filename",
"full",
"dry_run",
"versions",
"chunk_size",
"output_file_info",
)


Expand Down
138 changes: 89 additions & 49 deletions pulpcore/app/tasks/export.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import hashlib
import logging
import os
import subprocess
import tarfile

from distutils.util import strtobool
from gettext import gettext as _
from glob import glob
from pathlib import Path
from pkg_resources import get_distribution

from pulpcore.app.models import (
Expand Down Expand Up @@ -132,7 +135,7 @@ def pulp_export(the_export):
1) Spit out all Artifacts, ArtifactResource.json, and RepositoryResource.json
2) Spit out all *resource JSONs in per-repo-version directories
3) Compute and store the sha256 and filename of the resulting tar.gz
3) Compute and store the sha256 and filename of the resulting tar.gz/chunks
Args:
the_export (models.PulpExport): PulpExport instance
Expand All @@ -141,62 +144,99 @@ def pulp_export(the_export):
ValidationError: When path is not in the ALLOWED_EXPORT_PATHS setting,
OR path exists and is not a directory
"""

pulp_exporter = the_export.exporter
the_export.task = Task.current()

tarfile_fp = the_export.export_tarfile_path()
os.makedirs(pulp_exporter.path, exist_ok=True)
rslts = {}

if the_export.validated_chunk_size:
# write it into chunks
with subprocess.Popen(
[
"split",
"-a",
"4",
"-b",
str(the_export.validated_chunk_size),
"-d",
"-",
tarfile_fp + ".",
],
stdin=subprocess.PIPE,
) as split_process:
with tarfile.open(tarfile_fp, "w|gz", fileobj=split_process.stdin) as tar:
_do_export(pulp_exporter, tar, the_export)

# compute the hashes
paths = [str(Path(p)) for p in glob(tarfile_fp + ".*")]
for a_file in paths:
a_hash = _compute_hash(a_file)
rslts[a_file] = a_hash
else:
# write into the file
with tarfile.open(tarfile_fp, "w:gz") as tar:
_do_export(pulp_exporter, tar, the_export)
# compute the hash
tarfile_hash = _compute_hash(tarfile_fp)
rslts[tarfile_fp] = tarfile_hash

# store the outputfile/hash info
the_export.output_file_info = rslts
# save the export
the_export.save()
# mark it as 'last'
pulp_exporter.last_export = the_export
# save the exporter
pulp_exporter.save()

with tarfile.open(tarfile_fp, "w:gz") as tar:
the_export.tarfile = tar
CreatedResource.objects.create(content_object=the_export)
versions_to_export = _get_versions_to_export(pulp_exporter, the_export)
plugin_version_info = _get_versions_info(pulp_exporter)

do_incremental = _incremental_requested(the_export)

# list-of-previous-versions, or None
if do_incremental:
prev_versions = [
er.content_object
for er in ExportedResource.objects.filter(export=pulp_exporter.last_export).all()
]
else:
prev_versions = None

vers_match = _version_match(versions_to_export, prev_versions)

# Gather up versions and artifacts
artifacts = []
for version in versions_to_export:
# Check version-content to make sure we're not being asked to export an on_demand repo
content_artifacts = ContentArtifact.objects.filter(content__in=version.content)
if content_artifacts.filter(artifact=None).exists():
RuntimeError(_("Remote artifacts cannot be exported."))

if do_incremental:
vers_artifacts = version.artifacts.difference(vers_match[version].artifacts).all()
else:
vers_artifacts = version.artifacts.all()
artifacts.extend(vers_artifacts)

# export plugin-version-info
export_versions(the_export, plugin_version_info)
# Export the top-level entities (artifacts and repositories)
# Note: we've already handled "what about incrementals" when building the 'artifacts' list
export_artifacts(the_export, artifacts)
# Export the repository-version data, per-version
for version in versions_to_export:
export_content(the_export, version)
ExportedResource.objects.create(export=the_export, content_object=version)

def _compute_hash(filename):
sha256_hash = hashlib.sha256()
with open(tarfile_fp, "rb") as f:
with open(filename, "rb") as f:
# Read and update hash string value in blocks of 4K
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
the_export.sha256 = sha256_hash.hexdigest()
the_export.filename = tarfile_fp
the_export.save()
pulp_exporter.last_export = the_export
pulp_exporter.save()
return sha256_hash.hexdigest()


def _do_export(pulp_exporter, tar, the_export):
the_export.tarfile = tar
CreatedResource.objects.create(content_object=the_export)
versions_to_export = _get_versions_to_export(pulp_exporter, the_export)
plugin_version_info = _get_versions_info(pulp_exporter)
do_incremental = _incremental_requested(the_export)
# list-of-previous-versions, or None
if do_incremental:
prev_versions = [
er.content_object
for er in ExportedResource.objects.filter(export=pulp_exporter.last_export).all()
]
else:
prev_versions = None
vers_match = _version_match(versions_to_export, prev_versions)
# Gather up versions and artifacts
artifacts = []
for version in versions_to_export:
# Check version-content to make sure we're not being asked to export
# an on_demand repo
content_artifacts = ContentArtifact.objects.filter(content__in=version.content)
if content_artifacts.filter(artifact=None).exists():
RuntimeError(_("Remote artifacts cannot be exported."))

if do_incremental:
vers_artifacts = version.artifacts.difference(vers_match[version].artifacts).all()
else:
vers_artifacts = version.artifacts.all()
artifacts.extend(vers_artifacts)
# export plugin-version-info
export_versions(the_export, plugin_version_info)
# Export the top-level entities (artifacts and repositories)
# Note: we've already handled "what about incrementals" when building the 'artifacts' list
export_artifacts(the_export, artifacts)
# Export the repository-version data, per-version
for version in versions_to_export:
export_content(the_export, version)
ExportedResource.objects.create(export=the_export, content_object=version)
1 change: 1 addition & 0 deletions pulpcore/app/viewsets/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ def create(self, request, exporter_pk):
# Invoke the export
export = PulpExport.objects.create(exporter=exporter, params=request.data)
export.validated_versions = serializer.validated_data.get("versions", None)
export.validated_chunk_size = serializer.validated_data.get("chunk_size", None)

result = enqueue_with_reservation(pulp_export, [exporter], kwargs={"the_export": export})

Expand Down
20 changes: 17 additions & 3 deletions pulpcore/tests/functional/api/using_plugin/test_pulpexport.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,10 @@ def test_export(self):
export = self._gen_export(exporter)
self.assertIsNotNone(export)
self.assertEqual(len(exporter.repositories), len(export.exported_resources))
self.assertIsNotNone(export.filename)
self.assertIsNotNone(export.sha256)
self.assertFalse("//" in export.filename)
self.assertIsNotNone(export.output_file_info)
for an_export_filename in export.output_file_info.keys():
self.assertFalse("//" in an_export_filename)

finally:
self._delete_exporter(exporter)

Expand Down Expand Up @@ -352,3 +353,16 @@ def test_incremental(self):
self._gen_export(exporter, body)
finally:
self._delete_exporter(exporter)

def test_chunking(self):
a_repo = self.repo_api.create(gen_repo())
self.addCleanup(self.client.delete, a_repo.pulp_href)
(exporter, body) = self._create_exporter(use_repos=[a_repo], cleanup=False)
try:
body = {"chunk_size": "250B"}
export = self._gen_export(exporter, body)
info = export.output_file_info
self.assertIsNotNone(info)
self.assertTrue(len(info) > 1)
finally:
self._delete_exporter(exporter)
5 changes: 2 additions & 3 deletions pulpcore/tests/functional/api/using_plugin/test_pulpimport.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,8 @@ def test_import(self):
importer = self.importer_api.create(body)
self.addCleanup(self.importer_api.delete, importer.pulp_href)

import_response = self.imports_api.create(
importer.pulp_href, {"path": self.export.filename}
)
filenames = list(self.export.output_file_info.keys())
import_response = self.imports_api.create(importer.pulp_href, {"path": filenames[0]})
monitor_task(import_response.task)
task = self.client.get(import_response.task)
resources = task["created_resources"]
Expand Down
Loading

0 comments on commit 44bb062

Please sign in to comment.