In [1]:
%pip install -U data_utils
%pip install mlflow
%pip install -U catboost
%pip install -U rbfopt
%pip install -U prophet

In [2]:
from snowflake_client import SnowflakeClient
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
import mlflow
import mlflow.pyfunc
import mlflow.sklearn
from mlflow.models.signature import infer_signature
from mlflow.utils.environment import _mlflow_conda_env
import cloudpickle
import time
from databricks import koalas as ks
from catboost import CatBoostRegressor
from prophet import Prophet
from pyspark.sql.types import *
from pyspark.sql.functions import current_date
import matplotlib.pyplot as plt
# import pyspark.pandas as ps

In [3]:
client = SnowflakeClient('refined_spins')
pd.set_option('display.max_columns', 200)
pd.set_option('display.max_rows', None)

In [4]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
ip_total_us = client.query("SELECT * FROM IMMUTA.REFINED_SPINS.ABT_SPINS ") #and UPC = '00-17800-12596'
# WHERE MARKET_REGION = 'MARKET' AND market_region_name = 'PITTSBURGH, PA' and UPC ='00-17800-13483' 
ip_pandas_total_us = ks.DataFrame(ip_total_us)
ip_pandas = ip_pandas_total_us
ip_pandas = ip_pandas[ip_pandas['BRAND'] != 'PRIVATE LABEL']
ip_pandas['SPPD'].fillna(0, inplace=True)
ip_pandas = ip_pandas[ip_pandas['SPPD'] > 0]

In [5]:
prod = client.table("ABT_SPINS_PRODUCTS")
prodPandas = prod.toPandas()

del prodPandas['MAIN_CATEGORY']
del prodPandas['SUBCATEGORY']
del prodPandas['BRAND']
del prodPandas['DESCRIPTION']

In [6]:
class AlphaModel():
    def __init__(self, TARGET, LEVEL):
        # Example how to work with alpha's state:
        # self.observed_states = []
        # prod prep

        self.TARGET = TARGET
        self.LEVEL = LEVEL
