Skip to content

Commit

Permalink
Implement file storage
Browse files Browse the repository at this point in the history
Now, `file:///` URIs can be used to store on a file system.

Includes the following original commits by @colmaengus (Colm Aengus
Murphy <colmaengus@gmail.com>), rebased onto 1.1.0b1 by
charles@dyfis.net:

- First pass at file blobstore (6cddec1)
- Fixed style check errors (4dada39)
- Fixed permissions (9af2498)
- Added tests for file blobstore (963d1f1)
- Fixed code check and added file store to README (69905a1)
  • Loading branch information
colmaengus authored and fdr committed Jun 19, 2017
1 parent 63dcd81 commit c12cf97
Show file tree
Hide file tree
Showing 17 changed files with 566 additions and 13 deletions.
2 changes: 2 additions & 0 deletions CONTRIBUTORS
Expand Up @@ -10,10 +10,12 @@ Bo Shi <bs@alum.mit.edu>
Brian Oldfield <brian@oldfield.io>
Brian Rosner <brosner@gmail.com>
Carlo Cabanilla <carlo.cabanilla@gmail.com>
Charles Duffy <charles@dyfis.net>
Chris Armstrong <chris@opdemand.com>
Christian Pedersen <chripede@gmail.com>
Christophe Pettus <cpettus@pgexperts.com>
Christopher Weibel <christopher.weibel@gmail.com>
Colm Aengus Murphy <colmaengus@gmail.com>
Dan Milstein <milstein.dan@gmail.com>
Dan Robinson <dan@heapanalytics.com>
Daniel Farina <daniel@heroku.com>
Expand Down
5 changes: 5 additions & 0 deletions README.rst 100644 → 100755
Expand Up @@ -114,6 +114,11 @@ Optional Variables:
``internalURL`` on object stores like Rackspace Cloud Files in order
to use the internal network.

