Permalink
Browse files

Initial encryption support

  • Loading branch information...
1 parent 2f7cbfb commit 477607b4d2e4048fa8f9e6cf8a8f70c3965347c5 @russss committed Dec 20, 2011
Showing with 167 additions and 115 deletions.
  1. +11 −2 wal_e/cmd.py
  2. +7 −5 wal_e/operator/s3_operator.py
  3. +96 −0 wal_e/pipeline.py
  4. +53 −108 wal_e/worker/s3_worker.py
View
@@ -31,7 +31,7 @@ def gevent_monkey(*args, **kwargs):
from wal_e.operator import s3_operator
from wal_e.piper import popen_sp
from wal_e.worker.psql_worker import PSQL_BIN, psql_csv_run
-from wal_e.worker.s3_worker import LZOP_BIN, MBUFFER_BIN
+from wal_e.pipeline import LZOP_BIN, MBUFFER_BIN
# TODO: Make controllable from userland
log_help.configure(
@@ -137,6 +137,12 @@ def main(argv=None):
'Can also be defined via environment variable '
'WALE_S3_PREFIX')
+ parser.add_argument('--gpg-key-id',
+ help='GPG key ID to encrypt to. (Also needed when decrypting.) '
+ 'Can also be defined via environment variable '
+ 'WALE_GPG_KEY_ID')
+
+
subparsers = parser.add_subparsers(title='subcommands',
dest='subcommand')
@@ -272,7 +278,10 @@ def main(argv=None):
else:
aws_access_key_id = args.aws_access_key_id
- backup_cxt = s3_operator.S3Backup(aws_access_key_id, secret_key, s3_prefix)
+ # This will be None if we're not encrypting
+ gpg_key_id = args.gpg_key_id or os.getenv('WALE_GPG_KEY_ID')
+
+ backup_cxt = s3_operator.S3Backup(aws_access_key_id, secret_key, s3_prefix, gpg_key_id)
subcommand = args.subcommand
@@ -44,9 +44,10 @@ class S3Backup(object):
"""
def __init__(self,
- aws_access_key_id, aws_secret_access_key, s3_prefix):
+ aws_access_key_id, aws_secret_access_key, s3_prefix, gpg_key_id):
self.aws_access_key_id = aws_access_key_id
self.aws_secret_access_key = aws_secret_access_key
+ self.gpg_key_id = gpg_key_id
# Canonicalize the s3 prefix by stripping any trailing slash
self.s3_prefix = s3_prefix.rstrip('/')
@@ -206,7 +207,7 @@ def raise_walk_error(e):
total_size += tpart.total_member_size
uploads.append(pool.apply_async(
s3_worker.do_partition_put,
- [backup_s3_prefix, tpart, per_process_limit]))
+ [backup_s3_prefix, tpart, per_process_limit, self.gpg_key_id]))
finally:
while uploads:
uploads.pop().get()
@@ -282,7 +283,8 @@ def database_s3_fetch(self, pg_cluster_dir, backup_name, pool_size):
fetchers = []
for i in xrange(pool_size):
fetchers.append(s3_worker.BackupFetcher(
- s3_connections[i], layout, backup_info, pg_cluster_dir))
+ s3_connections[i], layout, backup_info, pg_cluster_dir,
+ (self.gpg_key_id is not None)))
assert len(fetchers) == pool_size
p = gevent.pool.Pool(size=pool_size)
@@ -381,7 +383,7 @@ def wal_s3_archive(self, wal_path):
'{0}/wal_{1}/{2}'.format(self.s3_prefix,
FILE_STRUCTURE_VERSION,
wal_file_name),
- wal_path)
+ wal_path, self.gpg_key_id)
def wal_s3_restore(self, wal_name, wal_destination):
"""
@@ -398,7 +400,7 @@ def wal_s3_restore(self, wal_name, wal_destination):
'{0}/wal_{1}/{2}.lzo'.format(self.s3_prefix,
FILE_STRUCTURE_VERSION,
wal_name),
- wal_destination)
+ wal_destination, (self.gpg_key_id is not None))
def delete_old_versions(self, dry_run):
obsolete_versions = ('004', '003', '002', '001', '000')
View
@@ -0,0 +1,96 @@
+from gevent import sleep
+
+from wal_e.exception import UserCritical
+from wal_e.piper import popen_sp, NonBlockPipeFileWrap, PIPE
+
+MBUFFER_BIN = 'mbuffer'
+GPG_BIN = 'gpg'
+LZOP_BIN = 'lzop'
+
+# BUFSIZE_HT: Buffer Size, High Throughput
+#
+# This is set conservatively because small systems can end up being
+# unhappy with too much memory usage in buffers.
+
+BUFSIZE_HT = 128 * 8192
+
+class Pipeline(object):
+ """ Represent a pipeline of commands.
+ stdin and stdout are wrapped to be non-blocking. """
+
+ def __init__(self, commands):
+ self.commands = commands
+
+ @property
+ def stdin(self):
+ return NonBlockPipeFileWrap(self.commands[0].stdin)
+
+ @property
+ def stdout(self):
+ return NonBlockPipeFileWrap(self.commands[-1].stdout)
+
+ def finish(self):
+ [command.finish() for command in self.commands]
+
+
+class PipelineCommand(object):
+ def __init__(self, stdin=PIPE, stdout=PIPE):
+ pass
+
+ def start(self, command, stdin, stdout):
+ self._command = command
+ self._process = popen_sp(command, stdin=stdin, stdout=stdout,
+ bufsize=BUFSIZE_HT, close_fds=True)
+
+ @property
+ def stdin(self):
+ return self._process.stdin
+
+ @property
+ def stdout(self):
+ return self._process.stdout
+
+ @property
+ def returncode(self):
+ return self._process.returncode
+
+ def finish(self):
+ while True:
+ if self._process.poll() is not None:
+ break
+ else:
+ sleep(0.1)
+
+ retcode = self._process.wait()
+
+ if self.stdout is not None:
+ self.stdout.close()
+
+ assert self.stdin is None or self.stdin.closed
+ assert self.stdout is None or self.stdout.closed
+
+ if retcode != 0:
+ raise UserCritical(
+ msg='pipeline process did not exit gracefully',
+ detail='"{0}" had terminated with the exit status {1}.'
+ .format(" ".join(self._command), retcode))
+
+
+class LZOCompressionFilter(PipelineCommand):
+ def __init__(self, stdin=PIPE, stdout=PIPE):
+ self.start([LZOP_BIN, '--stdout'], stdin, stdout)
+
+
+class LZODecompressionFilter(PipelineCommand):
+ def __init__(self, stdin=PIPE, stdout=PIPE):
+ self.start([LZOP_BIN, '-d', '--stdout', '-'], stdin, stdout)
+
+
+class GPGEncryptionFilter(PipelineCommand):
+ def __init__(self, key, stdin=PIPE, stdout=PIPE):
+ self.start([GPG_BIN, '-e', '-z', '0', '-r', key], stdin, stdout)
+
+
+class GPGDecryptionFilter(PipelineCommand):
+ def __init__(self, stdin=PIPE, stdout=PIPE):
+ self.start([GPG_BIN, '-d', '-q'], stdin, stdout)
Oops, something went wrong.

0 comments on commit 477607b

Please sign in to comment.