#         self.params ={'iterations': 2000 }
        
        ip = ip_pandas[ip_pandas['SPPD'] > 0]
        ip['DISCOUNT_PERC'] = ip['DISCOUNT_PERC'].replace(np.nan, 0)
        self.ip = ip.merge(ks.DataFrame(prodPandas), how="left", on=['UPC'])
        
        #filtering the ip for the right level -- 
        if self.LEVEL == "TOTAL US":
          self.ip = self.ip[self.ip['MARKET_REGION'] == 'TOTAL US']
        elif self.LEVEL == "MARKET":
          self.ip = self.ip[self.ip['MARKET_REGION'] == 'MARKET']
        elif self.LEVEL == "REGION":
          self.ip = self.ip[self.ip['MARKET_REGION'] == 'REGION']   
        
        prod = prodPandas.filter(
            ['UPC', 'PRODUCT_TYPE', 'POSITIONING_GROUP', 'LABELED_ORGANIC', 'SUBCATEGORY','BRAND', 
             'COMPANY',  'UNIT_OF_MEASURE',  'PACKAGING_TYPE_PRIMARY', 'FORM', 'LABELED_NON_GMO', 
             'STORAGE', 'PACK_COUNT'])
        self.prod = prod

        

        # MERGING PRODUCT ATTRIBUTES
 
        self.ip['UPC'].fillna(0, inplace=True)     
        self.ip['C19_NYCASES'] = self.ip['C19_NYCASES'].fillna(value=0)
        self.ip['C19_NYCASES'] = self.ip['C19_NYCASES'].astype(float)
        self.ip['C19_DEATHS'] = self.ip['C19_DEATHS'].fillna(value=0)
        self.ip['C19_DEATHS'] = self.ip['C19_DEATHS'].astype(float) 
        self.ip['INITIAL_CLAIMS'] = self.ip['INITIAL_CLAIMS'].fillna(value=0)
        self.ip['INITIAL_CLAIMS'] = self.ip['INITIAL_CLAIMS'].astype(float)
        self.ip['TIMEPERIODENDDATE'] = ks.to_datetime(self.ip['TIMEPERIODENDDATE'])
        self.ip['WEEK']=self.ip['TIMEPERIODENDDATE'].dt.week
        self.ip['WEEK'] = self.ip['WEEK'].astype(int)
        self.ip['MONTH']=self.ip['TIMEPERIODENDDATE'].dt.month
        self.ip['MONTH'] = self.ip['MONTH'].astype(int)
        self.ip['YEAR']=self.ip['TIMEPERIODENDDATE'].dt.year
        self.ip['YEAR'] = self.ip['YEAR'].astype(int)
        
                 
         
        week_count= self.ip.groupby(by=['MARKET_REGION','MARKET_REGION_NAME','CHANNEL','UPC'],as_index=False)["WEEK"].count()
        week_count.rename(columns = {'WEEK':'WEEK_COUNT'}, inplace = True)
        
        self.ip=self.ip.merge(week_count,on=['MARKET_REGION','MARKET_REGION_NAME','CHANNEL','UPC'],how='left')
 
        self.ip = self.ip[self.ip['WEEK_COUNT'] > 12]
        

     
        print(f"reading prophet inputs for {self.LEVEL}")
        #load relevant file - for training
        if self.LEVEL == "TOTAL US":
          forecast=ks.DataFrame(client.table('carina_prophet_features_TOTALUS_deploy'))
        elif self.LEVEL == "MARKET":
          forecast=ks.DataFrame(client.table('carina_prophet_features_MARKET_deploy'))
        elif self.LEVEL == "REGION":
          forecast=ks.DataFrame(client.table('carina_prophet_features_REGION_deploy'))
    
        forecast=forecast.filter(items=['TIMEPERIODENDDATE','YHAT','MARKET_REGION','MARKET_REGION_NAME','CHANNEL','UPC','MIN_SPPD','MAX_SPPD'])


        
        forecast=forecast.set_index(['MARKET_REGION','MARKET_REGION_NAME','CHANNEL','UPC','TIMEPERIODENDDATE'])
        self.ip=self.ip.set_index(['MARKET_REGION','MARKET_REGION_NAME','CHANNEL','UPC','TIMEPERIODENDDATE'])
        
       


        

        print(self.ip.shape)
        self.ip = ks.merge(self.ip,forecast, left_index=True, right_index=True,how='left')   
        print(self.ip.shape)    

        #self.ip=self.ip.to_pandas()
  
        
        #fro deployment there is not testing - so everything goes in 
        
         

          
    def fit(self):

        cats = ['UPC',  'PRODUCT_TYPE', 'POSITIONING_GROUP', 'LABELED_ORGANIC', 'SUBCATEGORY','BRAND', 
                 'COMPANY',  'UNIT_OF_MEASURE',  'PACKAGING_TYPE_PRIMARY', 'FORM', 'LABELED_NON_GMO', 
                'STORAGE', 'PACK_COUNT', 'MARKET_REGION_NAME', 'MARKET_REGION','CHANNEL']
        self.cats = cats
        self.cols_prophet = ['UPC', 'BASE_PRICE', 'DISCOUNT_PERC', 'AVGPCTACV', 'AVGPCTACVDISPLAYONLY',
                    'AVGPCTACVFEATUREONLY','AVGPCTACVFEATUREANDDISPLAY', 'AVGPCTACVTPR', 'C19_NYCASES', 
                             'C19_DEATHS', 'INITIAL_CLAIMS', 'PACK_COUNT',  'PRODUCT_TYPE', 'POSITIONING_GROUP',
                             'LABELED_ORGANIC', 'SUBCATEGORY', 'BRAND', 'COMPANY',  'UNIT_OF_MEASURE',  
                             'PACKAGING_TYPE_PRIMARY', 'FORM', 'LABELED_NON_GMO', 'STORAGE',                 
                     'MARKET_REGION_NAME', 'MARKET_REGION','CHANNEL', 'YHAT', 'INFLATION_RATE_ALL_ITEMS']
        
        print("max dATE for training")

        self.ip=self.ip.to_pandas().reset_index()
        #self.ip['DISCOUNT_PERC']=self.ip['DISCOUNT_PERC'].fillna(0)
        #self.ip['DISCOUNT_PERC']=np.where(self.ip['DISCOUNT_PERC']<0,0,self.ip['DISCOUNT_PERC'])
        #self.ip['AVGPCTACVTPR']=np.where(self.ip['DISCOUNT_PERC']<=0,0,self.ip['AVGPCTACVTPR'])          
        self.ip['YHAT']=np.where(self.ip['YHAT']<0,self.ip['MIN_SPPD'],self.ip['YHAT'])
        self.ip['DISCOUNT_PERC']=self.ip['DISCOUNT_PERC'].fillna(0)
        self.ip['DISCOUNT_PERC']=np.where(self.ip['DISCOUNT_PERC']<0,0,self.ip['DISCOUNT_PERC'])
        self.ip['AVGPCTACVTPR']=np.where(self.ip['DISCOUNT_PERC']<=0,0,self.ip['AVGPCTACVTPR'])  
        #print(self.ip.TIMEPERIODENDDATE.max())

                
        Y = np.log(self.ip[self.TARGET])
        self.y = Y
        
        
        self.X_prophet = self.ip.filter(items=self.cols_prophet)
        self.X_prophet.fillna(0, inplace =True)
        for c in self.X_prophet.columns:
            col_type = self.X_prophet[c].dtype
            if col_type == 'object' or col_type.name == 'category':
                self.X_prophet[c] = self.X_prophet[c].astype('category')
            
            

        self.cat_features = cats
        print("model start")
        print("cats:", cats)
        print("self.X_prophet", self.X_prophet.columns)
        print("self.ip", self.ip.columns)
        self.model_base = CatBoostRegressor(cat_features= cats,
                               loss_function='MAPE',verbose=True)        
        self.model_base.fit(self.X_prophet, Y)      
        print("model end")
        
        #comment for deployment --- not required. 
     #   self.feature_importance_df=pd.DataFrame({"variable":self.model_base.feature_names_, "importance":self.model_base.feature_importances_})
        
      #  client.save(spark.createDataFrame(self.feature_importance_df), 'CARINA_REGION_featureImportance', 'OVERWRITE')
       
    #    print(self.feature_importance_df)

