In [0]:
!pip install joblibspark
!pip install category_encoders

In [0]:
import pandas as pd
import numpy as np
from category_encoders.hashing import HashingEncoder
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import make_scorer
from sklearn.model_selection import RandomizedSearchCV
from sklearn.model_selection import TimeSeriesSplit
from sklearn.utils import parallel_backend  
from sklearn.inspection import permutation_importance
import warnings
warnings.filterwarnings("ignore")
from joblib import Parallel, delayed, parallel_backend, Memory
from joblibspark import register_spark
from sklearn.utils import parallel_backend  
from pyspark.sql.types import *
import datetime

In [0]:
def get_mat_level_data(df):  #takes in a spark dataframe
  #convert all col names to lowercase
  for col in df.columns:
      df = df.withColumnRenamed(col, col.lower())

  #convert to pandas for easier manip
  df = df.toPandas()

  #convert 'week' col into datetime object so can subset by date easier later
  df['week'] = pd.to_datetime(df['week'])

  #retain only cols required for modelling
  cols_to_drop = ['pl2_business_id', 'min_date', 'month', 'cust_lev6', 'trend']
  df.drop(cols_to_drop, axis='columns', inplace=True)
  
  #extract year 
  df['year'] = df['week'].apply(lambda x: str(x.year))
  
  #one-hot encode year col
  year_dummies_df = pd.get_dummies(df['year'])
  df = pd.concat([df, year_dummies_df], axis=1)
  df.drop(['year'], axis='columns', inplace=True)

  #extract week number of year
  df['week_num_in_year'] = df['week'].apply(lambda x: x.week)

  #hash encode week number since it is a high-cardinality (50+ levels) categorical variable
  he = HashingEncoder(n_components=30, cols=['week_num_in_year'])  #reduce dimensionality of newly created cols from 50+ to 30. see microsoft word doc documentation for motivation
  df = he.fit_transform(df)
  
  #groupby material first
  df_material = list(df.groupby('material'))

  #instantiate empty dfs
  all_promo = pd.DataFrame(columns=df.columns)
  all_nopromo = pd.DataFrame(columns=[col for col in df.columns if 'sales' not in col])
  
  for i in df_material:
    mat_df = i[1]
  
    #output 2 things: timeseries with promo and timeseries without promo
    with_promo = mat_df[mat_df['promo'] == 'YES'].reset_index(drop=True)
    without_promo = mat_df[mat_df['promo'] == 'NO'].reset_index(drop=True)
    with_promo.drop(['promo'], axis='columns', inplace=True)
    without_promo.drop(['promo'], axis='columns', inplace=True)

    #drop rows in with_promo that have all sales cols as null, move them to without_promo instead
    sales_cols = [col for col in df.columns if 'sales' in col]
    sales_cols_nulls = with_promo[sales_cols].isnull().apply(lambda row: row.sum(), axis=1)  
    rows_to_keep = sales_cols_nulls[sales_cols_nulls < len(sales_cols)].index
    rows_to_throw = sales_cols_nulls[sales_cols_nulls == len(sales_cols)].index

    without_promo = pd.concat([without_promo, with_promo.reindex(index=rows_to_throw)])
    with_promo = with_promo.reindex(index=rows_to_keep)

    #drop sales cols in without_promo since all are null
    without_promo.drop(sales_cols, axis='columns', inplace=True)
  
    #add to all_promo and all_no_promo
    all_promo = pd.concat([all_promo, with_promo])
    all_nopromo = pd.concat([all_nopromo, without_promo])
  
  #do groupby and aggregate
  groupby_cols = ['material', 'col_0', 'col_1', 'col_2', 'col_3', 'col_4', 'col_5', 'col_6', 'col_7',
       'col_8', 'col_9', 'col_10', 'col_11', 'col_12', 'col_13', 'col_14',
       'col_15', 'col_16', 'col_17', 'col_18', 'col_19', 'col_20', 'col_21',
       'col_22', 'col_23', 'col_24', 'col_25', 'col_26', 'col_27', 'col_28',
       'col_29', 'week', 'valentines', 'valentines_lag1', 'thanksgiving', 'thanksgiving_lag1',
       'thanksgiving_lag2', 'halloween', 'halloween_lag1', 'easter',
       'easter_lag1', 'easter_lag2', 'easter_lag3', 'christmas',
       'christmas_lag1', 'christmas_lag2', 'christmas_lag3',
       'independence_day', 'independence_day_lag1', 'independence_day_lag2',
       'independence_day_lag3', 'playoffs', 'playoffs_lag1', 'superbowl',
       'superbowl_lag1', 'newyear', 'm1', 'm2', 'm3', 'm4', 'm5', 'm6', 'm7',
       'm8', 'm9', 'm10', 'm11', 'm12', '2016', '2017', '2018', '2019',
       '2020', '2021', '2022', '2023']

  with_promo_agg = all_promo.groupby(groupby_cols).agg({col: 'sum' for col in all_promo.columns if ('order' in col) | ('sales' in col)}).reset_index()

  without_promo_agg = all_nopromo.groupby(groupby_cols).agg({col: 'sum' for col in all_nopromo.columns if 'order' in col}).reset_index()
  
  return list(with_promo_agg.groupby('material')), list(without_promo_agg.groupby('material'))

