New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue#108] Refactor Storage API #119

Merged
merged 22 commits into from Jul 2, 2017

Test fix. Rename storage backend.

  • Loading branch information...
ooq committed Jun 21, 2017
commit e364ed0e43c5fe49d2e5967385cea41aefd26269
Copy path View file
@@ -22,7 +22,8 @@
import pywren.wrenutil as wrenutil
import pywren.runtime as runtime
import pywren.storage as storage
from pywren.cloudpickle import serialize
from pywren.serialize import cloudpickle, serialize
from pywren.serialize import create_mod_data
from pywren.future import ResponseFuture, JobState
from pywren.wait import *
from pywren.storage import storage_utils
@@ -67,7 +68,6 @@ def invoke_with_keys(self, func_key, data_key, output_key,
# Pick a runtime url if we have shards.
# If not the handler will construct it
# TODO: we should always set the url, so our code here is S3-independent
runtime_url = ""
if ('urls' in self.runtime_meta_info and
isinstance(self.runtime_meta_info['urls'], list) and
@@ -166,12 +166,8 @@ def map(self, func, iterdata, extra_env = None, extra_meta = None,
host_job_meta = {}
pool = ThreadPool(invoke_pool_threads)
<<<<<<< 3d14c30966836bb1613f17ee224e0f6c283e40d9
callset_id = s3util.create_callset_id()
=======
callset_id = wrenutil.create_callset_id()
data = list(iterdata)

This comment has been minimized.

@shivaram

shivaram Jun 30, 2017

Collaborator

Strange that this shows up in the diff - how was it working before without this line ?

This comment has been minimized.

@apengwin

apengwin Jul 1, 2017

Contributor

s3util.create_callset_id is just a wrapper around wrenutil.create_callset_id

This comment has been minimized.

@apengwin

apengwin Jul 1, 2017

Contributor

The data=list(iterdata) line does appear twice here. @ooq ?

screen shot 2017-06-30 at 11 00 16 pm

>>>>>>> Refactor storage API.
### pickle func and all data (to capture module dependencies
func_and_data_ser, mod_paths = self.serializer([func] + data)
@@ -1,7 +1,7 @@
import botocore
from .exceptions import *
class S3Service(object):
class S3Backend(object):
"""
A wrap-up around S3 boto3 APIs.
"""
Copy path View file
@@ -3,26 +3,26 @@
from .storage_utils import *
from .exceptions import *
from .s3_service import S3Service
from .s3_backend import S3Backend
class Storage(object):
"""
A Storage object is used by executors and other components to access underlying storage service
A Storage object is used by executors and other components to access underlying storage backend
without exposing the the implementation details.
Currently we only support S3 as the underlying service.
We plan to support other services in the future.
Currently we only support S3 as the underlying backend.
We plan to support other storage backends in the future.
"""
def __init__(self, config):
self.storage_config = config
self.prefix = config['storage_prefix']
self.service = config['storage_service']
if config['storage_service'] == 's3':
self.service_handler = S3Service(config['service_config'])
self.backend_type = config['storage_backend']
if config['storage_backend'] == 's3':
self.backend_handler = S3Backend(config['backend_config'])
else:
raise NotImplementedError(("Using {} as storage service is" +
"not supported yet").format(config['storage_service']))
raise NotImplementedError(("Using {} as storage backend is" +
"not supported yet").format(config['storage_backend']))
def get_storage_config(self):
"""
@@ -38,7 +38,7 @@ def put_data(self, key, data):
:param data: data content
:return: None
"""
return self.service_handler.put_object(key, data)
return self.backend_handler.put_object(key, data)
def put_func(self, key, func):
"""
@@ -47,7 +47,7 @@ def put_func(self, key, func):
:param data: serialized function
:return: None
"""
return self.service_handler.put_object(key, func)
return self.backend_handler.put_object(key, func)
def get_callset_status(self, callset_id):

This comment has been minimized.

@shivaram

shivaram Apr 28, 2017

Collaborator

This should probably be renamed. It doesn't really return the status right ? I'd call it something like get_calls_in_callset ?

"""
@@ -58,7 +58,7 @@ def get_callset_status(self, callset_id):
# TODO: a better API for this is to return status for all calls in the callset. We'll fix
# this in scheduler refactoring.
callset_prefix = os.path.join(self.prefix, callset_id)
keys = self.service_handler.list_keys_with_prefix(callset_prefix)
keys = self.backend_handler.list_keys_with_prefix(callset_prefix)
suffix = status_key_suffix
status_keys = [k for k in keys if suffix in k]
call_ids = [k[len(callset_prefix)+1:].split("/")[0] for k in status_keys]
@@ -73,7 +73,7 @@ def get_call_status(self, callset_id, call_id):
"""
status_key = create_status_key(self.prefix, callset_id, call_id)
try:
data = self.service_handler.get_object(status_key)
data = self.backend_handler.get_object(status_key)
return json.loads(data.decode('ascii'))
except StorageNoSuchKeyError:
return None
@@ -87,7 +87,7 @@ def get_call_output(self, callset_id, call_id):
"""
output_key = create_output_key(self.prefix, callset_id, call_id)
try:
return self.service_handler.get_object(output_key)
return self.backend_handler.get_object(output_key)
except StorageNoSuchKeyError:
raise StorageOutputNotFoundError(callset_id, call_id)
@@ -103,7 +103,7 @@ def get_runtime_info(runtime_config):
"supported yet").format(runtime_config['runtime_storage']))
config = dict()
config['bucket'] = runtime_config['s3_bucket']
handler = S3Service(config)
handler = S3Backend(config)
key = runtime_config['s3_key'].replace(".tar.gz", ".meta.json")
json_str = handler.get_object(key)
@@ -78,10 +78,10 @@ def create_keys(prefix, callset_id, call_id):
def get_storage_path(config):
if config['storage_service'] != 's3':
raise NotImplementedError(("Using {} as storage service is" +
"not supported yet").format(config['storage_service']))
return [config['storage_service'], config['service_config']['bucket'], config['storage_prefix']]
if config['storage_backend'] != 's3':
raise NotImplementedError(("Using {} as storage backend is" +
"not supported yet").format(config['storage_backend']))
return [config['storage_backend'], config['backend_config']['bucket'], config['storage_prefix']]
def check_storage_path(config, prev_path):
Copy path View file
@@ -69,20 +69,20 @@ def default():
raise ValueError("could not find configuration file")
config_data = load(config_filename)
config_data['storage_service'] = 's3'
config_data['storage_backend'] = 's3'
config_data['storage_prefix'] = config_data['s3']['pywren_prefix']
config_data['runtime']['runtime_storage'] = 's3'
return config_data
def extract_storage_config(config):
storage_config = dict()
storage_config['storage_service'] = config['storage_service']
storage_config['storage_backend'] = config['storage_backend']
storage_config['storage_prefix'] = config['storage_prefix']
if storage_config['storage_service'] == 's3':
storage_config['service_config'] = {}
storage_config['service_config']['bucket'] = config['s3']['bucket']
storage_config['service_config']['region'] = config['account']['aws_region']
if storage_config['storage_backend'] == 's3':
storage_config['backend_config'] = {}
storage_config['backend_config']['bucket'] = config['s3']['bucket']
storage_config['backend_config']['region'] = config['account']['aws_region']
return storage_config
basic_role_policy = {
Copy path View file
@@ -134,12 +134,12 @@ def generic_handler(event, context_dict):
try:
response_status = {'exception' : None}
if event['storage_config']['storage_service'] != 's3':
raise NotImplementedError(("Using {} as storage service is not supported " +
"yet.").format(event['storage_config']['storage_service']))
if event['storage_config']['storage_backend'] != 's3':
raise NotImplementedError(("Using {} as storage backend is not supported " +
"yet.").format(event['storage_config']['storage_backend']))
s3_client = boto3.client("s3")
s3_transfer = boto3.s3.transfer.S3Transfer(s3_client)
s3_bucket = event['storage_config']['service_config']['bucket']
s3_bucket = event['storage_config']['backend_config']['bucket']
logger.info("invocation started")
ProTip! Use n and p to navigate between commits in a pull request.