In [1]:
from __future__ import division, print_function
import os
import sys
import pandas as pd
from datetime import datetime, timedelta
import hashlib
import logging
import importlib
import numpy as np
import argparse

from pyspark.sql import functions as f, types as t
from pyspark.sql import SparkSession

NUM_PARTITIONS = 1024
spark = ( SparkSession
             .builder
             .config("spark.app.name","prophetPredictionConfig")
             .config("spark.yarn.queue","root.sp_analytics.spark")
             .config("spark.default.serializer","org.apache.spark.serializer.KryoSerializer")
             .config("spark.speculation","true")
             .config("spark.default.parallelism",str(NUM_PARTITIONS))
             .config("spark.sql.shuffle.partitions",str(NUM_PARTITIONS))
             .config("spark.yarn.executor.memoryOverhead", str(8192))
             .enableHiveSupport()
             .getOrCreate() )

sc = spark.sparkContext

In [11]:
account = "trivago"
run_date =str(datetime.today().date())
optimizer="random_search"
forecast_horizon=90
metric_class='GrossBookings'
eval_level = 'daily'
pos = 'All'
has_add_regressors = False
cv_type = "rolling_window"
start_date = "2015-01-01"
end_date = run_date

In [12]:
logging.basicConfig(level=logging.INFO,format="%(asctime)s %(levelname)-10s %(message)s")

In [13]:
current_dir = os.getcwd()
current_dir = current_dir[:current_dir.find('scripts')]
logging.info("current_dir is {0}".format(current_dir))
mod_file = os.path.join(current_dir, "forecast_lib/utils/zip_module.py")
sc.addPyFile(mod_file)

2019-06-07 14:37:39,634 INFO       current_dir is /home/lijia/git_tree/lin_jia/forecasting/daily_metrics/


In [14]:
mod_zipping = importlib.import_module("zip_module")
mod_zipping.zip_and_ship_module(current_dir)
current_dir = os.getcwd()
zipped_file = os.path.join(current_dir,"forecast_lib.zip")
sc.addPyFile(zipped_file)

In [15]:
mod = importlib.import_module("forecast_lib.account.{0}".format(account))
Metrics = getattr(mod, metric_class)

In [16]:
from forecast_lib.utils.prepare_df_to_prophet import prepare_df_to_prophet
from forecast_lib.utils.days_setting import minusndays
from forecast_lib.utils.get_holidays_table import get_holidays_table
from forecast_lib.utils.gen_regressors import gen_regressors
from forecast_lib.utils.parameter_search import create_model_evaluation_udf
from forecast_lib.utils.gen_params_indices import gen_params_indices
from forecast_lib.utils.gen_weekly_aggregates_yoy import gen_weekly_aggregates_yoy
from forecast_lib.utils.ts_cross_validation import ts_train_test_split, ts_train_test_backtest_split
from forecast_lib.utils.evaluation_metrics import evaluation_smape
from forecast_lib.utils.suppress_stdout_stderr import suppress_stdout_stderr

In [17]:
Metrics = Metrics(start_date, end_date, pos)

In [20]:
metric_values = Metrics.values.cache()

In [21]:
metric_name = Metrics.metric_name

'gross_bookings'

In [22]:
grouped_metric = metric_values.groupBy("yyyy_mm_dd")\
                                    .agg(f.sum(metric_name).alias(metric_name)).cache()

In [23]:
perf = prepare_df_to_prophet(df = grouped_metric, metric_name = metric_name, tra = None, lamda = None)

In [25]:
cap, floor = np.max(perf.y) * 1.1, np.min(perf.y) * 0.9
perf["cap"] = cap
perf["floor"]  = floor

In [26]:
# 2. get holiday tables
holiday_tables = get_holidays_table(start_date = start_date)

