In [None]:
# default_exp inventory

In [None]:
#hide
# !pip install -q nbdev lambdasdk s3bz

In [None]:
#hide
USER=None
PW=None

# Inventory

> upload and download inventory data from villa master backend

In [None]:
#hide
from nbdev.showdoc import *

In [None]:
#export
from json.decoder import JSONDecodeError
from botocore.config import Config
from s3bz.s3bz import S3, Requests
from lambdasdk.lambdasdk import Lambda
from awsSchema.apigateway import Event, Response
import pandas as pd
from nicHelper.wrappers import add_class_method, add_method
from nicHelper.dictUtil import printDict
from io import BytesIO
from typing import List
import bz2, json, boto3, base64, logging, itertools , requests

In [41]:
#export
def union(*dicts):
    return dict(itertools.chain.from_iterable(dct.items() for dct in dicts))
class Endpoints:
  '''get endpoint names from branch name'''
  def __init__(self, branchName='manual-dev'):
    self.branchName = branchName
  updateWithS3 = lambda self: f'update-inventory-s3-{self.branchName}'
  inputS3 = lambda self: f'input-bucket-{self.branchName}'
  querySingleProduct = lambda self: f'single-product-query-inventory-{self.branchName}'
  queryAll = lambda self: f'query-all-inventory-{self.branchName}'
  queryBranch = lambda self: f'query-branch-inventory-{self.branchName}'
  queryAll2 = lambda self: f'query-all-inventory2-{self.branchName}'
  
  
class InventorySdk:
  ''' interact with villa inventory database '''
  def __init__(self, branchName = 'dev', user = None, pw = None, 
               region = 'ap-southeast-1'):
    self.branchName = branchName
    self.lambdaClient = Lambda(user =user, pw=pw, region = region)
    self.user = user; self.pw = pw; self.region = region
    self.endpoint = Endpoints(branchName=branchName)
    
    
  def updateWithS3(self, data:dict, 
                   key:str = 'allProducts',
                   invocationType:str = 'Event'):
    
    # save to s3
    S3.save(key = key, 
            objectToSave = data , 
            bucket = self.endpoint.inputS3(),
            user=self.user, pw=self.pw)
    logging.info(f'saving to s3 completed')
    
    lambdaPayload = {
        'inputBucketName': self.endpoint.inputS3(),
        'inputKeyName': key
    }
    logging.info(f'input to lambda is {lambdaPayload}')
    try:
      result = self.lambdaClient.invoke(functionName= self.endpoint.updateWithS3() 
                                        ,input=lambdaPayload,
                                        invocationType= invocationType )
      if result: return Response.getReturn(result)
    except JSONDecodeError:
      logging.warning('no return from function')
      return True

  def querySingleProduct(self, ib_prcode= None, functionName=None, 
                         user=None, pw=None):
    '''query a single product'''
    functionName = functionName or self.endpoint.querySingleProduct()
    input = { "body": json.dumps({'ib_prcode': ib_prcode })}
    response =  self.lambdaClient.invoke(
        functionName = functionName, input = input )
    try:
      inventory = json.loads(Response.from_dict(response).body)
      return {k:union(v,{'ib_prcode':ib_prcode,'ib_brcode':k}) \
              if k.isdigit() else v for k,v in inventory.items()}
    except:
      return response

  def queryAll(self, functionName = None):
    '''get the whole database'''
    functionName = functionName or self.endpoint.queryAll()
    response =  self.lambdaClient.invoke(
        functionName = functionName, input = {} )
    responseBody = json.loads(Response.from_dict(response).body)
    ### return body
    if 'url' in responseBody:
      inventory =  Requests.getContentFromUrl(responseBody['url'])
      return {k:{k2: union(v2, {'ib_prcode':k,'ib_brcode': k2}) if k2.isdigit() 
    else v2 for k2,v2 in v.items()} if k.isdigit() else v for k,v in inventory.items()}
#       return {k:{k2: union(json.loads(v2), {'ib_prcode':k,'ib_brcode': k2})for k2,v2 in v.items()} for k,v in inventory.items()}
    else :
      logging.error(responseBody)
      return responseBody
  
  def queryBranch(self, branch = '1000', functionName = None):
    '''get the branch database'''
    functionName = functionName or self.endpoint.queryBranch()
    response =  self.lambdaClient.invoke(
        functionName = functionName, input = {'body':json.dumps({'branch':branch})} )
    responseBody = json.loads(Response.from_dict(response).body)
    ### return body
    if 'url' in responseBody:
      inventory = Requests.getContentFromUrl(responseBody['url'])
      return {k:union(v,{'ib_prcode':k,'ib_brcode':branch}) for k,v in inventory.items()}
    else :
      logging.error(responseBody)
      return responseBody

