Skip to content

Commit

Permalink
Merge pull request #119 from pywren/refactor_storage
Browse files Browse the repository at this point in the history
[Issue#108] Refactor Storage API
  • Loading branch information
ooq committed Jul 2, 2017
2 parents 3d14c30 + aa8906b commit 1fe6c9b
Show file tree
Hide file tree
Showing 18 changed files with 455 additions and 252 deletions.
120 changes: 51 additions & 69 deletions pywren/executor.py
Expand Up @@ -2,6 +2,9 @@

import boto3
import json

from pywren.storage.storage_utils import create_func_key

try:
from six.moves import cPickle as pickle
except:
Expand All @@ -14,15 +17,16 @@
import glob2
import os

import pywren.s3util as s3util
import pywren.version as version
import pywren.wrenconfig as wrenconfig
import pywren.wrenutil as wrenutil
import pywren.runtime as runtime
import pywren.storage as storage
from pywren.serialize import cloudpickle, serialize
from pywren.serialize import create_mod_data
from pywren.future import ResponseFuture, JobState
from pywren.wait import *
from pywren.storage import storage_utils

logger = logging.getLogger(__name__)

Expand All @@ -32,50 +36,38 @@ class Executor(object):
Theoretically will allow for cross-AZ invocations
"""

def __init__(self, aws_region, s3_bucket, s3_prefix,
invoker, runtime_s3_bucket, runtime_s3_key, job_max_runtime):
self.aws_region = aws_region
self.s3_bucket = s3_bucket
self.s3_prefix = s3_prefix

self.session = botocore.session.get_session()
def __init__(self, invoker, config, job_max_runtime):
self.invoker = invoker
self.s3client = self.session.create_client('s3', region_name = aws_region)
self.job_max_runtime = job_max_runtime

self.runtime_bucket = runtime_s3_bucket
self.runtime_key = runtime_s3_key
self.runtime_meta_info = runtime.get_runtime_info(runtime_s3_bucket, runtime_s3_key)
if not runtime.runtime_key_valid(self.runtime_meta_info):
raise Exception("The indicated runtime: s3://{}/{} is not approprite for this python version".format(runtime_s3_bucket, runtime_s3_key))
self.config = config
self.storage_config = wrenconfig.extract_storage_config(self.config)
self.storage = storage.Storage(self.storage_config)
self.runtime_meta_info = runtime.get_runtime_info(config['runtime'])


if 'preinstalls' in self.runtime_meta_info:
logger.info("using serializer with meta-supplied preinstalls")
self.serializer = serialize.SerializeIndependent(self.runtime_meta_info['preinstalls'])
else:
self.serializer = serialize.SerializeIndependent()

def put_data(self, s3_data_key, data_str,
def put_data(self, data_key, data_str,
callset_id, call_id):

# put on s3 -- FIXME right now this takes 2x as long
self.storage.put_data(data_key, data_str)
logger.info("call_async {} {} data upload complete {}".format(callset_id, call_id,
data_key))

self.s3client.put_object(Bucket = s3_data_key[0],
Key = s3_data_key[1],
Body = data_str)

logger.info("call_async {} {} s3 upload complete {}".format(callset_id, call_id, s3_data_key))


def invoke_with_keys(self, s3_func_key, s3_data_key, s3_output_key,
s3_status_key,
def invoke_with_keys(self, func_key, data_key, output_key,
status_key,
callset_id, call_id, extra_env,
extra_meta, data_byte_range, use_cached_runtime,
host_job_meta, job_max_runtime,
overwrite_invoke_args = None):

# Pick a runtime url if we have shards. If not the handler will construct it
# using s3_bucket and s3_key
# Pick a runtime url if we have shards.
# If not the handler will construct it
runtime_url = ""
if ('urls' in self.runtime_meta_info and
isinstance(self.runtime_meta_info['urls'], list) and
Expand All @@ -85,18 +77,17 @@ def invoke_with_keys(self, s3_func_key, s3_data_key, s3_output_key,
random.seed()
runtime_url = random.choice(self.runtime_meta_info['urls'])


arg_dict = {'func_key' : s3_func_key,
'data_key' : s3_data_key,
'output_key' : s3_output_key,
'status_key' : s3_status_key,
arg_dict = {'storage_config' : self.storage.get_storage_config(),
'func_key' : func_key,
'data_key' : data_key,
'output_key' : output_key,
'status_key' : status_key,
'callset_id': callset_id,
'job_max_runtime' : job_max_runtime,
'data_byte_range' : data_byte_range,
'call_id' : call_id,
'use_cached_runtime' : use_cached_runtime,
'runtime_s3_bucket' : self.runtime_bucket,
'runtime_s3_key' : self.runtime_key,
'runtime' : self.config['runtime'],
'pywren_version' : version.__version__,
'runtime_url' : runtime_url }

Expand Down Expand Up @@ -135,9 +126,8 @@ def invoke_with_keys(self, s3_func_key, s3_data_key, s3_output_key,

host_job_meta.update(arg_dict)

fut = ResponseFuture(call_id, callset_id, host_job_meta,
self.s3_bucket, self.s3_prefix,
self.aws_region)
storage_path = storage_utils.get_storage_path(self.storage_config)
fut = ResponseFuture(call_id, callset_id, host_job_meta, storage_path)

fut._set_state(JobState.invoked)

Expand All @@ -162,7 +152,7 @@ def map(self, func, iterdata, extra_env = None, extra_meta = None,
"""
# FIXME work with an actual iterable instead of just a list
data_all_as_one : upload the data as a single s3 object; fewer
data_all_as_one : upload the data as a single object; fewer
tcp transactions (good) but potentially higher latency for workers (bad)
use_cached_runtime : if runtime has been cached, use that. When set
Expand All @@ -176,27 +166,24 @@ def map(self, func, iterdata, extra_env = None, extra_meta = None,
host_job_meta = {}

pool = ThreadPool(invoke_pool_threads)
callset_id = s3util.create_callset_id()
callset_id = wrenutil.create_callset_id()

### pickle func and all data (to capture module dependencies
func_and_data_ser, mod_paths = self.serializer([func] + data)

func_str = func_and_data_ser[0]
data_strs = func_and_data_ser[1:]
data_size_bytes = sum(len(x) for x in data_strs)
s3_agg_data_key = None
host_job_meta['aggregated_data_in_s3'] = False
agg_data_key = None
host_job_meta['agg_data'] = False
host_job_meta['data_size_bytes'] = data_size_bytes

if data_size_bytes < wrenconfig.MAX_AGG_DATA_SIZE and data_all_as_one:
s3_agg_data_key = s3util.create_agg_data_key(self.s3_bucket,
self.s3_prefix, callset_id)
agg_data_key = storage_utils.create_agg_data_key(self.storage.prefix, callset_id)
agg_data_bytes, agg_data_ranges = self.agg_data(data_strs)
agg_upload_time = time.time()
self.s3client.put_object(Bucket = s3_agg_data_key[0],
Key = s3_agg_data_key[1],
Body = agg_data_bytes)
host_job_meta['agg_data_in_s3'] = True
self.storage.put_data(agg_data_key, agg_data_bytes)
host_job_meta['agg_data'] = True
host_job_meta['data_upload_time'] = time.time() - agg_upload_time
host_job_meta['data_upload_timestamp'] = time.time()
else:
Expand All @@ -217,38 +204,33 @@ def map(self, func, iterdata, extra_env = None, extra_meta = None,
host_job_meta['func_module_str_len'] = len(func_module_str)

func_upload_time = time.time()
s3_func_key = s3util.create_func_key(self.s3_bucket, self.s3_prefix,
callset_id)
self.s3client.put_object(Bucket = s3_func_key[0],
Key = s3_func_key[1],
Body = func_module_str)
func_key = create_func_key(self.storage.prefix, callset_id)
self.storage.put_func(func_key, func_module_str)
host_job_meta['func_upload_time'] = time.time() - func_upload_time
host_job_meta['func_upload_timestamp'] = time.time()
def invoke(data_str, callset_id, call_id, s3_func_key,
def invoke(data_str, callset_id, call_id, func_key,
host_job_meta,
s3_agg_data_key = None, data_byte_range=None ):
s3_data_key, s3_output_key, s3_status_key \
= s3util.create_keys(self.s3_bucket,
self.s3_prefix,
callset_id, call_id)
agg_data_key = None, data_byte_range=None ):
data_key, output_key, status_key \
= storage_utils.create_keys(self.storage.prefix, callset_id, call_id)

host_job_meta['job_invoke_timestamp'] = time.time()

if s3_agg_data_key is None:
if agg_data_key is None:
data_upload_time = time.time()
self.put_data(s3_data_key, data_str,
self.put_data(data_key, data_str,
callset_id, call_id)
data_upload_time = time.time() - data_upload_time
host_job_meta['data_upload_time'] = data_upload_time
host_job_meta['data_upload_timestamp'] = time.time()

data_key = s3_data_key
data_key = data_key
else:
data_key = s3_agg_data_key
data_key = agg_data_key

return self.invoke_with_keys(s3_func_key, data_key,
s3_output_key,
s3_status_key,
return self.invoke_with_keys(func_key, data_key,
output_key,
status_key,
callset_id, call_id, extra_env,
extra_meta, data_byte_range,
use_cached_runtime, host_job_meta.copy(),
Expand All @@ -261,13 +243,13 @@ def invoke(data_str, callset_id, call_id, s3_func_key,
call_id = "{:05d}".format(i)

data_byte_range = None
if s3_agg_data_key is not None:
if agg_data_key is not None:
data_byte_range = agg_data_ranges[i]

cb = pool.apply_async(invoke, (data_strs[i], callset_id,
call_id, s3_func_key,
call_id, func_key,
host_job_meta.copy(),
s3_agg_data_key,
agg_data_key,
data_byte_range))

logger.info("map {} {} apply async".format(callset_id, call_id))
Expand Down Expand Up @@ -308,7 +290,7 @@ def reduce_func(fut_list):
def get_logs(self, future, verbose=True):


logclient = boto3.client('logs', region_name=self.aws_region)
logclient = boto3.client('logs', region_name=self.config['account']['aws_region'])


log_group_name = future.run_status['log_group_name']
Expand Down
34 changes: 16 additions & 18 deletions pywren/future.py
Expand Up @@ -14,8 +14,8 @@
import enum
from multiprocessing.pool import ThreadPool
import time
from pywren import s3util
from pywren.executor import *
from pywren.storage import storage, storage_utils
import logging
import botocore
import glob2
Expand All @@ -38,20 +38,18 @@ class ResponseFuture(object):
"""
"""
GET_RESULT_SLEEP_SECS = 4
def __init__(self, call_id, callset_id, invoke_metadata,
s3_bucket, s3_prefix, aws_region):
def __init__(self, call_id, callset_id, invoke_metadata, storage_path):

self.call_id = call_id
self.callset_id = callset_id
self._state = JobState.new
self.s3_bucket = s3_bucket
self.s3_prefix = s3_prefix
self.aws_region = aws_region

self._invoke_metadata = invoke_metadata.copy()

self.status_query_count = 0

self.storage_path = storage_path

def _set_state(self, new_state):
## FIXME add state machine
self._state = new_state
Expand All @@ -72,8 +70,7 @@ def done(self):
return False
return True


def result(self, timeout=None, check_only=False, throw_except=True):
def result(self, timeout=None, check_only=False, throw_except=True, storage_handler=None):
"""
Expand Down Expand Up @@ -103,10 +100,14 @@ def result(self, timeout=None, check_only=False, throw_except=True):
else:
return None

if storage_handler is None:
storage_config = wrenconfig.extract_storage_config(wrenconfig.default())
storage_handler = storage.Storage(storage_config)

storage_utils.check_storage_path(storage_handler.get_storage_config(), self.storage_path)

call_status = s3util.get_call_status(self.callset_id, self.call_id,
AWS_S3_BUCKET = self.s3_bucket,
AWS_S3_PREFIX = self.s3_prefix)

call_status = storage_handler.get_call_status(self.callset_id, self.call_id)

self.status_query_count += 1

Expand All @@ -119,9 +120,7 @@ def result(self, timeout=None, check_only=False, throw_except=True):

while call_status is None:
time.sleep(self.GET_RESULT_SLEEP_SECS)
call_status = s3util.get_call_status(self.callset_id, self.call_id,
AWS_S3_BUCKET = self.s3_bucket,
AWS_S3_PREFIX = self.s3_prefix)
call_status = storage_handler.get_call_status(self.callset_id, self.call_id)

self.status_query_count += 1
self._invoke_metadata['status_done_timestamp'] = time.time()
Expand Down Expand Up @@ -151,10 +150,9 @@ def result(self, timeout=None, check_only=False, throw_except=True):
return None

call_output_time = time.time()
call_invoker_result = pickle.loads(s3util.get_call_output(self.callset_id,
self.call_id,
AWS_S3_BUCKET = self.s3_bucket,
AWS_S3_PREFIX = self.s3_prefix))
call_invoker_result = pickle.loads(storage_handler.get_call_output(
self.callset_id, self.call_id))

call_output_time_done = time.time()
self._invoke_metadata['download_output_time'] = call_output_time_done - call_output_time

Expand Down
20 changes: 11 additions & 9 deletions pywren/runtime.py
Expand Up @@ -2,24 +2,26 @@
import json
import sys

def get_runtime_info(bucket, key):
import pywren.storage as storage


def get_runtime_info(runtime_config):
"""
Download runtime information from S3 at deserialize
Download runtime information from storage at deserialize
"""
s3 = boto3.resource('s3')
runtime_meta = storage.get_runtime_info(runtime_config)

runtime_meta_key = key.replace(".tar.gz", ".meta.json")

json_str = s3.meta.client.get_object(Bucket=bucket, Key=runtime_meta_key)['Body'].read()
runtime_meta = json.loads(json_str.decode("ascii"))
if not runtime_valid(runtime_meta):
raise Exception(("The indicated runtime: {} "
+ "is not approprite for this python version.")
.format(runtime_config))

return runtime_meta


def version_str(version_info):
return "{}.{}".format(version_info[0], version_info[1])

def runtime_key_valid(runtime_meta):
return runtime_valid(runtime_meta)

def runtime_valid(runtime_meta):
"""
Expand Down

0 comments on commit 1fe6c9b

Please sign in to comment.