# Data Engineering for Hourly Productivity Dashboard

- Step 1: Collecting hourly datas (product info, time data, pick details) from readonly db
- Step 2: Transforming datas into daily and hourly format
- Step 3: Uploading transformed datas into staging db
- Step 4: Scheduling to perform the task in every hour

In [None]:
#Import Libraries
import psycopg2
import pandas as pd
import numpy as np
import time
import os
import datetime
from datetime import timedelta
from sqlalchemy import create_engine
from apscheduler.schedulers.blocking import BlockingScheduler

In [None]:
#Enter Date Time
datetime_format = '%Y-%m-%d %H:%M:%S'
#start_date='2021-03-25 09:00:00'
#end_date='2021-03-26 08:59:59'
#start_date='2021-04-06 09:00:00'
#end_date='2021-04-07 08:59:59'

In [None]:
#ProductDeatils
ProductDetailsQuery=('''
select productid, brandid, productcatid, subcatid
from (
select productid, brandid, productcatid, subcatid 
,row_number()over (partition by productid order by subcatid) as rno
from orderworkflow.fc_productdetails
) unique_productid
where rno=1
''')

In [None]:
#Getting product details exclusively
def get_product_detail():
    
    conn = psycopg2.connect(
        host="13.234.208.5",
        database="india_reportsdata",
        user="username",
        password="password")
    
    start_time = time.time()
    df_product=pd.read_sql(ProductDetailsQuery,conn)
    #print("--- {} seconds  in get_product_data---".format ((time.time() - start_time)))
    
    return df_product;
    #df.head(10)

In [None]:
time_query=('''
SELECT emailaddress, status,  actiontime, logid 
    , 'whracks10' as warehouse
    , CASE When modulename in ('wave_picking', 'orderbasepicking', 'b2c_orderbasepicking', 'b2b_orderbasepicking'
							   , 'mark_bin_close', 'rack_wise_wave_picking') Then 'Picking'
           When modulename in ('qc_shipping', 'qc-shipping') Then 'QC'
           When modulename in ('b2c_pick_completed_orders', 'b2c_not_found_list') then 'ConsoPicking'
           When modulename in ('b2c_consolidate_orders', 'b2b_consolidate_orders', 'b2c_add_excess_items', 
							 'b2b_add_excess_items', 'b2b_consolidate_orders_b2b') then 'ConsoRacking'
           When modulename in ('bin/box_packing','b2b_packing') then 'Packing'
           Else 'Others' End as process
  	,lag(status) OVER (Partition by emailaddress ORDER BY logid) as prev_to
	FROM whracks10.userworkdetails_log
	Where actiontime between '2020-11-05 00:00:00' and '2020-11-05 11:59:59'
    AND status in ('in', 'out') 
	AND modulename in ('wave_picking', 'orderbasepicking', 'b2c_orderbasepicking', 'b2b_orderbasepicking'
							   , 'mark_bin_close', 'rack_wise_wave_picking'
                        ,'qc_shipping', 'qc-shipping'
                        ,'b2c_pick_completed_orders', 'b2c_not_found_list'
                        ,'b2c_consolidate_orders', 'b2b_consolidate_orders', 'b2c_add_excess_items', 
							 'b2b_add_excess_items', 'b2b_consolidate_orders_b2b'
                        ,'bin/box_packing','b2b_packing')
	Order by process, emailaddress, logid
''')

In [None]:
def get_datetime():
      #StartDate
    #Establish Connecttion
    conn1 = psycopg2.connect(
        host="13.232.130.170",
        database="whracks2",
        user="username",
        password="password")
    
    df=pd.read_sql('''SELECT max(timestamp) FROM "Master Hourly Productivity Data" ''',conn1)
    tm=df.iloc[0]['max']
    tmm=tm + datetime.timedelta(hours = 1)
    start_time = datetime.datetime.strptime(str(tmm), '%Y-%m-%d %H:%M:%S')
    start_date=str(start_time)
    
    #End date
    end_date=datetime.datetime.now()
    end_date=datetime.datetime.strptime(str(end_date),'%Y-%m-%d %H:%M:%S.%f')\
                 .replace(minute=0,second=0,microsecond=0)
    end_date=end_date-datetime.timedelta(seconds = 1)
    end_date=str(end_date)
    
    return start_date,end_date;


