### Objective
The objective of the notebook is to calculate the trend and seasonality components using prophet at the level provided in the config(data_processing > feature_engineering > prophet_based > higher_level_si_trend_creation)

In [0]:
import yaml
import inspect
import glob
import numpy as np
import pandas as pd
from prophet import Prophet
from prophet.make_holidays import make_holidays_df
from distutils.command.config import config
from tqdm.auto import tqdm
from datetime import timedelta
from datetime import datetime
from sklearn.metrics import mean_absolute_error,mean_squared_error
import os
import logging
import dotsi

In [0]:
# Getting the default settings of hyperparameters
def get_default_args(func) -> dict:
    """Function to get the default values of the hyperparameters for the given algorithm

    Parameters
    ----------
    func : constructor of the respective algorithm
        The name of the algorithm (Eg: Prophet,SARIMAX)

    Returns
    -------
    dict
        returns a dictionary of hyperparameters and the corresponding default values for the given algorithm
    """
    
    signature = inspect.signature(func)
    return {
        k: v.default if v.default is not inspect.Parameter.empty else None
        for k, v in signature.parameters.items()
        if k != 'self'
    }

In [0]:
# Default values for the hyperparameters in Prophet
default_hpps = get_default_args(Prophet)
# default_hpps

#### Broadcast helper functions
These functions helps to persist the data in all the workers so that we can leverage them in UDF while distributed processing

In [0]:
def broadcast_holidays(
    config_holidays: dict,
    year_list: list =[2018, 2019, 2020, 2021, 2022],
    country_name: str ="US",
    holiday_lower_window: int =7,
    holiday_upper_window: int =7,
) -> pd.DataFrame:
    """Function to return the dataframe of holidays for the given time period using Prophet's make_holidays_df()

    Parameters
    ----------
    config_holidays : dict
        the additional list of holidays and its respective dates provided by the user in config file
    year_list : list, optional
        the list of years for which we need the holidays , by default [2018, 2019, 2020, 2021, 2022]
    country_name : str, optional
        Name of the country based on which holidays can be decided, by default "US"
    holiday_lower_window : int, optional
        lower limit of the window, by default 7
    holiday_upper_window : int, optional
        upper limit of the window, by default 7

    Returns
    -------
    pd.DataFrame
        Returns a dataframe of holidays for the given time period
    """
    holidays = make_holidays_df(year_list, country_name)
    # Add window
    holidays['lower_window'] = -holiday_lower_window
    holidays['upper_window'] = holiday_upper_window
    
    # Adding additional holidays
    if config_holidays is not None:
        for ad_hol in config_holidays.keys():
            temp_df = pd.DataFrame({'holiday':ad_hol,
                                    'ds': pd.to_datetime(config_holidays[ad_hol]['ds']),
                                    'lower_window': -holiday_lower_window,
                                    'upper_window': holiday_upper_window})
            holidays = pd.concat([holidays,temp_df])
    
    # Dropping duplicates if exists any
    holidays = holidays.drop_duplicates().reset_index(drop = True)
    return holidays

#### Processing Config file
Dependent variable, date variable, modeling granularity & other related modeling details are provided in the form of a config file.Each TS Algorithm and the related hyperparameter values to be tried should given in the config.yml file

In [0]:
%run ../0_Config.ipynb

In [0]:
# Create the algo directory for storing the results
output_directory = app_config['output_dir_path']
root_dir = "Data_Processing"
category = "higher_level_trend_si"
algo_path = os.path.join(output_directory,root_dir,category)
if not os.path.exists(algo_path):
    os.makedirs(algo_path)
    print(algo_path)

#### Broadcasting the required variables
Variables suffixed with "_conf" are taken from the config file

In [0]:
# hyperparameters_conf = dict(app_config["Algorithms"]["Prophet"]["Hyperparameters"])
hyperparameters_conf = default_hpps
# print(hyperparameters_conf)

granularity_conf = app_config["data_processing"]['feature_engineering']['prophet_based']["higher_level_si_trend_creation"]["granularity"]
modeling_granularity_conf = app_config["modeling_granularity"]
# print(granularity_conf)

# Rename Start date and DV config
dv_config = app_config["data_processing"]['feature_engineering']['prophet_based']["higher_level_si_trend_creation"]["sales_variable"]
ds_config = app_config["date_var"]

