Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Portability #155

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion pywren/invokers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import botocore
import botocore.session
from pywren import local
#from pywren import local

SOURCE_DIR = os.path.dirname(os.path.abspath(__file__))

Expand Down
2 changes: 1 addition & 1 deletion pywren/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import time

import boto3
from pywren import local
#from pywren import local

SOURCE_DIR = os.path.dirname(os.path.abspath(__file__))

Expand Down
8 changes: 7 additions & 1 deletion pywren/scripts/pywrencli.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,14 @@ def deploy_lambda(ctx, update_if_exists=True):
module_dir = os.path.join(SOURCE_DIR, "../")

for f in ['wrenutil.py', 'wrenconfig.py', 'wrenhandler.py',
'version.py', 'jobrunner.py', 'wren.py']:
'version.py', 'jobrunner.py', 'wren.py', 'storage/storage.py',
'storage/s3_backend.py', 'storage/storage_utils.py', 'storage/exceptions.py']:
f = os.path.abspath(os.path.join(module_dir, f))
a = os.path.relpath(f, SOURCE_DIR + "/..")

zipfile_obj.write(f, arcname=a)
a = os.path.relpath("storage/__init__.py", SOURCE_DIR + "/..")
zipfile_obj.writestr(a, "")
zipfile_obj.close()
#open("/tmp/deploy.zip", 'w').write(file_like_object.getvalue())

Expand Down Expand Up @@ -575,3 +578,6 @@ def cleanup_all(ctx, force):

def main():
return cli() # pylint: disable=no-value-for-parameter

if __name__ == '__main__':
main()
23 changes: 21 additions & 2 deletions pywren/storage/s3_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,21 @@ def __init__(self, s3config):
self.s3client = self.session.create_client(
's3', config=botocore.client.Config(max_pool_connections=200))

def head_object(self, key):
"""
Get object metadata from S3 with a key. Throws StorageNoSuchKeyError if the given key does not exist.
:param key: key of the object
:return: Data of the object
:rtype: str/bytes
"""
try:
return self.s3client.head_object(Bucket = self.s3_bucket, Key = key)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "NoSuchKey":
raise StorageNoSuchKeyError(key)
else:
raise e

def put_object(self, key, data):
"""
Put an object in S3. Override the object if the key already exists.
Expand All @@ -24,15 +39,19 @@ def put_object(self, key, data):
"""
self.s3client.put_object(Bucket=self.s3_bucket, Key=key, Body=data)

def get_object(self, key):
def get_object(self, key, data_byte_range = None):
"""
Get object from S3 with a key. Throws StorageNoSuchKeyError if the given key does not exist.
:param key: key of the object
:return: Data of the object
:rtype: str/bytes
"""
try:
r = self.s3client.get_object(Bucket=self.s3_bucket, Key=key)
if data_byte_range != None:
range_str = 'bytes={}-{}'.format(*data_byte_range)
r = self.s3client.get_object(Bucket=self.s3_bucket, Key=key, Range = range_str)
else:
r = self.s3client.get_object(Bucket=self.s3_bucket, Key=key)
data = r['Body'].read()
return data
except botocore.exceptions.ClientError as e:
Expand Down
14 changes: 14 additions & 0 deletions pywren/storage/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,20 @@ def get_storage_config(self):
"""
return self.storage_config

def head_object(self, key):
"""
Retrieves metadata for given key.
The metadata dict must have Contentlength and Etag as keys
:return: dict
"""
return self.backend_handler.head_object(key)

def get_object(self, key, data_byte_range = None):
"""
Retrieves object for given key.
"""
return self.backend_handler.get_object(key, data_byte_range)

def put_data(self, key, data):
"""
Put input data into storage.
Expand Down