In [None]:
#default_exp pdUtils

# pd Utils
utilities function for pandas dataframe and series

In [None]:
#export
import pandas as pd
from hashlib import sha1
from nicHelper.dictUtil import saveStringToFile, loadStringFromFile
from pynamodb.constants import BINARY
from pynamodb.attributes import Attribute, UnicodeAttribute
from pynamodb.models import Model
from beartype import beartype
from s3bz.s3bz import S3
import os, logging

In [None]:
#hide
logging.basicConfig(level = logging.WARNING)

In [None]:
from random import randint
from randstr import randstr
largeDf = pd.DataFrame({'hello':[randint(1,100000) for _ in range(1000)], 
                        'hello2':[randint(1,100000) for _ in range(1000)],
                        randstr(5):[randstr(30) for _ in range(1000)],
                        randstr(5):[randstr(30) for _ in range(1000)],
                        randstr(5):[randstr(30) for _ in range(1000)],
                        randstr(5):[randstr(30) for _ in range(1000)],
                        randstr(5):[randstr(30) for _ in range(1000)],
                        randstr(5):[randstr(30) for _ in range(1000)],
                       })

# getDfHash

In [None]:
#export
from io import BytesIO
from typing import Callable

@beartype
def getDfHash(df:pd.DataFrame, 
              hashingAlgorithm: Callable = lambda x: sha1(x).hexdigest())->str:
  '''
    get a hash of a pandas dataframe\n
    this uses sha1 algorithm\n
    inputs:\n
      df: pd.DataFrame: a pandas dataframe\n
      hashingAlgoritm: callable: a hashing function which takes bytes input\n
    response:\n
      string hash\n
  '''
  f:BytesIO = BytesIO()
  df.to_feather(f)
  return hashingAlgorithm(f.read())


In [None]:
df = pd.DataFrame({'hello':[1,2,3,4,5,5]})
df

Unnamed: 0,hello
0,1
1,2
2,3
3,4
4,5
5,5


In [None]:
%%timeit
getDfHash(largeDf)

2.23 ms ± 79.7 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


In [None]:
%%timeit
import joblib
joblib.hash(largeDf)

19 ms ± 65.8 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


In [None]:
def testFeather(df):
  f:BytesIO = BytesIO()
  df.to_feather(f)
  
%timeit testFeather(largeDf)

2.25 ms ± 125 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


## Local cache and hash

In [None]:
#export
def saveLocalCache( data:pd.DataFrame, path:str = '/tmp/cache', 
                   saveHash:bool = True, force:bool = True):
  '''
    save cache of the dataframe to local location \n
    data:pd.DataFrame: dataframe to save \n
    path: str: path to save cache \n
    saveHash: bool: whether to save the hash digest 
  '''
  ##check cache
  if not force:
    localHash = loadLocalHash(f'{path}.hash')
    dataHash = getDfHash(data)
    if dataHash == localHash :
      logging.debug('hash is the latest, skipping')
      return True
  ##save hash
  if saveHash: 
    saveLocalHash(data, path=f'{path}.hash')
  # save cache
  logging.debug('saving cache')
  r =  data.to_feather(path)
  return r

def saveLocalHash( data:pd.DataFrame, path = '/tmp/cache.hash', force = False):
  '''
  save hash of the dataframe to local location \n
  data:pd.DataFrame: dataframe to save \n
  path: str: path to save hash
  '''
  dataHash = getDfHash(data)
  return saveStringToFile(dataHash,path)

def loadLocalCache( path = '/tmp/cache', throw = True):
  '''
  load cache of the dataframe from local location \n
  path: str: path to load cache
  '''
  if not os.path.exists(path):
    if throw:
      raise Exception('cache doesnt exist')
  return pd.read_feather(path)
def loadLocalHash( path = '/tmp/cache.hash'):
  '''
  load hash of the dataframe from local location \n
  path: str: path to load hash
  '''
  if not os.path.exists(path): raise Exception('hash doesnt exist')
  return loadStringFromFile(path)

In [None]:
%time saveLocalCache(df,force = True)
%time saveLocalHash(df)
%time print(loadLocalHash())
%time loadLocalCache()

CPU times: user 2.47 ms, sys: 0 ns, total: 2.47 ms
Wall time: 3.09 ms
CPU times: user 1.15 ms, sys: 0 ns, total: 1.15 ms
Wall time: 983 µs
da39a3ee5e6b4b0d3255bfef95601890afd80709
CPU times: user 244 µs, sys: 0 ns, total: 244 µs
Wall time: 173 µs
CPU times: user 2.99 ms, sys: 10 µs, total: 3 ms
Wall time: 3.35 ms


Unnamed: 0,hello
0,1
1,2
2,3
3,4
4,5
5,5


## Remote cache and hash

In [None]:
#export
def saveRemoteHash(data:pd.DataFrame, key='', bucket='', **kwargs):
  '''
  save hash of the dataframe to S3 bucket \n
  data:pd.DataFrame: dataframe to save \n
  key: str: the name (key) of the dataframe to be saved in the S3 bucket \n
  bucket: str: the name of the bucket to store the dataframe
  '''
  hashKey = f'{key}-hash'
  hashString = getDfHash(data)
  dictToSave= {'hash': hashString }
  print(f'hashKey is {hashKey}')
  print('saving hash to s3')
  S3.save(key=hashKey,objectToSave=dictToSave, bucket=bucket, **kwargs )
  print(f'saved hash {hashString}')
  