# Broadcasting
if app_config["data_processing"]["feature_engineering"]['prophet_based']["Holidays"]["include_holidays"] == True:
    aa = app_config["data_processing"]["feature_engineering"]['prophet_based']["Holidays"]
    holidays_broadcast = broadcast_holidays(aa['additional_holidays'],aa['years'],aa['country'],aa['holiday_lower_window'],aa['holiday_upper_window'])
    holidays_broadcast = dotsi.Dict({"value":holidays_broadcast})
else:
    holidays_broadcast = dotsi.Dict({"value":None})
    
broadcast_granularity = dotsi.Dict({"value":granularity_conf})
broadcast_hyper_parameters = dotsi.Dict({"value":hyperparameters_conf})
broadcast_future_weeks = dotsi.Dict({"value":app_config["data_processing"]["feature_engineering"]['prophet_based']["future_n_datapoints"]})

#### Pandas UDF for creating trend
The UDF gets executed in multiple worker nodes to parallelize the process. All the broadcasted variables are accessed within the UDF as and when required

In [0]:
def get_forecast_UDF(df_data: pd.DataFrame)-> pd.DataFrame:
    """Function to perform final model building using the train data and score on the test data utilizing the broadcasted details from the config file

    Parameters
    ----------
    df_data : pd.DataFrame
        The dataset containing values for all the required variables

    Returns
    -------
    pd.DataFrame
      Returns a dataframe with the granularity,date,independent variables contributions if any and performance metrics for the training and thetesting       set
    """
    try:
        df_data = df_data.sort_values(by=['ds'],ascending=True)

        # broadcast_granularity
        broadcast_gran = broadcast_granularity.value

        # Updating the default arguments with the parameters provided in the config
        hp_config = broadcast_hyper_parameters.value
        def_args = get_default_args(Prophet)
        for x in list(broadcast_hyper_parameters.value):
            def_args[x] = hp_config[x]
            
        if holidays_broadcast.value is not None:
            def_args["holidays"] = holidays_broadcast.value
            
        def_args['yearly_seasonality'] = True
        def_args['weekly_seasonality'] = True
        def_args['daily_seasonality'] = True

        # Calling the Prophet constructor with the hyperparameters of interest  
        m = Prophet(**def_args)
        m.fit(df_data)
        forecast_pd = m.predict(df_data)
        
        seasonal_cols = ['yearly','weekly','daily']
        seasonal_cols_zero = list(set(seasonal_cols) - set(forecast_pd.columns))
        forecast_pd[seasonal_cols_zero] = 0
        
        results_pd = forecast_pd[['ds', 'yhat', 'yhat_upper','yhat_lower','trend']+seasonal_cols]
        results_pd = pd.merge(results_pd, df_data[['y','ds']+broadcast_gran], how = "left",on = "ds")
        results_pd['future_weeks'] = 0
        
        # Infering frequency
        history_dates = pd.to_datetime(pd.Series(results_pd['ds'].unique(), name='ds')).sort_values()
        freq = pd.infer_freq(history_dates.tail(3))
        # returns None if inference failed
        if freq is None:
            raise Exception('Unable to infer `freq`')

        # making future week
        future_weeks = broadcast_future_weeks.value
        if(future_weeks>0):
            future = m.make_future_dataframe(periods=future_weeks, freq=freq, include_history=False)
            forecast = m.predict(future)
            forecast[seasonal_cols_zero] = 0
            forecast1 = forecast[['ds', 'yhat', 'yhat_upper','yhat_lower','trend']+seasonal_cols]
            forecast1['y'] = 0
            forecast1['future_weeks'] = 1
            fin_df = pd.concat([results_pd,forecast1],ignore_index=True)
        else:
            fin_df = results_pd.copy()
            
        fin_df[broadcast_gran] = fin_df[broadcast_gran].fillna(method='ffill')
        # To adhere to defined schema
        for x in broadcast_gran:   
            fin_df[x] = fin_df[x].astype(str)
            
        fin_df['status'] = 'success'
        return fin_df
    
    except Exception as e:
        results_pd = pd.DataFrame(columns = [['ds', 'y', 'yhat','yhat_upper','yhat_lower','trend','yearly','weekly','daily','future_weeks','status'] \
                                             + broadcast_granularity.value],index = range(1))
        results_pd[broadcast_granularity.value] = df_data[broadcast_granularity.value].head(1).reset_index(drop = True)
        for x in broadcast_granularity.value:
              results_pd[x] = results_pd[x].astype(str)
        results_pd['status'] = str(e)
        return results_pd