In [None]:
def read_query(query,warehouse_id, start_date, end_date):
    
    conn = psycopg2.connect(
            host="13.234.208.5",
            database="india_reportsdata",
            user="username",
            password="password")
    
    whracks='whracks'+warehouse_id
    whracksarchive='whracksarchive'+warehouse_id
    result=query.replace('2020-11-05 00:00:00',start_date)
    result=result.replace('2020-11-05 11:59:59',end_date)
    result=result.replace('whracks10',whracks)
    result=result.replace('whracksarchive10',whracksarchive)
    result=pd.read_sql(result,conn)
    return result;

In [None]:
def get_time_data(start_date,end_date):    

    #Get Time Data
    wh=['2','7','9','10','11','12']
    df=pd.DataFrame()
    start_time = time.time() #To know execution time
    
    start_date,end_date=start_date,end_date

    for wh in wh:
        df1=read_query(time_query,wh,start_date,end_date)
        df=df.append(df1)
        #print("--- {} seconds in WH {} get_time_data---".format ((time.time() - start_time),wh))


    #Remove consecutive 'in' 'out'
    df=df[df.status!=df.prev_to].drop(['logid','prev_to'],axis=1)

    #Create hourly timestamp 
    df['timestamp']=[datetime.datetime.strptime(str(x),datetime_format)\
                     .replace(minute=0,second=0)\
                    for x in df['actiontime']]

    #Calculate in_time out_time in sec
    df['diff_in_sec']=df['actiontime']-df['timestamp']
    df['diff_in_sec']=(df['actiontime']-df['timestamp']).dt.total_seconds()

    #Creating Groupby to calculate total working seconds
    time_gb=df.assign(
         in_time = np.where(df['status']=='in',df.diff_in_sec,0),
         out_time = np.where(df['status']=='out',df.diff_in_sec,0)
         ).groupby(['warehouse','process','timestamp','emailaddress']).agg({'in_time':sum, 'out_time':sum}).reset_index()

    time_gb['working_sec']=time_gb.out_time-time_gb.in_time

    #For negative working_sec need to add time interval
    interval_time=3600 #Here its 1 hr ie 3600 sec
    time_gb.loc[time_gb.working_sec<0,'working_sec']=[x+interval_time for x in time_gb.working_sec if x<0]

    #Drop Unnecessary columns
    time_gb.drop(['in_time','out_time'],axis=1,inplace=True)

    return time_gb;

In [None]:
pick_detail_query=('''
Select "emailaddress","businesstype", "quantity", "processquantity", "notfoundquantity"																
	             ,productid,lastmodifieddate,'Picking' as process, 'whracks10' as warehouse
                 FROM whracksarchive10.orderitemsrackpickdetails																
    WHERE ("processquantity" >0 OR "notfoundquantity" >0)																
        AND (																
        	taskid IN(SELECT distinct "taskid"															
                    FROM whracksarchive10.taskmaster																
                    WHERE "tasktype" = 'Picking'																
                    AND   (																
                    ("startdate" between '2020-11-05 00:00:00' and '2020-11-05 11:59:59' or  "enddate" between '2020-11-05 00:00:00' and '2020-11-05 11:59:59')																
                    OR ('2020-11-05 00:00:00' between "startdate" and enddate or '2020-11-05 11:59:59' between "startdate" and enddate)																
                 			)													
        				)												
		)														
        AND "lastmodifieddate" < '2020-11-05 11:59:59'																
        AND "lastmodifieddate" >= '2020-11-05 00:00:00' 
        
UNION ALL

Select "emailaddress","businesstype", "quantity", "processquantity", "notfoundquantity"																
	             ,productid,lastmodifieddate,'Picking' as process, 'whracks10' as warehouse
                 FROM whracksarchive10.orderitemsrackpickdetails																
    WHERE ("processquantity" >0 OR "notfoundquantity" >0)																
        AND (																
        	taskid IN(SELECT distinct "taskid"															
                    FROM whracks10.taskmaster																
                    WHERE "tasktype" = 'Picking'																
                    AND   (																
                    ("startdate" between '2020-11-05 00:00:00' and '2020-11-05 11:59:59' or  "enddate" between '2020-11-05 00:00:00' and '2020-11-05 11:59:59')																
                    OR ('2020-11-05 00:00:00' between "startdate" and enddate or '2020-11-05 11:59:59' between "startdate" and enddate)																
                 			)													
        				)												
		)														
        AND "lastmodifieddate" < '2020-11-05 11:59:59'																
        AND "lastmodifieddate" >= '2020-11-05 00:00:00' 

''')