In [42]:
from dataclasses import dataclass
from dataclasses_json import dataclass_json
from random import randrange
from datetime import datetime
import boto3


## generate dummy data for testing

In [43]:
%%time
#Dummy Data
numberOfRows = 10
@dataclass_json
@dataclass
class Inventory:
  ib_prcode:str
  ib_brcode:str
  ib_cf_qty:str
  new_ib_vs_stock_cv:str

sampleLargeRandomInput = [ Inventory.from_dict({
    'ib_brcode' : str(randrange(1000,1030,1)),
    'ib_prcode' : str(randrange(10000,100000,1)),
    'ib_cf_qty' : str(randrange(-10,1000,1)),
    'new_ib_vs_stock_cv' : str(randrange(-10,1000,1))
  }).to_dict() for _ in range(numberOfRows)]
sampleLargeRandomInput[0]

CPU times: user 5.74 ms, sys: 1.26 ms, total: 7 ms
Wall time: 14.3 ms


{'ib_prcode': '54904',
 'ib_brcode': '1020',
 'ib_cf_qty': '30',
 'new_ib_vs_stock_cv': '85'}

## create main object

In [44]:
sdk = InventorySdk(user=USER, pw=PW, branchName = 'dev-manual')

## Upload the batch data using s3

### SampleData

In [45]:
sampleInput =  [
               {
                  'cprcode': '0000009',
                  'brcode': '1000',
                  'ib_cf_qty': '50',
                  'new_ib_vs_stock_cv': '27'
                },
               {
                  'cprcode': '0000004',
                  'brcode': '1000',
                  'ib_cf_qty': '35',
                  'new_ib_vs_stock_cv': '33'
               },
                {
                  'cprcode': '0000003',
                  'brcode': '1003',
                  'ib_cf_qty': '36',
                  'new_ib_vs_stock_cv': '33'
               }
              ]

def getDf(input_:dict):
  return pd.DataFrame(input_)
  
df = getDf(sampleInput)
df

Unnamed: 0,cprcode,brcode,ib_cf_qty,new_ib_vs_stock_cv
0,9,1000,50,27
1,4,1000,35,33
2,3,1003,36,33


### uploading to s3

In [47]:
#export
@add_method(InventorySdk)
def uploadDf(self, df:pd.DataFrame, key:str ='1000', invApi:str = '2y9nzxkuyk')->bin:
  def getPresignedUrl(invApi = invApi, key = key):
    url = f'https://{invApi}.execute-api.ap-southeast-1.amazonaws.com/Prod/presign'
    r:requests.Response = requests.post(url, json = { "key": key } )
    return r.json()
  def dfToByte(df:pd.DataFrame):
    tempIo = BytesIO()
    inputByte = df.to_feather(tempIo)
    return tempIo.getvalue()
  def uploadFile(inputByte:bin, key=key):
    presigned = getPresignedUrl(key=key)
    print('signed url is ')
    printDict(presigned)
    files = {'file': (key , BytesIO(inputByte))}
    r = requests.post(url = presigned['url'], data = presigned['fields'] , files = files)
    return r
  
  ##### main 
  inputByte = dfToByte(df)
  r = uploadFile(inputByte, key = key)
  return r

In [None]:
#hide
sdk.uploadDf(df, invApi = '2y9nzxkuyk')

signed url is 
url : https://in
fields
 key : 1000
 AWSAccessKeyId : ASIAVX4Z5T
 x-amz-security-token : IQoJb3JpZ2
 policy : eyJleHBpcm
 signature : aDeTypH/LL


<Response [204]>

In [50]:
#hide
pd.read_csv('sampleData/inventory.csv',index_col=0).reset_index(drop=True)
sdk.uploadDf(df, invApi = '2y9nzxkuyk')

signed url is 
url : https://in
fields
 key : 1000
 AWSAccessKeyId : ASIAVX4Z5T
 x-amz-security-token : IQoJb3JpZ2
 policy : eyJleHBpcm
 signature : dI1a0e2mtU


<Response [204]>

### trigger ingestion

In [51]:
#export
@add_method(InventorySdk)
def ingestData(self, functionName= 'trigger-ingestion-dev-manual', key='1000', dtype='feather'):
  lambda_:Lambda = self.lambdaClient
  result = lambda_.invoke(functionName=functionName, input=Event.getInput({'key':key, 'dtype':dtype}))
  return result

In [52]:
sdk.ingestData(key = '1000')

{'body': '{"cprcode":{"0":"0000009","1":"0000004","2":"0000003"},"brcode":{"0":"1000","1":"1000","2":"1003"},"ib_cf_qty":{"0":"50","1":"35","2":"36"},"new_ib_vs_stock_cv":{"0":"27","1":"33","2":"33"}}',
 'statusCode': 200,
 'headers': {'Access-Control-Allow-Headers': '*',
  'Access-Control-Allow-Origin': '*',
  'Access-Control-Allow-Methods': '*'}}

