From 2031d4964fb82c8769485789d3936dab96c173ff Mon Sep 17 00:00:00 2001 From: Allan Peng Date: Fri, 28 Jul 2017 01:18:53 -0700 Subject: [PATCH 01/15] Some fixes. More work later. --- pywren/wrenhandler.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pywren/wrenhandler.py b/pywren/wrenhandler.py index f99b4dc..0fe2509 100644 --- a/pywren/wrenhandler.py +++ b/pywren/wrenhandler.py @@ -9,6 +9,8 @@ import tarfile import time import traceback +import platform + from threading import Thread import boto3 @@ -112,10 +114,10 @@ def aws_lambda_handler(event, context): def get_server_info(): - server_info = {'uname' : subprocess.check_output("uname -a", shell=True).decode("ascii")} + server_info = {'uname' : " ".join(platform.uname(), + 'cpuinfo': platform.processor()} if os.path.exists("/proc"): - server_info.update({'/proc/cpuinfo': open("/proc/cpuinfo", 'r').read(), - '/proc/meminfo': open("/proc/meminfo", 'r').read(), + server_info.update({'/proc/meminfo': open("/proc/meminfo", 'r').read(), '/proc/self/cgroup': open("/proc/meminfo", 'r').read(), '/proc/cgroups': open("/proc/cgroups", 'r').read()}) @@ -181,8 +183,8 @@ def generic_handler(event, context_dict): KS = get_key_size(s3_client, s3_bucket, data_key) if not event['use_cached_runtime']: - subprocess.check_output("rm -Rf {}/*".format(RUNTIME_LOC), shell=True) - + shutil.rmtree(RUNTIME_LOC, True) + os.mkdir(RUNTIME_LOC) # get the input and save to disk # FIXME here is we where we would attach the "canceled" metadata @@ -231,8 +233,6 @@ def generic_handler(event, context_dict): fid.write(b64str_to_bytes(m_data)) fid.close() logger.info("Finished writing {} module files".format(len(d['module_data']))) - logger.debug(subprocess.check_output("find {}".format(PYTHON_MODULE_PATH), shell=True)) - logger.debug(subprocess.check_output("find {}".format(os.getcwd()), shell=True)) response_status['runtime_s3_key_used'] = runtime_s3_key_used response_status['runtime_s3_bucket_used'] = runtime_s3_bucket_used From f4a5567a0000d8c954fa96608fca7c0da1624ee9 Mon Sep 17 00:00:00 2001 From: Allan Peng Date: Fri, 28 Jul 2017 10:52:15 -0700 Subject: [PATCH 02/15] fix path variable, use platform when possible. /proc is weird. --- pywren/wrenhandler.py | 44 +++++++++++++++++++++++++++++-------------- 1 file changed, 30 insertions(+), 14 deletions(-) diff --git a/pywren/wrenhandler.py b/pywren/wrenhandler.py index 0fe2509..a35b67c 100644 --- a/pywren/wrenhandler.py +++ b/pywren/wrenhandler.py @@ -13,8 +13,6 @@ from threading import Thread -import boto3 -import botocore if sys.version_info > (3, 0): from queue import Queue, Empty @@ -25,10 +23,24 @@ from Queue import Queue, Empty import wrenutil import version +if sys.platform == 'win32': + TEMP = "D:\local\Temp" + PATH_DELIMETER = ";" + +elif sys.platform == 'linux': + TEMP = "/tmp" + PATH_DELIMETER = ":" + import boto3 + import botocore + +else: + raise NotImplementedError(("Using {} based cloud is not supported " + + "yet.").format(sys.platfrom)) + +PYTHON_MODULE_PATH = os.path.join(TEMP, "pymodules") +CONDA_RUNTIME_DIR = os.path.join(TEMP, "condaruntime") +RUNTIME_LOC = os.path.join(TEMP, "runtimes") -PYTHON_MODULE_PATH = "/tmp/pymodules" -CONDA_RUNTIME_DIR = "/tmp/condaruntime" -RUNTIME_LOC = "/tmp/runtimes" logger = logging.getLogger(__name__) @@ -116,12 +128,12 @@ def get_server_info(): server_info = {'uname' : " ".join(platform.uname(), 'cpuinfo': platform.processor()} + if os.path.exists("/proc"): server_info.update({'/proc/meminfo': open("/proc/meminfo", 'r').read(), '/proc/self/cgroup': open("/proc/meminfo", 'r').read(), '/proc/cgroups': open("/proc/cgroups", 'r').read()}) - return server_info def generic_handler(event, context_dict): @@ -155,9 +167,9 @@ def generic_handler(event, context_dict): start_time = time.time() response_status['start_time'] = start_time - func_filename = "/tmp/func.pickle" - data_filename = "/tmp/data.pickle" - output_filename = "/tmp/output.pickle" + func_filename = os.path.join(TEMP, "func.pickle") + data_filename = os.path.join(TEMP, "data.pickle" + output_filename = os.path.join(TEMP, "output.pickle") runtime_s3_bucket = event['runtime']['s3_bucket'] runtime_s3_key = event['runtime']['s3_key'] @@ -205,7 +217,7 @@ def generic_handler(event, context_dict): data_fid.close() data_download_time = time.time() - start_time - logger.info("data data download complete, took {:3.2f} sec".format(data_download_time)) + logger.info("data download complete, took {:3.2f} sec".format(data_download_time)) response_status['data_download_time'] = data_download_time # now split @@ -217,7 +229,12 @@ def generic_handler(event, context_dict): m_path = os.path.dirname(m_filename) if len(m_path) > 0 and m_path[0] == "/": + #wait but what if the client is using windows . m_path = m_path[1:] + + if sys.platform == 'win32': + m_path = os.path.join(*filter(lambda x: len(x) > 0, m_path.split("/"))) + to_make = os.path.join(PYTHON_MODULE_PATH, m_path) #print "to_make=", to_make, "m_path=", m_path try: @@ -228,7 +245,6 @@ def generic_handler(event, context_dict): else: raise e full_filename = os.path.join(to_make, os.path.basename(m_filename)) - #print "creating", full_filename fid = open(full_filename, 'wb') fid.write(b64str_to_bytes(m_data)) fid.close() @@ -246,14 +262,14 @@ def generic_handler(event, context_dict): jobrunner_path = os.path.join(cwd, "jobrunner.py") extra_env = event.get('extra_env', {}) - extra_env['PYTHONPATH'] = "{}:{}".format(os.getcwd(), PYTHON_MODULE_PATH) + extra_env['PYTHONPATH'] = "{}{}{}".format(os.getcwd(), PATH_DELIMETER, PYTHON_MODULE_PATH) call_id = event['call_id'] callset_id = event['callset_id'] response_status['call_id'] = call_id response_status['callset_id'] = callset_id - CONDA_PYTHON_PATH = "/tmp/condaruntime/bin" + CONDA_PYTHON_PATH = os.path.join(CONDA_RUNTIME_DIR, "bin") CONDA_PYTHON_RUNTIME = os.path.join(CONDA_PYTHON_PATH, "python") cmdstr = "{} {} {} {} {}".format(CONDA_PYTHON_RUNTIME, @@ -270,7 +286,7 @@ def generic_handler(event, context_dict): local_env["OMP_NUM_THREADS"] = "1" local_env.update(extra_env) - local_env['PATH'] = "{}:{}".format(CONDA_PYTHON_PATH, local_env.get("PATH", "")) + local_env['PATH'] = "{}{}{}".format(CONDA_PYTHON_PATH, PATH_DELIMETER, local_env.get("PATH", "")) logger.debug("command str=%s", cmdstr) # This is copied from http://stackoverflow.com/a/17698359/4577954 From 2a60b1c6cdbf1c669ef6066f5179ac7032ee97a0 Mon Sep 17 00:00:00 2001 From: Allan Peng Date: Fri, 28 Jul 2017 01:18:53 -0700 Subject: [PATCH 03/15] Some fixes. More work later. --- pywren/wrenhandler.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pywren/wrenhandler.py b/pywren/wrenhandler.py index 631651f..2ef8d7e 100644 --- a/pywren/wrenhandler.py +++ b/pywren/wrenhandler.py @@ -9,6 +9,8 @@ import tarfile import time import traceback +import platform + from threading import Thread import boto3 @@ -112,10 +114,10 @@ def aws_lambda_handler(event, context): def get_server_info(): - server_info = {'uname' : subprocess.check_output("uname -a", shell=True).decode("ascii")} + server_info = {'uname' : " ".join(platform.uname(), + 'cpuinfo': platform.processor()} if os.path.exists("/proc"): - server_info.update({'/proc/cpuinfo': open("/proc/cpuinfo", 'r').read(), - '/proc/meminfo': open("/proc/meminfo", 'r').read(), + server_info.update({'/proc/meminfo': open("/proc/meminfo", 'r').read(), '/proc/self/cgroup': open("/proc/meminfo", 'r').read(), '/proc/cgroups': open("/proc/cgroups", 'r').read()}) @@ -181,8 +183,8 @@ def generic_handler(event, context_dict): KS = get_key_size(s3_client, s3_bucket, data_key) if not event['use_cached_runtime']: - subprocess.check_output("rm -Rf {}/*".format(RUNTIME_LOC), shell=True) - + shutil.rmtree(RUNTIME_LOC, True) + os.mkdir(RUNTIME_LOC) # get the input and save to disk # FIXME here is we where we would attach the "canceled" metadata @@ -231,8 +233,6 @@ def generic_handler(event, context_dict): fid.write(b64str_to_bytes(m_data)) fid.close() logger.info("Finished writing {} module files".format(len(d['module_data']))) - logger.debug(subprocess.check_output("find {}".format(PYTHON_MODULE_PATH), shell=True)) - logger.debug(subprocess.check_output("find {}".format(os.getcwd()), shell=True)) response_status['runtime_s3_key_used'] = runtime_s3_key_used response_status['runtime_s3_bucket_used'] = runtime_s3_bucket_used From cf3fa21e217d3fd7a1ebd55101860a459fb65f10 Mon Sep 17 00:00:00 2001 From: Allan Peng Date: Fri, 28 Jul 2017 10:52:15 -0700 Subject: [PATCH 04/15] fix path variable, use platform when possible. /proc is weird. --- pywren/wrenhandler.py | 45 +++++++++++++++++++++++++++++-------------- 1 file changed, 31 insertions(+), 14 deletions(-) diff --git a/pywren/wrenhandler.py b/pywren/wrenhandler.py index 2ef8d7e..14fcd43 100644 --- a/pywren/wrenhandler.py +++ b/pywren/wrenhandler.py @@ -13,8 +13,6 @@ from threading import Thread -import boto3 -import botocore if sys.version_info > (3, 0): from queue import Queue, Empty # pylint: disable=import-error @@ -26,9 +24,24 @@ import wrenutil # pylint: disable=relative-import import version # pylint: disable=relative-import -PYTHON_MODULE_PATH = "/tmp/pymodules" -CONDA_RUNTIME_DIR = "/tmp/condaruntime" -RUNTIME_LOC = "/tmp/runtimes" +if sys.platform == 'win32': + TEMP = "D:\local\Temp" + PATH_DELIMETER = ";" + +elif sys.platform == 'linux': + TEMP = "/tmp" + PATH_DELIMETER = ":" + import boto3 + import botocore + +else: + raise NotImplementedError(("Using {} based cloud is not supported " + + "yet.").format(sys.platfrom)) + +PYTHON_MODULE_PATH = os.path.join(TEMP, "pymodules") +CONDA_RUNTIME_DIR = os.path.join(TEMP, "condaruntime") +RUNTIME_LOC = os.path.join(TEMP, "runtimes") + logger = logging.getLogger(__name__) @@ -116,12 +129,12 @@ def get_server_info(): server_info = {'uname' : " ".join(platform.uname(), 'cpuinfo': platform.processor()} + if os.path.exists("/proc"): server_info.update({'/proc/meminfo': open("/proc/meminfo", 'r').read(), '/proc/self/cgroup': open("/proc/meminfo", 'r').read(), '/proc/cgroups': open("/proc/cgroups", 'r').read()}) - return server_info def generic_handler(event, context_dict): @@ -155,9 +168,9 @@ def generic_handler(event, context_dict): start_time = time.time() response_status['start_time'] = start_time - func_filename = "/tmp/func.pickle" - data_filename = "/tmp/data.pickle" - output_filename = "/tmp/output.pickle" + func_filename = os.path.join(TEMP, "func.pickle") + data_filename = os.path.join(TEMP, "data.pickle" + output_filename = os.path.join(TEMP, "output.pickle") runtime_s3_bucket = event['runtime']['s3_bucket'] runtime_s3_key = event['runtime']['s3_key'] @@ -205,7 +218,7 @@ def generic_handler(event, context_dict): data_fid.close() data_download_time = time.time() - start_time - logger.info("data data download complete, took {:3.2f} sec".format(data_download_time)) + logger.info("data download complete, took {:3.2f} sec".format(data_download_time)) response_status['data_download_time'] = data_download_time # now split @@ -217,7 +230,12 @@ def generic_handler(event, context_dict): m_path = os.path.dirname(m_filename) if len(m_path) > 0 and m_path[0] == "/": + #wait but what if the client is using windows . m_path = m_path[1:] + + if sys.platform == 'win32': + m_path = os.path.join(*filter(lambda x: len(x) > 0, m_path.split("/"))) + to_make = os.path.join(PYTHON_MODULE_PATH, m_path) #print "to_make=", to_make, "m_path=", m_path try: @@ -228,7 +246,6 @@ def generic_handler(event, context_dict): else: raise e full_filename = os.path.join(to_make, os.path.basename(m_filename)) - #print "creating", full_filename fid = open(full_filename, 'wb') fid.write(b64str_to_bytes(m_data)) fid.close() @@ -246,14 +263,14 @@ def generic_handler(event, context_dict): jobrunner_path = os.path.join(cwd, "jobrunner.py") extra_env = event.get('extra_env', {}) - extra_env['PYTHONPATH'] = "{}:{}".format(os.getcwd(), PYTHON_MODULE_PATH) + extra_env['PYTHONPATH'] = "{}{}{}".format(os.getcwd(), PATH_DELIMETER, PYTHON_MODULE_PATH) call_id = event['call_id'] callset_id = event['callset_id'] response_status['call_id'] = call_id response_status['callset_id'] = callset_id - CONDA_PYTHON_PATH = "/tmp/condaruntime/bin" + CONDA_PYTHON_PATH = os.path.join(CONDA_RUNTIME_DIR, "bin") CONDA_PYTHON_RUNTIME = os.path.join(CONDA_PYTHON_PATH, "python") cmdstr = "{} {} {} {} {}".format(CONDA_PYTHON_RUNTIME, @@ -270,7 +287,7 @@ def generic_handler(event, context_dict): local_env["OMP_NUM_THREADS"] = "1" local_env.update(extra_env) - local_env['PATH'] = "{}:{}".format(CONDA_PYTHON_PATH, local_env.get("PATH", "")) + local_env['PATH'] = "{}{}{}".format(CONDA_PYTHON_PATH, PATH_DELIMETER, local_env.get("PATH", "")) logger.debug("command str=%s", cmdstr) # This is copied from http://stackoverflow.com/a/17698359/4577954 From a0c36324ba410c80d560ef50e74fda5ebd25927f Mon Sep 17 00:00:00 2001 From: Allan Peng Date: Fri, 28 Jul 2017 14:50:59 -0700 Subject: [PATCH 05/15] always get server info. --- pywren/wrenhandler.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pywren/wrenhandler.py b/pywren/wrenhandler.py index 14fcd43..38fcccb 100644 --- a/pywren/wrenhandler.py +++ b/pywren/wrenhandler.py @@ -334,18 +334,17 @@ def consume_stdout(stdout, queue): response_status['stdout'] = stdout.decode("ascii") - response_status['exec_time'] = time.time() - setup_time response_status['end_time'] = end_time response_status['host_submit_time'] = event['host_submit_time'] - response_status['server_info'] = get_server_info() response_status.update(context_dict) except Exception as e: # internal runtime exceptions response_status['exception'] = str(e) response_status['exception_args'] = e.args + response_status['server_info'] = get_server_info() response_status['exception_traceback'] = traceback.format_exc() finally: # creating new client in case the client has not been created From a9b295a3fa61c3307d9e55d5eec449d47d2b3f45 Mon Sep 17 00:00:00 2001 From: Allan Peng Date: Wed, 9 Aug 2017 11:14:43 -0700 Subject: [PATCH 06/15] os.setsid doesn't work in windows --- pywren/wrenhandler.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pywren/wrenhandler.py b/pywren/wrenhandler.py index 38fcccb..651dba8 100644 --- a/pywren/wrenhandler.py +++ b/pywren/wrenhandler.py @@ -249,8 +249,8 @@ def generic_handler(event, context_dict): fid = open(full_filename, 'wb') fid.write(b64str_to_bytes(m_data)) fid.close() - logger.info("Finished writing {} module files".format(len(d['module_data']))) + logger.info("Finished writing {} module files".format(len(d['module_data']))) response_status['runtime_s3_key_used'] = runtime_s3_key_used response_status['runtime_s3_bucket_used'] = runtime_s3_bucket_used @@ -292,6 +292,8 @@ def generic_handler(event, context_dict): logger.debug("command str=%s", cmdstr) # This is copied from http://stackoverflow.com/a/17698359/4577954 # reasons for setting process group: http://stackoverflow.com/a/4791612 + + # os.setsid doesn't work in windows process = subprocess.Popen(cmdstr, shell=True, env=local_env, bufsize=1, stdout=subprocess.PIPE, preexec_fn=os.setsid) From 2f9a7335e196db4e21ac1d2d4abbab6c599d50f5 Mon Sep 17 00:00:00 2001 From: Allan Peng Date: Wed, 9 Aug 2017 11:27:21 -0700 Subject: [PATCH 07/15] change runtime_s3 variable namesto generic runtime variable --- pywren/wrenhandler.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/pywren/wrenhandler.py b/pywren/wrenhandler.py index 651dba8..54892a2 100644 --- a/pywren/wrenhandler.py +++ b/pywren/wrenhandler.py @@ -57,7 +57,7 @@ def get_key_size(s3client, bucket, key): else: raise e -def download_runtime_if_necessary(s3_client, runtime_s3_bucket, runtime_s3_key): +def download_runtime_if_necessary(s3_client, runtime_bucket, runtime_key): """ Download the runtime if necessary @@ -66,8 +66,8 @@ def download_runtime_if_necessary(s3_client, runtime_s3_bucket, runtime_s3_key): """ # get runtime etag - runtime_meta = s3_client.head_object(Bucket=runtime_s3_bucket, - Key=runtime_s3_key) + runtime_meta = s3_client.head_object(Bucket=runtime_bucket, + Key=runtime_key) # etags have strings (double quotes) on each end, so we strip those ETag = str(runtime_meta['ETag'])[1:-1] logger.debug("The etag is ={}".format(ETag)) @@ -96,8 +96,8 @@ def download_runtime_if_necessary(s3_client, runtime_s3_bucket, runtime_s3_key): os.makedirs(runtime_etag_dir) - res = s3_client.get_object(Bucket=runtime_s3_bucket, - Key=runtime_s3_key) + res = s3_client.get_object(Bucket=runtime_bucket, + Key=runtime_key) condatar = tarfile.open( mode="r:gz", @@ -172,15 +172,15 @@ def generic_handler(event, context_dict): data_filename = os.path.join(TEMP, "data.pickle" output_filename = os.path.join(TEMP, "output.pickle") - runtime_s3_bucket = event['runtime']['s3_bucket'] - runtime_s3_key = event['runtime']['s3_key'] + runtime_bucket = event['runtime']['s3_bucket'] + runtime_key = event['runtime']['s3_key'] if event.get('runtime_url'): # NOTE(shivaram): Right now we only support S3 urls. - runtime_s3_bucket_used, runtime_s3_key_used = wrenutil.split_s3_url( + runtime_bucket_used, runtime_key_used = wrenutil.split_s3_url( event['runtime_url']) else: - runtime_s3_bucket_used = runtime_s3_bucket - runtime_s3_key_used = runtime_s3_key + runtime_bucket_used = runtime_bucket + runtime_key_used = runtime_key job_max_runtime = event.get("job_max_runtime", 290) # default for lambda @@ -251,11 +251,11 @@ def generic_handler(event, context_dict): fid.close() logger.info("Finished writing {} module files".format(len(d['module_data']))) - response_status['runtime_s3_key_used'] = runtime_s3_key_used - response_status['runtime_s3_bucket_used'] = runtime_s3_bucket_used + response_status['runtime_key_used'] = runtime_key_used + response_status['runtime_bucket_used'] = runtime_bucket_used - runtime_cached = download_runtime_if_necessary(s3_client, runtime_s3_bucket_used, - runtime_s3_key_used) + runtime_cached = download_runtime_if_necessary(s3_client, runtime_bucket_used, + runtime_key_used) logger.info("Runtime ready, cached={}".format(runtime_cached)) response_status['runtime_cached'] = runtime_cached From 5552368db5e50248ea6a859873833e9626e4bc9a Mon Sep 17 00:00:00 2001 From: Allan Peng Date: Wed, 9 Aug 2017 12:00:22 -0700 Subject: [PATCH 08/15] more compatibility --- pywren/wrenhandler.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/pywren/wrenhandler.py b/pywren/wrenhandler.py index 54892a2..682adbd 100644 --- a/pywren/wrenhandler.py +++ b/pywren/wrenhandler.py @@ -43,6 +43,7 @@ RUNTIME_LOC = os.path.join(TEMP, "runtimes") + logger = logging.getLogger(__name__) PROCESS_STDOUT_SLEEP_SECS = 2 @@ -190,7 +191,7 @@ def generic_handler(event, context_dict): response_status['status_key'] = status_key KS = get_key_size(s3_client, s3_bucket, data_key) - #logger.info("bucket=", s3_bucket, "key=", data_key, "status: ", KS, "bytes" ) + while KS is None: logger.warning("WARNING COULD NOT GET FIRST KEY") @@ -230,14 +231,13 @@ def generic_handler(event, context_dict): m_path = os.path.dirname(m_filename) if len(m_path) > 0 and m_path[0] == "/": - #wait but what if the client is using windows . m_path = m_path[1:] if sys.platform == 'win32': m_path = os.path.join(*filter(lambda x: len(x) > 0, m_path.split("/"))) to_make = os.path.join(PYTHON_MODULE_PATH, m_path) - #print "to_make=", to_make, "m_path=", m_path + try: os.makedirs(to_make) except OSError as e: @@ -294,8 +294,12 @@ def generic_handler(event, context_dict): # reasons for setting process group: http://stackoverflow.com/a/4791612 # os.setsid doesn't work in windows + if sys.platfrom == 'win32': + preexec = None + else: + preexec = os.setsid process = subprocess.Popen(cmdstr, shell=True, env=local_env, bufsize=1, - stdout=subprocess.PIPE, preexec_fn=os.setsid) + stdout=subprocess.PIPE, preexec_fn=preexec) logger.info("launched process") def consume_stdout(stdout, queue): From cfea3f3323da68f3640417a5d384d5438b339364 Mon Sep 17 00:00:00 2001 From: Allan Peng Date: Wed, 9 Aug 2017 18:10:22 -0700 Subject: [PATCH 09/15] extend storage --- pywren/storage/s3_backend.py | 15 +++++++++++++++ pywren/storage/storage.py | 13 +++++++++++++ pywren/wrenhandler.py | 7 +------ 3 files changed, 29 insertions(+), 6 deletions(-) diff --git a/pywren/storage/s3_backend.py b/pywren/storage/s3_backend.py index 5c369b5..04b0264 100644 --- a/pywren/storage/s3_backend.py +++ b/pywren/storage/s3_backend.py @@ -14,6 +14,21 @@ def __init__(self, s3config): self.s3client = self.session.create_client( 's3', config=botocore.client.Config(max_pool_connections=200)) + def head_object(self, key): + """ + Get object metadata from S3 with a key. Throws StorageNoSuchKeyError if the given key does not exist. + :param key: key of the object + :return: Data of the object + :rtype: str/bytes + """ + try: + return self.s3client.head_object(self.s3_bucket, key) + except botocore.exceptions.ClientError as e: + if e.response['Error']['Code'] == "NoSuchKey": + raise StorageNoSuchKeyError(key) + else: + raise e + def put_object(self, key, data): """ Put an object in S3. Override the object if the key already exists. diff --git a/pywren/storage/storage.py b/pywren/storage/storage.py index 810ceb7..c2de9d9 100644 --- a/pywren/storage/storage.py +++ b/pywren/storage/storage.py @@ -33,6 +33,19 @@ def get_storage_config(self): """ return self.storage_config + def get_head(self, key): + """ + Retrieves metadata for given key. + :return: dict + """ + return self.backend_handler.head_object(key) + + def get_object(self, key): + """ + Retrieves object for given key. + """ + return self.backend_handler.get_object(key) + def put_data(self, key, data): """ Put input data into storage. diff --git a/pywren/wrenhandler.py b/pywren/wrenhandler.py index c6d7a31..1b5aa15 100644 --- a/pywren/wrenhandler.py +++ b/pywren/wrenhandler.py @@ -34,11 +34,6 @@ import boto3 import botocore -else: - from Queue import Queue, Empty - import wrenutil - import version - else: raise NotImplementedError(("Using {} based cloud is not supported " + "yet.").format(sys.platform)) @@ -298,7 +293,7 @@ def generic_handler(event, context_dict): # reasons for setting process group: http://stackoverflow.com/a/4791612 # os.setsid doesn't work in windows - if sys.platfrom == 'win32': + if sys.platform == 'win32': preexec = None else: preexec = os.setsid From 0de18290bb50b13ac927d268e66ae849134f0db6 Mon Sep 17 00:00:00 2001 From: Allan Peng Date: Wed, 9 Aug 2017 18:35:04 -0700 Subject: [PATCH 10/15] move range query to storage backend --- pywren/storage/s3_backend.py | 8 +++++-- pywren/wrenhandler.py | 46 ++++++++++++++++++------------------ 2 files changed, 29 insertions(+), 25 deletions(-) diff --git a/pywren/storage/s3_backend.py b/pywren/storage/s3_backend.py index 04b0264..94f8e6f 100644 --- a/pywren/storage/s3_backend.py +++ b/pywren/storage/s3_backend.py @@ -39,7 +39,7 @@ def put_object(self, key, data): """ self.s3client.put_object(Bucket=self.s3_bucket, Key=key, Body=data) - def get_object(self, key): + def get_object(self, key, data_byte_range = None): """ Get object from S3 with a key. Throws StorageNoSuchKeyError if the given key does not exist. :param key: key of the object @@ -47,7 +47,11 @@ def get_object(self, key): :rtype: str/bytes """ try: - r = self.s3client.get_object(Bucket=self.s3_bucket, Key=key) + if data_byte_range != None: + range_str = 'bytes={}-{}'.format(*data_byte_range) + else: + range_str = None + r = self.s3client.get_object(Bucket=self.s3_bucket, Key=key, Range = range_str) data = r['Body'].read() return data except botocore.exceptions.ClientError as e: diff --git a/pywren/wrenhandler.py b/pywren/wrenhandler.py index 1b5aa15..7543b84 100644 --- a/pywren/wrenhandler.py +++ b/pywren/wrenhandler.py @@ -13,7 +13,6 @@ from threading import Thread - if sys.version_info > (3, 0): from queue import Queue, Empty # pylint: disable=import-error from . import wrenutil # pylint: disable=relative-import @@ -28,15 +27,12 @@ TEMP = "D:\local\Temp" PATH_DELIMETER = ";" -elif sys.platform.startswith('linux'): +else: TEMP = "/tmp" PATH_DELIMETER = ":" import boto3 import botocore -else: - raise NotImplementedError(("Using {} based cloud is not supported " + - "yet.").format(sys.platform)) PYTHON_MODULE_PATH = os.path.join(TEMP, "pymodules") CONDA_RUNTIME_DIR = os.path.join(TEMP, "condaruntime") @@ -46,9 +42,10 @@ PROCESS_STDOUT_SLEEP_SECS = 2 -def get_key_size(s3client, bucket, key): + +def get_key_size(storage_client, key): try: - a = s3client.head_object(Bucket=bucket, Key=key) + a = storage_client.head_object(Bucket=bucket, Key=key) return a['ContentLength'] except botocore.exceptions.ClientError as e: if e.response['Error']['Code'] == "404": @@ -122,7 +119,19 @@ def aws_lambda_handler(event, context): 'log_group_name' : context.log_group_name, 'log_stream_name' : context.log_stream_name, } - return generic_handler(event, context_dict) + + backend_config = { + 'bucket': event['storage_config']['backend_config']['bucket'] + } + + storage_config = { + 'storage_prefix' : event['storage_config']['storage_prefix'], + 'storage_backend' : 's3', + 'backend_config': backend_config + } + + storage_handler = Storage(storage_config) + return generic_handler(event, context_dict, storage_handler) def get_server_info(): @@ -136,7 +145,7 @@ def get_server_info(): return server_info -def generic_handler(event, context_dict): +def generic_handler(event, context_dict, storage_client): """ context_dict is generic infromation about the context that we are running in, provided by the scheduler @@ -147,9 +156,7 @@ def generic_handler(event, context_dict): if event['storage_config']['storage_backend'] != 's3': raise NotImplementedError(("Using {} as storage backend is not supported " + "yet.").format(event['storage_config']['storage_backend'])) - s3_client = boto3.client("s3") s3_transfer = boto3.s3.transfer.S3Transfer(s3_client) - s3_bucket = event['storage_config']['backend_config']['bucket'] logger.info("invocation started") @@ -188,12 +195,12 @@ def generic_handler(event, context_dict): response_status['output_key'] = output_key response_status['status_key'] = status_key - KS = get_key_size(s3_client, s3_bucket, data_key) + KS = get_key_size(s3_client, data_key) while KS is None: logger.warning("WARNING COULD NOT GET FIRST KEY") - KS = get_key_size(s3_client, s3_bucket, data_key) + KS = get_key_size(storage_client, data_key) if not event['use_cached_runtime']: shutil.rmtree(RUNTIME_LOC, True) os.mkdir(RUNTIME_LOC) @@ -209,9 +216,7 @@ def generic_handler(event, context_dict): if data_byte_range is None: s3_transfer.download_file(s3_bucket, data_key, data_filename) else: - range_str = 'bytes={}-{}'.format(*data_byte_range) - dres = s3_client.get_object(Bucket=s3_bucket, Key=data_key, - Range=range_str) + dres = storage_client.get_object(data_key, data_byte_range) data_fid = open(data_filename, 'wb') data_fid.write(dres['Body'].read()) data_fid.close() @@ -229,10 +234,10 @@ def generic_handler(event, context_dict): m_path = os.path.dirname(m_filename) if len(m_path) > 0 and m_path[0] == "/": - #wait but what if the client is using windows . m_path = m_path[1:] if sys.platform == 'win32': + #change backslash to forward slash. m_path = os.path.join(*filter(lambda x: len(x) > 0, m_path.split("/"))) to_make = os.path.join(PYTHON_MODULE_PATH, m_path) @@ -339,12 +344,9 @@ def consume_stdout(stdout, queue): logger.debug("output uploaded to %s %s", s3_bucket, output_key) end_time = time.time() - response_status['stdout'] = stdout.decode("ascii") - response_status['exec_time'] = time.time() - setup_time response_status['end_time'] = end_time - response_status['host_submit_time'] = event['host_submit_time'] response_status.update(context_dict) @@ -355,9 +357,7 @@ def consume_stdout(stdout, queue): response_status['server_info'] = get_server_info() response_status['exception_traceback'] = traceback.format_exc() finally: - # creating new client in case the client has not been created - boto3.client("s3").put_object(Bucket=s3_bucket, Key=status_key, - Body=json.dumps(response_status)) + storage_client.put_object(status_key, json.dumps(response_status)) if __name__ == "__main__": From 8d60c443c9e9ac705823c944c4dd728ba4684908 Mon Sep 17 00:00:00 2001 From: Allan Peng Date: Thu, 10 Aug 2017 12:29:31 -0700 Subject: [PATCH 11/15] fix range_str --- pywren/storage/s3_backend.py | 4 ++-- pywren/storage/storage.py | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pywren/storage/s3_backend.py b/pywren/storage/s3_backend.py index 94f8e6f..f565e96 100644 --- a/pywren/storage/s3_backend.py +++ b/pywren/storage/s3_backend.py @@ -49,9 +49,9 @@ def get_object(self, key, data_byte_range = None): try: if data_byte_range != None: range_str = 'bytes={}-{}'.format(*data_byte_range) + r = self.s3client.get_object(Bucket=self.s3_bucket, Key=key, Range = range_str) else: - range_str = None - r = self.s3client.get_object(Bucket=self.s3_bucket, Key=key, Range = range_str) + r = self.s3client.get_object(Bucket=self.s3_bucket, Key=key) data = r['Body'].read() return data except botocore.exceptions.ClientError as e: diff --git a/pywren/storage/storage.py b/pywren/storage/storage.py index c2de9d9..8319734 100644 --- a/pywren/storage/storage.py +++ b/pywren/storage/storage.py @@ -36,6 +36,7 @@ def get_storage_config(self): def get_head(self, key): """ Retrieves metadata for given key. + The metadata dict must have Contentlength and Etag as keys :return: dict """ return self.backend_handler.head_object(key) From b8e4c4506b57df6ce1ecd537e7a6db51f81d4186 Mon Sep 17 00:00:00 2001 From: Allan Peng Date: Thu, 10 Aug 2017 12:35:29 -0700 Subject: [PATCH 12/15] doc --- pywren/wrenhandler.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pywren/wrenhandler.py b/pywren/wrenhandler.py index 7543b84..07b54bb 100644 --- a/pywren/wrenhandler.py +++ b/pywren/wrenhandler.py @@ -120,6 +120,7 @@ def aws_lambda_handler(event, context): 'log_stream_name' : context.log_stream_name, } + #construct storage handler based on s3 backend backend_config = { 'bucket': event['storage_config']['backend_config']['bucket'] } From 700e038763c08aab703c6a8f2da5f02280497278 Mon Sep 17 00:00:00 2001 From: Allan Peng Date: Mon, 14 Aug 2017 15:43:11 -0700 Subject: [PATCH 13/15] storage doesn't work --- pywren/scripts/pywrencli.py | 3 ++- pywren/storage/storage.py | 4 ++-- pywren/wrenhandler.py | 35 ++++++++++++++++++++--------------- 3 files changed, 24 insertions(+), 18 deletions(-) diff --git a/pywren/scripts/pywrencli.py b/pywren/scripts/pywrencli.py index 4b6142f..e985f8f 100644 --- a/pywren/scripts/pywrencli.py +++ b/pywren/scripts/pywrencli.py @@ -210,7 +210,8 @@ def deploy_lambda(ctx, update_if_exists=True): module_dir = os.path.join(SOURCE_DIR, "../") for f in ['wrenutil.py', 'wrenconfig.py', 'wrenhandler.py', - 'version.py', 'jobrunner.py', 'wren.py']: + 'version.py', 'jobrunner.py', 'wren.py', 'storage/storage.py', + 'storage/s3_backend.py', 'storage/storage_utils.py', 'storage/__init__.py']: f = os.path.abspath(os.path.join(module_dir, f)) a = os.path.relpath(f, SOURCE_DIR + "/..") diff --git a/pywren/storage/storage.py b/pywren/storage/storage.py index 8319734..47aa8c7 100644 --- a/pywren/storage/storage.py +++ b/pywren/storage/storage.py @@ -41,11 +41,11 @@ def get_head(self, key): """ return self.backend_handler.head_object(key) - def get_object(self, key): + def get_object(self, key, data_byte_range = None): """ Retrieves object for given key. """ - return self.backend_handler.get_object(key) + return self.backend_handler.get_object(key, data_byte_range) def put_data(self, key, data): """ diff --git a/pywren/wrenhandler.py b/pywren/wrenhandler.py index 07b54bb..0924e6f 100644 --- a/pywren/wrenhandler.py +++ b/pywren/wrenhandler.py @@ -22,6 +22,7 @@ from Queue import Queue, Empty # pylint: disable=import-error import wrenutil # pylint: disable=relative-import import version # pylint: disable=relative-import + import storage as Storage if sys.platform == 'win32': TEMP = "D:\local\Temp" @@ -154,10 +155,11 @@ def generic_handler(event, context_dict, storage_client): response_status = {'exception': None} try: - if event['storage_config']['storage_backend'] != 's3': + storage_backend = event['storage_config']['storage_backend'] + if storage_backend != 's3': raise NotImplementedError(("Using {} as storage backend is not supported " + "yet.").format(event['storage_config']['storage_backend'])) - s3_transfer = boto3.s3.transfer.S3Transfer(s3_client) +# s3_transfer = boto3.s3.transfer.S3Transfer(s3_client) logger.info("invocation started") @@ -179,8 +181,9 @@ def generic_handler(event, context_dict, storage_client): data_filename = os.path.join(TEMP, "data.pickle") output_filename = os.path.join(TEMP, "output.pickle") - runtime_bucket = event['runtime']['s3_bucket'] - runtime_key = event['runtime']['s3_key'] + if storage_backend == 's3': + runtime_bucket = event['runtime']['s3_bucket'] + runtime_key = event['runtime']['s3_key'] if event.get('runtime_url'): # NOTE(shivaram): Right now we only support S3 urls. runtime_bucket_used, runtime_key_used = wrenutil.split_s3_url( @@ -208,19 +211,19 @@ def generic_handler(event, context_dict, storage_client): # get the input and save to disk # FIXME here is we where we would attach the "canceled" metadata - s3_transfer.download_file(s3_bucket, func_key, func_filename) + +# s3_transfer.download_file(s3_bucket, func_key, func_filename) + func_data = storage_client.get_object(func_key) + with open(func_filename, 'wb') as f: + f.write(func_data) func_download_time = time.time() - start_time response_status['func_download_time'] = func_download_time logger.info("func download complete, took {:3.2f} sec".format(func_download_time)) - if data_byte_range is None: - s3_transfer.download_file(s3_bucket, data_key, data_filename) - else: - dres = storage_client.get_object(data_key, data_byte_range) - data_fid = open(data_filename, 'wb') - data_fid.write(dres['Body'].read()) - data_fid.close() + with open(data_filename, 'wb') as data_fid: + data_data = storage_client.get_object(data_key, data_byte_range) + data_filename.write(data_dat) data_download_time = time.time() - start_time logger.info("data download complete, took {:3.2f} sec".format(data_download_time)) @@ -340,9 +343,11 @@ def consume_stdout(stdout, queue): logger.info("command execution finished") - s3_transfer.upload_file(output_filename, s3_bucket, - output_key) - logger.debug("output uploaded to %s %s", s3_bucket, output_key) +# s3_transfer.upload_file(output_filename, s3_bucket, +# output_key) + output_d = open(output_filename).read() + storage_client.put_data(output_key, output_d) + logger.debug("output uploaded to %s %s", storage_client.config['backend_config']['bucket'], output_key) end_time = time.time() response_status['stdout'] = stdout.decode("ascii") From d566a312a4f1b4b698f95cfab6209024493e5bea Mon Sep 17 00:00:00 2001 From: Allan Peng Date: Mon, 14 Aug 2017 18:53:08 -0700 Subject: [PATCH 14/15] imports are hard to fix --- pywren/scripts/pywrencli.py | 4 +++- pywren/wrenhandler.py | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pywren/scripts/pywrencli.py b/pywren/scripts/pywrencli.py index e985f8f..81546ed 100644 --- a/pywren/scripts/pywrencli.py +++ b/pywren/scripts/pywrencli.py @@ -211,11 +211,13 @@ def deploy_lambda(ctx, update_if_exists=True): for f in ['wrenutil.py', 'wrenconfig.py', 'wrenhandler.py', 'version.py', 'jobrunner.py', 'wren.py', 'storage/storage.py', - 'storage/s3_backend.py', 'storage/storage_utils.py', 'storage/__init__.py']: + 'storage/s3_backend.py', 'storage/storage_utils.py']: f = os.path.abspath(os.path.join(module_dir, f)) a = os.path.relpath(f, SOURCE_DIR + "/..") zipfile_obj.write(f, arcname=a) + a = os.path.relpath("storage/__init__.py", SOURCE_DIR + "/..") + zipfile_obj.writestr("", arcname=a) zipfile_obj.close() #open("/tmp/deploy.zip", 'w').write(file_like_object.getvalue()) diff --git a/pywren/wrenhandler.py b/pywren/wrenhandler.py index 0924e6f..6eaef93 100644 --- a/pywren/wrenhandler.py +++ b/pywren/wrenhandler.py @@ -22,7 +22,7 @@ from Queue import Queue, Empty # pylint: disable=import-error import wrenutil # pylint: disable=relative-import import version # pylint: disable=relative-import - import storage as Storage + from storage.storage import Storage if sys.platform == 'win32': TEMP = "D:\local\Temp" From aa0aded55ab190ba8a1d05753a8e69d6e139921a Mon Sep 17 00:00:00 2001 From: Allan Peng Date: Mon, 14 Aug 2017 19:07:40 -0700 Subject: [PATCH 15/15] stuff works? --- pywren/invokers.py | 2 +- pywren/queues.py | 2 +- pywren/scripts/pywrencli.py | 7 +++++-- pywren/storage/s3_backend.py | 2 +- pywren/storage/storage.py | 2 +- pywren/wrenhandler.py | 15 ++++++++------- 6 files changed, 17 insertions(+), 13 deletions(-) diff --git a/pywren/invokers.py b/pywren/invokers.py index e8d74d4..9ccec8a 100644 --- a/pywren/invokers.py +++ b/pywren/invokers.py @@ -5,7 +5,7 @@ import botocore import botocore.session -from pywren import local +#from pywren import local SOURCE_DIR = os.path.dirname(os.path.abspath(__file__)) diff --git a/pywren/queues.py b/pywren/queues.py index 58e1017..f8a77fa 100644 --- a/pywren/queues.py +++ b/pywren/queues.py @@ -5,7 +5,7 @@ import time import boto3 -from pywren import local +#from pywren import local SOURCE_DIR = os.path.dirname(os.path.abspath(__file__)) diff --git a/pywren/scripts/pywrencli.py b/pywren/scripts/pywrencli.py index 81546ed..f196867 100644 --- a/pywren/scripts/pywrencli.py +++ b/pywren/scripts/pywrencli.py @@ -211,13 +211,13 @@ def deploy_lambda(ctx, update_if_exists=True): for f in ['wrenutil.py', 'wrenconfig.py', 'wrenhandler.py', 'version.py', 'jobrunner.py', 'wren.py', 'storage/storage.py', - 'storage/s3_backend.py', 'storage/storage_utils.py']: + 'storage/s3_backend.py', 'storage/storage_utils.py', 'storage/exceptions.py']: f = os.path.abspath(os.path.join(module_dir, f)) a = os.path.relpath(f, SOURCE_DIR + "/..") zipfile_obj.write(f, arcname=a) a = os.path.relpath("storage/__init__.py", SOURCE_DIR + "/..") - zipfile_obj.writestr("", arcname=a) + zipfile_obj.writestr(a, "") zipfile_obj.close() #open("/tmp/deploy.zip", 'w').write(file_like_object.getvalue()) @@ -578,3 +578,6 @@ def cleanup_all(ctx, force): def main(): return cli() # pylint: disable=no-value-for-parameter + +if __name__ == '__main__': + main() diff --git a/pywren/storage/s3_backend.py b/pywren/storage/s3_backend.py index f565e96..0f6a88f 100644 --- a/pywren/storage/s3_backend.py +++ b/pywren/storage/s3_backend.py @@ -22,7 +22,7 @@ def head_object(self, key): :rtype: str/bytes """ try: - return self.s3client.head_object(self.s3_bucket, key) + return self.s3client.head_object(Bucket = self.s3_bucket, Key = key) except botocore.exceptions.ClientError as e: if e.response['Error']['Code'] == "NoSuchKey": raise StorageNoSuchKeyError(key) diff --git a/pywren/storage/storage.py b/pywren/storage/storage.py index 47aa8c7..da9f000 100644 --- a/pywren/storage/storage.py +++ b/pywren/storage/storage.py @@ -33,7 +33,7 @@ def get_storage_config(self): """ return self.storage_config - def get_head(self, key): + def head_object(self, key): """ Retrieves metadata for given key. The metadata dict must have Contentlength and Etag as keys diff --git a/pywren/wrenhandler.py b/pywren/wrenhandler.py index 6eaef93..3009a40 100644 --- a/pywren/wrenhandler.py +++ b/pywren/wrenhandler.py @@ -46,7 +46,7 @@ def get_key_size(storage_client, key): try: - a = storage_client.head_object(Bucket=bucket, Key=key) + a = storage_client.head_object(key) return a['ContentLength'] except botocore.exceptions.ClientError as e: if e.response['Error']['Code'] == "404": @@ -54,13 +54,14 @@ def get_key_size(storage_client, key): else: raise e -def download_runtime_if_necessary(s3_client, runtime_bucket, runtime_key): +def download_runtime_if_necessary(runtime_bucket, runtime_key): """ Download the runtime if necessary return True if cached, False if not (download occured) """ + s3_client = boto3.client("s3") # get runtime etag runtime_meta = s3_client.head_object(Bucket=runtime_bucket, @@ -199,7 +200,7 @@ def generic_handler(event, context_dict, storage_client): response_status['output_key'] = output_key response_status['status_key'] = status_key - KS = get_key_size(s3_client, data_key) + KS = get_key_size(storage_client, data_key) while KS is None: logger.warning("WARNING COULD NOT GET FIRST KEY") @@ -223,7 +224,7 @@ def generic_handler(event, context_dict, storage_client): with open(data_filename, 'wb') as data_fid: data_data = storage_client.get_object(data_key, data_byte_range) - data_filename.write(data_dat) + data_fid.write(data_data) data_download_time = time.time() - start_time logger.info("data download complete, took {:3.2f} sec".format(data_download_time)) @@ -262,7 +263,7 @@ def generic_handler(event, context_dict, storage_client): response_status['runtime_key_used'] = runtime_key_used response_status['runtime_bucket_used'] = runtime_bucket_used - runtime_cached = download_runtime_if_necessary(s3_client, runtime_bucket_used, + runtime_cached = download_runtime_if_necessary(runtime_bucket_used, runtime_key_used) logger.info("Runtime ready, cached={}".format(runtime_cached)) response_status['runtime_cached'] = runtime_cached @@ -347,7 +348,7 @@ def consume_stdout(stdout, queue): # output_key) output_d = open(output_filename).read() storage_client.put_data(output_key, output_d) - logger.debug("output uploaded to %s %s", storage_client.config['backend_config']['bucket'], output_key) + logger.debug("output uploaded to %s %s", storage_client.storage_config['backend_config']['bucket'], output_key) end_time = time.time() response_status['stdout'] = stdout.decode("ascii") @@ -363,7 +364,7 @@ def consume_stdout(stdout, queue): response_status['server_info'] = get_server_info() response_status['exception_traceback'] = traceback.format_exc() finally: - storage_client.put_object(status_key, json.dumps(response_status)) + storage_client.put_data(status_key, json.dumps(response_status)) if __name__ == "__main__":