In [None]:
conso_pick_detail_query=('''
Select "emailaddress","businesstype", "quantity", "processquantity", "notfoundquantity"																
	             ,productid,lastmodifieddate,'ConsoPicking' as process, 'whracks10' as warehouse
                 FROM whracksarchive10.orderitemsrackconsodetails																
    WHERE ("processquantity" >0 OR "notfoundquantity" >0)																
        AND (																
        	taskid IN(SELECT distinct "taskid"															
                    FROM whracksarchive10.taskmaster																
                    WHERE "tasktype" = 'ConsoOut'																
                    AND   (																
                    ("startdate" between '2020-11-05 00:00:00' and '2020-11-05 11:59:59' or  "enddate" between '2020-11-05 00:00:00' and '2020-11-05 11:59:59')																
                    OR ('2020-11-05 00:00:00' between "startdate" and enddate or '2020-11-05 11:59:59' between "startdate" and enddate)																
                 			)													
        				)												
		)														
        AND "lastmodifieddate" < '2020-11-05 11:59:59'																
        AND "lastmodifieddate" >= '2020-11-05 00:00:00' 
        
UNION ALL

Select "emailaddress","businesstype", "quantity", "processquantity", "notfoundquantity"																
	             ,productid,lastmodifieddate,'Picking' as process, 'whracks10' as warehouse
                 FROM whracksarchive10.orderitemsrackconsodetails																
    WHERE ("processquantity" >0 OR "notfoundquantity" >0)																
        AND (																
        	taskid IN(SELECT distinct "taskid"															
                    FROM whracks10.taskmaster																
                    WHERE "tasktype" = 'ConsoOut'																
                    AND   (																
                    ("startdate" between '2020-11-05 00:00:00' and '2020-11-05 11:59:59' or  "enddate" between '2020-11-05 00:00:00' and '2020-11-05 11:59:59')																
                    OR ('2020-11-05 00:00:00' between "startdate" and enddate or '2020-11-05 11:59:59' between "startdate" and enddate)																
                 			)													
        				)												
		)														
        AND "lastmodifieddate" < '2020-11-05 11:59:59'																
        AND "lastmodifieddate" >= '2020-11-05 00:00:00' 

''')