In [7]:
def predict(model, input_data, attributes):
    test = input_data.copy()
    prod = attributes


    #taking only required inputs for pred call --
   

    ## adding month year variables
    test['TIMEPERIODENDDATE'] = pd.to_datetime(test['TIMEPERIODENDDATE'], format = "%Y/%m/%d")
#     test['WEEK']= test['TIMEPERIODENDDATE'].dt.isocalendar().week
#     test['WEEK'] = test['WEEK'].astype(int)
#     test['MONTH']= test['TIMEPERIODENDDATE'].dt.month
#     test['MONTH'] = test['MONTH'].astype(int)
#     test['YEAR'] = test['TIMEPERIODENDDATE'].dt.year
#     test['YEAR'] = test['YEAR'].astype(int)
    
  

    #ADDING PRODUCT ATTRIBUTES

    prod = prod.filter(
      ['UPC', 'PRODUCT_TYPE', 'POSITIONING_GROUP', 'LABELED_ORGANIC',
       'COMPANY',  'UNIT_OF_MEASURE',  'PACKAGING_TYPE_PRIMARY', 'FORM', 'LABELED_NON_GMO',
       'STORAGE', 'PACK_COUNT'])



    test = test.merge(prod, how="left", on=['UPC'])

    #DATA IMPUTAIONS DONE IN THE TRAINING DATA
    test['C19_NYCASES'] = test['C19_NYCASES'].fillna(value=0)
    test['C19_NYCASES'] = test['C19_NYCASES'].astype(float)
    test['C19_DEATHS'] = test['C19_DEATHS'].fillna(value=0)
    test['C19_DEATHS'] = test['C19_DEATHS'].astype(float)
    test['INITIAL_CLAIMS'] = test['INITIAL_CLAIMS'].fillna(value=0)
    test['INITIAL_CLAIMS'] = test['INITIAL_CLAIMS'].astype(float)
    test['DISCOUNT_PERC'] = test['DISCOUNT_PERC'].replace(np.nan, 0)
    test['PACK_COUNT'] = test['PACK_COUNT'].astype(str)
    test.fillna(value=np.nan, inplace=True)
    test.fillna(0, inplace=True)

    for c in test.columns:
        col_type = test[c].dtype
        if col_type == 'object' or col_type.name == 'category':
            test[c] = test[c].astype('category')

    test['DISCOUNT_PERC']=test['DISCOUNT_PERC'].fillna(0)
    test['DISCOUNT_PERC']=np.where(test['DISCOUNT_PERC']<0,0,test['DISCOUNT_PERC'])
    test['AVGPCTACVTPR']=np.where(test['DISCOUNT_PERC']<=0,0,test['AVGPCTACVTPR'])    
    
    test = test.filter(items=['UPC', 'BASE_PRICE', 'DISCOUNT_PERC', 'AVGPCTACV', 'AVGPCTACVDISPLAYONLY',
                    'AVGPCTACVFEATUREONLY','AVGPCTACVFEATUREANDDISPLAY', 'AVGPCTACVTPR', 'C19_NYCASES', 
                             'C19_DEATHS', 'INITIAL_CLAIMS', 'PACK_COUNT',  'PRODUCT_TYPE', 'POSITIONING_GROUP',
                             'LABELED_ORGANIC', 'SUBCATEGORY', 'BRAND', 'COMPANY',  'UNIT_OF_MEASURE',  
                             'PACKAGING_TYPE_PRIMARY', 'FORM', 'LABELED_NON_GMO', 'STORAGE',                 
                     'MARKET_REGION_NAME', 'MARKET_REGION','CHANNEL', 'YHAT', 'INFLATION_RATE_ALL_ITEMS','MIN_SPPD'])

            
    pred = (model.predict(test))
    
    test['pred'] = np.exp(pred)