In [0]:
def get_all_predictions(biz_name, whether_promo, mat_timeseries_list):  
  location = f'{biz_name}_cachedir'
  memory = Memory(location=location, verbose=0)

  @memory.cache(verbose=0)  #store output of this function in memory for each i
  def for_i_in_all_timeseries_list(i):  
    material = i[0]
    timeseries_df = i[1]

    try:
      predictions_per_time_series = get_predictions_per_time_series(material, timeseries_df)
      
      if len(predictions_per_time_series) > 0:  #could have 0 rows then will get error when saving file to DBFS
        dbutils.fs.rm(f'dbfs:/FileStore/phase 2/all mat level/{biz_name}/{material}_{whether_promo}.csv', True)  #remove any file with same name alr there just in case, otherwise will get error when saving the file to DBFS
        spark.createDataFrame(predictions_per_time_series).coalesce(1).write.format('com.databricks.spark.csv').option('header', 'true').save(f'dbfs:/FileStore/phase 2/all mat level/{biz_name}/{material}_{whether_promo}.csv') 
        
      else:
        print(f'note {material}, {whether_promo}: no content to save!')

    except Exception as e:
      print(f'error: material={material}')
      print(e)

  #run the above helper fn in parallel
  with parallel_backend(backend='threading', n_jobs=-1):  
    Parallel(verbose=1000)(delayed(for_i_in_all_timeseries_list)(i) for i in mat_timeseries_list)

