The following script builds a FBProphet forecast for uplift modelling used for Geo based AB testing. In particular it is used for incrementality testing of marketing channels; marketing campaigns with the use of TV or other offline channels. 

The script takes as inputs:
- Forecast parameters filled in by the managers in the GoogleSheet file. Parameters are imported using Google API. 
- Historical data of at least 3 years for each forecast. The data is imported from connected database (Vertica in this case) using SQL builder. 
The script provides as outputs:
- Factual data for the given dates
- Foreacsted (simulated) data for the given dates
- Error of the model (accuracy)
- Visualisation


The script consists of the following: 
1. Libraries import 
2. Functions activation
3. Forecast input parameters downloading
4. Main code (one iteration for each forecast)
    1. Data uploading
    2. Data preprocessing
    3. Exogenous variable selection
    4. FBProphet parameters tuning 
    5. Model fitting 
    6. Visualisation 
5. Export of the results to DataBase 

# 1. Libraries import

In [None]:
# Supporting libraries
import pandas as pd
import numpy as np
pd.options.plotting.backend = "plotly"
import vertica_python
import inspect
import datetime 
from tqdm.notebook import tqdm
import ast
import re
import itertools
from plotly.subplots import make_subplots
import os
import json

# Google Sheets APIs (to download the forecast parameters)
import gspread

# FBProphet Libraries for forecasting & CausalImpact for covariates selection
import prophet as fp 
from prophet.plot import add_changepoints_to_plot
from prophet.diagnostics import cross_validation
from prophet.diagnostics import performance_metrics
from causalimpact import CausalImpact

# Writes the time of starting the script
with open('run_time.txt', 'a') as myFile:
    myFile.write('\n -----------------------------Started at ' + str(datetime.datetime.now())+' -----------------------------')

# Import Kats libraries
from kats.consts import TimeSeriesData

from kats.detectors.outlier import OutlierDetector
from kats.detectors.cusum_detection import CUSUMDetector

In [None]:
# import credentials for vertica access with password from access_file.txt
with open ('/home/aavoronkin/Projects/Forecasts/access_file.txt') as f:
    password = f.read()
credentials = {'host':'vertica-dwh-proxy',
 'port':5435,
 'user':'aavoronkin',               
 'password':password,
 'database':'DWH',
 'read_timeout':6000,
 'unicode_error':'strict',
 'ssl':False,
 'connection_timeout':6000}

# import regional_dict with locations and parent locations in order to later exclude common locations 
#  and parent_locations from covariates (e.g. exclude 'Омск' if 'Омская область' is already added)
regional_dict_query=inspect.cleandoc(
    '''
SELECT cl1.locationname as locationname,
       case when cl2.locationname = 'Россия' then cl1.locationname
            else cl2.locationname end as parentlocationname
FROM   dma.current_locations cl1 join dma.current_locations cl2
        ON cl1.parentlocation_id = cl2.location_id
WHERE  cl1.city_population_group != 'under_100k'
    ''')
with vertica_python.connect(**credentials) as connection:
    regional_dict = pd.read_sql(regional_dict_query, connection)
    
# Holidays table    
# import holdays table from dict.calendar and then transform it into table for FBProphet model
  
holidays = pd.read_csv('holidays.csv', parse_dates=['ds'])
holidays=holidays.set_index('ds', drop=False)

# adjust the holidays according to changes due-to covid
holidays.at[datetime.datetime(2021, 5, 1), 'upper_window'] = int(8)
holidays.at[datetime.datetime(2021, 11, 4), 'upper_window'] = int(3)
holidays.at[datetime.datetime(2021, 11, 4), 'lower_window'] = int(-4)
holidays=holidays.append({'holiday': 'covid', 'ds':datetime.date(2020, 3, 15), 
                 'lower_window':0, 'upper_window':145}, ignore_index=True)
holidays = holidays.append({'holiday': 'february_crisis', 'ds': datetime.date(2022, 2, 24),
                                    'lower_window': 0, 'upper_window': 56}, ignore_index=True)
holidays = holidays.append({'holiday': 'sep_22_draft', 'ds': datetime.date(2022, 9, 20),
                                    'lower_window': 0, 'upper_window': 21}, ignore_index=True)

# adjust NY holidays to start a little earlier
holidays.loc[holidays[holidays.holiday=='NY'].index,'lower_window']=-4

# 2. Functions activation

In [None]:
# Function that extracts the forecast parameters either from google sheets (custom_forecast_parameters) or from 
#  vertica (dict.custom_forecast_parameters)
def params_extractor():
    gc = gspread.service_account(filename='/home/aavoronkin/Projects/keys/mrk-cft-api-17cd0731d265.json')
    sh=gc.open_by_key('1pPHeWQuCLfUF_XAFaDXFAldNiaVtpLcwE5AsUOqOO68')
    data=sh.worksheet('custom_forecast_parameters').get()
    params_gs=pd.DataFrame(data[1:], columns=data[0])
    print("COMPLETE: Data copied")
    params_gs=params_gs.astype({'forecast_start_date': 'datetime64[ns]', 'forecast_end_date': 'datetime64[ns]',
                            'custom_sql': 'object'}) 
    return params_gs