In [0]:
def data_loading(path):
    if os.path.isdir(path):  
        all_files = os.listdir(path)
        all_files_ext = []
        for file_name in all_files:
            all_files_ext.append(os.path.splitext(file_name)[1])
        df = pd.DataFrame()
        
        if(".csv" in all_files_ext):
            for file_name in all_files:
                if('.csv' in file_name):
                    df = pd.concat([df,pd.read_csv(path + "/" + file_name)], ignore_index = True)
            return df
        elif(".parquet" in all_files_ext):
            for file_name in all_files:
                if('.parquet' in file_name):
                    df = pd.concat([df,pd.read_parquet(path + "/" + file_name, engine='pyarrow')], ignore_index = True)
            return df    
        else:
            assert False, "Only .csv or .parquet file types are supported"
        
    elif os.path.isfile(path):  
        file_type = os.path.splitext(path)[1]
        if(".csv" == file_type):
            df = pd.read_csv(path)
            return df
        elif(".parquet" == file_type):
            df = pd.read_parquet(path, engine='pyarrow')
            return df
        else:
            assert False, "Only .csv or .parquet file types are supported"
    else:  
        assert False, 'Path specified is not correct'

#### Load data

In [0]:
if(app_config["data_processing"]["outlier_treatment_needed"] == True):
    # Reading the latest input file based on timestamp
    all_files = [file for file in os.listdir(app_config['output_dir_path']+"/Data_Processing/Outlier_treatment")]
    outlier_op_files = [file for file in all_files if "Outlier_treatment_results (" in file]
    outlier_op_files = [file.replace(".csv","") for file in outlier_op_files]
    version_dates = [datetime.strptime(x.split('(')[1].replace(')',''), '%Y-%m-%d-%H-%M-%S') for x in outlier_op_files]
    max_date = max(version_dates)
    max_date = max_date.strftime('%Y-%m-%d-%H-%M-%S')
    req_file_name = [x for x in outlier_op_files if max_date in x]
    outlier_op_file_path = os.path.join(app_config['output_dir_path']+"/Data_Processing/Outlier_treatment",req_file_name[0] + ".csv")
    # print(outlier_op_file_path)

    # Reading the data
    df = pd.read_csv(outlier_op_file_path)
else:
    df = data_loading(app_config["input_file_path"])
    
# print(df.shape)
df.rename(columns = {ds_config:"ds", dv_config:"y"}, inplace = True)
df['ds'] = pd.to_datetime(df['ds'],format = app_config["date_format_pandas"])
df[modeling_granularity_conf] = df[modeling_granularity_conf].astype(str)
    

start_date= app_config["data_processing"]['feature_engineering']['prophet_based']["higher_level_si_trend_creation"]["start_date"]
end_date= app_config["data_processing"]['feature_engineering']['prophet_based']["higher_level_si_trend_creation"]["end_date"]

if((len(start_date)>1) & (len(end_date)>1)):
    start_date = datetime.strptime(start_date, app_config["date_format_pandas"])
    end_date = datetime.strptime(end_date, app_config["date_format_pandas"])
    print(df.shape)
    df = df[(df['ds']>=start_date) & (df['ds']<=end_date)]
else:
    start_date = df['ds'].min()
    end_date = df['ds'].max()

# filtering the products based on start and end date
gran = list(set(granularity_conf+modeling_granularity_conf))
temp = df.groupby(gran).agg(min_ds = ('ds','min'), max_ds = ('ds','max')).reset_index()  
temp2 = temp[(temp["min_ds"]==start_date) & (temp["max_ds"]==end_date)]

tot_pdts = temp.shape[0]
tot_sum = df['y'].sum()
df = df.merge(temp2,on=gran,how='right')
rem_sum = df['y'].sum()

print("Out of "+ str(tot_pdts) + " combinations, "+str(tot_pdts-temp2.shape[0]) + " combinations are getting dropped bcz of the date filters")
print("Dropped products conntributed around " + str(np.round((tot_sum-rem_sum)/tot_sum*100,1)) + "%")

# Aggregating the data at granularity level
df1 = df.groupby(granularity_conf + ['ds']).agg(y = ('y','sum')).reset_index()

df1['gran_tempp'] = df1[granularity_conf].astype(str).sum(axis=1)
unique_pdts = df1['gran_tempp'].unique()
results_s = pd.DataFrame()
for pdt in unique_pdts:
	results_s = pd.concat([results_s,get_forecast_UDF(df1[df1['gran_tempp']==pdt])])
# display(results_s)

#### Exporting higher level trend results

In [0]:
results_s.to_csv(algo_path+"/higher_level_trend_si_results ("+datetime.today().strftime('%Y-%m-%d-%H-%M-%S')+").csv", index = False)