Skip to content

Commit

Permalink
Added tests for file blobstore
Browse files Browse the repository at this point in the history
  • Loading branch information
colmaengus committed Oct 27, 2015
1 parent 9af2498 commit 963d1f1
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 56 deletions.
164 changes: 164 additions & 0 deletions tests/test_file_blobstore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
import pytest
import os
import errno

from subprocess import call

from wal_e.storage import StorageLayout
from wal_e import exception
from wal_e.operator.file_operator import FileBackup

from wal_e.blobstore.file import uri_put_file
from wal_e.blobstore.file import uri_get_file
from wal_e.blobstore.file import do_lzop_get
from wal_e.blobstore.file import write_and_return_error


def create_files(files):
"""Helper function to create a test directory structure.
File path is used as file contents"""
for f in files:
dir_path = os.path.dirname(f)
if not os.path.exists(dir_path):
os.makedirs(dir_path)
fp = open(f, "w")
fp.write(f)
fp.close()


def test_valid_prefix():
store = StorageLayout("file://localhost/tmp")
assert store.is_file is True


def test_invalid_prefix():
with pytest.raises(exception.UserException):
StorageLayout("notfile://localhost/tmp")


def test_uri_put_file_writes_key_file(tmpdir):
base = str(tmpdir.mkdir("base"))
file_list = [base + "/src.txt"]
create_files(file_list)
f = open(base + "/src.txt", "r")
uri_put_file("", "file://localhost/" + base + "/dst.txt", f)

with open(base + "/dst.txt", "r") as dstFile:
assert dstFile.read() == file_list[0]


def test_uri_put_file_creates_key_dir(tmpdir):
"""Verify file upload"""
base = str(tmpdir.mkdir("base"))
file_list = [base + "/src.txt"]
create_files(file_list)
f = open(file_list[0], "r")
uri_put_file("", "file://localhost/" + base + "/subdir/dst.txt", f)

with open(base + "/subdir//dst.txt", "r") as dstFile:
assert dstFile.read() == file_list[0]


def test_uri_get_file(tmpdir):
"""Verify file download"""
base = str(tmpdir.mkdir("base"))
file_list = [base + "/src.txt"]
create_files(file_list)
file_contents = uri_get_file("", "file://localhost/" + base + "/src.txt")
assert file_contents == file_list[0]


def test_bucket_list(tmpdir):
"""Verify bucket keys can be listed"""
base = str(tmpdir.mkdir("base"))
file_list = [base + "/subdirfile.txt",
base + "/subdir/file.txt"]
create_files(file_list)
store = StorageLayout("file://localhost/" + base)
backup = FileBackup(store, "", "")
conn = backup.cinfo.connect("")
bucket = conn.get_bucket("")
result = bucket.list(base)
assert len(result) == len(file_list)
for f in file_list:
matches = [x for x in result if x.path == f]
assert len(matches) == 1
assert hasattr(matches[0], 'size') is True
assert hasattr(matches[0], 'last_modified') is True


def test_delete_keys(tmpdir):
"""Verify keys are deleted and bucket is trimmed"""
base = str(tmpdir.mkdir("base"))
file_list = [base + "/subdir1/file.txt",
base + "/subdir2/file.txt"]
create_files(file_list)
store = StorageLayout("file://localhost/" + base)
backup = FileBackup(store, "", "")
conn = backup.cinfo.connect("")
bucket = conn.get_bucket("")
bucket.delete_keys(file_list)
assert len(os.listdir(base)) == 0


def test_do_lzop_get(tmpdir):
"""Create a dummy lzo file and confirm it is download/decompression"""
base = str(tmpdir.mkdir("base"))
file_list = [base + "/src.txt"]
create_files(file_list)
call(["lzop", base + "/src.txt"])
do_lzop_get("", "file://localhost/" + base + "/src.txt.lzo",
base + "/dst.txt", False, do_retry=True)

with open(base + "/dst.txt", "r") as dstFile:
assert dstFile.read() == file_list[0]


def test_do_lzop_get_missing_key(tmpdir):
"""Verify exception is raised for missing key"""
base = str(tmpdir.mkdir("base"))
with pytest.raises(IOError) as e:
do_lzop_get("", "file://localhost/" + base + "/src.txt.lzo",
base + "/dst.txt", False, do_retry=True)

assert e.value.errno == errno.ENOENT


def test_write_and_return_error(tmpdir):
"""Verify None as result in normal operation"""
base = str(tmpdir.mkdir("base"))
file_list = [base + "/src.txt"]
create_files(file_list)

store = StorageLayout("file://localhost/" + base)
backup = FileBackup(store, "", "")
conn = backup.cinfo.connect("")
bucket = conn.get_bucket("")
f = open(base + "/dst.txt", "w")
key = bucket.get_key(base + "/src.txt")

result = write_and_return_error(key, f)
assert result is None

with open(base + "/dst.txt", "r") as dstFile:
assert dstFile.read() == file_list[0]


