New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue#108] Refactor Storage API #119

Merged
merged 22 commits into from Jul 2, 2017

Make Future Serializable Again.

  • Loading branch information...
ooq committed Apr 12, 2017
commit ed2ba9043bcaac9307245888240348a597052f10
View
@@ -122,7 +122,7 @@ def invoke_with_keys(self, func_key, data_key, output_key,
host_job_meta.update(arg_dict)
fut = ResponseFuture(call_id, callset_id, host_job_meta, self.storage)
fut = ResponseFuture(call_id, callset_id, host_job_meta, self.storage_config)
fut._set_state(JobState.invoked)
@@ -276,8 +276,6 @@ def reduce(self, function, list_of_futures,
"""
#if self.invoker.TIME_LIMIT:
wait(list_of_futures, return_when=ALL_COMPLETED) # avoid race condition
for f in list_of_futures:
f.prepare_before_serialize()
def reduce_func(fut_list):
# FIXME speed this up for big reduce
View
@@ -38,7 +38,7 @@ class ResponseFuture(object):
"""
"""
GET_RESULT_SLEEP_SECS = 4
def __init__(self, call_id, callset_id, invoke_metadata, storage):
def __init__(self, call_id, callset_id, invoke_metadata, storage_config):
self.call_id = call_id
self.callset_id = callset_id
@@ -48,10 +48,7 @@ def __init__(self, call_id, callset_id, invoke_metadata, storage):
self.status_query_count = 0
self.storage_config = storage.get_storage_config()
# a storage object is not serializable. So set this to None before saving the futures.
# See wren.py::save_futures
self.storage = storage
self.storage_config = storage_config
def _set_state(self, new_state):
## FIXME add state machine
@@ -73,12 +70,7 @@ def done(self):
return False
return True
def get_storage_handler(self):
if self.storage == None:
self.storage = storage.Storage(self.storage_config)
return self.storage
def result(self, timeout=None, check_only=False, throw_except=True):
def result(self, timeout=None, check_only=False, throw_except=True, storage_handler=None):
"""
@@ -108,7 +100,10 @@ def result(self, timeout=None, check_only=False, throw_except=True):
else:
return None
call_status = self.get_storage_handler().get_call_status(self.callset_id, self.call_id)
if storage_handler is None:

This comment has been minimized.

@shivaram

shivaram Apr 17, 2017

Collaborator

Did we reach consensus on what the model we want to support is ? There was the other option of passing in the storage_handler while constructing the future that I think avoids some of the failure scenarios ?

This comment has been minimized.

@ooq

ooq Apr 17, 2017

Collaborator

Yes, we (Eric and I) agreed that we want futures to be serializable. Storing storage handler as a field of future will make it un-serializable, my previous solution was to have a save() method in which we first set future.storage_handler=None to make future serializable. We then agree that having pickle.dump(futures) to work is also important.

storage_handler = storage.Storage(self.storage_config)
call_status = storage_handler.get_call_status(self.callset_id, self.call_id)
self.status_query_count += 1
@@ -121,7 +116,7 @@ def result(self, timeout=None, check_only=False, throw_except=True):
while call_status is None:
time.sleep(self.GET_RESULT_SLEEP_SECS)
call_status = self.storage.get_call_status(self.callset_id, self.call_id)
call_status = storage_handler.get_call_status(self.callset_id, self.call_id)
self.status_query_count += 1
self._invoke_metadata['status_done_timestamp'] = time.time()
@@ -151,7 +146,7 @@ def result(self, timeout=None, check_only=False, throw_except=True):
return None
call_output_time = time.time()
call_invoker_result = pickle.loads(self.get_storage_handler().get_call_output(
call_invoker_result = pickle.loads(storage_handler.get_call_output(
self.callset_id, self.call_id))
call_output_time_done = time.time()
@@ -199,6 +194,3 @@ def exception(self, timeout = None):
def add_done_callback(self, fn):
raise NotImplementedError()
def prepare_before_serialize(self):
self.storage = None
View
@@ -10,6 +10,7 @@
pickling_support.install()
from pywren.future import JobState
import pywren.storage as storage
ALL_COMPLETED = 1
ANY_COMPLETED = 2
@@ -84,7 +85,8 @@ def _wait(fs, THREADPOOL_SIZE):
callset_id = present_callsets.pop() # FIXME assume only one
f0 = not_done_futures[0] # This is a hack too
callids_done = f0.get_storage_handler().get_callset_status(callset_id)
storage_handler = storage.Storage(f0.storage_config)
callids_done = storage_handler.get_callset_status(callset_id)
callids_done = set(callids_done)
@@ -103,7 +105,7 @@ def _wait(fs, THREADPOOL_SIZE):
else:
fs_notdones.append(f)
def test(f):
f.result(throw_except=False)
f.result(throw_except=False, storage_handler=storage_handler)
pool = ThreadPool(THREADPOOL_SIZE)
pool.map(test, f_to_wait_on)
ProTip! Use n and p to navigate between commits in a pull request.