In [0]:
# !pip install pyspark
# !pip3 install pystan
# !pip3 install fbprophet
# !sudo apt install openjdk-8-jdk

In [0]:
import os
java8_location= '/usr/lib/jvm/java-8-openjdk-amd64' # Set your own
os.environ['JAVA_HOME'] = java8_location 

In [0]:
import pandas as pd
import numpy as np

In [0]:
from fbprophet import Prophet

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import array, col, explode, lit, struct
from pyspark.sql import DataFrame
from typing import Iterable

In [0]:
from pyspark.sql.functions import (dayofmonth,hour,dayofyear,month,year,
                              weekofyear,format_number,date_format,to_date)

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import functions as F
spark = SparkSession.builder.appName('Final').config("spark.driver.memory", "20g").config("spark.executor.memory","20g").getOrCreate()

In [0]:
sc = spark.sparkContext

In [0]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [0]:
 #small5000 = spark.read.csv('/content/drive/My Drive/small_5000.csv',inferSchema=True,header=True)
small5000 = spark.read.csv('/content/drive/My Drive/Project4/train_1.csv',inferSchema=True, header=True)
# small5000 = spark.read.csv('/content/drive/My Drive/top20p.csv',inferSchema=True,header=True)


In [0]:
dates_cols = small5000.drop('_c0','Page').columns

In [0]:
#https://stackoverflow.com/questions/41670103/how-to-melt-spark-dataframe
#Thanks a lot for the Melt Function in spark

def melt(
        df: DataFrame, 
        id_vars: Iterable[str], value_vars: Iterable[str], 
        var_name: str="variable", value_name: str="value") -> DataFrame:
    """Convert :class:`DataFrame` from wide to long format."""

    # Create array<struct<variable: str, value: ...>>
    _vars_and_vals = array(*(
        struct(lit(c).alias(var_name), col(c).alias(value_name)) 
        for c in value_vars))

    # Add to the DataFrame and explode
    _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))

    cols = id_vars + [
            col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
    return _tmp.select(*cols)

In [0]:
train2 = melt(small5000,id_vars=['Page'], value_vars=dates_cols)

In [0]:
train2.printSchema()

root
 |-- Page: string (nullable = true)
 |-- variable: string (nullable = false)
 |-- value: string (nullable = true)



In [0]:
#train3 = train2.withColumn('ds',to_date('variable')).drop('variable')
train3 = train2.withColumn('value',train2['value'].cast('Integer'))

In [0]:
train3.head(5)

[Row(Page='2NE1_zh.wikipedia.org_all-access_spider', variable='2015-07-01', value=18),
 Row(Page='2NE1_zh.wikipedia.org_all-access_spider', variable='2015-07-02', value=11),
 Row(Page='2NE1_zh.wikipedia.org_all-access_spider', variable='2015-07-03', value=5),
 Row(Page='2NE1_zh.wikipedia.org_all-access_spider', variable='2015-07-04', value=13),
 Row(Page='2NE1_zh.wikipedia.org_all-access_spider', variable='2015-07-05', value=14)]

In [0]:
train3.groupBy('Page').count().show()

+------------------------------+-----+
|                          Page|count|
+------------------------------+-----+
|          EXID_zh.wikipedia...|  550|
|       水菜麗_zh.wikipedia....|  550|
|       朴炯植_zh.wikipedia....|  550|
| 進擊的巨人角色列表_zh.wiki...|  550|
|       陳庭妮_zh.wikipedia....|  550|
|     太阳能电池_zh.wikipedi...|  550|
|     成均馆大学_zh.wikipedi...|  550|
|      賽菲羅斯_zh.wikipedia...|  550|
|      中島美嘉_zh.wikipedia...|  550|
|     湄公河大案_zh.wikipedi...|  550|
|雲之彼端，約定的地方_zh.wik...|  550|
|   罗斯柴尔德家族_zh.wikipe...|  550|
|      長榮航空_zh.wikipedia...|  550|
|      班淑傳奇_zh.wikipedia...|  550|
|       黃義雄_zh.wikipedia....|  550|
|    歡樂頌_(電視劇)_zh.wiki...|  550|
|      星際過客_zh.wikipedia...|  550|
|    2017無綫節目巡禮_zh.wik...|  550|
|          Amazon.com_fr.wik...|  550|
|          Championnat_d'Eur...|  550|
+------------------------------+-----+
only showing top 20 rows



In [0]:
from pyspark.sql.types import *

result_schema =StructType([                      
  StructField('Page',StringType()),
  StructField('y',FloatType()),
  StructField('ds',DateType()),
  StructField('yhat',FloatType()),
  StructField('yhat_upper',FloatType()),
  StructField('yhat_lower',FloatType())
  ])

In [0]:
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf(result_schema,PandasUDFType.GROUPED_MAP)
def forecast_page_views(sdf):
  
  sdf['value']=pd.to_numeric(sdf['value'],errors='coerce')
  sdf.reset_index(inplace=True)
  first = sdf.notna().idxmax()[2]
  sdf = sdf[first:]

  mean_value=sdf['value'].dropna().mean()
  sdf['value'].fillna(value=mean_value)

  sdf['cap'] = sdf['value'].max()*2
  sdf['floor'] = 0 
  #sdf.dropna()
  sdf['ds']= pd.to_datetime(sdf['variable'],errors='coerce')
  sdf['y'] = sdf['value']
  
  
  model = Prophet(interval_width=0.90,growth='logistic', daily_seasonality=False, weekly_seasonality=False,yearly_seasonality=True,seasonality_mode='additive')
  try:
    # train the model
    model.fit(sdf)
    # make predictions
    future_pd = model.make_future_dataframe(periods=90,freq='d',include_history=True)
    future_pd['cap'] = sdf['value'].max()*2
    future_pd['floor'] = 0

    forecast_pd = model.predict(future_pd)  
    f_pd = pd.concat([sdf[['Page','y']],forecast_pd[['ds','yhat', 'yhat_upper', 'yhat_lower']]],axis=1)
    f_pd['Page']= sdf['Page'].iloc[0]
    f_pd['yhat'] = f_pd['yhat'].apply(lambda x: x if x> 0 else 0)

  except:
    zeros = pd.DataFrame(np.zeros((len(sdf['ds']),3)))
    f_pd = pd.concat([sdf['Page','y','ds'],zeros],axis=1)
    f_pd.columns=['Page','y','ds','yhat', 'yhat_upper', 'yhat_lower']
    
  
  del model, sdf, future_pd, forecast_pd
  return f_pd


In [0]:
#results = page_history.groupBy('Page','ds').apply(forecast_page_views)
results = train2.groupBy('Page').apply(forecast_page_views)

In [0]:
results.show()

In [0]:
#results.write.csv('/content/drive/My Drive/Project4/train_1_results.csv')

In [0]:
#rc = results.collect()

In [0]:
# results.write.format('com.databricks.spark.csv').save('/content/drive/My Drive/top20p_Results.csv')


In [0]:
# results.show()