In [28]:
rank_pos = metric_values\
        .where("yyyy_mm_dd >= '{cutoff_date}'".format(cutoff_date =
            (datetime.strptime(end_date, "%Y-%m-%d") - timedelta(365)).strftime("%Y-%m-%d")))\
        .groupBy("pos")\
        .agg(f.sum(metric_name).alias(metric_name))\
        .orderBy(metric_name, ascending=False).toPandas()
rank_pos = rank_pos.assign(pos_rank = lambda x: x[metric_name].rank(ascending = False))

  return a[slice1]-a[slice2]


In [29]:
holiday_tables = holiday_tables.merge(rank_pos[['pos', 'pos_rank']])
top5 = holiday_tables[holiday_tables['pos_rank']<=5][[ "ds", "holiday"]].drop_duplicates()
other_pos_counts = holiday_tables[holiday_tables['pos_rank']>5].groupby(['ds', 'holiday']).count().reset_index()
other_pos = other_pos_counts[other_pos_counts.pos >= 10][['ds', 'holiday']]
filtered_holiday_table = top5.append(other_pos)\
                .drop_duplicates()\
                .assign(lower_window = 0, upper_window = 0)
# 2.4 manually adjust for some holidays where the upper and lower window is bigger than 0
# TODO: this has quite some subjectivity and require data inspection
special_holidays = ['ChristmasEve', 'ThanksgivingDay','NewYearsDay']
special_holidays_table = filtered_holiday_table[filtered_holiday_table['holiday']\
                                                .isin(special_holidays)]\
                                                .assign(lower_window = -1, upper_window = 1)
special_holidays_table['upper_window'] = np.where(special_holidays_table['holiday'] ==  'ChristmasEve', 0,
                                                   special_holidays_table['upper_window'])

final_holiday_table = filtered_holiday_table[~filtered_holiday_table['holiday']\
                                     .isin(special_holidays)]\
                                     .append(special_holidays_table)\
                                     .sort_values(by = ['ds'])

In [30]:
train_test_split = 14
train = perf.iloc[:-train_test_split, ]
test  = perf.iloc[-train_test_split:, ]

In [32]:
import logging
import numpy as np
from fbprophet import Prophet

In [33]:
with suppress_stdout_stderr():
    default_mod = Prophet(holidays = final_holiday_table, changepoint_range = 0.9)
    forecast = default_mod.fit(train).predict(test)

default_mape = evaluation_smape(eval_level = eval_level,
                                forecast_horizon = forecast_horizon,
                                true_df = test, forecast_df = forecast)
logging.info("The {eval_level} smape for the default model is {default_mape}".format(eval_level = eval_level,
                                                                            default_mape = default_mape))

2019-06-07 14:44:54,149 INFO       Disabling daily seasonality. Run prophet with daily_seasonality=True to override this.
2019-06-07 14:44:58,594 INFO       The daily smape for the default model is 2.15504998184


In [54]:
df_res = forecast[['ds', 'yhat']].merge(test[['ds', 'y']])

In [55]:
df_res ['residual'] = df_res ['yhat'] - df_res['y']

In [46]:
from scipy.stats.mstats import winsorize

In [None]:
fd.store_residual(df_input_prophet,df_forecast_tot,skill_name,model_used)
df_forecast_tot2=fd.l1_module_residual(df_input_prophet,df_forecast_tot,pdf_residuals,skill_name)
df_forecast_tot_adjsted2=fd.damp_trend_new(df_forecast_tot2,pdf_trend_dampening,skill_name,check=True)
df_forecast_output=fd.extract_forecast_from_prophet(df_forecast_tot_adjsted2,skill_name,tra,lamda)

In [56]:
df_res['residual2']=winsorize(df_res['residual'], limits=0.05, axis=0)

In [57]:
df_res