File System
'''''''''''

* WALE_FILE_PREFIX (e.g. ``file://localhost/backups/pg``)

.. IMPORTANT::
Ensure that all writing servers have different _PREFIXes set.
Reuse of a value between two, writing databases will likely cause
Expand Down
164 changes: 164 additions & 0 deletions tests/test_file_blobstore.py
@@ -0,0 +1,164 @@
import pytest
import os
import errno

from subprocess import call

from wal_e.storage import StorageLayout
from wal_e import exception
from wal_e.operator.file_operator import FileBackup

from wal_e.blobstore.file import uri_put_file
from wal_e.blobstore.file import uri_get_file
from wal_e.blobstore.file import do_lzop_get
from wal_e.blobstore.file import write_and_return_error


def create_files(files):
"""Helper function to create a test directory structure.
File path is used as file contents"""
for f in files:
dir_path = os.path.dirname(f)
if not os.path.exists(dir_path):
os.makedirs(dir_path)
fp = open(f, "wb")
fp.write(f.encode("utf-8"))
fp.close()


def test_valid_prefix():
store = StorageLayout("file://localhost/tmp")
assert store.is_file is True


def test_invalid_prefix():
with pytest.raises(exception.UserException):
StorageLayout("notfile://localhost/tmp")


def test_uri_put_file_writes_key_file(tmpdir):
base = str(tmpdir.mkdir("base"))
file_list = [base + "/src.txt"]
create_files(file_list)
with open(base + "/src.txt", "rb") as f:
uri_put_file("", "file://localhost/" + base + "/dst.txt", f)

with open(base + "/dst.txt", "rb") as dst_file:
assert dst_file.read() == file_list[0].encode('utf-8')


def test_uri_put_file_creates_key_dir(tmpdir):
"""Verify file upload"""
base = str(tmpdir.mkdir("base"))
file_list = [base + "/src.txt"]
create_files(file_list)
with open(file_list[0], "rb") as f:
uri_put_file("", "file://localhost/" + base + "/subdir/dst.txt", f)

with open(base + "/subdir//dst.txt", "rb") as dst_file:
assert dst_file.read() == file_list[0].encode('utf-8')


def test_uri_get_file(tmpdir):
"""Verify file download"""
base = str(tmpdir.mkdir("base"))
file_list = [base + "/src.txt"]
create_files(file_list)
file_contents = uri_get_file("", "file://localhost/" + base + "/src.txt")
assert file_contents == file_list[0].encode('utf-8')


def test_bucket_list(tmpdir):
"""Verify bucket keys can be listed"""
base = str(tmpdir.mkdir("base"))
file_list = [base + "/subdirfile.txt",
base + "/subdir/file.txt"]
create_files(file_list)
store = StorageLayout("file://localhost/" + base)
backup = FileBackup(store, "", "")
conn = backup.cinfo.connect("")
bucket = conn.get_bucket("")
result = bucket.list(base)
assert len(result) == len(file_list)
for f in file_list:
matches = [x for x in result if x.path == f]
assert len(matches) == 1
assert hasattr(matches[0], 'size') is True
assert hasattr(matches[0], 'last_modified') is True


def test_delete_keys(tmpdir):
"""Verify keys are deleted and bucket is trimmed"""
base = str(tmpdir.mkdir("base"))
file_list = [base + "/subdir1/file.txt",
base + "/subdir2/file.txt"]
create_files(file_list)
store = StorageLayout("file://localhost/" + base)
backup = FileBackup(store, "", "")
conn = backup.cinfo.connect("")
bucket = conn.get_bucket("")
bucket.delete_keys(file_list)
assert len(os.listdir(base)) == 0


def test_do_lzop_get(tmpdir):
"""Create a dummy lzo file and confirm it is download/decompression"""
base = str(tmpdir.mkdir("base"))
file_list = [base + "/src.txt"]
create_files(file_list)
call(["lzop", base + "/src.txt"])
do_lzop_get("", "file://localhost/" + base + "/src.txt.lzo",
base + "/dst.txt", False, do_retry=True)

with open(base + "/dst.txt", "rb") as dst_file:
assert dst_file.read() == file_list[0].encode('utf-8')


def test_do_lzop_get_missing_key(tmpdir):
"""Verify exception is raised for missing key"""
base = str(tmpdir.mkdir("base"))
with pytest.raises(IOError) as e:
do_lzop_get("", "file://localhost/" + base + "/src.txt.lzo",
base + "/dst.txt", False, do_retry=True)

assert e.value.errno == errno.ENOENT


def test_write_and_return_error(tmpdir):
"""Verify None as result in normal operation"""
base = str(tmpdir.mkdir("base"))
file_list = [base + "/src.txt"]
create_files(file_list)

store = StorageLayout("file://localhost/" + base)
backup = FileBackup(store, "", "")
conn = backup.cinfo.connect("")
bucket = conn.get_bucket("")
f = open(base + "/dst.txt", "wb")
key = bucket.get_key(base + "/src.txt")

result = write_and_return_error(key, f)
assert result is None

with open(base + "/dst.txt", "rb") as dst_file:
assert dst_file.read() == file_list[0].encode('utf-8')


def test_write_and_return_error_with_error(tmpdir):
"""Verify exception as result in error operation"""
base = str(tmpdir.mkdir("base"))
file_list = [base + "/src.txt"]
create_files(file_list)

store = StorageLayout("file://localhost/" + base)
backup = FileBackup(store, "", "")
conn = backup.cinfo.connect("")
bucket = conn.get_bucket("")
f = open(base + "/dst.txt", "wb")
key = bucket.get_key(base + "/missing.txt")

with pytest.raises(IOError) as e:
result = write_and_return_error(key, f)
raise result

assert e.value.errno == errno.ENOENT
3 changes: 3 additions & 0 deletions wal_e/blobstore/__init__.py 100644 → 100755
Expand Up @@ -15,4 +15,7 @@ def get_blobstore(layout):
elif layout.is_gs:
from wal_e.blobstore import gs
blobstore = gs
elif layout.is_file:
from wal_e.blobstore import file
blobstore = file
return blobstore
13 changes: 13 additions & 0 deletions wal_e/blobstore/file/__init__.py
@@ -0,0 +1,13 @@
from wal_e.blobstore.file.file_credentials import Credentials
from wal_e.blobstore.file.file_util import do_lzop_get
from wal_e.blobstore.file.file_util import uri_get_file
from wal_e.blobstore.file.file_util import uri_put_file
from wal_e.blobstore.file.file_util import write_and_return_error

__all__ = [
'Credentials',
'do_lzop_get',
'uri_get_file',
'uri_put_file',
'write_and_return_error',
]
87 changes: 87 additions & 0 deletions wal_e/blobstore/file/calling_format.py
@@ -0,0 +1,87 @@
import os
import shutil
from datetime import datetime


def remove_empty_dirs(path):
""" removes empty dirs under a given path """
for root, dirs, files in os.walk(path):
for d in dirs:
dir_path = os.path.join(root, d)
if not os.listdir(dir_path):
os.rmdir(dir_path)


def ensure_dir_exists(path):
""" create a directory if required """
dir_path = os.path.dirname(path)
if not os.path.exists(dir_path):
os.makedirs(dir_path)


def common_dir_path(args, sep='/'):
""" return the highest common directory given a list of files """
return os.path.commonprefix(args).rpartition(sep)[0]


def epoch_to_iso8601(timestamp):
return datetime.utcfromtimestamp(timestamp).isoformat()


class FileKey(object):
def __init__(self, bucket, name):
self.bucket = bucket
self.name = name
self.path = os.path.join("/", name.strip("/"))
if os.path.isfile(self.path):
stat = os.stat(self.path)
self.last_modified = epoch_to_iso8601(stat.st_mtime)
self.size = stat.st_size

def get_contents_as_string(self):
with open(self.path, 'rb') as fp:
contents = fp.read()
return contents

def set_contents_from_file(self, fp):
ensure_dir_exists(self.path)
with open(self.path, 'wb') as f:
shutil.copyfileobj(fp, f)
setattr(self, 'size', os.path.getsize(self.path))

def get_contents_to_file(self, fp):
with open(self.path, 'rb') as f:
shutil.copyfileobj(f, fp)


class Bucket(object):
def __init__(self, name):
self.name = name

def get_key(self, name):
return FileKey(bucket=self, name=name)

def delete_keys(self, keys):
for k in keys:
key_path = os.path.join("/", k.strip("/"))
os.remove(key_path)
# deleting files can leave empty dirs => trim them
common_path = os.path.join("/", common_dir_path(keys).strip("/"))
remove_empty_dirs(common_path)

def list(self, prefix):
path = "/" + prefix
file_paths = [os.path.join(root, f)
for root, dirs, files in os.walk(path) for f in files]
# convert to an array of Keys
return [FileKey(bucket=self, name=f) for f in file_paths]


class Connection(object):

def get_bucket(self, name, validate=False):
return Bucket(name)


def connect(creds):
return Connection()
3 changes: 3 additions & 0 deletions wal_e/blobstore/file/file_credentials.py
@@ -0,0 +1,3 @@
class Credentials(object):
def __init__(self):
pass
69 changes: 69 additions & 0 deletions wal_e/blobstore/file/file_util.py
@@ -0,0 +1,69 @@
from urllib.parse import urlparse
import gevent

from . import calling_format
from wal_e import files
from wal_e import log_help
from wal_e.pipeline import get_download_pipeline
from wal_e.piper import PIPE

logger = log_help.WalELogger(__name__)


def _uri_to_key(creds, uri, conn=None):
assert uri.startswith('file://')
url_tup = urlparse(uri)
bucket_name = url_tup.netloc
if conn is None:
conn = calling_format.connect(creds)
return conn.get_bucket(bucket_name).get_key(url_tup.path)


def uri_put_file(creds, uri, fp, content_type=None, conn=None):
assert fp.tell() == 0

k = _uri_to_key(creds, uri, conn=conn)

k.set_contents_from_file(fp)
return k


def uri_get_file(creds, uri, conn=None):
k = _uri_to_key(creds, uri, conn=conn)
return k.get_contents_as_string()


def do_lzop_get(creds, url, path, decrypt, do_retry):
"""
Get and decompress a URL
This streams the content directly to lzop; the compressed version
is never stored on disk.
"""
assert url.endswith('.lzo'), 'Expect an lzop-compressed file'

with files.DeleteOnError(path) as decomp_out:
key = _uri_to_key(creds, url)
with get_download_pipeline(PIPE, decomp_out.f, decrypt) as pl:
g = gevent.spawn(write_and_return_error, key, pl.stdin)
exc = g.get()
if exc is not None:
raise exc

logger.info(
msg='completed download and decompression',
detail='Downloaded and decompressed "{url}" to "{path}"'
.format(url=url, path=path))

return True


def write_and_return_error(key, stream):
try:
key.get_contents_to_file(stream)
stream.flush()
except Exception as e:
return e
finally:
stream.close()

0 comments on commit c12cf97

Please sign in to comment.