New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue#108] Refactor Storage API #119

Merged
merged 22 commits into from Jul 2, 2017

Better seprating storage and service.

  • Loading branch information...
ooq committed Apr 9, 2017
commit 228d9d1646c38d90de2705eb84a09d4af575a6e1
Copy path View file
@@ -36,8 +36,9 @@ def __init__(self, invoker, config, job_max_runtime):
self.job_max_runtime = job_max_runtime
self.config = config
self.storage = storage.get_storage(config)
self.runtime_meta_info = runtime.get_runtime_info(config)
self.storage_config = wrenconfig.extract_storage_config(self.config)

This comment has been minimized.

@apengwin

apengwin Jun 21, 2017

Contributor

Can we formalize what config looks like? Also, it might be easier to have just one self.config instead of both self.storage_config and self.config

This comment has been minimized.

@ooq

ooq Jun 21, 2017

Collaborator

I have wanted to formalize/document config for quite some time. For now, this is structured/parsed in a way to be backward-compatible with .pywren_config. Let's formalize config in a separate PR.
For the self.storage_config, so the idea is to make the config more "hierarchical" and only expose parameters that are related to class.

This comment has been minimized.

@apengwin

apengwin Jun 21, 2017

Contributor

This is kind of petty, but it's confusing to have multiple config variables that are all different, so should we change to something like storage_info