# SQL-script builder function that uses dma.regional_ab_metrics and its metrics (list of metrics through link:
# http://stash.msk.avito.ru/projects/BI/repos/regional-ab-tests/browse/configs/legit_regional_ab_metrics_and_dimensions.json)
def reg_sql_builder(
    target_region,
    metric_name,
    vertical,
    log_cat,
    excluded_locations
    ):
    sql = '''select 
        event_date as ds
        '''
    sql = sql + ',case when locationname in (' + target_region \
        + ") then 'target' else locationname end as region"
    sql = sql \
        + '''
        ,sum(metric_value) as y
        from DMA.regional_ab_metrics ab
        join dma.current_locations cl using (location_id)
        where user_segment_market = 'any'
            and is_user_asd = 'any'
            and cl.isactive
            and cl.City_Population_Group != 'under_100k'
            and platform_id in (0,-1) 
            and event_date>=date_trunc('year',TIMESTAMPADD(year,-4,CURRENT_TIMESTAMP))::date
            '''
    if bool(re.search('ny',vertical))==False:
        sql = sql \
            + "and vertical_id in (select distinct vertical_id from dma.current_microcategories where vertical in (" \
            + vertical + "))"
    else:
        sql = sql + '\n            and vertical_id = -1'
    if  bool(re.search('ny',log_cat))==False:
        sql = sql \
            + '\n            and logical_category_id in (select distinct logical_category_id from dma.current_microcategories where logical_category in (' \
            + log_cat + '))'
    else:
        sql = sql + '\n            and logical_category_id = -1'
    sql = sql + "\n            and metric_name = '" + metric_name + "'"
    sql = sql \
        + '\n            and ab.location_id not in (select location_id from dma.current_locations where isactive and (ParentLocation_id in (select parentlocation_id from dma.current_locations where LocationName in (' \
        + target_region + ',' + excluded_locations \
        + ')) and locationname not in (' + target_region \
        + ') and ParentLocation_id!=2885))'
    sql = sql \
        + '\n            and cl.parentlocation_id not in (select location_id from dma.current_locations where locationname in (' \
        + target_region + ',' + excluded_locations + '))'
    if 'Санкт-Петербург' \
        in excluded_locations:
        sql = sql \
            + "\n            and locationname != 'Санкт-Петербург'"
    if 'Москва' \
        in excluded_locations:
        sql = sql \
            + "\n            and locationname != 'Москва'"
    sql = sql \
        + '''
        and locationname != 'все регионы'
 group by 1,2
        union all
        select 
        event_date as ds'''
    sql = sql + '\n        ,case when clp.locationname in (' \
        + target_region \
        + ") then 'target' else clp.locationname end as region"
    sql = sql \
        + '''
        ,sum(metric_value) as y
        from DMA.regional_ab_metrics ab
        join dma.current_locations cl using (location_id)
        join dma.current_locations clp on  cl.parentlocation_id=clp.location_id
        where user_segment_market = 'any'
            and is_user_asd = 'any'
            and cl.isactive
            and platform_id in (0,-1) 
            and event_date>=date_trunc('year',TIMESTAMPADD(year,-4,CURRENT_TIMESTAMP))::date
            '''
    if  bool(re.search('ny',vertical))==False:
        sql = sql \
            + "and vertical_id in (select distinct vertical_id from dma.current_microcategories where vertical in (" \
            + vertical + "))"
    else:
        sql = sql + '\n            and vertical_id = -1'
    if  bool(re.search('ny',log_cat))==False:
        sql = sql \
            + '\n            and logical_category_id in (select distinct logical_category_id from dma.current_microcategories where logical_category in (' \
            + log_cat + '))'
    else:
        sql = sql + '\n            and logical_category_id = -1'
    sql = sql + "\n            and metric_name = '" + metric_name + "'"
    sql = sql \
        + '\n            and ab.location_id not in (select location_id from dma.current_locations where isactive and (ParentLocation_id in (select parentlocation_id from dma.current_locations where LocationName in (' \
        + target_region + ',' + excluded_locations \
        + ')) and ParentLocation_id!=2885))'
    sql = sql \
        + "\n            and clp.locationname not in ('все регионы', 'Москва','Санкт-Петербург'," \
        + excluded_locations + ')'
    sql = sql + '''\n group by 1,2'''
    return sql


# SQL-script builder function that uses big10 and its metrics
def big10_sql_builder(metric_name,
        vertical,
        log_cat,
        ):
    
    sql = '''select
     metric_date as ds
     , 'target' as region
     '''
    sql = sql \
     + '''
    ,sum(metric_value) as y
    from dma.big10_abs
    where True
    and is_asd = -1
    and user_segment_market = 'any'
    and asd_user_group_id=-1
    and metric_date>=date_trunc('year',TIMESTAMPADD(year,-4,CURRENT_TIMESTAMP))::date'''
    sql = sql \
     + "\n    and  metric_name = '" \
     + metric_name + "'"
    sql = sql \
     + "\n and vertical_id in (select distinct vertical_id from dma.current_microcategories where vertical in (" \
     + vertical + "))"
    if  bool(re.search('ny',log_cat))==False:
        sql = sql \
         + "\n    and logical_category_id in (select distinct logical_category_id from dma.current_microcategories where logical_category in (" \
         + log_cat + "))"
    else:
        sql = sql + ''
    sql = sql + '''\n and target_type = 'fact' '''
    sql = sql + '''\n group by 1,2'''
    return sql