In [None]:
conso_rack_detail_query=('''
Select "emailaddress","businesstype", "quantity", "processquantity", "notfoundquantity"																
	             ,productid,lastmodifieddate,'ConsoRacking' as process, 'whracks10' as warehouse
                 FROM whracksarchive10.orderitemsrackpickdetails																
    WHERE ("processquantity" >0 OR "notfoundquantity" >0)																
        AND (																
        	taskid IN(SELECT distinct "taskid"															
                    FROM whracksarchive10.taskmaster																
                    WHERE "tasktype" = 'Conso'
                    AND taskname in ('B2CExcessRackIn', 'B2BExcessRackIn'
									 , 'B2BConsoRackIn', 'B2CConsoRackIn')
                    AND   (																
                    ("startdate" between '2020-11-05 00:00:00' and '2020-11-05 11:59:59' or  "enddate" between '2020-11-05 00:00:00' and '2020-11-05 11:59:59')																
                    OR ('2020-11-05 00:00:00' between "startdate" and enddate or '2020-11-05 11:59:59' between "startdate" and enddate)																
                 			)													
        				)												
		)														
        AND "lastmodifieddate" < '2020-11-05 11:59:59'																
        AND "lastmodifieddate" >= '2020-11-05 00:00:00' 
        
UNION ALL

Select "emailaddress","businesstype", "quantity", "processquantity", "notfoundquantity"																
	             ,productid,lastmodifieddate,'Picking' as process, 'whracks10' as warehouse
                 FROM whracksarchive10.orderitemsrackpickdetails																
    WHERE ("processquantity" >0 OR "notfoundquantity" >0)																
        AND (																
        	taskid IN(SELECT distinct "taskid"															
                    FROM whracks10.taskmaster																
                    WHERE "tasktype" = 'Conso'
                    AND taskname in ('B2CExcessRackIn', 'B2BExcessRackIn'
									 , 'B2BConsoRackIn', 'B2CConsoRackIn')
                    AND   (																
                    ("startdate" between '2020-11-05 00:00:00' and '2020-11-05 11:59:59' or  "enddate" between '2020-11-05 00:00:00' and '2020-11-05 11:59:59')																
                    OR ('2020-11-05 00:00:00' between "startdate" and enddate or '2020-11-05 11:59:59' between "startdate" and enddate)																
                 			)													
        				)												
		)														
        AND "lastmodifieddate" < '2020-11-05 11:59:59'																
        AND "lastmodifieddate" >= '2020-11-05 00:00:00' 

''')

In [None]:
QC_detail_query=('''
Select "emailaddress","businesstype", "quantity", "processquantity", "notfoundquantity"																
	             ,productid,lastmodifieddate,'QC' as process, 'whracks10' as warehouse
                 FROM whracksarchive10.orderitemsrackqcdetails																
    WHERE ("processquantity" >0 OR "notfoundquantity" >0)																
        AND (																
        	taskid IN(SELECT distinct "taskid"															
                    FROM whracksarchive10.taskmaster																
                    WHERE "tasktype" = 'QCShipping'																
                    AND   (																
                    ("startdate" between '2020-11-05 00:00:00' and '2020-11-05 11:59:59' or  "enddate" between '2020-11-05 00:00:00' and '2020-11-05 11:59:59')																
                    OR ('2020-11-05 00:00:00' between "startdate" and enddate or '2020-11-05 11:59:59' between "startdate" and enddate)																
                 			)													
        				)												
		)														
        AND "lastmodifieddate" < '2020-11-05 11:59:59'																
        AND "lastmodifieddate" >= '2020-11-05 00:00:00' 
        
UNION ALL

Select "emailaddress","businesstype", "quantity", "processquantity", "notfoundquantity"																
	             ,productid,lastmodifieddate,'Picking' as process, 'whracks10' as warehouse
                 FROM whracksarchive10.orderitemsrackqcdetails																
    WHERE ("processquantity" >0 OR "notfoundquantity" >0)																
        AND (																
        	taskid IN(SELECT distinct "taskid"															
                    FROM whracks10.taskmaster																
                    WHERE "tasktype" = 'QCShipping'																
                    AND   (																
                    ("startdate" between '2020-11-05 00:00:00' and '2020-11-05 11:59:59' or  "enddate" between '2020-11-05 00:00:00' and '2020-11-05 11:59:59')																
                    OR ('2020-11-05 00:00:00' between "startdate" and enddate or '2020-11-05 11:59:59' between "startdate" and enddate)																
                 			)													
        				)												
		)														
        AND "lastmodifieddate" < '2020-11-05 11:59:59'																
        AND "lastmodifieddate" >= '2020-11-05 00:00:00' 

''')

