In [1]:
# default_exp s3bz

# S3bz
> API details.

In [2]:
#hide
from nbdev.showdoc import *

In [3]:
#export
from botocore.config import Config
import bz2, json, boto3, logging

In [4]:
#export
class S3:
  @staticmethod
  def s3(region = 'ap-southeast-1', user = None, pw = None, accelerate = True, **kwargs):
    '''
    create and return s3 client
    '''
    logging.info(f'using {("standard","accelerate")[accelerate]} endpoint')
    config = Config(s3={"use_accelerate_endpoint": accelerate,
                        "addressing_style": "virtual"})
    s3 = boto3.client(
        's3',
        aws_access_key_id= user,
        aws_secret_access_key= pw,
        region_name = region,
        config = config
      )
    return s3
  @classmethod
  def save(cls,  key, objectToSave, bucket = '',**kwargs):
    '''
    save an object to s3
    '''
    s3 = cls.s3(**kwargs)
    compressedData = bz2.compress(json.dumps(objectToSave).encode())
    result = s3.put_object(Body=compressedData, Bucket=bucket, Key=key)
    success = result['ResponseMetadata']['HTTPStatusCode'] ==  200
    logging.info('data was saved to s3')
    if not success: raise Error(success)
    else: return True
  @classmethod
  def exist(cls, key, bucket, **kwargs):
    return 'Contents' in cls.s3(**kwargs).list_objects(
        Bucket=bucket , Prefix=key )
  @classmethod
  def load(cls, key, bucket='', **kwargs):
    if not cls.exist(key, bucket, **kwargs):
      logging.info('object doesnt exist')
      return {}
    logging.info('object exists, loading')
    s3 = cls.s3(**kwargs)
    requestResult =  s3.get_object(
                  Bucket = bucket,
                  Key = key
                )
    allItemsByte = next(requestResult.get('Body',None))
    if not allItemsByte: raise ValueError('all data does not exist in the database')
    allItems = json.loads(bz2.decompress(allItemsByte).decode())
    return allItems

  @classmethod
  def presign(cls, key, expiry = 1000, bucket = '',**kwargs):
    if not cls.exist(key,bucket=bucket,**kwargs): return 'object doesnt exist'
    s3 = cls.s3(**kwargs)
    result = s3.generate_presigned_url(
        'get_object',
          Params={'Bucket': bucket,
                  'Key': key},
        ExpiresIn=expiry)
    return result



In [5]:
#hide
from nbdev.export import *
notebook2script()

Converted index.ipynb.
Converted s3bz.ipynb.


In [6]:
#hide
!nbdev_build_lib
!nbdev_build_docs --mk_readme True

Converted index.ipynb.
Converted s3bz.ipynb.
No notebooks were modified
converting /Users/nic/pip/s3bz/index.ipynb to README.md