Unnamed: 0,ds,yhat,y,residual,residual2
0,2019-05-24,34445.131205,40152,-5706.868795,-5706.868795
1,2019-05-25,31402.297043,36464,-5061.702957,-5061.702957
2,2019-05-26,38006.798679,40688,-2681.201321,-2681.201321
3,2019-05-27,35956.946643,43905,-7948.053357,-7948.053357
4,2019-05-28,41170.784697,44632,-3461.215303,-3461.215303
5,2019-05-29,40441.664599,43181,-2739.335401,-2739.335401
6,2019-05-30,42948.371879,44961,-2012.628121,-2012.628121
7,2019-05-31,36398.257321,38543,-2144.742679,-2144.742679
8,2019-06-01,33287.775948,34713,-1425.224052,-1425.224052
9,2019-06-02,39816.325177,39683,133.325177,133.325177


In [None]:
with suppress_stdout_stderr():
    final_mod = Prophet(holidays = final_holiday_table)
    future = final_mod.fit(perf).make_future_dataframe(periods=forecast_horizon)
    forecast = final_mod.predict(future).tail(forecast_horizon)

In [None]:


# use the default prophet model with holidays as baseline
# supress_stdout_stderr is to suppress the optimization output from Stan used by prophet package
with suppress_stdout_stderr():
    default_mod = Prophet(holidays = final_holiday_table, changepoint_range = 0.9)
    forecast = default_mod.fit(train).predict(test)

default_mape = evaluation_smape(eval_level = self.eval_level,
                                forecast_horizon = self.forecast_horizon,
                                true_df = test, forecast_df = forecast)
logging.info("The {eval_level} smape for the default model is {default_mape}".format(eval_level = self.eval_level,
                                                                            default_mape = default_mape))
# 5.1 run the model with Random search
if self.optimizer == 'random_search':
    logging.info("Cross Validation type is {cv_type}, start with random grid search for hyperparameters".format(
            cv_type = self.cv_type))
    params_sp = gen_params_indices(df = train, pred_size = self.forecast_horizon, cv_type = self.cv_type)
    # broadcast the in sample training set
    bc_train = sc.broadcast(train)
    # specify the udf
    mod_eval_udf = create_model_evaluation_udf(eval_level = self.eval_level, perf_df = perf, bc_df = bc_train,
                                               pred_size = self.forecast_horizon, holiday_table = final_holiday_table, 
                                               cap= cap, floor = floor, has_add_regressors = self.has_add_regressors)

    results_sp = params_sp.select('params', f.expr('explode(train_indices)').alias('train_indices') )\
                          .withColumn('smape', mod_eval_udf(f.col("params"), f.col("train_indices")))\
                          .cache()

    results_pd = results_sp.toPandas()
    # get the meidan smape for each different combinations of hyperparameters and select the best median
    avg_scores = results_pd.groupby('params', as_index = False)[['smape']].median()
    best_params = avg_scores.sort_values('smape').iloc[0].params.asDict()
    logging.info("Best hyperparameters found is {best_params}".format(best_params = best_params))
    # fit the model again using the best parameters with entire training set and evaluate the error
    # based on out of sample test set.
    with suppress_stdout_stderr():
        best_mod = Prophet(daily_seasonality = False, interval_width = 0.95, holidays = final_holiday_table, **best_params)
        forecast = best_mod.fit(train).predict(test)
    rs_mape = evaluation_smape(eval_level = self.eval_level,
                               forecast_horizon = self.forecast_horizon,
                               true_df = test, forecast_df = forecast)
    logging.info("The {eval_level} mape for the random grid search model is {rs_mape}".format(eval_level = self.eval_level,
                                                                                              rs_mape = rs_mape))
# 5.2 compare the default baseline with random grid search and choose which model to use
if self.optimizer == "default" or default_mape <= rs_mape:
    logging.info("""Default prophet configuration is the default or it is better than random search
                therefore re-forecast using the entire dataset with default prophet""")
    with suppress_stdout_stderr():
        final_mod = Prophet(holidays = final_holiday_table)
        future = final_mod.fit(perf).make_future_dataframe(periods=self.forecast_horizon)
        forecast = final_mod.predict(future).tail(self.forecast_horizon)