In [None]:
Packing_detail_query=('''
Select "emailaddress","businesstype", "quantity", "processquantity", "notfoundquantity"																
	             ,productid,lastmodifieddate,'Packing' as process, 'whracks10' as warehouse
                 FROM whracksarchive10.packingbinitemdetails																
    WHERE ("processquantity" >0 OR "notfoundquantity" >0)																
        AND (																
        	taskid IN(SELECT distinct "taskid"															
                    FROM whracksarchive10.taskmaster																
                    WHERE "tasktype" = 'Packing'																
                    AND   (																
                    ("startdate" between '2020-11-05 00:00:00' and '2020-11-05 11:59:59' or  "enddate" between '2020-11-05 00:00:00' and '2020-11-05 11:59:59')																
                    OR ('2020-11-05 00:00:00' between "startdate" and enddate or '2020-11-05 11:59:59' between "startdate" and enddate)																
                 			)													
        				)												
		)														
        AND "lastmodifieddate" < '2020-11-05 11:59:59'																
        AND "lastmodifieddate" >= '2020-11-05 00:00:00' 
        
UNION ALL

Select "emailaddress","businesstype", "quantity", "processquantity", "notfoundquantity"																
	             ,productid,lastmodifieddate,'Picking' as process, 'whracks10' as warehouse
                 FROM whracksarchive10.packingbinitemdetails																
    WHERE ("processquantity" >0 OR "notfoundquantity" >0)																
        AND (																
        	taskid IN(SELECT distinct "taskid"															
                    FROM whracks10.taskmaster																
                    WHERE "tasktype" = 'Packing'																
                    AND   (																
                    ("startdate" between '2020-11-05 00:00:00' and '2020-11-05 11:59:59' or  "enddate" between '2020-11-05 00:00:00' and '2020-11-05 11:59:59')																
                    OR ('2020-11-05 00:00:00' between "startdate" and enddate or '2020-11-05 11:59:59' between "startdate" and enddate)																
                 			)													
        				)												
		)														
        AND "lastmodifieddate" < '2020-11-05 11:59:59'																
        AND "lastmodifieddate" >= '2020-11-05 00:00:00' 

''')

In [None]:
def get_pick_data(start_date,end_date):
    #Get Pick Data
    wh=['2','7','9','10','11','12']
    df=pd.DataFrame()
    start_time = time.time() #To know execution time
    start_date,end_date=start_date,end_date

    for wh in wh:
        df1=read_query(pick_detail_query,wh,start_date,end_date)
        df=df.append(df1)
        #print("--- {} seconds in WH {} Picking---".format ((time.time() - start_time),wh))

        df1=read_query(conso_pick_detail_query,wh,start_date,end_date)
        df=df.append(df1)
        #print("--- {} seconds in WH {} ConsoPicking---".format ((time.time() - start_time),wh))

        df1=read_query(conso_rack_detail_query,wh,start_date,end_date)
        df=df.append(df1)
        #print("--- {} seconds in WH {} ConsoRacking---".format ((time.time() - start_time),wh))

        df1=read_query(QC_detail_query,wh,start_date,end_date)
        df=df.append(df1)
        #print("--- {} seconds in WH {} QC---".format ((time.time() - start_time),wh))

        df1=read_query(Packing_detail_query,wh,start_date,end_date)
        df=df.append(df1)
        #print("--- {} seconds in WH {} Packing---".format ((time.time() - start_time),wh))
        
    return df;

