Skip to content

Commit

Permalink
first pass at file blobstore
Browse files Browse the repository at this point in the history
  • Loading branch information
colmaengus committed Oct 27, 2015
1 parent 399e0a8 commit 6cddec1
Show file tree
Hide file tree
Showing 16 changed files with 412 additions and 10 deletions.
3 changes: 3 additions & 0 deletions wal_e/blobstore/__init__.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,7 @@ def get_blobstore(layout):
elif layout.is_swift:
from wal_e.blobstore import swift
blobstore = swift
elif layout.is_file:
from wal_e.blobstore import file
blobstore = file
return blobstore
13 changes: 13 additions & 0 deletions wal_e/blobstore/file/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from wal_e.blobstore.file.file_credentials import Credentials
from wal_e.blobstore.file.file_util import do_lzop_get
from wal_e.blobstore.file.file_util import uri_get_file
from wal_e.blobstore.file.file_util import uri_put_file
from wal_e.blobstore.file.file_util import write_and_return_error

__all__ = [
'Credentials',
'do_lzop_get',
'uri_get_file',
'uri_put_file',
'write_and_return_error',
]
Empty file added wal_e/blobstore/file/bucket.py
Empty file.
91 changes: 91 additions & 0 deletions wal_e/blobstore/file/calling_format.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import os
import datetime


def remove_empty_dirs(path):
""" removes empty dirs under a given path """
for root, dirs, files in os.walk(path):
for d in dirs:
dir_path = os.path.join(root, d)
if not os.listdir(dir_path):
os.removedirs(dir_path)


def ensure_dir_exists(path):
""" create a directory if required """
dir_path = os.path.dirname(path)
if not os.path.exists(dir_path):
os.makedirs(dir_path)


def common_dir_path(args, sep='/'):
""" return the highest common directory given a list of files """
return os.path.commonprefix(args).rpartition(sep)[0]


class FileKey:
def __init__(self, bucket, name):
self.bucket = bucket
self.name = name
self.path = os.path.normpath("/" + name)
if os.path.isfile(self.path):
stat = os.stat(self.path)
self.last_modified = datetime.datetime.utcfromtimestamp(stat.st_mtime).strftime("%Y-%m-%dT%H:%M:%S")
self.size = stat.st_size

def get_contents_as_string(self):
fp = open(self.path, 'r')
contents = fp.read()
fp.close()
return contents

def set_contents_from_file(self, fp):
ensure_dir_exists(self.path)
f = open(self.path, 'w')
while True:
piece = fp.read(1024)
if not piece:
break
f.write(piece)
f.close()
setattr(self, 'size', os.path.getsize(self.path))

def get_contents_to_file(self, fp):
f = open(self.path, 'rb')
while True:
piece = f.read(1024)
if not piece:
break
fp.write(piece)
f.close()


class Bucket(object):
def __init__(self, name):
self.name = name

def get_key(self, name):
return FileKey(bucket=self, name=name)

def delete_keys(self, keys):
for k in keys:
os.remove(k)
# deleting files can leave empty dirs => trim them
remove_empty_dirs(common_dir_path(keys))

def list(self, prefix):
# TODO: handle errors from missing path
path = "/" + prefix
file_paths = [os.path.join(root, f) for root, dirs, files in os.walk(path) for f in files]
# convert to an array of Keys
return [FileKey(bucket=self, name=f) for f in file_paths]


class Connection(object):

def get_bucket(self, name, validate=False):
return Bucket(name)


def connect(creds):
return Connection()
Empty file.
3 changes: 3 additions & 0 deletions wal_e/blobstore/file/file_credentials.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
class Credentials(object):
def __init__(self):
pass
103 changes: 103 additions & 0 deletions wal_e/blobstore/file/file_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
from urlparse import urlparse
import socket
import traceback
import gevent

from . import calling_format
from wal_e import files
from wal_e import log_help
from wal_e.pipeline import get_download_pipeline
from wal_e.piper import PIPE
from wal_e.retries import retry, retry_with_count

logger = log_help.WalELogger(__name__)

def _uri_to_key(creds, uri, conn=None):
assert uri.startswith('file://')
url_tup = urlparse(uri)
bucket_name = url_tup.netloc
if conn is None:
conn = calling_format.connect(creds)
return conn.get_bucket(bucket_name).get_key(url_tup.path)


def uri_put_file(creds, uri, fp, content_encoding=None, conn=None):
assert fp.tell() == 0

k = _uri_to_key(creds, uri, conn=conn)

