New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Issue#108] Refactor Storage API #119
Conversation
Okay, I'll make changes per discussion with @ericmjonas yesterday. Just for record, below is a performance graph for #109 |
Done. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ooq - I did one pass and had some inline comments.
@@ -103,10 +100,10 @@ def result(self, timeout=None, check_only=False, throw_except=True): | |||
else: | |||
return None | |||
|
|||
if storage_handler is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did we reach consensus on what the model we want to support is ? There was the other option of passing in the storage_handler while constructing the future that I think avoids some of the failure scenarios ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we (Eric and I) agreed that we want futures to be serializable. Storing storage handler as a field of future will make it un-serializable, my previous solution was to have a save()
method in which we first set future.storage_handler=None
to make future serializable. We then agree that having pickle.dump(futures)
to work is also important.
pywren/runtime.py
Outdated
import pywren.storage as storage | ||
import pywren.wrenconfig as wrenconfig | ||
|
||
# FIXME separate runtime code with S3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you clarify what this fixme means ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, this has been fixed. Will remove.
pywren/runtime.py
Outdated
s3 = boto3.resource('s3') | ||
if storage_handler is None: | ||
storage_config = wrenconfig.extract_storage_config(wrenconfig.default()) | ||
storage_handler = storage.Storage(storage_config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to above, is there a case where we can't pass in a Storage
object ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can happen in tests, etc.
pywren/storage/s3_service.py
Outdated
""" | ||
Get object from S3 with a key. | ||
:param key: key of the object | ||
:return: Data of the object |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we specify the type of the return object ? i.e. is it a byte array or an iterator etc ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
pywren/storage/storage.py
Outdated
:param call_id: call's ID | ||
:return: data_key, output_key, status_key | ||
""" | ||
data_key = os.path.join(self.prefix, callset_id, call_id, "data.pickle") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all of these suffixes are hardcoded right now. This is probably not a problem for this PR, but we should come back and refactor this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
pywren/wren.py
Outdated
|
||
standalone_executor = remote_executor | ||
|
||
|
||
def save_futures_to_string(futures): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we adding these functions in this PR ? Not sure it relates to the storage refactoring ? Can we move this out and discuss this separately ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense.
@@ -37,7 +36,17 @@ | |||
|
|||
PROCESS_STDOUT_SLEEP_SECS = 2 | |||
|
|||
def download_runtime_if_necessary(s3conn, runtime_s3_bucket, runtime_s3_key): | |||
def get_key_size(s3client, bucket, key): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just to clarify this is s3 specific right now ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes.
@shivaram @ericmjonas Comments addressed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ooq Apologies for the delay. I just had a couple of high level comments that I think would be good to think about.
pywren/executor.py
Outdated
'data_key' : s3_data_key, | ||
'output_key' : s3_output_key, | ||
'status_key' : s3_status_key, | ||
arg_dict = {'storage_info' : self.storage.get_storage_info(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the main thing that we discussed the other day -- can we see if there a better way to serialize / send information from the driver to the workers ? One possibility could be that we send a config
object or the result of extract_storage_config
and then recreate the StorageHandler on the worker side ?
pywren/executor.py
Outdated
host_job_meta['data_size_bytes'] = data_size_bytes | ||
|
||
if data_size_bytes < wrenconfig.MAX_AGG_DATA_SIZE and data_all_as_one: | ||
s3_agg_data_key = s3util.create_agg_data_key(self.s3_bucket, | ||
self.s3_prefix, callset_id) | ||
agg_data_key = self.storage.create_agg_data_key(callset_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so one more thought -- all these utility functions to create keys, can we stick them into a StorageUtils
or something like that ? These functions shouldn't change based on storage backend, is that right ?
pywren/storage/storage.py
Outdated
status_key = os.path.join(self.prefix, callset_id, call_id, "status.json") | ||
return data_key, output_key, status_key | ||
|
||
def create_func_key(self, callset_id): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is related to the previous comment -- I feel like we should only have the abstract methods in this class so it is very clear what needs to be implemented for a new implementation ?
@@ -46,7 +55,7 @@ def download_runtime_if_necessary(s3conn, runtime_s3_bucket, runtime_s3_key): | |||
""" | |||
|
|||
# get runtime etag | |||
runtime_meta = s3conn.meta.client.head_object(Bucket=runtime_s3_bucket, | |||
runtime_meta = s3_client.head_object(Bucket=runtime_s3_bucket, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be a good thing to think about what parts of wrenhandler cannot be handled by our existing storage API. I guess right now this includes head_object
, download_file
and upload_file
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did we reach any longer term consensus on this ? If not we can talk about it this week
pywren/storage/storage.py
Outdated
agg_data_key = os.path.join(self.prefix, callset_id, "aggdata.pickle") | ||
return agg_data_key | ||
|
||
def get_callset_status(self, callset_id): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably be renamed. It doesn't really return the status right ? I'd call it something like get_calls_in_callset
?
@ooq Can you rebase with the latest fix ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of the changes look good. I think putting the storage path check in there and also the storage backend API separation have resolved most of the big questions i had. I had some small comments and we can merge this once tests pass ?
@@ -2,6 +2,9 @@ | |||
|
|||
import boto3 | |||
import json | |||
|
|||
from pywren.storage.storage_utils import create_func_key |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sort this and put it at the bottom ? Or we can do this with the style check PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's do this in style check.
pywren/executor.py
Outdated
# using s3_bucket and s3_key | ||
# Pick a runtime url if we have shards. | ||
# If not the handler will construct it | ||
# TODO: we should always set the url, so our code here is S3-independent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I follow this comment ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This TODO is actually fixed. Removing this comment.
storage_config = wrenconfig.extract_storage_config(wrenconfig.default()) | ||
storage_handler = storage.Storage(storage_config) | ||
|
||
storage_utils.check_storage_path(storage_handler.get_storage_config(), self.storage_path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice !
@@ -46,7 +55,7 @@ def download_runtime_if_necessary(s3conn, runtime_s3_bucket, runtime_s3_key): | |||
""" | |||
|
|||
# get runtime etag | |||
runtime_meta = s3conn.meta.client.head_object(Bucket=runtime_s3_bucket, | |||
runtime_meta = s3_client.head_object(Bucket=runtime_s3_bucket, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did we reach any longer term consensus on this ? If not we can talk about it this week
pywren/storage/storage.py
Outdated
self.prefix = config['storage_prefix'] | ||
self.service = config['storage_service'] | ||
if config['storage_service'] == 's3': | ||
self.service_handler = S3Service(config['service_config']) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we call this storage backend ? might be better than service_handler. Also it might be better to have an explicit interface for storage backend - that makes it easier to code / test ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StorageBackend sounds better.
I looked into the python way for interface , and Abstract Base Class, which seems to be the to-go method. The benefit of having an interface class for code/test is probably very limited for a dynamic language, as we probably won't get NotImplementedError from IDE or install time. To that end, this is much worth to add more non-pythonic code?
Thanks for the comments @shivaram . The PR is updated. |
@apengwin Can you also take a look at this? |
if not runtime.runtime_key_valid(self.runtime_meta_info): | ||
raise Exception("The indicated runtime: s3://{}/{} is not approprite for this python version".format(runtime_s3_bucket, runtime_s3_key)) | ||
self.config = config | ||
self.storage_config = wrenconfig.extract_storage_config(self.config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we formalize what config
looks like? Also, it might be easier to have just one self.config
instead of both self.storage_config
and self.config
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have wanted to formalize/document config for quite some time. For now, this is structured/parsed in a way to be backward-compatible with .pywren_config. Let's formalize config in a separate PR.
For the self.storage_config
, so the idea is to make the config more "hierarchical" and only expose parameters that are related to class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is kind of petty, but it's confusing to have multiple config
variables that are all different, so should we change to something like storage_info
@@ -17,6 +17,8 @@ class SimpleAsync(unittest.TestCase): | |||
Test sqs dispatch but with local runner | |||
""" | |||
def setUp(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be set_up
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do have some inconsistent formatting now and needs to be fixed.
Can the purpose of this PR, can you just review the changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ooq -- Sorry for the delay in reviewing. I had a couple of small nits, but otherwise this looks good. Lets merge this and we can fix things like style as follow ups.
fut = ResponseFuture(call_id, callset_id, host_job_meta, | ||
self.s3_bucket, self.s3_prefix, | ||
self.aws_region) | ||
storage_path = storage_utils.get_storage_path(self.storage_config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This can probably be done at the beginning where we construct storage_config ?
pywren/executor.py
Outdated
@@ -176,27 +166,25 @@ def map(self, func, iterdata, extra_env = None, extra_meta = None, | |||
host_job_meta = {} | |||
|
|||
pool = ThreadPool(invoke_pool_threads) | |||
callset_id = s3util.create_callset_id() | |||
callset_id = wrenutil.create_callset_id() | |||
data = list(iterdata) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Strange that this shows up in the diff - how was it working before without this line ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s3util.create_callset_id
is just a wrapper around wrenutil.create_callset_id
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The data=list(iterdata)
line does appear twice here. @ooq ?
This PR:
1, creates a Storage class as a middle layer to separate PyWren core and S3.
2, creates S3Service that wraps S3 boto3 APIs. In S3Service, reuses the same S3Client per #109 .
3, adds
pywren. save_futures_to_string
,pywren.load_futures_from_string
,pywren.save_futures_to_file
,pywren.load_futures_from_file
.4, in
wrenhandler.py
, use #23 .5, removes
s3util.py
.6, configuration: each object/method should only be exposed to configuration that is needed for that object/method. For example, the config file used to create
Storage
should be generated bywrenconfig.extract_storage_config(config)
;config['runtime']
should be passed toruntime.get_runtime_info(config['runtime'])
instead ofconfig
.7, documentation is added for new classes and methods.
8, no new test is added.
@shivaram @ericmjonas