#     test['pred_1'] = np.exp(pred)

    #adding boundaries --

    test['pred']=np.where(test.pred<0,test['MIN_SPPD'],test.pred)
    #test['pred']=np.where(test.pred>test['MAX_SPPD'],test['MAX_SPPD'],test.pred)
    #test['pred']=np.where(test.pred<test['MIN_SPPD']*0.5,test['YHAT'],test.pred)
    test['pred']=np.where(test.AVGPCTACV==0,0,test.pred)
    test['pred']=np.where(test.pred<0,test['MIN_SPPD'],test.pred)


  


#       test['SPPD_PRED'] = test['pred']
#       test['SPPD_ACTUAL'] = test['SPPD']

#       deployment_output = test[['SUBCATEGORY','CHANNEL','MARKET_REGION_NAME','MARKET_REGION','UPC','BASE_PRICE','DISCOUNT_PERC',
#                                 'TIMEPERIODENDDATE', 'AVGPCTACV', 'SPPD_PRED', 'SPPD_ACTUAL', 'BRAND','YHAT']]
#       print(deployment_output.head())
#       client.save(spark.createDataFrame(deployment_output), 'CARINA_DEPLOYMENT_TOTALUS_allTrainingDeployTest', 'OVERWRITE')

    return test['pred'].values



#['TOTAL US', 'MARKET', 'REGION']
from data_utils.model import log_predict
for level in ['TOTAL US', 'MARKET', 'REGION']:
  model = AlphaModel('SPPD', level)
  model.fit()    
  model = model.model_base
  level = level.replace(' ', '')
  mlflow.set_experiment(f'/spins/baseline_model_{level}_sp')
  feature_importance_df=pd.DataFrame({"variable":model.feature_names_, "importance":model.feature_importances_})
  feature_importance_df.to_csv("/feature_importance_df.csv")
  
#   prophet = client.table(f"carina_prophet_features_{level}_deploy").filter("year(TIMEPERIODENDDATE) == 2022").toPandas()
  attributes = client.table("ABT_SPINS_PRODUCTS").toPandas()
  with mlflow.start_run(run_name=f'spins_refresh_baseline_model_{level}'):
    log_predict(predict, model, mlflow.catboost, artifacts={'attributes': attributes}, registered_model_name=f"spins_baseline_model_{level}_sp")
    mlflow.log_artifact("/feature_importance_df.csv")




In [8]:
# #need to finalize the form - its a attribute getting generated in the _init_ call.



# #comment for deploy -- 
# # prophet = client.table("carina_prophet_features_TOTALUS_deploy").toPandas()

# #ip_pandas should be predict call -- model_base_totalUS is the same model as called--- 

# ip_pandas = ip_pandas[ip_pandas['MARKET_REGION'] == 'TOTAL US'].toPandas()
# ip_pandas['WEEK_COUNT'] = ip_pandas.groupby(['MARKET_REGION','MARKET_REGION_NAME','CHANNEL','UPC'])['WEEK'].transform('count')
# ip_pandas = ip_pandas[ip_pandas['WEEK_COUNT'] > 12]


# #make sure the model is always the same -- 
# #commenting the pred call for generating the var importance - but need this for deployment ---- 

# y_pred = predict(model_base_totalUS, ip_pandas, prodPandas, prophet)
