Skip to content

Commit

Permalink
Merge pull request #118 from ottonemo/feature/s3-persister
Browse files Browse the repository at this point in the history
Introduce AWS S3 persister
  • Loading branch information
alattner committed Sep 30, 2019
2 parents 52d0aed + 5901fe9 commit 20e369b
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .travis.yml
Expand Up @@ -23,7 +23,7 @@ install:
- conda install -q python=$TRAVIS_PYTHON_VERSION
- pip install -r requirements.txt -r requirements-dev.txt
- pip install pandas==0.23.4 # for rpy2 compatibility
- pip install -e .[docs,testing]
- pip install -e .[docs,testing,S3]
script:
- travis_wait py.test --runslow
deploy:
Expand Down
46 changes: 46 additions & 0 deletions palladium/persistence.py
Expand Up @@ -657,3 +657,49 @@ def delete(self, version):

def upgrade(self, from_version=None, to_version=__version__):
return self.impl.upgrade(from_version, to_version)


class S3IO(FileLikeIO):
"""IO interface that glues palladium to S3 buckets."""
def __init__(self, **kwargs):
try:
import s3fs
except ImportError:
raise ImportError('S3IO needs the s3fs module to work correctly.')

self.fs = s3fs.S3FileSystem(anon=False)

def open(self, path, mode='r'):
return self.fs.open(path, mode=mode)

def exists(self, path):
return self.fs.exists(path)

def remove(self, path):
return self.fs.rm(path)


class S3(FileLike):
"""Persister that acts like a File persister but uses
S3 IO in the background. Expects the path of the bucket
in the path parameter.
To use it with palladium, just use this class as model
persister. For example, if you used the ``File`` persister
before, change your config as follows:
- '__factory__': 'palladium.persistence.File',
- 'path': 'models/mymodel-{version}',
+ '__factory__': 'palladium.persistence.S3',
+ 'path': 'your-s3-bucket/models/mymodel-{version}',
Note that the first part of the path denotes the s3 bucket.
Parameters
----------
path : str
The path to the bucket and file, e.g. ``'my-bucket/my-folder/my-file'``.
"""
def __init__(self, path):
super().__init__(path, S3IO())
143 changes: 143 additions & 0 deletions palladium/tests/test_persistence.py
@@ -1,8 +1,10 @@
import codecs
import copy
import gzip
import json
import os
import pickle
import posixpath
from threading import Thread
from unittest.mock import Mock
from unittest.mock import MagicMock
Expand Down Expand Up @@ -853,3 +855,144 @@ def test_proxy_upgrade(self, CachedUpdatePersister):
persister = CachedUpdatePersister(impl)
assert persister.upgrade("0.9", "1.0") is impl.upgrade.return_value
impl.upgrade.assert_called_with("0.9", "1.0")


class TestS3IO:
@pytest.fixture
def test_model_cls(self):
class TestModel:
def __init__(self, name, value):
self.name = name
self.value = str(value)
return TestModel

@pytest.fixture
def test_models(self, test_model_cls):
return [test_model_cls(f"model_{n}", n) for n in range(3)]

@pytest.fixture
def bucket_name(self):
return 'test-bucket'

@pytest.yield_fixture
def s3_io_filled(self, bucket_name, test_models, s3_io_cls):
import moto, boto3
with moto.mock_s3():
conn = boto3.resource('s3')
conn.create_bucket(Bucket=bucket_name)

s3 = boto3.client('s3')
for model in test_models:
s3.put_object(
Bucket=bucket_name,
Key=model.name,
Body=model.value,
)

yield s3_io_cls()

@pytest.fixture
def s3_io_cls(self):
from palladium.persistence import S3IO
return S3IO

def test_open(self, s3_io_filled, test_models, bucket_name):
for expected_model in test_models:
obj = s3_io_filled.open(posixpath.join(
bucket_name,
expected_model.name,
))

assert obj.read() == expected_model.value

def test_exists(self, s3_io_filled, test_models, bucket_name):
for expected_model in test_models:
assert s3_io_filled.exists(posixpath.join(
bucket_name,
expected_model.name,
))

def test_remove(self, s3_io_filled, test_models, bucket_name):
s3_io_filled.remove(posixpath.join(
bucket_name,
test_models[0].name,
))

assert not s3_io_filled.exists(posixpath.join(
bucket_name,
test_models[0].name,
))

for expected_model in test_models[1:]:
assert s3_io_filled.exists(posixpath.join(
bucket_name,
expected_model.name,
))



class TestS3:
@pytest.fixture
def s3_cls(self):
from palladium.persistence import S3
return S3

@pytest.yield_fixture
def s3_cls_with_bucket(self, bucket_name, s3_cls):
import moto, boto3
with moto.mock_s3():
conn = boto3.resource('s3')
conn.create_bucket(Bucket=bucket_name)

yield s3_cls

@pytest.fixture
def bucket_name(self):
return 'test-bucket'

@pytest.fixture
def dummy_model(self):
return Dummy(weight=3)

def test_write_read(self, dummy_model, bucket_name, s3_cls_with_bucket):
persister = s3_cls_with_bucket(posixpath.join(
bucket_name,
'mymodel-single-{version}',
))
persister.write(dummy_model)

assert persister.io.exists(posixpath.join(
bucket_name,
'mymodel-single-1.pkl.gz',
))

assert persister.io.exists(posixpath.join(
bucket_name,
'mymodel-single-metadata.json',
))

model = persister.read(version=1)
assert type(model) == type(dummy_model)

def test_successive_writes(self, dummy_model, bucket_name, s3_cls_with_bucket):
# NOTE: using the same file spec as with `test_write_read` fails,
# probably due to some internal inconsistency in moto.
persister = s3_cls_with_bucket(posixpath.join(
bucket_name,
'mymodel-successive-{version}',
))

dummy_model_1 = copy.copy(dummy_model)
dummy_model_1.weight = 8

dummy_model_2 = copy.copy(dummy_model)
dummy_model_2.weight = 9

persister.write(dummy_model_1)
persister.write(dummy_model_2)

read_model_1 = persister.read(version=1)
read_model_2 = persister.read(version=2)

assert read_model_1.weight == dummy_model_1.weight
assert read_model_2.weight == dummy_model_2.weight
1 change: 1 addition & 0 deletions setup.py
Expand Up @@ -61,6 +61,7 @@
'docs': docs_require,
'julia': ['julia'],
'R': ['rpy2'],
'S3': ['s3fs', 'moto'],
},
entry_points={
'console_scripts': [
Expand Down

0 comments on commit 20e369b

Please sign in to comment.