def test_write_and_return_error_with_error(tmpdir):
"""Verify exception as result in error operation"""
base = str(tmpdir.mkdir("base"))
file_list = [base + "/src.txt"]
create_files(file_list)

store = StorageLayout("file://localhost/" + base)
backup = FileBackup(store, "", "")
conn = backup.cinfo.connect("")
bucket = conn.get_bucket("")
f = open(base + "/dst.txt", "w")
key = bucket.get_key(base + "/missing.txt")

with pytest.raises(IOError) as e:
result = write_and_return_error(key, f)
raise result

assert e.value.errno == errno.ENOENT
13 changes: 7 additions & 6 deletions wal_e/blobstore/file/calling_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ def remove_empty_dirs(path):
for d in dirs:
dir_path = os.path.join(root, d)
if not os.listdir(dir_path):
os.removedirs(dir_path)
os.rmdir(dir_path)


def ensure_dir_exists(path):
Expand All @@ -27,11 +27,11 @@ def epoch_to_iso8601(timestamp):
return datetime.utcfromtimestamp(timestamp).isoformat()


class FileKey:
class FileKey(object):
def __init__(self, bucket, name):
self.bucket = bucket
self.name = name
self.path = os.path.normpath("/" + name)
self.path = os.path.join("/", name.strip("/"))
if os.path.isfile(self.path):
stat = os.stat(self.path)
self.last_modified = epoch_to_iso8601(stat.st_mtime)
Expand Down Expand Up @@ -73,12 +73,13 @@ def get_key(self, name):

def delete_keys(self, keys):
for k in keys:
os.remove(k)
key_path = os.path.join("/", k.strip("/"))
os.remove(key_path)
# deleting files can leave empty dirs => trim them
remove_empty_dirs(common_dir_path(keys))
common_path = os.path.join("/", common_dir_path(keys).strip("/"))
remove_empty_dirs(common_path)

def list(self, prefix):
# TODO: handle errors from missing path
path = "/" + prefix
file_paths = [os.path.join(root, f)
for root, dirs, files in os.walk(path) for f in files]
Expand Down
67 changes: 17 additions & 50 deletions wal_e/blobstore/file/file_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from wal_e import log_help
from wal_e.pipeline import get_download_pipeline
from wal_e.piper import PIPE
from wal_e.retries import retry, retry_with_count

logger = log_help.WalELogger(__name__)

Expand All @@ -26,9 +25,6 @@ def uri_put_file(creds, uri, fp, content_encoding=None, conn=None):

k = _uri_to_key(creds, uri, conn=conn)

if content_encoding is not None:
k.content_type = content_encoding

k.set_contents_from_file(fp)
return k

Expand All @@ -38,60 +34,31 @@ def uri_get_file(creds, uri, conn=None):
return k.get_contents_as_string()


def do_lzop_get(creds, url, path, decrypt, do_retry=True):
def do_lzop_get(creds, url, path, decrypt, do_retry):
"""
Get and decompress a S3 URL
Get and decompress a URL
This streams the content directly to lzop; the compressed version
is never stored on disk.
"""
assert url.endswith('.lzo'), 'Expect an lzop-compressed file'

def log_wal_fetch_failures_on_error(exc_tup, exc_processor_cxt):
def standard_detail_message(prefix=''):
return (prefix + ' There have been {n} attempts to fetch wal '
'file {url} so far.'.format(n=exc_processor_cxt, url=url))
typ, value, tb = exc_tup
del exc_tup

# TODO: better error handling ?
logger.warning(
msg='retrying WAL file fetch from unexpected exception',
detail=standard_detail_message(
'The exception type is {etype} and its value is '
'{evalue} and its traceback is {etraceback}'
.format(etype=typ, evalue=value,
etraceback=''.join(traceback.format_tb(tb)))))

# Help Python GC by resolving possible cycles
del tb

def download():
with files.DeleteOnError(path) as decomp_out:
key = _uri_to_key(creds, url)
with get_download_pipeline(PIPE, decomp_out.f, decrypt) as pl:
g = gevent.spawn(write_and_return_error, key, pl.stdin)

try:
# Raise any exceptions from write_and_return_error
exc = g.get()
if exc is not None:
raise exc
finally:
pass

logger.info(
msg='completed download and decompression',
detail='Downloaded and decompressed "{url}" to "{path}"'
.format(url=url, path=path))
return True

if do_retry:
download = retry(
retry_with_count(log_wal_fetch_failures_on_error))(download)

return download()
with files.DeleteOnError(path) as decomp_out:
key = _uri_to_key(creds, url)
with get_download_pipeline(PIPE, decomp_out.f, decrypt) as pl:
g = gevent.spawn(write_and_return_error, key, pl.stdin)
exc = g.get()
if exc is not None:
raise exc

logger.info(
msg='completed download and decompression',
detail='Downloaded and decompressed "{url}" to "{path}"'
.format(url=url, path=path))

return True



def write_and_return_error(key, stream):
Expand Down

0 comments on commit 963d1f1

Please sign in to comment.