if content_encoding is not None:
k.content_type = content_encoding

k.set_contents_from_file(fp)
return k


def uri_get_file(creds, uri, conn=None):
k = _uri_to_key(creds, uri, conn=conn)
return k.get_contents_as_string()


def do_lzop_get(creds, url, path, decrypt, do_retry=True):
"""
Get and decompress a S3 URL
This streams the content directly to lzop; the compressed version
is never stored on disk.
"""
assert url.endswith('.lzo'), 'Expect an lzop-compressed file'

def log_wal_fetch_failures_on_error(exc_tup, exc_processor_cxt):
def standard_detail_message(prefix=''):
return (prefix + ' There have been {n} attempts to fetch wal '
'file {url} so far.'.format(n=exc_processor_cxt, url=url))
typ, value, tb = exc_tup
del exc_tup

# TODO: better error handling ?
logger.warning(
msg='retrying WAL file fetch from unexpected exception',
detail=standard_detail_message(
'The exception type is {etype} and its value is '
'{evalue} and its traceback is {etraceback}'
.format(etype=typ, evalue=value, etraceback=''.join(traceback.format_tb(tb)))))

# Help Python GC by resolving possible cycles
del tb

def download():
with files.DeleteOnError(path) as decomp_out:
key = _uri_to_key(creds, url)
with get_download_pipeline(PIPE, decomp_out.f, decrypt) as pl:
g = gevent.spawn(write_and_return_error, key, pl.stdin)

try:
# Raise any exceptions from write_and_return_error
exc = g.get()
if exc is not None:
raise exc
finally:
pass

logger.info(
msg='completed download and decompression',
detail='Downloaded and decompressed "{url}" to "{path}"'
.format(url=url, path=path))
return True

if do_retry:
download = retry(
retry_with_count(log_wal_fetch_failures_on_error))(download)

return download()


def write_and_return_error(key, stream):
try:
key.get_contents_to_file(stream)
stream.flush()
except Exception, e:
return e
finally:
stream.close()
27 changes: 20 additions & 7 deletions wal_e/cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
"""
import sys


def gevent_monkey(*args, **kwargs):
import gevent.monkey
gevent.monkey.patch_os()
Expand Down Expand Up @@ -207,10 +206,15 @@ def build_parser():
'WALE_S3_PREFIX.')

parser.add_argument('--wabs-prefix',
help='Storage prefix to run all commands against. '
help='Windows Azure Blob Service prefix to run all commands against. '
'Can also be defined via environment variable '
'WALE_WABS_PREFIX.')

parser.add_argument('--file-prefix',
help='Storage prefix to run all commands against. '
'Can also be defined via environment variable '
'WALE_FILE_PREFIX.')

parser.add_argument(
'--gpg-key-id',
help='GPG key ID to encrypt to. (Also needed when decrypting.) '
Expand Down Expand Up @@ -416,17 +420,17 @@ def s3_instance_profile(args):

def configure_backup_cxt(args):
# Try to find some WAL-E prefix to store data in.
prefix = (args.s3_prefix or args.wabs_prefix
prefix = (args.s3_prefix or args.wabs_prefix or args.file_prefix
or os.getenv('WALE_S3_PREFIX') or os.getenv('WALE_WABS_PREFIX')
or os.getenv('WALE_SWIFT_PREFIX'))
or os.getenv('WALE_SWIFT_PREFIX') or os.getenv('WALE_FILE_PREFIX'))

if prefix is None:
raise UserException(
msg='no storage prefix defined',
hint=(
'Either set one of the --wabs-prefix or --s3-prefix options or'
' define one of the WALE_WABS_PREFIX, WALE_S3_PREFIX, or '
'WALE_SWIFT_PREFIX environment variables.'
'Either set one of the --wabs-prefix, --s3-prefix, --file-prefix options or'
' define one of the WALE_WABS_PREFIX, WALE_S3_PREFIX, WALE_SWIFT_PREFIX or '
'WALE_FILE_PREFIX environment variables.'
)
)

Expand Down Expand Up @@ -483,6 +487,13 @@ def configure_backup_cxt(args):
os.getenv('SWIFT_AUTH_VERSION', '2'),
)
return SwiftBackup(store, creds, gpg_key_id)
elif store.is_file:
from wal_e.blobstore import file
from wal_e.operator.file_operator import FileBackup

creds = file.Credentials()
return FileBackup(store, creds, gpg_key_id)

else:
raise UserCritical(
msg='no unsupported blob stores should get here',
Expand Down Expand Up @@ -653,3 +664,5 @@ def just_error(*args, **kwargs):
msg='An unprocessed exception has avoided all error handling',
detail=''.join(traceback.format_exception(*sys.exc_info())))
sys.exit(2)

if __name__ == "__main__": main()
1 change: 1 addition & 0 deletions wal_e/operator/backup.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def new_connection(self):
return self.cinfo.connect(self.creds)

def backup_list(self, query, detail):

"""
Lists base backups and basic information about them
Expand Down
14 changes: 14 additions & 0 deletions wal_e/operator/file_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from wal_e.blobstore.file import calling_format
from wal_e.operator.backup import Backup
from wal_e.worker.file import file_worker


