New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue#108] Refactor Storage API #119

Merged
merged 22 commits into from Jul 2, 2017
Copy path View file
@@ -2,6 +2,9 @@
import boto3
import json
from pywren.storage.storage_utils import create_func_key

This comment has been minimized.

@shivaram

shivaram Jun 19, 2017

Collaborator

sort this and put it at the bottom ? Or we can do this with the style check PR

This comment has been minimized.

@ooq

ooq Jun 20, 2017

Collaborator

Let's do this in style check.

try:
from six.moves import cPickle as pickle
except:
@@ -14,15 +17,16 @@
import glob2
import os
import pywren.s3util as s3util
import pywren.version as version
import pywren.wrenconfig as wrenconfig
import pywren.wrenutil as wrenutil
import pywren.runtime as runtime
import pywren.storage as storage
from pywren.serialize import cloudpickle, serialize
from pywren.serialize import create_mod_data
from pywren.future import ResponseFuture, JobState
from pywren.wait import *
from pywren.storage import storage_utils
logger = logging.getLogger(__name__)
@@ -32,50 +36,38 @@ class Executor(object):
Theoretically will allow for cross-AZ invocations
"""
def __init__(self, aws_region, s3_bucket, s3_prefix,
invoker, runtime_s3_bucket, runtime_s3_key, job_max_runtime):
self.aws_region = aws_region
self.s3_bucket = s3_bucket
self.s3_prefix = s3_prefix
self.session = botocore.session.get_session()
def __init__(self, invoker, config, job_max_runtime):
self.invoker = invoker
self.s3client = self.session.create_client('s3', region_name = aws_region)
self.job_max_runtime = job_max_runtime
self.runtime_bucket = runtime_s3_bucket
self.runtime_key = runtime_s3_key
self.runtime_meta_info = runtime.get_runtime_info(runtime_s3_bucket, runtime_s3_key)
if not runtime.runtime_key_valid(self.runtime_meta_info):
raise Exception("The indicated runtime: s3://{}/{} is not approprite for this python version".format(runtime_s3_bucket, runtime_s3_key))
self.config = config
self.storage_config = wrenconfig.extract_storage_config(self.config)

This comment has been minimized.

@apengwin

apengwin Jun 21, 2017

Contributor

Can we formalize what config looks like? Also, it might be easier to have just one self.config instead of both self.storage_config and self.config

This comment has been minimized.

@ooq

ooq Jun 21, 2017

Collaborator

I have wanted to formalize/document config for quite some time. For now, this is structured/parsed in a way to be backward-compatible with .pywren_config. Let's formalize config in a separate PR.
For the self.storage_config, so the idea is to make the config more "hierarchical" and only expose parameters that are related to class.

This comment has been minimized.

@apengwin

apengwin Jun 21, 2017

Contributor

This is kind of petty, but it's confusing to have multiple config variables that are all different, so should we change to something like storage_info

self.storage = storage.Storage(self.storage_config)
self.runtime_meta_info = runtime.get_runtime_info(config['runtime'])
if 'preinstalls' in self.runtime_meta_info:
logger.info("using serializer with meta-supplied preinstalls")
self.serializer = serialize.SerializeIndependent(self.runtime_meta_info['preinstalls'])
else:
self.serializer = serialize.SerializeIndependent()
def put_data(self, s3_data_key, data_str,
def put_data(self, data_key, data_str,
callset_id, call_id):
# put on s3 -- FIXME right now this takes 2x as long
self.s3client.put_object(Bucket = s3_data_key[0],
Key = s3_data_key[1],
Body = data_str)
self.storage.put_data(data_key, data_str)
logger.info("call_async {} {} data upload complete {}".format(callset_id, call_id,
data_key))
logger.info("call_async {} {} s3 upload complete {}".format(callset_id, call_id, s3_data_key))
def invoke_with_keys(self, s3_func_key, s3_data_key, s3_output_key,
s3_status_key,
def invoke_with_keys(self, func_key, data_key, output_key,
status_key,
callset_id, call_id, extra_env,
extra_meta, data_byte_range, use_cached_runtime,
host_job_meta, job_max_runtime,
overwrite_invoke_args = None):
# Pick a runtime url if we have shards. If not the handler will construct it
# using s3_bucket and s3_key
# Pick a runtime url if we have shards.
# If not the handler will construct it
runtime_url = ""
if ('urls' in self.runtime_meta_info and
isinstance(self.runtime_meta_info['urls'], list) and
@@ -85,18 +77,17 @@ def invoke_with_keys(self, s3_func_key, s3_data_key, s3_output_key,
random.seed()
runtime_url = random.choice(self.runtime_meta_info['urls'])
arg_dict = {'func_key' : s3_func_key,
'data_key' : s3_data_key,
'output_key' : s3_output_key,
'status_key' : s3_status_key,
arg_dict = {'storage_config' : self.storage.get_storage_config(),
'func_key' : func_key,
'data_key' : data_key,
'output_key' : output_key,
'status_key' : status_key,
'callset_id': callset_id,
'job_max_runtime' : job_max_runtime,
'data_byte_range' : data_byte_range,
'call_id' : call_id,
'use_cached_runtime' : use_cached_runtime,
'runtime_s3_bucket' : self.runtime_bucket,
'runtime_s3_key' : self.runtime_key,
'runtime' : self.config['runtime'],
'pywren_version' : version.__version__,
'runtime_url' : runtime_url }
@@ -135,9 +126,8 @@ def invoke_with_keys(self, s3_func_key, s3_data_key, s3_output_key,
host_job_meta.update(arg_dict)
fut = ResponseFuture(call_id, callset_id, host_job_meta,
self.s3_bucket, self.s3_prefix,
self.aws_region)
storage_path = storage_utils.get_storage_path(self.storage_config)

This comment has been minimized.

@shivaram

shivaram Jun 30, 2017

Collaborator

nit: This can probably be done at the beginning where we construct storage_config ?

fut = ResponseFuture(call_id, callset_id, host_job_meta, storage_path)
fut._set_state(JobState.invoked)
@@ -162,7 +152,7 @@ def map(self, func, iterdata, extra_env = None, extra_meta = None,
"""
# FIXME work with an actual iterable instead of just a list
data_all_as_one : upload the data as a single s3 object; fewer
data_all_as_one : upload the data as a single object; fewer
tcp transactions (good) but potentially higher latency for workers (bad)
use_cached_runtime : if runtime has been cached, use that. When set
@@ -176,27 +166,25 @@ def map(self, func, iterdata, extra_env = None, extra_meta = None,
host_job_meta = {}
pool = ThreadPool(invoke_pool_threads)
callset_id = s3util.create_callset_id()
callset_id = wrenutil.create_callset_id()
data = list(iterdata)

This comment has been minimized.

@shivaram

shivaram Jun 30, 2017

Collaborator

Strange that this shows up in the diff - how was it working before without this line ?

This comment has been minimized.

@apengwin

apengwin Jul 1, 2017

Contributor

s3util.create_callset_id is just a wrapper around wrenutil.create_callset_id

This comment has been minimized.

@apengwin

apengwin Jul 1, 2017

Contributor

The data=list(iterdata) line does appear twice here. @ooq ?

screen shot 2017-06-30 at 11 00 16 pm

### pickle func and all data (to capture module dependencies
func_and_data_ser, mod_paths = self.serializer([func] + data)
func_str = func_and_data_ser[0]
data_strs = func_and_data_ser[1:]
data_size_bytes = sum(len(x) for x in data_strs)
s3_agg_data_key = None
host_job_meta['aggregated_data_in_s3'] = False
agg_data_key = None
host_job_meta['agg_data'] = False
host_job_meta['data_size_bytes'] = data_size_bytes
if data_size_bytes < wrenconfig.MAX_AGG_DATA_SIZE and data_all_as_one:
s3_agg_data_key = s3util.create_agg_data_key(self.s3_bucket,
self.s3_prefix, callset_id)
agg_data_key = storage_utils.create_agg_data_key(self.storage.prefix, callset_id)
agg_data_bytes, agg_data_ranges = self.agg_data(data_strs)
agg_upload_time = time.time()
self.s3client.put_object(Bucket = s3_agg_data_key[0],
Key = s3_agg_data_key[1],
Body = agg_data_bytes)
host_job_meta['agg_data_in_s3'] = True
self.storage.put_data(agg_data_key, agg_data_bytes)
host_job_meta['agg_data'] = True
host_job_meta['data_upload_time'] = time.time() - agg_upload_time
host_job_meta['data_upload_timestamp'] = time.time()
else:
@@ -217,38 +205,33 @@ def map(self, func, iterdata, extra_env = None, extra_meta = None,
host_job_meta['func_module_str_len'] = len(func_module_str)
func_upload_time = time.time()
s3_func_key = s3util.create_func_key(self.s3_bucket, self.s3_prefix,
callset_id)
self.s3client.put_object(Bucket = s3_func_key[0],
Key = s3_func_key[1],
Body = func_module_str)
func_key = create_func_key(self.storage.prefix, callset_id)
self.storage.put_func(func_key, func_module_str)
host_job_meta['func_upload_time'] = time.time() - func_upload_time
host_job_meta['func_upload_timestamp'] = time.time()
def invoke(data_str, callset_id, call_id, s3_func_key,
def invoke(data_str, callset_id, call_id, func_key,
host_job_meta,
s3_agg_data_key = None, data_byte_range=None ):
s3_data_key, s3_output_key, s3_status_key \
= s3util.create_keys(self.s3_bucket,
self.s3_prefix,
callset_id, call_id)
agg_data_key = None, data_byte_range=None ):
data_key, output_key, status_key \
= storage_utils.create_keys(self.storage.prefix, callset_id, call_id)
host_job_meta['job_invoke_timestamp'] = time.time()
if s3_agg_data_key is None:
if agg_data_key is None:
data_upload_time = time.time()
self.put_data(s3_data_key, data_str,
self.put_data(data_key, data_str,
callset_id, call_id)
data_upload_time = time.time() - data_upload_time
host_job_meta['data_upload_time'] = data_upload_time
host_job_meta['data_upload_timestamp'] = time.time()
data_key = s3_data_key
data_key = data_key
else:
data_key = s3_agg_data_key
data_key = agg_data_key
return self.invoke_with_keys(s3_func_key, data_key,
s3_output_key,
s3_status_key,
return self.invoke_with_keys(func_key, data_key,
output_key,
status_key,
callset_id, call_id, extra_env,
extra_meta, data_byte_range,
use_cached_runtime, host_job_meta.copy(),
@@ -261,13 +244,13 @@ def invoke(data_str, callset_id, call_id, s3_func_key,
call_id = "{:05d}".format(i)
data_byte_range = None
if s3_agg_data_key is not None:
if agg_data_key is not None:
data_byte_range = agg_data_ranges[i]
cb = pool.apply_async(invoke, (data_strs[i], callset_id,
call_id, s3_func_key,
call_id, func_key,
host_job_meta.copy(),
s3_agg_data_key,
agg_data_key,
data_byte_range))
logger.info("map {} {} apply async".format(callset_id, call_id))
@@ -308,7 +291,7 @@ def reduce_func(fut_list):
def get_logs(self, future, verbose=True):
logclient = boto3.client('logs', region_name=self.aws_region)
logclient = boto3.client('logs', region_name=self.config['account']['aws_region'])
log_group_name = future.run_status['log_group_name']
Copy path View file
@@ -14,8 +14,8 @@
import enum
from multiprocessing.pool import ThreadPool
import time
from pywren import s3util
from pywren.executor import *
from pywren.storage import storage, storage_utils
import logging
import botocore
import glob2
@@ -38,20 +38,18 @@ class ResponseFuture(object):
"""
"""
GET_RESULT_SLEEP_SECS = 4
def __init__(self, call_id, callset_id, invoke_metadata,
s3_bucket, s3_prefix, aws_region):
def __init__(self, call_id, callset_id, invoke_metadata, storage_path):
self.call_id = call_id
self.callset_id = callset_id
self._state = JobState.new
self.s3_bucket = s3_bucket
self.s3_prefix = s3_prefix
self.aws_region = aws_region
self._invoke_metadata = invoke_metadata.copy()
self.status_query_count = 0
self.storage_path = storage_path
def _set_state(self, new_state):
## FIXME add state machine
self._state = new_state
@@ -72,8 +70,7 @@ def done(self):
return False
return True
def result(self, timeout=None, check_only=False, throw_except=True):
def result(self, timeout=None, check_only=False, throw_except=True, storage_handler=None):
"""
@@ -103,10 +100,14 @@ def result(self, timeout=None, check_only=False, throw_except=True):
else:
return None
if storage_handler is None:

This comment has been minimized.

@shivaram

shivaram Apr 17, 2017

Collaborator

Did we reach consensus on what the model we want to support is ? There was the other option of passing in the storage_handler while constructing the future that I think avoids some of the failure scenarios ?

This comment has been minimized.

@ooq

ooq Apr 17, 2017

Collaborator

Yes, we (Eric and I) agreed that we want futures to be serializable. Storing storage handler as a field of future will make it un-serializable, my previous solution was to have a save() method in which we first set future.storage_handler=None to make future serializable. We then agree that having pickle.dump(futures) to work is also important.

storage_config = wrenconfig.extract_storage_config(wrenconfig.default())
storage_handler = storage.Storage(storage_config)
storage_utils.check_storage_path(storage_handler.get_storage_config(), self.storage_path)

This comment has been minimized.

@shivaram

shivaram Jun 19, 2017

Collaborator

Nice !

call_status = s3util.get_call_status(self.callset_id, self.call_id,
AWS_S3_BUCKET = self.s3_bucket,
AWS_S3_PREFIX = self.s3_prefix)
call_status = storage_handler.get_call_status(self.callset_id, self.call_id)
self.status_query_count += 1
@@ -119,9 +120,7 @@ def result(self, timeout=None, check_only=False, throw_except=True):
while call_status is None:
time.sleep(self.GET_RESULT_SLEEP_SECS)
call_status = s3util.get_call_status(self.callset_id, self.call_id,
AWS_S3_BUCKET = self.s3_bucket,
AWS_S3_PREFIX = self.s3_prefix)
call_status = storage_handler.get_call_status(self.callset_id, self.call_id)
self.status_query_count += 1
self._invoke_metadata['status_done_timestamp'] = time.time()
@@ -151,10 +150,9 @@ def result(self, timeout=None, check_only=False, throw_except=True):
return None
call_output_time = time.time()
call_invoker_result = pickle.loads(s3util.get_call_output(self.callset_id,
self.call_id,
AWS_S3_BUCKET = self.s3_bucket,
AWS_S3_PREFIX = self.s3_prefix))
call_invoker_result = pickle.loads(storage_handler.get_call_output(
self.callset_id, self.call_id))
call_output_time_done = time.time()
self._invoke_metadata['download_output_time'] = call_output_time_done - call_output_time
Copy path View file
@@ -2,24 +2,26 @@
import json
import sys
def get_runtime_info(bucket, key):
import pywren.storage as storage
def get_runtime_info(runtime_config):
"""
Download runtime information from S3 at deserialize
Download runtime information from storage at deserialize
"""
s3 = boto3.resource('s3')
runtime_meta = storage.get_runtime_info(runtime_config)
runtime_meta_key = key.replace(".tar.gz", ".meta.json")
json_str = s3.meta.client.get_object(Bucket=bucket, Key=runtime_meta_key)['Body'].read()
runtime_meta = json.loads(json_str.decode("ascii"))
if not runtime_valid(runtime_meta):
raise Exception(("The indicated runtime: {} "
+ "is not approprite for this python version.")
.format(runtime_config))
return runtime_meta
def version_str(version_info):
return "{}.{}".format(version_info[0], version_info[1])
def runtime_key_valid(runtime_meta):
return runtime_valid(runtime_meta)
def runtime_valid(runtime_meta):
"""
Oops, something went wrong.
ProTip! Use n and p to navigate between commits in a pull request.