In [None]:
def create_productivity_rawdata(df_product, time_gb, df):

    #Create hourly timestamp 
    df['timestamp']=[datetime.datetime.strptime(str(x),datetime_format)\
                     .replace(minute=0,second=0)\
                    for x in df['lastmodifieddate']]

    #Join Report with productdetails
    df= pd.merge(df, df_product, how ='left')

    #Creating Groupby to calculate pickdetails
    pick_gb=df.assign(
        Quantity = df.processquantity,
        totalnotfound = df.notfoundquantity,
        B2B_Qty = np.where(df['businesstype']=='B2B',df.processquantity,0),
        B2C_Qty = np.where(df['businesstype']=='B2C',df.processquantity,0),
        BabyGear_Qty= np.where(df['productcatid']=='7',df.processquantity,0),
        Consumable_Qty = np.where(df['productcatid']=='999',df.processquantity,0)
         ).groupby(['warehouse','process','timestamp','emailaddress']).agg({'Quantity':sum,
                                                                            'totalnotfound':sum,
                                                                            'B2B_Qty':sum,
                                                                            'B2C_Qty':sum,
                                                                            'BabyGear_Qty':sum,
                                                                            'Consumable_Qty':sum
                                                                           }).reset_index()
    #pick_gb

    #Join  pickdetails with time details
    df_final= pd.merge(pick_gb, time_gb, how='left')
    #null value in working_sec column should be filled with interval time ie 1 hr
    interval_time=3600#in sec
    df_final.working_sec.fillna(interval_time,inplace=True)
    #Qty having more than 1 picking should be awarded 1 hr
    df_final.loc[(df_final.Quantity>0 )& (df_final.working_sec==0),'working_sec']=interval_time

    df_final['warehouse']=df_final['warehouse'].replace('whracks2','CHKN')
    df_final['warehouse']=df_final['warehouse'].replace('whracks7','BLR')
    df_final['warehouse']=df_final['warehouse'].replace('whracks9','HRN')           
    df_final['warehouse']=df_final['warehouse'].replace('whracks10','PNQ')
    df_final['warehouse']=df_final['warehouse'].replace('whracks11','FRQ') 
    df_final['warehouse']=df_final['warehouse'].replace('whracks12','INDR')

    #Transforming Data into daily and hourly 
    df=df_final
    df['QTY']=df['Quantity']-df['Consumable_Qty']

    return df;

In [None]:
def create_final_report(df):
    #Creating Groupby by Hourly Wise
    df_hr=df.assign(
        QTY = df.Quantity-df.Consumable_Qty).groupby(['warehouse','process','timestamp']).agg({'QTY':sum,
                                                                                               'working_sec':sum,
                                                                                               'emailaddress':'nunique'               
                                                                           }).reset_index()

    df_hr['EPH']=df_hr['QTY']/df_hr['working_sec']*3600
    df_hr['Distinct_User']=df_hr['emailaddress']
    df_hr.drop(['working_sec','emailaddress'],axis=1, inplace=True)

    #Daily
    df['date'] = pd.to_datetime(df['timestamp']).dt.date

    #Creating Groupby by Date Wise
    df_dly=df.assign(
        QTY = df.Quantity-df.Consumable_Qty).groupby(['warehouse','process','date']).agg({'QTY':sum,
                                                                                               'working_sec':sum,
                                                                                               'emailaddress':'nunique'               
                                                                           }).reset_index()

    df_dly['Distinct_User']=df_dly['emailaddress']
    df_dly['EPH']=df_dly['QTY']/df_dly['working_sec']*3600
    df_dly.drop(['working_sec','emailaddress'],axis=1, inplace=True)
    
    return df_hr,df_dly;

In [None]:
def upload_productivity_report():
    
    start_time = time.time() #To know execution time
    
    start_date,end_date=get_datetime()
    print("Running Programme for start_date: {} and end_date: {} ".format (start_date,end_date))
    df_product=get_product_detail()
    time_gb=get_time_data(start_date,end_date)
    df=get_pick_data(start_date,end_date)
    df=create_productivity_rawdata(df_product,time_gb,df)
    df_hr,df_dly=create_final_report(df)
    
    #Establish Connecttion#This is for staging
    engine = create_engine('postgresql://username:password@XX.XXX.XXX.XXX/whracks2')

    #Feeding data into database

    df_dly.to_sql('Master Daily Productivity Data', engine,if_exists='append',index=False)

    df_hr.to_sql('Master Hourly Productivity Data', engine,if_exists='append',index=False)
    
    print("TOTAL EXECUTION TIME: {} seconds ".format ((time.time() - start_time)))
    #return df_hr,df_dly;
    

## Scheduler
#### Main Function - to execute all functions to upload productivity data into staging db

In [None]:
#Scheduling to perform the task in every hour
scheduler = BlockingScheduler()
scheduler.add_job(upload_productivity_report, 'interval', hours=1)
scheduler.start()

In [None]:
#Trial
#start_date,end_date=get_datetime()
#start_date