def saveRemoteCache(data:pd.DataFrame, key = '', 
                    bucket = '', localCachePath='/tmp/cache', localHashPath='/tmp/hash', **kwargs):
  '''
  save cache and hash of the dataframe to both local location and S3 bucket \n
  data:pd.DataFrame: dataframe to save \n
  key: str: the name (key) of the dataframe to be saved in the S3 bucket \n
  bucket: str: the name of the bucket to store the dataframe \n
  localCachePath: str:path to save cache locally \n
  localHashPath: str: path to save hash locally
  '''
  saveLocalCache(data=data, path = localCachePath)
  saveLocalHash(data=data, path = localHashPath)
  saveRemoteHash(data=data, key = key, bucket=bucket)
  S3.saveFile(key=key, path=localCachePath, bucket=bucket, **kwargs)
  
def loadRemoteCache(key='', bucket='', **kwargs):
  '''
  load cache of the dataframe from S3 bucket \n
  key: str: the name (key) of the dataframe to be loaded from the S3 bucket \n
  bucket: str: the name of the bucket to load the dataframe
  '''
  path = '/tmp/tmpPath'
  S3.loadFile(key,path=path ,bucket=bucket, **kwargs)
  df = pd.read_feather(path)
  return df

def loadRemoteHash(key='', bucket='', **kwargs):
  '''
  load hash of the dataframe from S3 bucket \n
  key: str: the name (key) of the dataframe to be loaded from the S3 bucket \n
  bucket: str: the name of the bucket to load the dataframe
  '''
  hashKey = f'{key}-hash'
  print(f'loading hashkey {hashKey}')
  loadedHash= S3.load(hashKey,bucket=bucket, **kwargs).get('hash')
  print(f'loaded hash is {loadedHash}')
  return loadedHash

In [None]:
#hide
testKey = 'testKey'
testBucket = 'villa-clipboard'
%time saveRemoteCache(df, key = testKey, bucket = testBucket)
print(loadRemoteHash(testKey, testBucket))
%timeit loadRemoteCache(key = testKey, bucket = testBucket)

hashKey is testKey-hash
saving hash to s3
saved hash da39a3ee5e6b4b0d3255bfef95601890afd80709
CPU times: user 94.6 ms, sys: 11.5 ms, total: 106 ms
Wall time: 264 ms
loading hashkey testKey-hash
loaded hash is da39a3ee5e6b4b0d3255bfef95601890afd80709
da39a3ee5e6b4b0d3255bfef95601890afd80709
125 ms ± 3.03 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


## PynamoAttributes

In [None]:
#export
class PandasDataFrameAttribute(Attribute):
  '''
  pynamodb attributes for pandas dataframe
  '''
  attr_type = BINARY
  def serialize(self, value: pd.DataFrame)->bin:
    bio = BytesIO()
    value.to_feather(bio)
    data:bin = bio.getvalue()
    return data
  def deserialize(self, value: bin)->pd.DataFrame:
    bio = BytesIo(bin)
    df: pd.DataFrame = pd.read_feather(bio)
    return df

In [None]:
class Database(Model):
  class Meta:
    table_name = ''
    region = ''
    billing_mode='PAY_PER_REQUEST'
    
  brcode = UnicodeAttribute(hash_key=True, default = '')
  data = PandasDataFrameAttribute()
  
import sys
df = pd.DataFrame({'cprcode':['1234', '12345'], 'quantity':[123, 345]})
db = Database(brcode='1234', data = df)
db.data

Unnamed: 0,cprcode,quantity
0,1234,123
1,12345,345


In [None]:
#export
# class PandasSeriesAttribute(Attribute):
#   attr_type = BINARY
#   def serialize(self, value: pd.Series)->bin:
#     bio = BytesIO()
#     df = s.to_frame()
#     value.to_feather(bio)
#     data:bin = bio.getvalue()
#     return data
#   def deserialize(self, value: bin)->pd.DataFrame:
#     bio = BytesIo(bin)
#     df: pd.DataFrame = pd.read_feather(bio)
#     return df

In [None]:
#export
from nicHelper.schema import getTypes

In [None]:
url = 'https://raw.githubusercontent.com/thanakijwanavit/villaMasterSchema/dev-manual/inventory/inventory.yaml'


inv = {
                  'iprcode': '0000009',
                  'brcode': '1000',
                  'ib_cf_qty': '50',
                  'new_ib_vs_stock_cv': '27',
                  'onlineflag': True
                }
getTypes(url)

{'iprcode': int,
 'brcode': int,
 'ib_cf_qty': int,
 'new_ib_vs_stock_cv': int,
 'onlineflag': bool}

In [None]:
#export
def forceType(url:str, df:pd.DataFrame, defaultType=str)->pd.DataFrame:
  '''
  force the data type in the dataframe to be the data type specified in the schema \n
  url: str: the url of the schema \n
  df: pd.DataFrame: a pandas dataframe \n
  defaultType: the default type of data if type of data is not specified, default = str
  '''
  typeDict = getTypes(url)
  typeList = {col:typeDict.get(col) for col in df.columns}
  print(typeList)
  df = df.astype(typeList)
  print(df.dtypes)
  return df

In [None]:
forceType(url, pd.DataFrame([inv]))

{'iprcode': <class 'int'>, 'brcode': <class 'int'>, 'ib_cf_qty': <class 'int'>, 'new_ib_vs_stock_cv': <class 'int'>, 'onlineflag': <class 'bool'>}
iprcode               int64
brcode                int64
ib_cf_qty             int64
new_ib_vs_stock_cv    int64
onlineflag             bool
dtype: object


Unnamed: 0,iprcode,brcode,ib_cf_qty,new_ib_vs_stock_cv,onlineflag
0,9,1000,50,27,True
