Skip to content

Commit

Permalink
Improve object-replicator startup time.
Browse files Browse the repository at this point in the history
The object replicator checks each partition directory to ensure it's
really a directory and not a zero-byte file. This was happening in
collect_jobs(), which is the first thing that the object replicator
does.

The effect was that, at startup, the object-replicator process would
list each "objects" or "objects-N" directory on each object device,
then stat() every single thing in there. On devices with lots of
partitions on them, this makes the replicator take a long time before
it does anything useful.

If you have a cluster with a too-high part_power plus some failing
disks elsewhere, you can easily get thousands of partition directories
on each disk. If you've got 36 disks per node, that turns into a very
long wait for the object replicator to do anything. Worse yet, if you
add in a configuration management system that pushes new rings every
couple hours, the object replicator can spend the vast majority of its
time collecting jobs, then only spend a short time doing useful work
before the ring changes and it has to start all over again.

This commit moves the stat() call (os.path.isfile) to the loop that
processes jobs. In a complete pass, the total work done is about the
same, but the replicator starts doing useful work much sooner.

Change-Id: I5ed4cd09dde514ec7d1e74afe35feaab0cf28a10
  • Loading branch information
smerritt committed Dec 8, 2014
1 parent cc2f0f4 commit ba8114a
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 62 deletions.
22 changes: 12 additions & 10 deletions swift/obj/replicator.py
Expand Up @@ -432,14 +432,6 @@ def process_repl(self, policy, jobs, ips):
for partition in os.listdir(obj_path):
try:
job_path = join(obj_path, partition)
if isfile(job_path):
# Clean up any (probably zero-byte) files where a
# partition should be.
self.logger.warning(
'Removing partition directory '
'which was a file: %s', job_path)
os.remove(job_path)
continue
part_nodes = obj_ring.get_part_nodes(int(partition))
nodes = [node for node in part_nodes
if node['id'] != local_dev['id']]
Expand All @@ -451,8 +443,7 @@ def process_repl(self, policy, jobs, ips):
policy_idx=policy.idx,
partition=partition,
object_ring=obj_ring))

except (ValueError, OSError):
except ValueError:
continue

def collect_jobs(self):
Expand Down Expand Up @@ -508,6 +499,17 @@ def replicate(self, override_devices=None, override_partitions=None):
self.logger.info(_("Ring change detected. Aborting "
"current replication pass."))
return
try:
if isfile(job['path']):
# Clean up any (probably zero-byte) files where a
# partition should be.
self.logger.warning(
'Removing partition directory '
'which was a file: %s', job['path'])
os.remove(job['path'])
continue
except OSError:
continue
if job['delete']:
self.run_pool.spawn(self.update_deleted, job)
else:
Expand Down
97 changes: 45 additions & 52 deletions test/unit/obj/test_replicator.py
Expand Up @@ -321,64 +321,57 @@ def test_collect_jobs_handoffs_first(self):
self.assertTrue(jobs[0]['delete'])
self.assertEquals('1', jobs[0]['partition'])

def test_collect_jobs_removes_zbf(self):
"""
After running xfs_repair, a partition directory could become a
zero-byte file. If this happens, collect_jobs() should clean it up and
*not* create a job which will hit an exception as it tries to listdir()
a file.
"""
# Surprise! Partition dir 1 is actually a zero-byte-file
part_1_path = os.path.join(self.objects, '1')
rmtree(part_1_path)
with open(part_1_path, 'w'):
def test_replicator_skips_bogus_partition_dirs(self):
# A directory in the wrong place shouldn't crash the replicator
rmtree(self.objects)
rmtree(self.objects_1)
os.mkdir(self.objects)
os.mkdir(self.objects_1)

os.mkdir(os.path.join(self.objects, "burrito"))
jobs = self.replicator.collect_jobs()
self.assertEqual(len(jobs), 0)

def test_replicator_removes_zbf(self):
# After running xfs_repair, a partition directory could become a
# zero-byte file. If this happens, the replicator should clean it
# up, log something, and move on to the next partition.

