New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue#108] Refactor Storage API #119

Merged
merged 22 commits into from Jul 2, 2017

Fix tests.

  • Loading branch information...
ooq committed Jun 14, 2017
commit 5679816a1f7e6e5720dbfe181ea50638da0e25e0
Copy path View file
@@ -25,7 +25,7 @@
from pywren.cloudpickle import serialize
from pywren.future import ResponseFuture, JobState
from pywren.wait import *
from pywren.storage import storage_utils, create_keys
from pywren.storage import storage_utils
logger = logging.getLogger(__name__)
@@ -126,7 +126,8 @@ def invoke_with_keys(self, func_key, data_key, output_key,
host_job_meta.update(arg_dict)
fut = ResponseFuture(call_id, callset_id, host_job_meta, self.storage_config)
storage_path = storage_utils.get_storage_path(self.storage_config)

This comment has been minimized.

@shivaram

shivaram Jun 30, 2017

Collaborator

nit: This can probably be done at the beginning where we construct storage_config ?

fut = ResponseFuture(call_id, callset_id, host_job_meta, storage_path)
fut._set_state(JobState.invoked)
@@ -216,7 +217,7 @@ def invoke(data_str, callset_id, call_id, func_key,
host_job_meta,
agg_data_key = None, data_byte_range=None ):
data_key, output_key, status_key \
= create_keys(self.storage.prefix, callset_id, call_id)
= storage_utils.create_keys(self.storage.prefix, callset_id, call_id)
host_job_meta['job_invoke_timestamp'] = time.time()
Copy path View file
@@ -15,7 +15,7 @@
from multiprocessing.pool import ThreadPool
import time
from pywren.executor import *
import pywren.storage as storage
from pywren.storage import storage, storage_utils
import logging
import botocore
import glob2
@@ -38,7 +38,7 @@ class ResponseFuture(object):
"""
"""
GET_RESULT_SLEEP_SECS = 4
def __init__(self, call_id, callset_id, invoke_metadata, storage_config):
def __init__(self, call_id, callset_id, invoke_metadata, storage_path):
self.call_id = call_id
self.callset_id = callset_id
@@ -48,7 +48,7 @@ def __init__(self, call_id, callset_id, invoke_metadata, storage_config):
self.status_query_count = 0
self.storage_config = storage_config
self.storage_path = storage_path
def _set_state(self, new_state):
## FIXME add state machine
@@ -101,7 +101,11 @@ def result(self, timeout=None, check_only=False, throw_except=True, storage_hand
return None
if storage_handler is None:

This comment has been minimized.

@shivaram

shivaram Apr 17, 2017

Collaborator

Did we reach consensus on what the model we want to support is ? There was the other option of passing in the storage_handler while constructing the future that I think avoids some of the failure scenarios ?

This comment has been minimized.

@ooq

ooq Apr 17, 2017

Collaborator

Yes, we (Eric and I) agreed that we want futures to be serializable. Storing storage handler as a field of future will make it un-serializable, my previous solution was to have a save() method in which we first set future.storage_handler=None to make future serializable. We then agree that having pickle.dump(futures) to work is also important.

storage_handler = storage.Storage(self.storage_config)
storage_config = wrenconfig.extract_storage_config(wrenconfig.default())
storage_handler = storage.Storage(storage_config)
storage_utils.check_storage_path(storage_handler.get_storage_config(), self.storage_path)

This comment has been minimized.

@shivaram

shivaram Jun 19, 2017

Collaborator

Nice !

call_status = storage_handler.get_call_status(self.callset_id, self.call_id)
Copy path View file
@@ -3,8 +3,13 @@ def __init__(self, key):
msg = "No such key {} found in storage.".format(key)
super(StorageNoSuchKeyError, self).__init__(msg)
class StorageOutputNotFoundError(Exception):
def __init__(self, callset_id, call_id):
msg = "Output for {} {} not found in storage.".format(callset_id, call_id)
super(StorageNoSuchKeyError, self).__init__(msg)
super(StorageOutputNotFoundError, self).__init__(msg)
class StorageConfigMismatchError(Exception):
def __init__(self, current_path, prev_path):
msg = "The data is stored at {}, but current storage is configured at {}.".format(
prev_path, current_path)
super(StorageConfigMismatchError, self).__init__(msg)
Copy path View file
@@ -1,3 +1,4 @@
from __future__ import absolute_import
import json
from .storage_utils import *
@@ -58,7 +59,7 @@ def get_callset_status(self, callset_id):
# this in scheduler refactoring.
callset_prefix = os.path.join(self.prefix, callset_id)
keys = self.service_handler.list_keys_with_prefix(callset_prefix)
suffix = storage_utils.status_key_suffix
suffix = status_key_suffix
status_keys = [k for k in keys if suffix in k]
call_ids = [k[len(callset_prefix)+1:].split("/")[0] for k in status_keys]
return call_ids
@@ -70,7 +71,7 @@ def get_call_status(self, callset_id, call_id):
:param call_id: call ID of the call
:return: A dictionary containing call's status, or None if no updated status
"""
status_key = storage_utils.create_status_key(self.prefix, callset_id, call_id)
status_key = create_status_key(self.prefix, callset_id, call_id)
try:
data = self.service_handler.get_object(status_key)
return json.loads(data.decode('ascii'))
@@ -84,7 +85,7 @@ def get_call_output(self, callset_id, call_id):
:param call_id: call ID of the call
:return: Output of the call.
"""
output_key = storage_utils.create_output_key(self.prefix, callset_id, call_id)
output_key = create_output_key(self.prefix, callset_id, call_id)
try:
return self.service_handler.get_object(output_key)
except StorageNoSuchKeyError:
@@ -100,7 +101,7 @@ def get_runtime_info(runtime_config):
if runtime_config['runtime_storage'] != 's3':
raise NotImplementedError(("Storing runtime in non-S3 storage is not " +
"supported yet").format(runtime_config['runtime_storage']))
config = []
config = dict()
config['bucket'] = runtime_config['s3_bucket']
handler = S3Service(config)
Copy path View file
@@ -1,12 +1,13 @@
import os
from .exceptions import *
func_key_suffix = "func.json"
agg_data_key_suffix = "aggdata.pickle"
data_key_suffix = "data.pickle"
output_key_suffix = "output.pickle"
status_key_suffix = "status.json"
def create_func_key(prefix, callset_id):
"""
Create function key
@@ -76,3 +77,14 @@ def create_keys(prefix, callset_id, call_id):
return data_key, output_key, status_key
def get_storage_path(config):
if config['storage_service'] != 's3':
raise NotImplementedError(("Using {} as storage service is" +
"not supported yet").format(config['storage_service']))
return [config['storage_service'], config['service_config']['bucket'], config['storage_prefix']]
def check_storage_path(config, prev_path):
current_path = get_storage_path(config)
if current_path != prev_path:
raise StorageConfigMismatchError(current_path, prev_path)
Copy path View file
@@ -11,6 +11,7 @@
from pywren.future import JobState
import pywren.storage as storage
import pywren.wrenconfig as wrenconfig
ALL_COMPLETED = 1
ANY_COMPLETED = 2
@@ -83,9 +84,9 @@ def _wait(fs, THREADPOOL_SIZE):
# get the list of all objects in this callset
callset_id = present_callsets.pop() # FIXME assume only one
f0 = not_done_futures[0] # This is a hack too
storage_handler = storage.Storage(f0.storage_config)
storage_config = wrenconfig.extract_storage_config(wrenconfig.default())
storage_handler = storage.Storage(storage_config)
callids_done = storage_handler.get_callset_status(callset_id)
callids_done = set(callids_done)
ProTip! Use n and p to navigate between commits in a pull request.