In [0]:
def get_predictions_per_time_series(material, timeseries_df):
  #create df skeleton to store predictions later
  all_predictions_df = pd.DataFrame(columns=['material', 'date', 'forecast week', 'lag weeks', 'actual', 'prediction'])
  
  #there are 11 forecast periods
  all_forecast_weeks = ['26 May 2019', '23 Jun 2019', '28 Jul 2019', '25 Aug 2019', '29 Sep 2019', '27 Oct 2019', '24 Nov 2019', '30 Dec 2019', '29 Dec 2019', '26 Jan 2020', '23 Feb 2020']
  
  #tune rf and select features to be used for all 11 forecasting periods for this timeseries. use the largest amnt of training data to do these
  overall_train = timeseries_df[timeseries_df['week'] < '13 Feb 2022']  #103 weeks after last forecast period of 23 Feb 2020
  overall_test = timeseries_df[timeseries_df['week'] >= '13 Feb 2022']  #overall_test not used, only instantiated bc clean_up_train_test() fn below needs it as argument too
  
  #clean up overall_train and overall_test for fitting rf
  overall_train, overall_test = clean_up_train_test(overall_train, overall_test)
    
  #tune rf
  initial_rf, opt_min_samples_leaf, opt_min_samples_split, opt_max_depth, initial_val_dpa = tune_rf(overall_train)

  #select features based on tuned rf
  features_selected_list, opt_rf = select_features(initial_rf, opt_min_samples_leaf, opt_min_samples_split, opt_max_depth, initial_val_dpa, overall_train) 
  
  #get 11 forecasts for this timeseries
  for week in all_forecast_weeks:  
    #subset data into training and test
    train = timeseries_df[timeseries_df['week'] < week]
    test = timeseries_df[(timeseries_df['week'] >= week) & (timeseries_df['week'] <= datetime.datetime.strptime(week, '%d %b %Y') + datetime.timedelta(weeks=103))]  #test set to obtain predictions for is 103 weeks ahead
    
    if (len(train) > 0) & (len(test) > 0): 
      #organize this forecast period's predictions to be added on to all_predictions_df later
      per_period_predictions_df = pd.DataFrame(columns=['material', 'date', 'forecast week', 'lag weeks', 'actual', 'prediction'])
      per_period_predictions_df['date'] = test['week']
      per_period_predictions_df['lag weeks'] = round(per_period_predictions_df['date'].apply(lambda date: (date-datetime.datetime.strptime(week, '%d %b %Y')).days / 7)) 
      per_period_predictions_df['actual'] = test['order_quantity']
      per_period_predictions_df.dropna(subset=['actual'], inplace=True)  #drop rows with null in actual cause we won't be predicting for them. note that having null in actual does not refer to 0 value, it refers to the actual NAs
      per_period_predictions_df['material'] = material
      per_period_predictions_df['forecast week'] = week

      #clean up train and test for fitting rf
      train, test = clean_up_train_test(train, test)

      #get predictions for test set
      predictions = get_predictions_per_forecast_period(train, test, opt_rf, features_selected_list)

      #add predictions to per_period_predictions_df
      per_period_predictions_df['prediction'] = predictions

      #add per_period_predictions_df to all_predictions_df
      all_predictions_df = pd.concat([all_predictions_df, per_period_predictions_df], axis=0)
      
    else:
      pass  
    
  return all_predictions_df

In [0]:
def clean_up_train_test(train, test):
  #remove all rows with no actual order quantity
  test.dropna(subset=['order_quantity'], inplace=True)  #we won't waste time predicting for rows without an actual to compare to
  train.dropna(subset=['order_quantity'], inplace=True)  #we can't learn from rows without target
    
  #drop cols with >40% of rows being nulls bc the col is useless
  for col in train.columns:
    if train[col].isnull().sum() >= 0.4*len(train):
      train.drop([col], axis='columns', inplace=True)
      test.drop([col], axis='columns', inplace=True)  #must drop for test also!

  if train.isnull().sum().sum() != 0:  #if there still exists nulls in train after doing the above
    train.fillna(method='ffill', axis='index', inplace=True)  #use forward fill imputation 
    if train.isnull().sum().sum() != 0:  #if there still exists nulls, which is the case if the very first value starts with null so forward fill imputation does not work
      train.fillna(method='bfill', axis='index', inplace=True)  #do backward fill imputation 
      if train.isnull().sum().sum() != 0:  ##if there still exists nulls, which is the case if both the very first value and the very last value start with null so both forward fill and backward fill imputation do not work
          train = train.apply(lambda col: col.fillna(col.mean()), axis=0)  #do mean imputation 
          
  #repeat the above for nulls in test set
  if test.isnull().sum().sum() != 0:  
    test.fillna(method='ffill', axis='index', inplace=True)
    if test.isnull().sum().sum() != 0:  
      test.fillna(method='bfill', axis='index', inplace=True)  
      if test.isnull().sum().sum() != 0:  
          test = test.apply(lambda col: col.fillna(col.mean()), axis=0)  
  
  #subset to only features needed for modelling
  train.drop(['material', 'week'], axis='columns', inplace=True)
  test.drop(['material', 'week'], axis='columns', inplace=True)
  
  return train, test