class FileBackup(Backup):
"""
Performs local file system uploads of PostgreSQL WAL files and clusters
"""

def __init__(self, layout, creds, gpg_key_id):
super(FileBackup, self).__init__(layout, creds, gpg_key_id)
self.cinfo = calling_format
self.worker = file_worker
9 changes: 6 additions & 3 deletions wal_e/storage/base.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def next_larger(self):

OBSOLETE_VERSIONS = frozenset(('004', '003', '002', '001', '000'))

SUPPORTED_STORE_SCHEMES = ('s3', 'wabs', 'swift')
SUPPORTED_STORE_SCHEMES = ('s3', 'wabs', 'swift', 'file')


# Exhaustively enumerates all possible metadata about a backup. These
Expand Down Expand Up @@ -222,10 +222,10 @@ def __init__(self, prefix, version=CURRENT_VERSION):

if url_tup.scheme not in SUPPORTED_STORE_SCHEMES:
raise wal_e.exception.UserException(
msg='bad S3, Windows Azure Blob Storage, or OpenStack Swift '
msg='bad S3, Windows Azure Blob Storage, OpenStack Swift or File System'
'URL scheme passed',
detail=('The scheme {0} was passed when "s3", "wabs", or '
'"swift" was expected.'.format(url_tup.scheme)))
'"swift" or "file" was expected.'.format(url_tup.scheme)))

for scheme in SUPPORTED_STORE_SCHEMES:
setattr(self, 'is_%s' % scheme, scheme == url_tup.scheme)
Expand Down Expand Up @@ -322,4 +322,7 @@ def get_backup_info(layout, **kwargs):
elif layout.is_swift:
from wal_e.storage.swift_storage import SwiftBackupInfo
bi = SwiftBackupInfo(**kwargs)
elif layout.is_file:
from wal_e.storage.file_storage import FileBackupInfo
bi = FileBackupInfo(**kwargs)
return bi
8 changes: 8 additions & 0 deletions wal_e/storage/file_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from wal_e.storage.base import BackupInfo

class FileBackupInfo(BackupInfo):
def load_detail(self, conn):
if self._details_loaded:
return

self._details_loaded = True
Empty file modified wal_e/worker/base.py
100644 → 100755
Empty file.
13 changes: 13 additions & 0 deletions wal_e/worker/file/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from wal_e.worker.file.file_deleter import Deleter
from wal_e.worker.file.file_worker import BackupFetcher
from wal_e.worker.file.file_worker import BackupList
from wal_e.worker.file.file_worker import DeleteFromContext
from wal_e.worker.file.file_worker import TarPartitionLister

__all__ = [
'Deleter',
'TarPartitionLister',
'BackupFetcher',
'BackupList',
'DeleteFromContext',
]
27 changes: 27 additions & 0 deletions wal_e/worker/file/file_deleter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from wal_e import exception
from wal_e import retries
from wal_e.worker.base import _Deleter


class Deleter(_Deleter):

@retries.retry()
def _delete_batch(self, page):
# Check that all keys are in the same bucket; this code is not
# designed to deal with fast deletion of keys from multiple
# buckets at the same time, and not checking this could result
# in deleting similarly named keys from the wrong bucket.
#
# In wal-e's use, homogeneity of the bucket retaining the keys
# is presumed to be always the case.
bucket_name = page[0].bucket.name
for key in page:
if key.bucket.name != bucket_name:
raise exception.UserCritical(
msg='submitted keys are not part of the same bucket',
detail=('The clashing bucket names are {0} and {1}.'
.format(key.bucket.name, bucket_name)),
hint='This should be reported as a bug.')

bucket = page[0].bucket
bucket.delete_keys([key.name for key in page])

0 comments on commit 6cddec1

Please sign in to comment.