In [None]:
#default_exp s3

# S3Cache

## Deals with everything that reads and write to the s3 cache for the database

In [None]:
#hide
import pickle, os

os.environ['DATABASE_TABLE_NAME'] = 'product-table-dev-manual'
os.environ['REGION'] = 'ap-southeast-1'
os.environ['INVENTORY_BUCKET_NAME'] = 'product-bucket-dev-manual'
os.environ['INPUT_BUCKET_NAME'] = 'input-product-bucket-dev-manual'
os.environ['DAX_ENDPOINT'] = 'longtermcluster.vuu7lr.clustercfg.dax.apse1.cache.amazonaws.com:8111'
os.environ['LINEKEY'] = '2uAfV4AoYglUGmKTAk2xNOm0aV2Ufgh1BQPvQl9vJd4'
REGION = 'ap-southeast-1'

In [None]:
#export
from s3bz.s3bz import S3
from nicHelper.wrappers import add_method, add_class_method, add_static_method
from nicHelper.dictUtil import stripDict, printDict, hashDict, saveStringToFile, loadStringFromFile, saveDictToFile, loadDictFromFile
from nicHelper.exception import errorString
import nicHelper.pdUtils as pdUtils
from dict_hash import dict_hash, sha256
from base64 import b64encode, b64decode
from pandas.util import hash_pandas_object
from hashlib import sha1
import pandas as pd
import os, logging

In [None]:
from villaProductDatabase.database import ProductDatabase
import logging
logging.basicConfig(level=logging.INFO)

longtermcluster.vuu7lr.clustercfg.dax.apse1.cache.amazonaws.com:8111
longtermcluster.vuu7lr.clustercfg.dax.apse1.cache.amazonaws.com:8111
longtermcluster.vuu7lr.clustercfg.dax.apse1.cache.amazonaws.com:8111
longtermcluster.vuu7lr.clustercfg.dax.apse1.cache.amazonaws.com:8111


In [None]:
#export
import os
DBHASHLOCATION = '/mnt/efs/database.hash'
DBCACHELOCATION = '/mnt/efs/database.cache'
DATABASE_TABLE_NAME = os.environ.get('DATABASE_TABLE_NAME')
INVENTORY_BUCKET_NAME = os.environ.get('INVENTORY_BUCKET_NAME')
INPUT_BUCKET_NAME = os.environ.get('INPUT_BUCKET_NAME')
REGION = os.environ.get('REGION') or 'ap-southeast-1'
ACCESS_KEY_ID = os.environ.get('USER') or None
SECRET_ACCESS_KEY = os.environ.get('PW') or None
LINEKEY= os.environ.get('LINEKEY')
ALLDATAKEY = 'allData'
  
try:
  DAX_ENDPOINT = os.environ['DAX_ENDPOINT']
  print(DAX_ENDPOINT)
except KeyError as e:
  print(f'dax endpoint missing {e}')
  

longtermcluster.vuu7lr.clustercfg.dax.apse1.cache.amazonaws.com:8111


In [None]:
#export
class S3Cache:
  pass

In [None]:
class Tester( S3Cache, ProductDatabase):
  class Meta:
    table_name = os.environ['DATABASE_TABLE_NAME']
    region = os.environ['REGION']
    billing_mode='PAY_PER_REQUEST'
    dax_read_endpoints = [DAX_ENDPOINT] if DAX_ENDPOINT else None
    dax_write_endpoints = [DAX_ENDPOINT] if DAX_ENDPOINT else None
  pass
  

## save and load local hash

In [None]:
#export
@add_class_method(S3Cache)
def loadFromCache(cls, key=ALLDATAKEY, localCache=DBCACHELOCATION, 
                  localHash=DBHASHLOCATION, bucket=INVENTORY_BUCKET_NAME):
  ## check for local object and its hash
  if os.path.exists(localCache) and os.path.exists(localHash):
    try:
      localHash = pdUtils.loadLocalHash(path=localHash)
      logging.debug(f'localHash is {localHash}')
      remoteHash = pdUtils.loadRemoteHash(key=key, bucket=bucket)
      logging.debug(f'remoteHash is {remoteHash}')

      if localHash == remoteHash:
        print('data is still in sync, using local cache')
        db = pdUtils.loadLocalCache(path=localCache)
        return db
      else: 
        print('remote hash is not the same, load remote cache')
    except Exception as e: print(f'local loading error{e}, loading remote hash')
  ### load from remote cache
  try:
    db = pdUtils.loadRemoteCache(key=key, bucket=bucket)
    pdUtils.saveLocalCache(db, path=localCache)
    pdUtils.saveLocalHash(db, path = localHash)
  except Exception as e:
    print(f'locding remtoe failed {e} returning blank df')
    db = pd.DataFrame()
  ### save to local cache
  return db