In [0]:
def tune_rf(train): 
  feature_cols = [col for col in train.columns if col != 'order_quantity']
  X_train = train[feature_cols]
  y_train = train['order_quantity']

  rf = RandomForestRegressor(n_estimators=300, random_state=15)  
  param_grid = {'min_samples_leaf': [2, 5, 8], 'min_samples_split': [5, 10, 15], 'max_depth': [5, 8, 11, 14, 17], 'criterion': ['mae', 'mse']}  #tune the most important hyperparameters in a random forest model  
  
  #make custom scoring metric: DPA
  def dpa(y_true, y_pred):
    abs_diff = np.abs(y_pred - y_true)
    if y_true.sum() != 0:
      return 1 - abs_diff.sum()/y_true.sum()
    else:
      return 1 - abs_diff.sum()

  dpa_scorer = make_scorer(dpa, greater_is_better=True)

  timeseries_cv = TimeSeriesSplit(n_splits=5)  
  rs_rf = RandomizedSearchCV(rf, param_grid, cv=timeseries_cv, scoring=dpa_scorer, refit=True, return_train_score=True, random_state=15, n_iter=150)

  register_spark()
  with parallel_backend('spark', n_jobs=20):  #can't put n_jobs too high bc alr got outer parallelization going on in get_all_predictions() function, will hit max spark worker capacity and obtain error
    rs_rf.fit(X_train, y_train)  

  #get best hyperparams
  best_params = rs_rf.best_params_   
  opt_min_samples_leaf = best_params['min_samples_leaf']
  opt_min_samples_split = best_params['min_samples_split']
  opt_max_depth = best_params['max_depth']
  opt_criterion = best_params['criterion']
  
  #get mean validation DPA 
  val_dpa = rs_rf.best_score_

  return rs_rf, opt_min_samples_leaf, opt_min_samples_split, opt_max_depth, opt_criterion, val_dpa  

In [0]:
def select_features(rs_rf, opt_min_samples_leaf, opt_min_samples_split, opt_max_depth, opt_criterion, initial_val_dpa, train):
  feature_cols = [col for col in train.columns if col != 'order_quantity']
  X_train = train[feature_cols]
  y_train = train['order_quantity']
  
  pi = permutation_importance(rs_rf, X_train, y_train, n_repeats=5, random_state=15)  
  all_imptances = list(pi.importances_mean)  #this result is not ordered, so we order it below
  feature_impt_dic = {}
  for num in range(len(all_imptances)):
    feature = X_train.columns[num]
    impt = all_imptances[num]
    feature_impt_dic[feature] = impt
  feature_impt_dic_sorted = dict(sorted(feature_impt_dic.items(), key=lambda tup: tup[1], reverse=True))

#test feature subsets of: (all features, 85, 75, 65, 55, 35)
  num_features_to_test = [85, 75, 65, 55, 35]
  
  #create df skeleton to store feature selection results
  feature_selection_results = pd.DataFrame(columns=['features selected', 'validation DPA'])

  for num_features in num_features_to_test:
    try:  #need try-except bc nopromo df might not have 85 features since it does not have any sales columns as features
      selected_features = list(feature_impt_dic_sorted.keys())[:num_features]
      num_total_features = len(X_train.columns)

      #tune rf on selected features
      selected_features.append('order_quantity')  #need 'selected_features' to have target to pass new train & test into tune_rf()
      val_dpa = train_rf_per_feature_subset(train[selected_features], opt_min_samples_leaf, opt_min_samples_split, opt_max_depth, opt_criterion)

      #add results to results df
      selected_features.pop()  #remove target from selected_features list that was just added from above
      feature_selection_results.loc[num_features] = [selected_features, val_dpa]  
      
    except Exception as e:  
      print(e)
      continue
      
  #last comparison: using no threshold = no feature selection at all
  feature_selection_results.loc[num_total_features] = [list(X_train.columns), initial_val_dpa]
    
  #decide which number of features is the best to use
  max_validation_DPA = feature_selection_results['validation DPA'].max() 
  
  #get rf and features selected associated with best threshold
  best_num_features = feature_selection_results.index[feature_selection_results['validation DPA'] == max_validation_DPA].tolist()[0]
  opt_rf = all_rfs[best_num_features]
  features_selected_list = feature_selection_results.loc[best_num_features]['features selected']
  
  return features_selected_list