# Surprise! Partition dir 1 is actually a zero-byte file.
pol_0_part_1_path = os.path.join(self.objects, '1')
rmtree(pol_0_part_1_path)
with open(pol_0_part_1_path, 'w'):
pass
self.assertTrue(os.path.isfile(part_1_path)) # sanity check
part_1_path_1 = os.path.join(self.objects_1, '1')
rmtree(part_1_path_1)
with open(part_1_path_1, 'w'):
self.assertTrue(os.path.isfile(pol_0_part_1_path)) # sanity check

# Policy 1's partition dir 1 is also a zero-byte file.
pol_1_part_1_path = os.path.join(self.objects_1, '1')
rmtree(pol_1_part_1_path)
with open(pol_1_part_1_path, 'w'):
pass
self.assertTrue(os.path.isfile(part_1_path_1)) # sanity check
jobs = self.replicator.collect_jobs()
jobs_to_delete = [j for j in jobs if j['delete']]
jobs_by_pol_part = {}
for job in jobs:
jobs_by_pol_part[str(job['policy_idx']) + job['partition']] = job
self.assertEquals(len(jobs_to_delete), 0)
self.assertEquals(
[node['id'] for node in jobs_by_pol_part['00']['nodes']], [1, 2])
self.assertFalse('1' in jobs_by_pol_part)
self.assertEquals(
[node['id'] for node in jobs_by_pol_part['02']['nodes']], [2, 3])
self.assertEquals(
[node['id'] for node in jobs_by_pol_part['03']['nodes']], [3, 1])
self.assertEquals(
[node['id'] for node in jobs_by_pol_part['10']['nodes']], [1, 2])
self.assertFalse('1' in jobs_by_pol_part)
self.assertEquals(
[node['id'] for node in jobs_by_pol_part['12']['nodes']], [2, 3])
self.assertEquals(
[node['id'] for node in jobs_by_pol_part['13']['nodes']], [3, 1])
for part in ['00', '02', '03']:
for node in jobs_by_pol_part[part]['nodes']:
self.assertEquals(node['device'], 'sda')
self.assertEquals(jobs_by_pol_part[part]['path'],
os.path.join(self.objects, part[1:]))
self.assertFalse(os.path.exists(part_1_path))
expected = sorted(self.replicator.logger.log_dict['warning'])
self.assertTrue(os.path.isfile(pol_1_part_1_path)) # sanity check

# Don't delete things in collect_jobs(); all the stat() calls would
# make replicator startup really slow.
self.replicator.collect_jobs()
self.assertTrue(os.path.exists(pol_0_part_1_path))
self.assertTrue(os.path.exists(pol_1_part_1_path))

# After a replication pass, the files should be gone
with mock.patch('swift.obj.replicator.http_connect',
mock_http_connect(200)):
self.replicator.run_once()

self.assertFalse(os.path.exists(pol_0_part_1_path))
self.assertFalse(os.path.exists(pol_1_part_1_path))

logged_warnings = sorted(self.replicator.logger.log_dict['warning'])
self.assertEquals(
(('Removing partition directory which was a file: %s',
part_1_path), {}), expected[1])
# policy 1
for part in ['10', '12', '13']:
for node in jobs_by_pol_part[part]['nodes']:
self.assertEquals(node['device'], 'sda')
self.assertEquals(jobs_by_pol_part[part]['path'],
os.path.join(self.objects_1, part[1:]))
self.assertFalse(os.path.exists(part_1_path_1))
pol_1_part_1_path), {}), logged_warnings[0])
self.assertEquals(
(('Removing partition directory which was a file: %s',
part_1_path_1), {}), expected[0])
pol_0_part_1_path), {}), logged_warnings[1])

def test_delete_partition(self):
with mock.patch('swift.obj.replicator.http_connect',
Expand Down

0 comments on commit ba8114a

Please sign in to comment.