In [None]:
# Covariates (features) selection function based on CausalImpact algorithm
def ci_feat_selection (data,forecast_start_date,forecast_horizon, sign_level, num_vars):
    data_fs=data_train.copy(deep=True)
    # target variable should be placed in first column 
    data_fs.insert(0, 'y_',data_train.y)
    data_fs = data_fs.drop(columns=['y'])
    data_fs = data_fs.loc[:,data_fs.columns[data_fs.columns.str.contains('dumm')==False]]
    data_fs=data_fs.set_index('ds')
    # pre-period is train period, i.e. period from the earliest date in whole data till start of the campaign
    pre_period = [data_fs.index[0], forecast_start_date-datetime.timedelta(days=forecast_horizon+2)]
    # post-period is test period, i.e. period from the start of the campaign till its end
    post_period = [forecast_start_date-datetime.timedelta(days=forecast_horizon+1),
                  forecast_start_date-datetime.timedelta(days=1)] 
    ci = CausalImpact(
        data_fs, pre_period, post_period,
        model_args={"prior_level_sd":None, "alpha":0.01})
    ci.run()
    # build a dataframe with covariates as first column and their p-values (significance) as second
    pval_df = pd.DataFrame({'col_names':data_fs.columns[1:], 'pval':ci.model.pvalues[2:]}).reset_index(drop=True)
    # make a column locationname which deletes the prefix (e.g. 'x_Омск' -> 'Омск')
    pval_df['locationname']=pval_df['col_names'].apply(lambda x: x[3:])
    # include only the covariates with p-value of less than 5 percent
    pval_df=pval_df[pval_df.pval<sign_level]
    # drop the locations with the same parent location (in order to account for cointegration) and then sort the 
    # table from lowest p-value to highest and leave only top num_vars covariates
    pval_df=pval_df.merge(regional_dict, how='left', on='locationname').sort_values(by=['pval']).drop_duplicates(subset=['parentlocationname'])[:num_vars]
    sign_vars=list(pval_df.col_names)
    sign_vars
    return sign_vars
     

# Function that adds a dummy for after-Covid period 
def dummy_var(ds, first_date, last_date='2099-12-31'):
    first_date = datetime.datetime.strptime(first_date, "%Y-%m-%d")
    last_date = datetime.datetime.strptime(last_date, "%Y-%m-%d")
    if (ds >= first_date) & (ds <= last_date):
        return 1
    else:
        return 0
    