self.storage = storage.Storage(self.storage_config)
self.runtime_meta_info = self.storage.get_runtime_info(config['runtime'])
if 'preinstalls' in self.runtime_meta_info:
@@ -72,8 +73,8 @@ def invoke_with_keys(self, func_key, data_key, output_key,
random.seed()
runtime_url = random.choice(self.runtime_meta_info['urls'])
arg_dict = {'func_key' : func_key,
arg_dict = {'storage_info' : self.storage.get_storage_info(),

This comment has been minimized.

@shivaram

shivaram Apr 28, 2017

Collaborator

This is the main thing that we discussed the other day -- can we see if there a better way to serialize / send information from the driver to the workers ? One possibility could be that we send a config object or the result of extract_storage_config and then recreate the StorageHandler on the worker side ?

'func_key' : func_key,
'data_key' : data_key,
'output_key' : output_key,
'status_key' : status_key,
@@ -181,7 +182,7 @@ def map(self, func, iterdata, extra_env = None, extra_meta = None,
agg_data_key = self.storage.create_agg_data_key(callset_id)

This comment has been minimized.

@shivaram

shivaram Apr 28, 2017

Collaborator

so one more thought -- all these utility functions to create keys, can we stick them into a StorageUtils or something like that ? These functions shouldn't change based on storage backend, is that right ?

agg_data_bytes, agg_data_ranges = self.agg_data(data_strs)
agg_upload_time = time.time()
self.storage.put_object(agg_data_key[1], agg_data_bytes)
self.storage.put_object(agg_data_key, agg_data_bytes)
host_job_meta['agg_data'] = True
host_job_meta['data_upload_time'] = time.time() - agg_upload_time
host_job_meta['data_upload_timestamp'] = time.time()
@@ -204,7 +205,7 @@ def map(self, func, iterdata, extra_env = None, extra_meta = None,
func_upload_time = time.time()
func_key = self.storage.create_func_key(callset_id)
self.storage.put_object(func_key[1], func_module_str)
self.storage.put_object(func_key, func_module_str)
host_job_meta['func_upload_time'] = time.time() - func_upload_time
host_job_meta['func_upload_timestamp'] = time.time()
def invoke(data_str, callset_id, call_id, func_key,
Copy path View file
@@ -48,7 +48,7 @@ def __init__(self, call_id, callset_id, invoke_metadata, storage):
self.status_query_count = 0
# FIXME: we now put a reference of storage in future, can we still serialize futures?
self.storage = storage
#self.storage = storage
def _set_state(self, new_state):
## FIXME add state machine
Copy path View file
@@ -10,7 +10,7 @@ def get_runtime_info(config):
"""
Download runtime information from S3 at deserialize
"""
runtime_meta = storage.get_storage(config).get_runtime_info()
runtime_meta = storage.Storage(config).get_runtime_info()
if not runtime_valid(runtime_meta):
raise Exception("The indicated runtime: {} "
Copy path View file
@@ -6,84 +6,47 @@
class S3Service(object):
def __init__(self, config):
self.config = config
self.aws_region = config['account']['aws_region']
FUNCTION_NAME = config['lambda']['function_name']
self.s3_bucket = config['s3']['bucket']
self.s3_prefix = config['s3']['pywren_prefix']
self.runtime_bucket = config['runtime']['s3_bucket']
self.runtime_key = config['runtime']['s3_key']
self.prefix = config['s3']['pywren_prefix']
def __init__(self, s3config):
self.s3_bucket = s3config['bucket']
self.session = botocore.session.get_session()
self.s3client = self.session.create_client('s3', region_name=self.aws_region)
self.s3client = self.session.create_client('s3')
def get_storage_location(self):
return self.s3_bucket
def put_object(self, key, body):
self.s3client.put_object(Bucket=self.s3_bucket, Key=key, Body=body)
def create_keys(self, callset_id, call_id):
data_key = (self.s3_bucket, os.path.join(self.prefix, callset_id, call_id, "data.pickle"))
output_key = (self.s3_bucket, os.path.join(self.prefix, callset_id, call_id, "output.pickle"))
status_key = (self.s3_bucket, os.path.join(self.prefix, callset_id, call_id, "status.json"))
return data_key, output_key, status_key
def create_func_key(self, callset_id):
func_key = (self.s3_bucket, os.path.join(self.prefix, callset_id, "func.json"))
return func_key
def create_agg_data_key(self, callset_id):
func_key = (self.s3_bucket, os.path.join(self.prefix, callset_id, "aggdata.pickle"))
return func_key
def get_callset_done(self, callset_id):
key_prefix = os.path.join(self.prefix, callset_id)
paginator = self.s3client.get_paginator('list_objects_v2')
operation_parameters = {'Bucket': self.s3_bucket,
'Prefix': key_prefix}
page_iterator = paginator.paginate(**operation_parameters)
status_keys = []
for page in page_iterator:
for item in page['Contents']:
object_name = item['Key']
if "status.json" in object_name:
status_keys.append(object_name)
def get_object(self, key):
r = self.s3client.get_object(Bucket = self.s3_bucket, Key = key)
data = r['Body'].read()
return data
call_ids = [k[len(key_prefix)+1:].split("/")[0] for k in status_keys]
return call_ids
def get_callset_status(self, callset_prefix, status_suffix):
paginator = self.s3client.get_paginator('list_objects_v2')
operation_parameters = {'Bucket': self.s3_bucket,
'Prefix': callset_prefix}
page_iterator = paginator.paginate(**operation_parameters)
def get_call_status(self, callset_id, call_id):
s3_data_key, s3_output_key, s3_status_key = self.create_keys(callset_id, call_id)
status_keys = []
for page in page_iterator:
for item in page['Contents']:
object_name = item['Key']
if status_suffix in object_name:
status_keys.append(object_name)
call_ids = [k[len(callset_prefix)+1:].split("/")[0] for k in status_keys]
return call_ids
def get_call_status(self, s3_status_key):
try:
r = self.s3client.get_object(Bucket = self.s3_bucket, Key = s3_status_key[1])
result_json = r['Body'].read()
return json.loads(result_json.decode('ascii'))
data = self.get_object(s3_status_key)
return json.loads(data.decode('ascii'))
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "NoSuchKey":
return None
else:
raise e
def get_call_output(self, callset_id, call_id):
s3_data_key, s3_output_key, s3_status_key = self.create_keys(callset_id, call_id)
r = self.s3client.get_object(Bucket = self.s3_bucket, Key = s3_output_key[1])
return r['Body'].read()
def get_runtime_info(self):
runtime_meta_key = self.runtime_key.replace(".tar.gz", ".meta.json")
json_str = self.s3client.get_object(Bucket=self.runtime_bucket,
Key=runtime_meta_key)['Body'].read()
runtime_meta = json.loads(json_str.decode("ascii"))
return runtime_meta
def get_call_output(self, s3_output_key):
return self.get_object(s3_output_key)
Copy path View file
@@ -1,12 +1,67 @@
import os
import json
from s3_service import S3Service
def get_storage(config):
'''
Get the function service based on configuration.
Currently we only support AWS Lambda.
Google Cloud Functions and Azure are to be supported in the future.
:return: A handler for the function service.
'''
return S3Service(config)
class Storage(object):
def __init__(self, config):
self.prefix = config['storage_prefix']
self.service = config['storage_service']
if config['storage_service'] == 's3':
self.service_handler = S3Service(config['s3'])
else:
raise NotImplementedError(("Using {} as storage service is" +
"not supported yet").format(config['storage_service']))
def get_storage_info(self):
info = dict()
info['service'] = self.service
info['location'] = self.service_handler.get_storage_location()
return info
def put_object(self, key, data):
return self.service_handler.put_object(key, data)
def get_object(self, key):
return self.service_handler.get_object(key)
def create_keys(self, callset_id, call_id):
data_key = os.path.join(self.prefix, callset_id, call_id, "data.pickle")

This comment has been minimized.

@shivaram

shivaram Apr 17, 2017

Collaborator

all of these suffixes are hardcoded right now. This is probably not a problem for this PR, but we should come back and refactor this

This comment has been minimized.

@ooq

ooq Apr 17, 2017

Collaborator

ditto

output_key = os.path.join(self.prefix, callset_id, call_id, "output.pickle")
status_key = os.path.join(self.prefix, callset_id, call_id, "status.json")
return data_key, output_key, status_key
def create_func_key(self, callset_id):

This comment has been minimized.

@shivaram

shivaram Apr 28, 2017

Collaborator

This is related to the previous comment -- I feel like we should only have the abstract methods in this class so it is very clear what needs to be implemented for a new implementation ?

func_key = os.path.join(self.prefix, callset_id, "func.json")
return func_key
def create_agg_data_key(self, callset_id):
agg_data_key = os.path.join(self.prefix, callset_id, "aggdata.pickle")
return agg_data_key
def get_callset_status(self, callset_id):

This comment has been minimized.

@shivaram

shivaram Apr 28, 2017

Collaborator

This should probably be renamed. It doesn't really return the status right ? I'd call it something like get_calls_in_callset ?

callset_prefix = os.path.join(self.prefix, callset_id)
status_suffix = "status.json"
return self.service_handler.get_callset_status(callset_prefix, status_suffix)
def get_call_status(self, callset_id, call_id):
_, _, status_key = self.create_keys(callset_id, call_id)
return self.service_handler.get_call_status(status_key)
def get_call_output(self, callset_id, call_id):
_, output_key, _ = self.create_keys(callset_id, call_id)
return self.service_handler.get_call_output(output_key)
def get_runtime_info(self, runtime_config):
if runtime_config['runtime_storage'] != 's3':
raise NotImplementedError(("Storing runtime in non-S3 storage is not " +
"supported yet").format(runtime_config['runtime_storage']))
bucket = runtime_config['s3_bucket']
key = runtime_config['s3_key'].replace(".tar.gz", ".meta.json")
handler = S3Service({"bucket":bucket})
json_str = handler.get_object(key)
runtime_meta = json.loads(json_str.decode("ascii"))
return runtime_meta
Copy path View file
@@ -84,7 +84,7 @@ def _wait(fs, THREADPOOL_SIZE):
callset_id = present_callsets.pop() # FIXME assume only one
f0 = not_done_futures[0] # This is a hack too
callids_done = f0.storage.get_callset_done(callset_id)
callids_done = f0.storage.get_callset_status(callset_id)
callids_done = set(callids_done)
Copy path View file
@@ -69,9 +69,21 @@ def default():
raise ValueError("could not find configuration file")
config_data = load(config_filename)
config_data['storage_service'] = 's3'
config_data['storage_prefix'] = config_data['s3']['pywren_prefix']
config_data['runtime']['runtime_storage'] = 's3'
return config_data
def extract_storage_config(config):
storage_config = dict()
storage_config['storage_service'] = config['storage_service']
storage_config['storage_prefix'] = config['storage_prefix']
if storage_config['storage_service'] == 's3':
storage_config['s3'] = {}
storage_config['s3']['bucket'] = config['s3']['bucket']
return storage_config
basic_role_policy = {
"Version": "2012-10-17",
"Statement": [{
Copy path View file
@@ -135,7 +135,12 @@ def generic_handler(event, context_dict):
try:
response_status = {'exception' : None}
s3 = boto3.resource('s3')
logger.error(event)
if event['storage_info']['service'] != 's3':
raise NotImplementedError(("Using {} as storage service is not supported " +
"yet.").format(event['storage_info']['service']))
s3 = boto3.resource("s3")
s3_bucket = event['storage_info']['location']
logger.info("invocation started")
@@ -173,22 +178,19 @@ def generic_handler(event, context_dict):
response_status['output_key'] = output_key
response_status['status_key'] = status_key
b, k = data_key
KS = get_key_size(b, k)
#logger.info("bucket=", b, "key=", k, "status: ", KS, "bytes" )
KS = get_key_size(s3_bucket, data_key)
#logger.info("bucket=", s3_bucket, "key=", data_key, "status: ", KS, "bytes" )
while KS is None:
logger.warn("WARNING COULD NOT GET FIRST KEY" )
KS = get_key_size(b, k)
KS = get_key_size(s3_bucket, data_key)
if not event['use_cached_runtime'] :
subprocess.check_output("rm -Rf {}/*".format(RUNTIME_LOC), shell=True)
# get the input and save to disk
# FIXME here is we where we would attach the "canceled" metadata
s3.meta.client.download_file(func_key[0], func_key[1], func_filename)
s3.meta.client.download_file(s3_bucket, func_key, func_filename)
func_download_time = time.time() - start_time
response_status['func_download_time'] = func_download_time
@@ -198,7 +200,7 @@ def generic_handler(event, context_dict):
s3.meta.client.download_file(data_key[0], data_key[1], data_filename)
else:
range_str = 'bytes={}-{}'.format(*data_byte_range)
dres = s3.meta.client.get_object(Bucket=data_key[0], Key=data_key[1],
dres = s3.meta.client.get_object(Bucket=s3_bucket, Key=data_key,
Range=range_str)
data_fid = open(data_filename, 'wb')
data_fid.write(dres['Body'].read())
@@ -311,9 +313,9 @@ def consume_stdout(stdout, queue):
logger.info("command execution finished")
s3.meta.client.upload_file(output_filename, output_key[0],
output_key[1])
logger.debug("output uploaded to %s %s", output_key[0], output_key[1])
s3.meta.client.upload_file(output_filename, s3_bucket,
output_key)
logger.debug("output uploaded to %s %s", s3_bucket, output_key)
end_time = time.time()
@@ -333,8 +335,8 @@ def consume_stdout(stdout, queue):
response_status['exception_args'] = e.args
response_status['exception_traceback'] = traceback.format_exc()
finally:
s3.meta.client.put_object(Bucket=status_key[0], Key=status_key[1],
logger.error(response_status)
s3.meta.client.put_object(Bucket=s3_bucket, Key=status_key,
Body=json.dumps(response_status))
ProTip! Use n and p to navigate between commits in a pull request.