Skip to content
This repository has been archived by the owner on Dec 7, 2022. It is now read-only.

Commit

Permalink
Speeds up the unit file path migrations
Browse files Browse the repository at this point in the history
Removes the empty directory purge phase of the 2.8 migrations, which was taking
some users many hours when done over NFS.

Introduces multi-threaded concurrency for the bulk of the migration's work.

https://pulp.plan.io/issues/2118
fixes #2118
  • Loading branch information
mhrivnak committed Aug 2, 2016
1 parent 60a6934 commit caddb22
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 48 deletions.
16 changes: 16 additions & 0 deletions docs/user-guide/release-notes/2.10.x.rst
Expand Up @@ -18,3 +18,19 @@ New Features

* RPM, ISO, and Docker repositories can be published using the new rsync distributors. These
distributors afford Pulp users to rsync repositories to remote servers.

Upgrade
-------

Action required: If you are upgrading from 2.8.2 or earlier direct to 2.10.0 or later, you will find
that many empty directories are present in /var/lib/pulp/content/. Several migrations that moved
unit files to new locations were optimized for performance, specifically when operating on NFS, and
that required removal of a directory pruning stage that was taking some users many hours. Instead,
you can now perform that removal separately from the migration system. Or pulp will happily run with
the empty directories in place if you do not wish to spend time running the removal.

To execute the removal, which may take a long time over NFS, run this command::

$ sudo -u apache find /var/lib/pulp/content/ -type d -empty \
-not -path "/var/lib/pulp/content/units/*" -delete

118 changes: 100 additions & 18 deletions server/pulp/plugins/migration/standard_storage_path.py
Expand Up @@ -17,17 +17,24 @@ def migrate(*args, **kwargs):
migration.add(plan)
migration()
"""
import contextlib
import logging
import os
import shutil

import Queue
import threading
from collections import namedtuple
from gettext import gettext as _
from hashlib import sha256
from itertools import chain

from pulp.plugins.util.misc import mkdir
from pulp.server.config import config


_log = logging.getLogger(__name__)


# stored in the Batch
Item = namedtuple(
'Item',
Expand All @@ -41,6 +48,54 @@ def migrate(*args, **kwargs):
)


@contextlib.contextmanager
def defer(func):
"""
Call a function at the end of the context block. Usefulness is similar to contextlib.closing,
but obviously more flexible.
:param func: A function to call with no arguments.
:type func: function
"""
try:
yield
finally:
func()


@contextlib.contextmanager
def threader(target, num_threads):
"""
Starts the specified number of threads using the target function. Creates a Queue.Queue object,
passes it to the target as the only argument, and yields that queue.
The code using this as a context manager must add items to the queue in the code block whose
context is being managed. At the end of the block, this manager will add None to the queue
num_threads times. It will then call queue.join().
:param target: A function to run in a thread. It should take items out of the queue, do
something with each, and call queue.task_complete() appropriately. When the
value retrieved from the queue is none, the thread should gracefully exit.
:type target: function
:param num_threads: The number of threads to start
:type num_threads: int
"""
# 500 is an arbitrary limit to control memory use.
queue = Queue.Queue(500)
for i in range(num_threads):
t = threading.Thread(target=target, args=[queue])
t.daemon = True
t.start()

yield queue

# sentinel value telling workers to quit
for i in range(num_threads):
queue.put(None)

queue.join()


class Batch(object):
"""
A batch of units to be migrated.
Expand Down Expand Up @@ -90,9 +145,27 @@ def _relink(self):
Foreach symlink found, find the *new* path using the plan. Then,
re-create the symlink with the updated target.
"""
root = Migration.publish_dir()
for path, directories, files in os.walk(root):
for name in chain(files, directories):
with threader(self._relink_worker, 4) as queue:
root = Migration.publish_dir()
for path, directories, files in os.walk(root):
for name in chain(files, directories):
queue.put((path, name))

def _relink_worker(self, queue):
"""
Worker function to be run in a thread. It takes Item instances from the queue and
relinks each.
:param queue: a queue with items that are Item instances. When the fetched item is None,
this function will return.
:type queue: Queue.Queue
"""
while True:
job = queue.get()
with defer(queue.task_done):
if job is None:
break
path, name = job
abs_path = os.path.join(path, name)
try:
target = os.readlink(abs_path)
Expand All @@ -115,8 +188,25 @@ def _migrate(self):
1. Move the content files.
2. Update the unit in the DB.
"""
for item in self.items:
item.plan.migrate(item.unit_id, item.storage_path, item.new_path)
with threader(self._migrate_worker, 4) as queue:
for item in self.items:
queue.put(item)

@staticmethod
def _migrate_worker(queue):
"""
Worker function to be run in a thread. It takes Units from the queue and migrates them.
:param queue: a queue with items that are Unit instances. When the fetched item is None,
this function will return.
:type queue: Queue.Queue
"""
while True:
item = queue.get()
with defer(queue.task_done):
if item is None:
break
item.plan.migrate(item.unit_id, item.storage_path, item.new_path)

def __call__(self):
"""
Expand Down Expand Up @@ -284,17 +374,6 @@ def publish_dir():
"""
return os.path.join(Migration.storage_dir(), 'published')

@staticmethod
def _prune():
"""
Delete empty directories in the content storage tree.
The migration will create empty directories.
"""
root = Migration.content_dir()
for path, directories, files in os.walk(root, topdown=False):
if not os.listdir(path):
os.rmdir(path)

def __init__(self):
self.plans = []

Expand All @@ -319,7 +398,10 @@ def __call__(self):
if len(batch) >= Batch.LIMIT:
batch()
batch()
self._prune()
_log.info(_('*** To remove empty directories, consider running the following command. It '
'may take a long time over NFS.'))
_log.info('$ sudo -u apache find /var/lib/pulp/content/ -type d -empty '
'-not -path "/var/lib/pulp/content/units/*" -delete')


class Unit(object):
Expand Down
30 changes: 0 additions & 30 deletions server/test/unit/plugins/migration/test_standard_storage_path.py
Expand Up @@ -341,36 +341,6 @@ def test_publish_dir(self, storage_dir):
# validation
self.assertEqual(path, os.path.join(storage_dir.return_value, 'published'))

@patch('os.rmdir')
@patch('os.walk')
@patch('os.listdir')
@patch(MODULE + '.Migration.content_dir')
def test_prune(self, content_dir, listdir, walk, rmdir):
def list_dir(path):
if path.endswith('_'):
return []
else:
return [1, 2]
listdir.side_effect = list_dir
walk.return_value = [
('r', ['d1', 'd2'], ['f1', 'f2']),
('d1_', [], []),
('d2', ['d3'], []),
('d4_', [], [])
]

# test
Migration._prune()

# validation
walk.assert_called_once_with(content_dir.return_value, topdown=False)
self.assertEqual(
rmdir.call_args_list,
[
call('d1_'),
call('d4_')
])

def test_add(self):
plan = Mock()

Expand Down

0 comments on commit caddb22

Please sign in to comment.