# Cross-valiadation function that uses window equal to forecast horizon. 
# Holidays may be excluded from error calculation. DF resulted from CV may be returned.
def cv_fbprophet(
    model,
    forecast_horizon,
    drop_holidays=False,
    generate_df=False
    ):
    df_cv = cross_validation(model, initial='365.25 days',
                             period='{} days'.format((forecast_horizon//2) if forecast_horizon>=90 else forecast_horizon),
                             horizon='{} days'.format(forecast_horizon),
                             parallel='processes')
    if drop_holidays:
        rows_to_drop = []
        date_list = []
        for j in range(len(holidays.ds)):
            numdays = holidays.upper_window[j] \
                - holidays.lower_window[j]
            upper_w = holidays.ds[j] \
                + datetime.timedelta(days=int(holidays.upper_window[j]))
            holiday_list = [upper_w - datetime.timedelta(days=x)
                            for x in range(numdays + 1)]
            date_list.append(holiday_list)
        rows_to_drop = [item for sublist in date_list for item in
                        sublist]
        df_cv = df_cv[df_cv.ds.isin(rows_to_drop) == False]
    df_cv['mape'] = abs(df_cv['y'] - df_cv['yhat']) / df_cv['y']    
    res = performance_metrics(df_cv, rolling_window=1)
    df_cv['y_horizon'] = 0
    df_cv['yhat_horizon'] = 0
    df_cv.loc[df_cv.index[np.arange(len(df_cv)) % forecast_horizon == 0],
              'y_horizon'] = df_cv.y.rolling(window=40).sum()
    df_cv.loc[df_cv.index[np.arange(len(df_cv)) % forecast_horizon == 0],
              'yhat_horizon'] = df_cv.yhat.rolling(window=40).sum()
    df_cv['mtape'] = abs(df_cv['y_horizon'] - df_cv['yhat_horizon']) \
        / df_cv['y_horizon']
    mtape = np.mean(df_cv['mtape'])        
    pape_80 = np.percentile(df_cv['mape'], 80)
    res['pape_80'] = pape_80
    res['mtape'] = mtape
    if generate_df == False:
        return res[['mape', 'mdape', 'pape_80', 'mtape']]
    else:
        return (res[['mape', 'mdape', 'pape_80', 'mtape']], df_cv)

# Function that checks the data to correspond to the parameters passed
def raw_data_checker(data_raw, target_region):
    if (custom_sql not in empty_list) or (bool(re.search('ny',target_region))):
        pass_check='pass'
    else:
        for target in target_region.split(','):
            target=re.sub(r'[^a-zA-Z0-9]', '', target)
            if target not in list(set(data_raw.region)):
                pass_check='pass'
            else:
                pass_check='fail_no_target'   
    return pass_check

# Function that quotes the data in order to plug it into SQL-builder
def quoter (ex):
    if ex is None:
        return ""
    else:
        ex=ex.split(',')
        ex=["'"+elem+"'" for elem in ex]
        s=','
        return s.join(ex) 
    
# Function that adds prefix to column names
def name_prefix_col (df, prefx):
    dict_col={}
    for col in list(df.columns):
        if col not in ['y','ds']:
            dict_col[col]=prefx+str(col)
    df=df.rename(columns=dict_col) 
    return df

def fix_outliers(df, start_ds, end_ds = None):
    start_ds=datetime.datetime.strptime(start_ds, '%Y-%m-%d')
    if end_ds==None:
        df = df[df.ds != start_ds].reset_index(drop=True)
    else:
        end_ds=datetime.datetime.strptime(end_ds, '%Y-%m-%d')
        df = df[(df.ds < start_ds)|(df.ds > end_ds)].reset_index(drop=True)
    return df

def target_preprocessing(df_orig,forecast_start_date, df_holidays=holidays,hist_w=45,scan_w=60,stp=15):
    df = df_orig.copy(deep=True)
    holidays_NY = holidays[holidays.holiday.str.contains('NY', regex=True)].reset_index(drop=True)
    df['holiday']=0
    length=len(df)
    for date in range(0,(length-1)):
        for j in range(len(holidays_NY.ds)):
            if (holidays_NY.ds[j]+ datetime.timedelta(days=int(holidays_NY.lower_window[j])))<=\
            df.ds[date]<=(holidays_NY.ds[j]+ datetime.timedelta(days=int(holidays_NY.upper_window[j]))):
                df.loc[date,'holiday']=1  
    holiday_index = df[df.holiday==1].index

    # outliers detection
    outlier_ts = df.rename(columns = {'y':'value', 'ds': 'time'})
    outlier_ts = TimeSeriesData(outlier_ts.loc[:,['time','value']])    
    ts_outlierDetection = OutlierDetector(outlier_ts, 'multiplicative', iqr_mult=2.5) # call OutlierDetector
    ts_outlierDetection.detector()
    ts_outliers_interpolated = ts_outlierDetection.remover(interpolate = True)
    ts_outliers_interpolated = ts_outliers_interpolated.to_dataframe()
    df['y'] = ts_outliers_interpolated['y_0']

    # changepoints detection
    ex = df[['ds','holiday','y']]
    ex['y'] = ex['y'].rolling(window=7,min_periods=1).mean()
    df_multi_cps = ex.rename(columns = {'ds':'time'})
    multi_cp_ts = TimeSeriesData(df_multi_cps.loc[:,['time','y']])
    historical_window = hist_w
    scan_window = scan_w
    step = stp
    changepoints = []
    n = len(df_multi_cps)
    for end_idx in range(historical_window + scan_window, n, step):
        if not holiday_index.isin(range(end_idx - (historical_window + scan_window), end_idx)).any(): 
            tsd = multi_cp_ts[end_idx - (historical_window + scan_window) : end_idx]
            changepoints += CUSUMDetector(tsd).detector(interest_window=[historical_window, historical_window + scan_window],
                                                       threshold=1e-4,delta_std_ratio=3)
    list_sign=[]
    for i in range(len(changepoints)):
        if len(changepoints) > 0:
            if (i!=len(changepoints)-1) & (changepoints[i].start_time<forecast_start_date):
                if ((changepoints[i+1].start_time-changepoints[i].start_time).days>14):
                    list_sign.append(i)
            elif (i==len(changepoints)-1) & (changepoints[i].start_time<forecast_start_date):
                list_sign.append(i)
    changepoints = list( changepoints[i] for i in list_sign)
    for i in range(0,len(changepoints)):
        if len(changepoints) > 0:
            df['dummy_cp_{}'.format(i)] = df['ds'].apply(lambda x: dummy_var(x,changepoints[i].start_time.strftime("%Y-%m-%d")))

    del df['holiday']
    return (df, changepoints)

def nan_fill(df):
    # Drop columns if they consist of NA for more than 50%
    df.dropna(axis='columns', thresh=df.shape[0]//2,inplace=True)
    
    # Drop columns if median value is below 100
    df = df.loc[:,['ds']+list(df.median()[(df.median()>100)].index)]    

    # Fill NA data with interpolated values since CausalImpact cant be implemented with missing values
    list_na=list(df.columns[df.isnull().any()])
    for col in list_na:
        df[col].interpolate(method='polynomial', order=2, inplace=True)
    list_na = list(df.columns[df.isnull().any()])
    for col in list_na:
        df[col].interpolate(method='backfill', inplace=True)
        df[col].interpolate(method='ffill', inplace=True)
    return df


# 3. Forecast input parameters downloading

In [None]:
# Create the table with Forecast initial parameters from google-sheet and before calculated parameters
# Import the parameters filled in by managers from Google sheet or vertica
params_gs = params_extractor()
params_gs=params_gs.replace([''], [None])

# Read the saved file with the FBProphet pre-determined parameters and pre-determined control locations
fbprophet_params=pd.read_csv('fbprophet_params_v5.csv')
# Change the type to match with Parameters from Google-Sheet
fbprophet_params=fbprophet_params.astype({'forecast_start_date': 'datetime64[ns]', 'forecast_end_date': 'datetime64[ns]',
                        'custom_sql': 'object'})
fbprophet_params = fbprophet_params.where(pd.notnull(fbprophet_params), None)
fbprophet_params = fbprophet_params.replace({np.nan: None})

# Join GS parameters with FBP parameters based on the keys 
keys=['target_location','metric_name', 'vertical', 'log_cat', 
      'excluded_locations','control_locations', 'custom_sql','forecast_name']
fbprophet_cols=['target_location','metric_name', 'vertical', 'log_cat','excluded_locations','control_locations', 
                'custom_sql','forecast_name','fbprophet_params','ci_control_locations','ci_covariates']
params_all=params_gs.merge(fbprophet_params[fbprophet_cols], how='left', on=keys)
params_all=params_all.where(pd.notnull(params_all), None)
params_all = params_all.replace({np.nan: None})
params_all=params_all.drop_duplicates().reset_index(drop=True)

non_regab_metrics=[ 'autoteka_reports',
 'cpa_calls',
 'installs',
 'new_buyers',
 'new_cookie_buyers',
 'new_cookie_buyers_28',
 'orders_confirmed',
 'paid_orders']

big10_metrics=['buyers']

big10_filter=(params_all.target_location=='Any') & (params_all.metric_name.isin(big10_metrics))
regab_filter=(params_all.target_location!='Any') & (params_all.metric_name.isin(non_regab_metrics)==False)
custom_sql_filter=params_all.custom_sql.isnull()==False
params_all=params_all[(custom_sql_filter | regab_filter | big10_filter)]
params_all=params_all.sort_values(by='fbprophet_params',na_position='first')

empty_list=[None,""," ","' '","''",np.nan,'nan']

current_date_filter = (datetime.datetime.today() > params_all['forecast_start_date']) \
    & (datetime.datetime.today() < params_all['forecast_end_date'] + datetime.timedelta(days=28))
empty_param_filter = ((params_all['fbprophet_params'].isin(empty_list)) & (datetime.datetime.today() > params_all['forecast_start_date']))
params_all_filter = current_date_filter|empty_param_filter


# 4. Main code (one iteration for each forecast)

In [None]:
indices_excluded=[]
indices2run=[elem for elem in list(params_all[params_all_filter].index) if elem not in indices_excluded]


# indices2run=[860]


In [None]:
# Create an empty DF to fill it with the forecasts 
res_agg = pd.DataFrame(columns = ['forecast_name', 'vertical', 'log_cat', 'target_region', 'ds', 'fact',
                                  'forecast', 'error_mape', 'error_pape80', 'start_date', 'end_date',
                                  'metric_name', 'download_time','error_mtape'])
error_type = 'mape'

# Iterate through each forecast parameter combination
print('Total of ', params_all[params_all_filter].shape[0], 'forecasts are being run.')
num_forecast=0
for (index, row) in params_all[params_all.index.isin(indices2run)].iterrows(): 
    num_forecast+=1
    upper_bound = row['forecast_end_date'] + datetime.timedelta(days=28)
    forecast_name = row['forecast_name']
    target_region = quoter(row['target_location'])
    # indicator if there are any control locations
    if row['target_location'] in ['Any', 'any']:
        control_location_ind = 0
    else:
        control_location_ind = 1
    forecast_start_date = row['forecast_start_date']
    forecast_end_date = row['forecast_end_date']
    metric_name = row['metric_name']
    vertical = quoter(row['vertical'])
    log_cat = quoter(row['log_cat'])
    excluded_locations = quoter(row['excluded_locations'])
    # if there are no control_locations set in Google Sheets, then control_locations are extracted from 
    # pre-calculated by CausalImpact locations (which are zero in case of 'Any' region or first calculation of forecast)
    if row['control_locations'] in empty_list:
        if row['ci_control_locations'] in empty_list:
            ci_control_locations_ind = 1
        else:
            ci_control_locations_ind = 0
            control_locations=row['ci_control_locations']
    else:
        ci_control_locations_ind = 0
        control_locations=row['control_locations']
    excluded_locations = quoter(row['excluded_locations'])
    platform = row['platform']
    custom_sql = row['custom_sql']
    # import the pre-calculated model_params in the form of dict
    if row['fbprophet_params'] in empty_list:
        model_params = None
    else:
        model_params = ast.literal_eval(row['fbprophet_params'])
    covariates_sql = row['covariates_sql']
    if covariates_sql in empty_list:
        covariates_ind = 0
    else:
        covariates_ind = 1
    ci_covariates=row['ci_covariates']
    #check if covariates have already been calculated before
    if row['ci_covariates'] in empty_list:
        ci_covariates_ind = 1
    else:
        ci_covariates_ind = 0

    print('Forecast with index: ', index,' (',num_forecast, ' out of ', params_all[params_all_filter].shape[0],') Forecast name: ', forecast_name, '; Log Cat: ',log_cat, '; Target region: ',target_region, 'Metric: ', metric_name)             

    try:


        print('--------------------------- Data Uploading -----------------------------')
        if custom_sql in empty_list:
            if (target_region == "'Any'" or target_region =='Any'):
                sql_query=big10_sql_builder(metric_name,vertical,log_cat)
            else:    
                sql_query=reg_sql_builder(target_region,metric_name,vertical,log_cat,excluded_locations)
        else:
            sql_query=custom_sql
        print('######## SQL query is built ########')
        # Download the data from vertica
        query = inspect.cleandoc(sql_query)
        with vertica_python.connect(**credentials) as connection:
            data_raw = pd.read_sql(query, connection, parse_dates=['ds'])
        # Download the covariates data from vertica
        if covariates_ind==1:
            query = inspect.cleandoc(covariates_sql)
            with vertica_python.connect(**credentials) as connection:
                covariates_data = pd.read_sql(query, connection, parse_dates=['ds'])
                # Rename covariates to co_{name of the covariate}                        
                covariates_data = name_prefix_col(covariates_data, 'co_')
        print('######## Data is extracted ########')        
        data_raw = data_raw.sort_values('ds').reset_index(drop = True)
        # Check the data for errors
        if raw_data_checker(data_raw,target_region)!='pass':
            print('Something is wrong with SQL query. Error: ', raw_data_checker(data_raw,target_region))
            status_txt="\nerror in forecast with Index: {index}, Forecast name: {forecast_name}; \
                        Log Cat: {log_cat}; Target region: {target_region}; Metric: {metric_name}".format(index = index,
                                                                              forecast_name=forecast_name,
                                                                              log_cat=log_cat,
                                                                              target_region=target_region,
                                                                              metric_name=metric_name)
            with open('run_time.txt', 'a') as myFile:
                myFile.write(status_txt + ' '+ str(raw_data_checker(data_raw,target_region)))
            pass

        print('--------------------------- Data Preprocessing -----------------------------')            
        # Create the pivot table out of raw_data (from [ds;value;regions] to [ds;value;region_1, region_2, ...]
        data=pd.pivot_table(data_raw,index=['ds'],columns=['region'],values=['y'],aggfunc=np.sum)
        data=data.y.reset_index()
        # Rename target region to y and rest of the regions to cl_{name of the region}        
        data=data.rename(columns={'target': "y"})
        data = name_prefix_col(data, 'cl_')
        # Merge covariates data with forecasted data
        if covariates_ind==1:        
            data=data.merge(covariates_data, how='left', on='ds')        

        # Drop columns with 50% of nulls and interpolate nulls in others
        data = nan_fill(data)                 

        # Subset the train data, i.e. data for period before campaign start
        data_train=data[data.ds<forecast_start_date]

        # Outliers and changepoints detection
        (data_train,changepoints) = target_preprocessing(data_train,forecast_start_date)   

        print('--------------------------- Exogenous Variables Selection -----------------------------')
        global forecast_horizon
        if (forecast_end_date-forecast_start_date).days > 90:
            forecast_horizon=90
        else:
            forecast_horizon=(forecast_end_date-forecast_start_date).days                
        sign_vars=[]
        if ((control_location_ind==1) or (covariates_ind==1)) and (len(data.columns)>2):
            if (ci_control_locations_ind==1) and (ci_covariates_ind==1):
                print('######## Causal Impact Running ########')
                sign_vars=ci_feat_selection(data_train, forecast_start_date, forecast_horizon, sign_level=0.01, num_vars=7)
                # store selected variables in the control_locations variable
                ci_covariates = []
                ci_control_locations=[]
                for control in sign_vars:
                    if 'cl_' in control:
                        ci_control_locations.append(control)
                    else:
                        ci_covariates.append(control)
                #save selected control_locations in params_all file                    
                if not ci_control_locations==False:
                    ci_control_locations = ','.join(ci_control_locations)
                    params_all.loc[index, 'ci_control_locations']=ci_control_locations
                if not ci_covariates==False:
                    ci_covariates = ','.join(ci_covariates)
                    params_all.loc[index, 'ci_covariates']=ci_covariates
            else:
                if ci_control_locations_ind==0:
                    sign_vars=sign_vars+['cl_'+elem if 'cl_' not in elem else elem for elem in control_locations.split(',')]
                if ci_covariates_ind==0:
                    sign_vars=sign_vars+ci_covariates.split(',')

        data_train['cap'] = max(data.y)*1.2
        data_train['floor'] = min(data.y)*0.8
        data['cap'] = max(data.y)*1.2
        data['floor'] = min(data.y)*0.8 
        data_train['after_february'] = data_train['ds'].apply(lambda x: dummy_var(x,'2022-04-24'))
        data['after_february'] = data['ds'].apply(lambda x: dummy_var(x,'2022-04-21'))
        data_train['after_corona'] = data_train['ds'].apply(lambda x: dummy_var(x,'2020-07-08'))
        data['after_corona'] = data['ds'].apply(lambda x: dummy_var(x,'2020-07-08'))
         

        print('--------------------------- Model Parameters Selection -----------------------------')
        if model_params in empty_list:
            param_grid_trend = {'changepoint_prior_scale':[0.05,0.1,0.5],
                                'changepoint_range':[0.8,0.9,0.95],
                               'growth':['linear']}
            param_grid_season = {'seasonality_mode':['multiplicative','additive'],
                               'seasonality_prior_scale':[0.01,0.1,1.0,10.0]}
            param_grid_holidays = {'holidays_prior_scale':[0.01,0.1,1.0,10.0]}
            param_list=[param_grid_season,param_grid_trend,param_grid_holidays]
            results = []
            param_set={}
            # Create combinations of parameters
            for param_grid in param_list:
                param_iter = itertools.product(*param_grid.values())
                params_list =[]
                for param in param_iter:
                    params_list.append(param)
                params_df = pd.DataFrame(params_list, columns=list(param_grid.keys()))
                for param in tqdm(params_df.values):
                    param_dict = dict(zip(params_df.keys(), param))
                    all_param = {**param_dict, **param_set}
                    m = fp.Prophet(**all_param, holidays = holidays)
                    m.add_regressor('after_corona', mode=all_param['seasonality_mode'])
                    m.add_regressor('after_february', mode=all_param['seasonality_mode'])
#                     m.add_regressor('after_draft', mode=all_param['seasonality_mode'])  
                    if (control_location_ind==1) or (covariates_ind==1):
                        for i in sign_vars:
                            m.add_regressor(i, mode=all_param['seasonality_mode'])
                    for i in range(0,len(changepoints)):
                        if len(changepoints) > 0:
                            m.add_regressor('dummy_cp_%s' %i, mode=all_param['seasonality_mode'])      
                    m.fit(data_train)
                    df_p = cv_fbprophet(m, forecast_horizon)
                    df_p = df_p[[error_type]]
                    print('params: ',all_param,'; error_mape: ',df_p[error_type][0])
                    df_p['params'] = str(all_param)
                    df_p = df_p.loc[:, ['params', error_type]]
                    results.append(df_p)
                    if df_p[error_type][0]<0.02:
                        break
                results_df = pd.concat(results).sort_values(by=[error_type]).reset_index(drop=True)
                best_params = ast.literal_eval(results_df.params[0])
                param_set = {**param_set,**best_params}
            params_res=param_set
            params_all.loc[index, 'fbprophet_params']=str(params_res)
            params_all.to_csv('fbprophet_params_v5.csv',index=False)
        else:
            params_res = model_params

        print('--------------------------- Model Fitting -----------------------------')
        lower_bound=forecast_start_date- datetime.timedelta(days=45)
        upper_bound=forecast_end_date+ datetime.timedelta(days=28)
        #Model fit
        m = fp.Prophet(holidays = holidays, **params_res)
        m.add_regressor('after_corona', mode=params_res['seasonality_mode'])
        m.add_regressor('after_february', mode=params_res['seasonality_mode'])          
        if (control_location_ind==1) or (covariates_ind==1):
            for i in sign_vars:
                m.add_regressor(i, mode=params_res['seasonality_mode'])
        for i in range(0,len(changepoints)):
            if len(changepoints) > 0:
                m.add_regressor('dummy_cp_%s' %i, mode=params_res['seasonality_mode'])
        m.fit(data_train)
        #Model forecast
        if datetime.datetime.today()<upper_bound:
            periods = (datetime.datetime.today() - forecast_start_date).days-1
        else:
            periods = (upper_bound - forecast_start_date).days
        future = m.make_future_dataframe(periods=periods)
        future['after_corona'] = future['ds'].apply(lambda x: dummy_var(x,'2020-07-08'))
        future['after_february'] = future['ds'].apply(lambda x: dummy_var(x,'2022-04-24'))       
        for i in range(0,len(changepoints)):
            if len(changepoints) > 0:
                future['dummy_cp_%s' %i] = future['ds'].apply(lambda x: dummy_var(x,changepoints[i].start_time.strftime("%Y-%m-%d")))
        if (control_location_ind==1) or (covariates_ind==1):        
            for i in sign_vars:
                future = future.merge(data[['ds']+[i]], on='ds',how ='left')
        for col in future.columns[(future.isnull().sum()>0)]:
            future[col] = future.set_index('ds')[col].interpolate(method='time').reset_index()[col]
        future['cap']=max(data.y)*1.2
        future['floor'] = min(data.y)*0.8

        forecast = m.predict(future)

        fig_fb = m.plot(forecast)
        a = add_changepoints_to_plot(fig_fb.gca(), m, forecast)

        res_cv=cv_fbprophet(m,forecast_horizon, drop_holidays=True)

        print('--------------------------- Results Preparation -----------------------------')
        forecast=forecast[['ds','yhat']]
        forecast = forecast.sort_values('ds').reset_index(drop = True)
        res_df = data.merge(forecast, how = 'right', on = 'ds')
        res_df=res_df[['ds','y','yhat']]
        res_df['error_mape']=np.round(res_cv['mape'][0],4)
        res_df['error_mtape']=np.round(res_cv['mtape'][0],4)
        res_df['error_pape80']=np.round(res_cv['pape_80'][0],4)
        res_df['start_date']=forecast_start_date
        res_df['end_date']=forecast_end_date
        res_df.insert(loc=0, column='forecast_name', value=forecast_name)
        res_df.insert(loc=1, column='vertical', value=vertical)
        res_df.insert(loc=2, column='log_cat', value=log_cat)
        res_df.insert(loc=3, column='target_region', value=target_region)
        res_df.insert(loc=4, column='metric_name', value=metric_name)    
        res_df=res_df.rename(columns={'y':'fact', 'yhat':'forecast'})
        res_df=res_df[(res_df.ds>lower_bound) & (res_df.ds<upper_bound)]
        # Store results in single DataFrame
        res_agg=res_agg.append(res_df).reset_index(drop=True)
        # Export first-time calculated parameters for later iterations           

    except Exception as exc:
        print(exc)
        print('error in forecast building')
        status_txt="\nerror ({exc}) in forecast with Index: {index}, Forecast name: {forecast_name}; \
            Log Cat: {log_cat}; Target region: {target_region}; Metric: {metric_name}".format(exc=exc,
                                                                                    index = index,
                                                                                  forecast_name=forecast_name,
                                                                                  log_cat=log_cat,
                                                                                  target_region=target_region,
                                                                                  metric_name=metric_name)
        with open('run_time.txt', 'a') as myFile:
            myFile.write(status_txt)
        pass

# Add the time of final calculation
res_agg['download_time']=datetime.datetime.now().replace(microsecond=0)      
col_order=['forecast_name', 'vertical', 'log_cat', 'target_region', 'ds', 'fact','forecast', 'error_mape', 'error_pape80', 'start_date', 'end_date','metric_name', 'download_time','error_mtape']
res_agg[col_order].to_csv('aav_custom_forecast_v5.csv', index=False)

# 5. Export of the results to DataBase

In [None]:
!sudo cp aav_custom_forecast_v5.csv /mnt/storage/staging/csv-delta/dict.marketing_custom_forecast[ds,forecast_name].csv

In [None]:
# Writes the time of starting the script
with open('run_time.txt', 'a') as myFile:
    myFile.write('\n ----------------------------- Finished at ' + str(datetime.datetime.now()) + ' -----------------------------')

# 6. Visualisation

In [None]:
import plotly.express as px
from plotly.subplots import make_subplots


res_agg_plot=res_agg[(res_agg.forecast_name=='goods_2023_03_business')&(res_agg.target_region.str.contains("Челябинская"))]


lower_bound=res_agg_plot.start_date- datetime.timedelta(days=30)
upper_bound=res_agg_plot.end_date+ datetime.timedelta(days=30)

campaign_dates_filter = (res_agg_plot['ds']>=res_agg_plot.start_date) & (res_agg_plot['ds']<=upper_bound)

pre_post_dates_filter = (res_agg_plot['ds']>=lower_bound) & (res_agg_plot['ds']<=upper_bound)

res_agg_plot['daily_uplift']=res_agg_plot['fact']-res_agg_plot['forecast']
res_agg_plot['cum_uplift_pre']=res_agg_plot[pre_post_dates_filter&campaign_dates_filter==False].daily_uplift.cumsum(axis = 0)
res_agg_plot['cum_uplift_post']=res_agg_plot[campaign_dates_filter].daily_uplift.cumsum(axis = 0)
res_agg_plot['daily_error']=res_agg_plot['error_mape']*res_agg_plot['forecast']
res_agg_plot['cum_error']=res_agg_plot[campaign_dates_filter].daily_error.cumsum(axis = 0)

res_agg_plot['cum_error_negative']=-res_agg_plot[campaign_dates_filter].daily_error.cumsum(axis = 0)

res_agg_plot=res_agg_plot[pre_post_dates_filter]

fig = make_subplots(rows=2, cols=1,shared_xaxes=True,subplot_titles=("Fact vs Forecast", "Uplift"))

fig.add_scatter(x=res_agg_plot['ds'], y=res_agg_plot['fact'], name='fact', row=1, col=1)
fig.add_scatter(x=res_agg_plot['ds'], y=res_agg_plot['forecast'], name='forecast', row=1, col=1)
# fig.add_scatter(x=res_agg_plot['ds'], y=res_agg_plot['fact_ly'], name='fact_ly', row=1, col=1)
fig.add_scatter(x=res_agg_plot['ds'], y=res_agg_plot['cum_uplift_pre'], name='total_uplift_pre', row=2, col=1)
fig.add_scatter(x=res_agg_plot['ds'], y=res_agg_plot['cum_uplift_post'], name='total_uplift_post', row=2, col=1)
fig.add_scatter(x=res_agg_plot['ds'], y=res_agg_plot['cum_error'],fill="tozeroy", name='total_error', row=2, col=1)
fig.add_scatter(x=res_agg_plot['ds'], y=res_agg_plot['cum_error_negative'],fill="tozeroy", name='total_error', row=2, col=1)


fig.add_vline(x=list(set(res_agg_plot.start_date))[0],  line_width=1, line_dash="solid", line_color="red",row=1, col=1)
fig.add_vline(x=list(set(res_agg_plot.end_date))[0],  line_width=1, line_dash="solid", line_color="red",row=1, col=1)
fig.add_vline(x=list(set(res_agg_plot.start_date))[0],  line_width=1, line_dash="solid", line_color="red",row=2, col=1)
fig.add_vline(x=list(set(res_agg_plot.end_date))[0],  line_width=1, line_dash="solid", line_color="red",row=2, col=1)

fig.update_layout(margin=dict(l=20, r=20, t=20, b=20))
fig.show()