else:
    logging.info("""Random search is better than default prophet configuration
                therefore re-forecast using the entire dataset with the best hyperparameters""")

    # run the model with Random search or default, compare the default baseline with random grid search
    with suppress_stdout_stderr():
        final_mod = Prophet(daily_seasonality = False, interval_width = 0.95, holidays = final_holiday_table, **best_params)
        future = final_mod.fit(perf).make_future_dataframe(periods=self.forecast_horizon)
    if getattr(best_mod, 'growth') == 'logistic':
        future['cap'] = cap
        future['floor'] = floor
    forecast = final_mod.predict(future)

# 6. adjust for the recent performance AR

# 7. produce the components plots and output data needed. 
components_fig = final_mod.plot_components(forecast)
# get the predicted data
prediction = forecast.tail(self.forecast_horizon)[['ds','yhat']].round({'yhat': 0})
# generate the weekly data or weekly year-over-year data
yoy = gen_weekly_aggregates_yoy(prediction = prediction, perf_last_year = perf,
                                metric_name = self.metric_name)


In [23]:
perf = prepare_df_to_prophet(df = grouped_metric, metric_name = metric_name, tra = None, lamda = None)

In [24]:
def get_holidays_table(start_date, end_date):
    """function to get the most important holidays in the training and predicted dataset 
    it takes in for every account the booker countries and filter for the holidays of those countries
    if a holiday is celebrated in more than 10 countries, then it be in the final holiday table  
    
    Args: 
        start_date: start date of the training dataset
        end_date  : end date of the test dataset
        christmas_new_year_dummy: True, when treating every day between Christmas and New year as a separate dummy variable
    Returns:
        spark dataframe with holidays and upper and lower windows 
    """
    
    query = """
    SELECT 
        yyyy_mm_dd AS ds, 
        name AS holiday,
        upper(cc1) AS pos
    FROM 
        csa.holidays_2000_2030_cc1 
    WHERE 
        yyyy_mm_dd >= '{start_date}'
    AND 
        yyyy_mm_dd <= '{end_date}'
    AND 
        level = 1 

    """.format(start_date = start_date, end_date = end_date)
    holiday_table = spark.sql(query).toPandas()
    
    return holiday_table 


In [26]:
# set the cap and floor of the data in case there is
cap, floor = np.max(perf.y) * 1.1, np.min(perf.y) * 0.9
perf["cap"] = cap
perf["floor"]  = floor

# 2. get holiday tables

logging.info("getting holidays")
holiday_tables = get_holidays_table(start_date = start_date, end_date = end_date)

2019-06-07 10:34:22,759 INFO       getting holidays


In [33]:
rank_pos = metric_values\
                .where("yyyy_mm_dd >= '{cutoff_date}'".format(cutoff_date =
                    (datetime.strptime(end_date, "%Y-%m-%d") - timedelta(365)).strftime("%Y-%m-%d")))\
                .groupBy("pos")\
                .agg(f.sum(metric_name).alias(metric_name))\
                .orderBy(metric_name, ascending=False).toPandas()

In [34]:
logging.info("test the rank_pos")
rank_pos = rank_pos.assign(pos_rank = lambda x: x[metric_name].rank(ascending = False))

2019-06-07 10:40:10,778 INFO       test the rank_pos


In [None]:
prediction = pd.read_csv("tripadvisor_GrossBookings_daily_prediction.csv")

In [60]:
pred_this_year = prediction.round({'yhat': 0})


In [61]:
pred_this_year = pred_this_year.assign(ds = lambda x: pd.to_datetime(x['ds']))

In [62]:
pred_this_year['week'] = pred_this_year.ds.dt.week
pred_this_year['year'] = pred_this_year.ds.dt.year

In [63]:
# only keep those days where it is a full week for weekly aggregates
pred_this_year = pred_this_year.groupby(['year', 'week']).filter(lambda x: x['yhat'].count() == 7)
pred_this_year = pred_this_year.groupby(['year', 'week']).agg({"yhat": "sum"}).reset_index()