In [None]:
#pip install pyspark
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import matplotlib.pyplot as plt
import seaborn as sns
from fbprophet import Prophet
from fbprophet.diagnostics import cross_validation
from fbprophet.diagnostics import performance_metrics
from fbprophet.plot import plot_cross_validation_metric
import warnings
warnings.filterwarnings('ignore')
from scipy.stats import boxcox
from scipy.special import inv_boxcox
import os
import gc
from multiprocessing import Pool, cpu_count
p = Pool(cpu_count())
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from datetime import datetime as Date
from pyspark.sql.dataframe import DataFrame
spark = SparkSession.builder.appName('play').getOrCreate()

In [None]:
def reduce_mem_usage(df, verbose=True):
    numerics = ['int16', 'int32', 'int64', 'float16', 'float32', 'float64']
    start_mem = df.memory_usage().sum() / 1024**2
    for col in df.columns:
        col_type = df[col].dtypes
        if col_type in numerics:
            c_min = df[col].min()
            c_max = df[col].max()
            if str(col_type)[:3] == 'int':
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    df[col] = df[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    df[col] = df[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    df[col] = df[col].astype(np.int32)
                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                    df[col] = df[col].astype(np.int64)
            else:
                if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
                    df[col] = df[col].astype(np.float16)
                elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df[col] = df[col].astype(np.float32)
                else:
                    df[col] = df[col].astype(np.float64)

    end_mem = df.memory_usage().sum() / 1024**2
    print('Memory usage after optimization is: {:.2f} MB'.format(end_mem))
    print('Decreased by {:.1f}%'.format(100 * (start_mem - end_mem) / start_mem))

    return df

In [None]:
sales_train_validation = reduce_mem_usage(pd.read_csv('/kaggle/input/m5-forecasting-accuracy/sales_train_validation.csv'))
calendar = reduce_mem_usage(pd.read_csv('/kaggle/input/m5-forecasting-accuracy/calendar.csv'))
sell_prices = reduce_mem_usage(pd.read_csv('/kaggle/input/m5-forecasting-accuracy/sell_prices.csv'))

In [None]:
sell_prices['id'] = sell_prices.item_id+'_'+ sell_prices.store_id+'_validation'
ex_columns = ['item_id','dept_id','cat_id','store_id','state_id']
sales_train_validation = reduce_mem_usage(sales_train_validation.drop(ex_columns, axis = 1))
sales_train = reduce_mem_usage(sales_train_validation.melt(id_vars=["id"], 
        var_name="d", 
        value_name="sales_units"))


day_d = reduce_mem_usage(calendar[['date','d','wm_yr_wk']])
sales_date = reduce_mem_usage(sales_train.merge(day_d, on = 'd', how = 'left'))
sell_prices = reduce_mem_usage(sell_prices[['id','sell_price','wm_yr_wk']])

df_final = reduce_mem_usage(sales_date.merge(sell_prices, on=['id','wm_yr_wk'], how = 'left'))
df_final['y'] = df_final['sales_units']

df_final = df_final[df_final['y']>=1]
x_trans, lamb = boxcox(df_final['y'])
df_final['y'] = x_trans

#create holidays data frame

event_name = calendar[['event_name_1','date']].dropna(axis = 0)
event_name.columns = ['holiday','ds']
event_name['lower_window'] = 0
event_name['upper_window'] = 1
#reduce dataframe size
df_final = reduce_mem_usage(df_final)
event_name = reduce_mem_usage(event_name)

In [None]:
sales_date=pd.DataFrame()
calendar=pd.DataFrame()
sell_prices=pd.DataFrame()
sales_train_validation=pd.DataFrame()
sales_train=pd.DataFrame()
day_d=pd.DataFrame()
sales_date=pd.DataFrame()
gc.collect()

In [None]:
filter_df = (df_final['id'].value_counts()).reset_index()
filter_df.columns = ['id', 'id_count']
filter_df['0.001'] = filter_df['id_count'].between(0,19)
filter_df['0.005'] = filter_df['id_count'].between(20,29)
filter_df['0.01'] = filter_df['id_count'].between(30,59)
filter_df['0.2'] = filter_df['id_count'].between(60,89)
filter_df['0.25'] = filter_df['id_count'].between(90,119)
filter_df['0.3'] = filter_df['id_count']>=120
filter_df[['0.001','0.005','0.01','0.2','0.25','0.3']]=filter_df[['0.001','0.005','0.01','0.2','0.25','0.3']]*1
filter_df = reduce_mem_usage(filter_df.melt(['id','id_count'], var_name='change_point').query('value == 1').sort_values(['id', 'change_point']).drop('value',1))
filter_df['weekly'] = filter_df['id_count']>=10
filter_df['yearly'] = filter_df['id_count']>=365
filter_df =filter_df.head(100)
#create F1 to F29 days format for submission
day = range(1, 29,1)
df_day = pd.DataFrame(day)
df_day.columns = ['day']
df_day['day'] = 'F'+df_day['day'].astype(str)
result_day = df_day['day'].tolist()

#Merge filter_df and final df
historic_data = df_final.merge(filter_df, on = 'id', how = 'inner')
#clear dataframes from memory
filter_df=pd.DataFrame()
df_final=pd.DataFrame()
gc.collect()

In [None]:
historic_data['ds']= historic_data['date']
historic_data = historic_data.drop(['date'], axis = 1)

In [None]:
result_schema =StructType([
  StructField('id',StringType()),
  StructField('F1', FloatType()),
  StructField('F2', FloatType()),
  StructField('F3', FloatType()),
  StructField('F4', FloatType()),
  StructField('F5', FloatType()),
  StructField('F6', FloatType()),
  StructField('F7', FloatType()),
  StructField('F8', FloatType()),
  StructField('F9', FloatType()),
  StructField('F10', FloatType()),
  StructField('F11', FloatType()),
  StructField('F12', FloatType()),
  StructField('F13', FloatType()),
  StructField('F14', FloatType()),
  StructField('F15', FloatType()),
  StructField('F16', FloatType()),
  StructField('F17', FloatType()),
  StructField('F18', FloatType()), 
  StructField('F19', FloatType()), 
  StructField('F20', FloatType()),
  StructField('F21', FloatType()), 
  StructField('F22', FloatType()), 
  StructField('F23', FloatType()),
  StructField('F24', FloatType()),
  StructField('F25', FloatType()),
  StructField('F26', FloatType()),
  StructField('F27', FloatType()),
  StructField('F28', FloatType())
  ])

test_schema =StructType([
  StructField('id',StringType()),
  StructField('ds',DateType()),
  StructField('yhat', FloatType())
])
history_spark = spark.createDataFrame(historic_data)

In [None]:
history_spark.show()

In [None]:
@pandas_udf(test_schema, PandasUDFType.GROUPED_MAP)
def profesy(history_spark):

    #history_spark['ds']= history_spark['date']
    #history_spark = history_spark.drop(['date'], axis = 1)
    #history_spark['ds'] = pd.to_datetime(history_spark['ds'])

    #model = Prophet(changepoint_prior_scale=float(history_spark['change_point'].max()), 
    #            daily_seasonality=False, 
    #            weekly_seasonality=history_spark['weekly'].max(),
    #            yearly_seasonality=history_spark['yearly'].max()
    #            #holidays = event_name
    #               )
    model = Prophet()
    model.fit(history_spark)
    build_forecast = model.make_future_dataframe(periods=28,freq='D',include_history=False)
    forecast = model.predict(build_forecast)
    
    df_forecast = forecast[["ds", "yhat"]]
    #forecast = pd.DataFrame()
    df_forecast['id'] = history_spark['id'].max()
    #df_forecast["yhat"] = inv_boxcox(df_forecast["yhat"], lamb)
    #df_forecast["yhat"] = df_forecast["yhat"].round()
    #df_forecast['fday'] = result_day
    #df_forecast = df_forecast.drop(['ds'], axis = 1)
    #df_forecast = df_forecast.pivot_table(values ='yhat',index='id', columns='fday').reset_index()
    
    return df_forecast
#df_forecast[['id','F1', 'F2', 'F3', 'F4', 'F5', 'F6', 'F7', 'F8', 'F9', 'F10','F11','F12','F13','F14','F15','F16','F17','F18','F19','F20','F21','F22','F23','F24','F25','F26','F27','F28']]

In [None]:
results = history_spark.groupBy('id').apply(profesy)

In [None]:

results_df = results.toPandas()

In [None]:
display(results)

In [None]:

results_df[['yhat', 'yhat_lower', 'yhat_upper']] = inv_boxcox(results_df[['yhat', 'yhat_lower', 'yhat_upper']], lamb)