In [None]:
%%time
db = Tester.loadFromCache()
assert db.shape > (10,10)

INFO:root:using accelerate endpoint


loading hashkey allData-hash


INFO:root:object exists, loading
INFO:root:using accelerate endpoint
INFO:root:using accelerate endpoint


loaded hash is PzfrumW4Vib/5yh3/4UtOHZI88U=
remote hash is not the same, load remote cache
CPU times: user 1.2 s, sys: 257 ms, total: 1.46 s
Wall time: 2.32 s


In [None]:
Tester.loadFromCache(key='doestExist', localCache='doesntExist', localHash='doesntExist')

INFO:root:using accelerate endpoint


locding remtoe failed An error occurred (404) when calling the HeadObject operation: Not Found returning blank df


## Save and load remote hash

In [None]:
#export

@add_class_method(S3Cache)
def saveRemoteCache(cls ,db:pd.DataFrame, key= ALLDATAKEY,
                   bucket = INVENTORY_BUCKET_NAME, localCachePath=DBCACHELOCATION,
                   localHashPath=DBHASHLOCATION):
  pdUtils.saveRemoteCache(data=db, key= key, 
                          bucket=bucket, localCachePath=localCachePath, 
                          localHashPath=localHashPath)
  jsonDb = db.to_json(orient='split')
  zlibArc = zlib.compress(jsonDb.encode())
  tmpPath = '/tmp/zlibJsonCache.zl'
  with open(tmpPath, 'wb') as f:
    f.write(zlibArc)
  S3.saveFile(key=f'{key}-json.zl',path=tmpPath,bucket=bucket)
  


In [None]:
%%time
Tester.saveRemoteCache(db)
assert pdUtils.getDfHash(db) == pdUtils.getDfHash(Tester.loadFromCache())

INFO:root:using accelerate endpoint
INFO:root:data was saved to s3
INFO:root:using accelerate endpoint


hashKey is allData-hash
saving hash to s3
saved hash e706d6d6b6a9a7e831045ed1e859e7fb24bd5e4b


INFO:root:using accelerate endpoint
INFO:root:using accelerate endpoint


loading hashkey allData-hash


INFO:root:object exists, loading
INFO:root:using accelerate endpoint


loaded hash is e706d6d6b6a9a7e831045ed1e859e7fb24bd5e4b
data is still in sync, using local cache
CPU times: user 3.92 s, sys: 457 ms, total: 4.38 s
Wall time: 5.94 s


## Reset S3 Cache

In [None]:
#export
@add_class_method(S3Cache)
def resetS3Cache(cls, bucketName= INVENTORY_BUCKET_NAME, key = 'allData', limit=10000, **kwargs):
  ''' upload changes to s3'''
  ###### get all data
  items:List[cls] = cls.scanDb(limit=limit)
  db:pd.DataFrame = cls.toDf(items)
  print(f'{db.shape} changes to update')
  cls.saveRemoteCache(db)
  return True

In [None]:
%%time
Tester.resetS3Cache()

INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials


In [None]:
jsonDb = db.to_json(orient='split')

In [None]:
import zlib, json
%time jsonDb = db.to_json(orient='split')
%time zlibArc = zlib.compress(jsonDb.encode())
%time zlibObject = zlib.decompress(zlibArc).decode()
%time dbDict = json.loads(zlibObject)

CPU times: user 379 ms, sys: 39.4 ms, total: 418 ms
Wall time: 417 ms
CPU times: user 1.41 s, sys: 14.7 ms, total: 1.42 s
Wall time: 1.46 s
CPU times: user 126 ms, sys: 63.3 ms, total: 189 ms
Wall time: 189 ms
CPU times: user 848 ms, sys: 74.8 ms, total: 923 ms
Wall time: 920 ms


In [None]:
import sys
print(sys.getsizeof(jsonDb))
print(sys.getsizeof(zlibArc))

45878793
6645115