In [0]:
def train_rf_per_feature_subset(train, opt_min_samples_leaf, opt_min_samples_split, opt_max_depth, opt_criterion):
  feature_cols = [col for col in train.columns if col != 'order_quantity']
  X_train = train[feature_cols]
  y_train = train['order_quantity']

  rf = RandomForestRegressor(n_estimators=300, random_state=15, criterion='mae')  
  param_grid = {'min_samples_leaf': [opt_min_samples_leaf], 'min_samples_split': [opt_min_samples_split], 'max_depth': [opt_max_depth], 'criterion': [opt_criterion]} 
  timeseries_cv = TimeSeriesSplit(n_splits=5) 

  #make custom scoring metric: DPA
  def dpa(y_true, y_pred):
    abs_diff = np.abs(y_pred - y_true)
    if y_true.sum() != 0:
      return 1 - abs_diff.sum()/y_true.sum()
    else:
      return 1 - abs_diff.sum()
  dpa_scorer = make_scorer(dpa, greater_is_better=True)
  
  rs_rf = RandomizedSearchCV(rf, param_grid, cv=timeseries_cv, scoring=dpa_scorer, refit=True, n_iter=1)  #use randomized search with only n_iter=1 because we have only 1 hyperparameter combination passed into the parameter grid for tuning. however, we want to use randomized search still because it has a built-in cross-validation function, so we can obtain the mean validation DPA for the feature subset we are testing. this mean validation DPA is more likely to be closer to the true validation DPA of using this feature subset on new unseen data hence we want to obtain it
  
  register_spark()
  with parallel_backend('spark', n_jobs=20):  #can't put n_jobs too high bc alr got outer parallelization going on in get_all_predictions() function, will hit max spark worker capacity
    rs_rf.fit(X_train, y_train)  
    
  #get mean validation DPA
  val_dpa = rs_rf.best_score_
  
  return val_dpa

In [0]:
def get_predictions_per_forecast_period(train, test, opt_min_samples_leaf, opt_min_samples_split, opt_max_depth, opt_criterion, features_selected_list):
  feature_cols = [col for col in train.columns if col != 'order_quantity']
  X_train = train[feature_cols]
  y_train = train['order_quantity']
  X_test = test[feature_cols]  
  
  #fit rf on training set
  rf = RandomForestRegressor(n_estimators=300, random_state=15, min_samples_leaf=opt_min_samples_leaf, min_samples_split=opt_min_samples_split, max_depth=opt_max_depth, criterion=opt_criterion)
  
  register_spark()
  with parallel_backend('spark', n_jobs=20):  #can't put n_jobs too high bc alr got outer parallelization going on in get_all_predictions() function, will hit max spark worker capacity
    rf.fit(X_train, y_train)  
    predictions = rf.predict(X_test[features_selected_list])
  
  return predictions

In [0]:
#check that all material have a file and the file has things in it
#run once for promo_mat_timeseries_list and once for nopromo_mat_timeseries_list

def check_get_all_predictions(biz_name, whether_promo, mat_timeseries_list):
  schema = StructType([StructField('material', IntegerType(), True), StructField('date', TimestampType(), True), StructField('forecast week', StringType(), True), StructField('lag weeks', DoubleType(), True), StructField('actual', DoubleType(), True), StructField('prediction', DoubleType(), True)])
  
  mat_errors = []
  for i in mat_timeseries_list:
    material = i[0]

    try: 
      df = spark.read.format("csv") \
        .schema(schema) \
        .option("header", "true") \
        .option("sep", ",") \
        .load(f"/FileStore/phase 2/all mat level/{biz_name}/{material}_{whether_promo}.csv").toPandas()   #tests that file exists
      
      df.iloc[0]  #tests that file contains required results if it exists

    except Exception as e:
      print(material)
      print(e)
      mat_errors.append(material)
    
  return mat_errors

In [0]:
#re-run materials that had errors above
#run once for promo_mat_timeseries_list and once for nopromo_mat_timeseries_list

