Skip to content

Commit

Permalink
Make parallel upload pass on missing segments
Browse files Browse the repository at this point in the history
This code was designed under the supposition that the
"pg_xlog/archive_status" directory would always have corresponding
"pg_xlog" files.  There are a couple of documented cases where this is
not true, at least one involving pg_upgrade.

Instead of crashing the process in such a case preventing progress,
allow parallel uploads not explicitly requested by Postgres succeed.
  • Loading branch information
fdr committed Dec 28, 2014
1 parent 92f15c6 commit 730b6f2
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 19 deletions.
54 changes: 54 additions & 0 deletions tests/test_blackbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,60 @@ def test_wal_push_fetch(pg_xlog, tmpdir, config):
assert tmpdir.join('.wal-e', 'prefetch', seg_name).check(file=1)


def test_wal_push_parallel(pg_xlog, config, monkeypatch):
from wal_e.worker import upload

old_info = upload.logger.info

class GatherActions(object):
def __init__(self):
self.actions = set()

def __call__(self, *args, **kwargs):
s = kwargs['structured']
self.actions.add((s['action'], s['state']))
return old_info(*args, **kwargs)

ga = GatherActions()
monkeypatch.setattr(upload.logger, 'info', ga)

def seg_name(*parts):
return ''.join(str(p).zfill(8) for p in parts)

segments = [seg_name(1, 1, x) for x in xrange(1, 4)]

for s in segments:
pg_xlog.touch(s, '.ready')

# Prepare the second segment with *only* a ready file, to make
# sure parallel-push doesn't crash when pg_xlog's file is missing.
pg_xlog.seg(segments[1]).remove()

# This push has enough parallelism that it should attempt all the
# wal segments staged.
config.main('wal-push', '-p8', 'pg_xlog/' + segments[0])

# Ensure all three action types, particularly the "skip" state,
# are encountered.
assert ga.actions == set([('push-wal', 'begin'),
('push-wal', 'skip'),
('push-wal', 'complete')])

# An explicit request to upload a segment that doesn't exist must
# yield a failure.
#
# NB: Normally one would use pytest.raises, but in this case,
# e.value was *sometimes* giving an integer value, and sometimes
# the SystemExit value, whereas the builtin try/except constructs
# appear reliable by comparison.
try:
config.main('wal-push', '-p8', 'pg_xlog/' + segments[1])
except SystemExit as e:
assert e.code == 1
else:
assert False


def test_wal_fetch_non_existent(tmpdir, config):
# Recall file and check for equality.
download_file = tmpdir.join('TEST-DOWNLOADED')
Expand Down
20 changes: 15 additions & 5 deletions wal_e/operator/backup.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import sys
import os
import json
import errno
import functools
import gevent
import gevent.pool
import itertools
import json
import os
import sys

from cStringIO import StringIO
from wal_e import log_help
Expand Down Expand Up @@ -279,8 +280,17 @@ def wal_archive(self, wal_path, concurrency=1):
group.start(other_segment)
started += 1

# Wait for uploads to finish.
group.join()
try:
# Wait for uploads to finish.
group.join()
except EnvironmentError as e:
if e.errno == errno.ENOENT:
print e
raise UserException(
msg='could not find file for wal-push',
detail=('The operating system reported: {0} {1}'
.format(e.strerror, repr(e.filename))))
raise

def wal_restore(self, wal_name, wal_destination, prefetch_max):
"""
Expand Down
42 changes: 28 additions & 14 deletions wal_e/worker/upload.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import errno
import socket
import tempfile
import time
Expand Down Expand Up @@ -38,20 +39,33 @@ def __call__(self, segment):
'prefix': self.layout.path_prefix,
'state': 'begin'})

# Upload and record the rate at which it happened.
kib_per_second = do_lzop_put(self.creds, url, segment.path,
self.gpg_key_id)

logger.info(msg='completed archiving to a file ',
detail=('Archiving to "{url}" complete at '
'{kib_per_second}KiB/s. '
.format(url=url, kib_per_second=kib_per_second)),
structured={'action': 'push-wal',
'key': url,
'rate': kib_per_second,
'seg': segment.name,
'prefix': self.layout.path_prefix,
'state': 'complete'})
structured_template = {'action': 'push-wal',
'key': url,
'seg': segment.name,
'prefix': self.layout.path_prefix}

try:
# Upload and record the rate at which it happened.
kib_per_second = do_lzop_put(self.creds, url, segment.path,
self.gpg_key_id)
except EnvironmentError as e:
if not segment.explicit and e.errno == errno.ENOENT:
structured = dict(state='skip', **structured_template)
logger.info(msg='skip parallel archiving of a file',
detail=('The segment {0} did not exist.'
.format(segment.path)),
structured=structured)
else:
raise
else:
structured = dict(rate=str(kib_per_second), state='complete',
**structured_template)
logger.info(msg='completed archiving to a file',
detail=('Archiving to "{url}" complete at '
'{kib_per_second}KiB/s.'
.format(url=url,
kib_per_second=kib_per_second)),
structured=structured)

return segment

Expand Down

0 comments on commit 730b6f2

Please sign in to comment.