### The whole flow

In [55]:
%%time
key = 'test'
r = sdk.uploadDf(df, key = key)
if r.status_code >= 400: raise Exception(r.json())
sdk.ingestData(key = key)

signed url is 
url : https://in
fields
 key : test
 AWSAccessKeyId : ASIAVX4Z5T
 x-amz-security-token : IQoJb3JpZ2
 policy : eyJleHBpcm
 signature : YmbHC9NOOM
CPU times: user 32.3 ms, sys: 0 ns, total: 32.3 ms
Wall time: 422 ms


{'body': '{"cprcode":{"0":"0000009","1":"0000004","2":"0000003"},"brcode":{"0":"1000","1":"1000","2":"1003"},"ib_cf_qty":{"0":"50","1":"35","2":"36"},"new_ib_vs_stock_cv":{"0":"27","1":"33","2":"33"}}',
 'statusCode': 200,
 'headers': {'Access-Control-Allow-Headers': '*',
  'Access-Control-Allow-Origin': '*',
  'Access-Control-Allow-Methods': '*'}}

## Query a branch

In [None]:
#export
@add_method(InventorySdk)
def branchQuery(self, brcode:str, cprcodes:List[int]=[])->pd.DataFrame:
  lambda_: Lambda =self.lambdaClient
  payload = Event.getInput({
    'brcode': brcode,
    'cprcodes': [int(i) for i in cprcodes],
    'format' : 'feather'
  })
  rawReturn = lambda_.invoke(functionName=self.endpoint.queryBranch(), input = payload)
  if rawReturn.get('errorMessage'):
    raise Exception(rawReturn)
  parsedReturn = Response.parseBody(rawReturn)
  return pd.read_feather(parsedReturn['url'])
  

In [None]:
sdk.branchQuery('1000', ['0000047'])

Unnamed: 0,cprcode,brcode,ib_cf_qty,new_ib_vs_stock_cv
0,47,1000,10,10


## Query All

In [None]:
#export
@add_method(InventorySdk)
def queryAll2(self, format_ = 'feather'):
  functionName = self.endpoint.queryAll2()
  r =  self.lambdaClient.invoke(
      functionName = functionName, input = Event.getInput({'format': format_ } ))
  if r['statusCode'] > 300 :
    raise Exception(f'error getting database url {r}')
  body = Response.parseBody(r)
  url = body['url']
  print('succesfully get url, returning pandas')
  if format_ == 'feather':
    return pd.read_feather(url)
  return pd.read_json(url, orient='split', dtype= 'str')
  

In [None]:
%%time
sdk.queryAll2()

succesfully get url, returning pandas
CPU times: user 16.1 ms, sys: 6 µs, total: 16.2 ms
Wall time: 247 ms


Unnamed: 0,cprcode,brcode,ib_cf_qty,new_ib_vs_stock_cv
0,1234,test,123,123
1,12345,test,345,345
2,4,1000,35,33
3,9,1000,39,39
4,12,1000,39,39
5,26,1000,9,9
6,28,1000,13,13
7,33,1000,7,7
8,36,1000,22,22
9,38,1000,143,143


In [None]:
%%time
sdk.queryAll2(format_='json')

succesfully get url, returning pandas
CPU times: user 19.5 ms, sys: 4.54 ms, total: 24.1 ms
Wall time: 212 ms


Unnamed: 0,cprcode,brcode,ib_cf_qty,new_ib_vs_stock_cv
0,1234,test,123,123
1,12345,test,345,345
2,4,1000,35,33
3,9,1000,39,39
4,12,1000,39,39
5,26,1000,9,9
6,28,1000,13,13
7,33,1000,7,7
8,36,1000,22,22
9,38,1000,143,143


In [None]:
%%time
result = sdk.queryAll()
list(iter(result.items()))[:2]

ResourceNotFoundException: An error occurred (ResourceNotFoundException) when calling the Invoke operation: Function not found: arn:aws:lambda:ap-southeast-1:394922924679:function:query-all-inventory-dev-manual

## Query single product 2

In [None]:
#export 
@add_method(InventorySdk)
def querySingleProduct2(self, cprcode = '1234'):
  res:pd.DataFrame = self.queryAll2()
  if res.empty: raise Exception('database not found')
  return res[res['cprcode']==cprcode]

In [None]:
sdk.querySingleProduct2('1234')

succesfully get url, returning pandas


Unnamed: 0,cprcode,brcode,ib_cf_qty,new_ib_vs_stock_cv
0,1234,test,123,123


## schema
```
key:str # key is ib_prcode
  ib_cf_qty: int
  new_ib_bs_stock_cv: int
  lastUpdate: float
  ib_brcode: str
  ib_prcode: str
```