def run_errors(biz_name, whether_promo, all_mat_errors, mat_timeseries_list):
  for material in all_mat_errors:
    for i in mat_timeseries_list:
      if i[0] == material:
        mat_df = i[1]
        break

    try:
        predictions_per_time_series = get_predictions_per_time_series(material, mat_df)

        if len(predictions_per_time_series) > 0:
          dbutils.fs.rm(f'dbfs:/FileStore/phase 2/all mat level/{biz_name}/{material}_{whether_promo}.csv', True)  #remove any file with same name alr there just in case, otherwise will get error when saving the file to DBFS
          spark.createDataFrame(predictions_per_time_series).coalesce(1).write.format('com.databricks.spark.csv').option('header', 'true').save(f'dbfs:/FileStore/phase 2/all mat level/{biz_name}/{material}_{whether_promo}.csv') 

        else:
          print(f'note {material}, {whether_promo}: no content to save!')
          continue

    except Exception as e:
      print(material)
      print(e)

In [0]:
def get_required_result(biz_name, promo_mat_timeseries_list, nopromo_mat_timeseries_list): 
  schema = StructType([StructField('material', IntegerType(), True), StructField('date', TimestampType(), True), StructField('forecast week', StringType(), True), StructField('lag weeks', DoubleType(), True), StructField('actual', DoubleType(), True), StructField('prediction', DoubleType(), True)])

  all_predictions = pd.DataFrame(columns=['material', 'date', 'forecast week', 'lag weeks', 'actual', 'prediction'])
  
  for timeseries in promo_mat_timeseries_list:
    material = timeseries[0]
    
    try:
      df = spark.read.format("csv") \
            .schema(schema) \
            .option("header", "true") \
            .option("sep", ",") \
            .load(f"/FileStore/phase 2/all mat level/{biz_name}/{material}_promo.csv").toPandas()

      all_predictions = pd.concat([all_predictions, df])
    except Exception as e:  #some exceptions are okay bc some materials have no content to be saved!
      print(material)
      print(e)
      
  for timeseries in nopromo_mat_timeseries_list:
    material = timeseries[0]
    
    try:
      df = spark.read.format("csv") \
            .schema(schema) \
            .option("header", "true") \
            .option("sep", ",") \
            .load(f"/FileStore/phase 2/all mat level/{biz_name}/{material}_nopromo.csv").toPandas()

      all_predictions = pd.concat([all_predictions, df])
    except Exception as e:  #some exceptions are okay bc some materials have no content to be saved!
      print(material)
      print(e)
    
  result = all_predictions.groupby(['material', 'date', 'forecast week', 'lag weeks'])[['actual', 'prediction']].apply(sum).reset_index(inplace=False)
      
  #save result to dbfs
  dbutils.fs.rm(f'dbfs:/FileStore/phase 2/all mat level/{biz_name}/predictions_final.csv', True)  #remove any file with same name alr there just in case, otherwise will get error when saving the file to DBFS
  spark.createDataFrame(result).coalesce(1).write.format('com.databricks.spark.csv').option('header', 'true').save(f'dbfs:/FileStore/phase 2/all mat level/{biz_name}/predictions_final.csv')
  
  return spark.createDataFrame(result)  #to save when displayed

In [0]:
#example of the flow of using the above functions, on USR business

biz_name = 'usr'

promo_mat_timeseries_list, nopromo_mat_timeseries_list =  get_mat_level_data(usr)  #note: usr is a spark dataframe of the file sales_history_sell_out_usr.csv   

get_all_predictions(biz_name, 'promo', promo_mat_timeseries_list)  
promo_mat_errors = check_get_all_predictions(biz_name, 'promo', promo_mat_timeseries_list)
run_errors(biz_name, 'promo', promo_mat_errors, promo_mat_timeseries_list)

get_all_predictions(biz_name, 'nopromo', nopromo_mat_timeseries_list)  
nopromo_mat_errors = check_get_all_predictions(biz_name, 'nopromo', nopromo_mat_timeseries_list)
run_errors(biz_name, 'nopromo', nopromo_mat_errors, nopromo_mat_timeseries_list)

result = get_required_result(biz_name, promo_mat_timeseries_list, nopromo_mat_timeseries_list)