In [1]:
import time

import pandas as pd
import os
from prophet import Prophet
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
from pyspark import SparkContext

jars = "" \
       "./jars/mongo-spark-connector_2.12-3.0.1.jar," \
       "./jars/mongo-java-driver-3.12.5.jar," \
       "./jars/bson-4.0.5.jar," \
       "./jars/spark-core_2.12-3.0.1.jar," \
       "./jars/spark-sql_2.12-3.0.1.jar"

SPARK_MASTER = os.environ["SPARK_MASTER"]
DB_NAME = os.environ["DB_NAME"]
DB_HOST = os.environ["DB_HOST"]
DB_PORT = os.environ["DB_PORT"]
# SPARK_THREAD_COUNT = os.environ["SPARK_THREAD_COUNT"]
SPARK_EXECUTOR_CORES = os.environ["SPARK_EXECUTOR_CORES"]
SPARK_EXECUTOR_MEMORY = os.environ["SPARK_EXECUTOR_MEMORY"]
SPARK_INITIAL_EXECUTORS = os.environ["SPARK_INITIAL_EXECUTORS"]
SPARK_MIN_EXECUTORS = os.environ["SPARK_MIN_EXECUTORS"]
SPARK_MAX_EXECUTORS = os.environ["SPARK_MAX_EXECUTORS"]
SPARK_BACKLOG_TIMEOUT = os.environ["SPARK_BACKLOG_TIMEOUT"]
SPARK_IDLE_TIMEOUT = os.environ["SPARK_IDLE_TIMEOUT"]

GISJOIN = "GISJOIN"

spark = SparkSession \
    .builder \
    .master(SPARK_MASTER) \
    .appName("COVID-19 Time-series - PySpark") \
    .config("spark.jars", jars) \
    .config("spark.driver.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true") \
    .config("spark.executor.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true") \
    .config("park.executor.cores", SPARK_EXECUTOR_CORES) \
    .config("spark.executor.memory", SPARK_EXECUTOR_MEMORY) \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.shuffleTracking.enabled", "true") \
    .config("spark.dynamicAllocation.initialExecutors", SPARK_INITIAL_EXECUTORS) \
    .config("spark.dynamicAllocation.minExecutors", SPARK_MIN_EXECUTORS) \
    .config("spark.dynamicAllocation.maxExecutors", SPARK_MAX_EXECUTORS) \
    .config("spark.dynamicAllocation.schedulerBacklogTimeout", SPARK_BACKLOG_TIMEOUT) \
    .config("spark.dynamicAllocation.executorIdleTimeout", SPARK_IDLE_TIMEOUT) \
    .getOrCreate()

In [3]:
result_schema = StructType([
    StructField("ds", DateType(), True),
    StructField("yhat", DoubleType(), True),
    StructField("yhat_lower", DoubleType(), True),
    StructField("yhat_upper", DoubleType(), True)
])

@pandas_udf(result_schema, PandasUDFType.GROUPED_MAP)
def predict(df0):
    # instantiate the model, configure the parameters
    print('>>> predict(): call')
    m = Prophet()
    m.fit(df0)
    df0_future = m.make_future_dataframe(periods=365)
    df0_forecast = m.predict(df0_future)

    return df0_forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']]

In [18]:
sc = spark.sparkContext
temp = sc._jsc.sc()

mongo_connection_uri = f'mongodb://{DB_HOST}:{DB_PORT}/{DB_NAME}.covid_county_formatted'

df = spark.read.format("mongo").option("uri", mongo_connection_uri).load()

df = df.select(GISJOIN, 'cases', 'deaths', 'date', 'formatted_date')

df.show()


df_cases = df.select(GISJOIN, 'date', 'cases').withColumnRenamed('date', 'ds').withColumnRenamed('cases', 'y')

df_cases.show()

+--------+-----+------+----------+-------------------+
| GISJOIN|cases|deaths|      date|     formatted_date|
+--------+-----+------+----------+-------------------+
|G0100010|    0|     0|2020-03-27|2020-03-26 18:00:00|
|G0100010|    3|     0|2020-03-25|2020-03-24 18:00:00|
|G0100010|    0|     0|2020-03-28|2020-03-27 18:00:00|
|G0100010|    0|     0|2020-03-29|2020-03-28 18:00:00|
|G0100010|    1|     0|2020-03-30|2020-03-29 18:00:00|
|G0100010|    0|     0|2020-03-31|2020-03-30 18:00:00|
|G0100010|    3|     0|2020-04-01|2020-03-31 18:00:00|
|G0100010|    2|     0|2020-03-26|2020-03-25 18:00:00|
|G0100010|    0|     0|2020-04-04|2020-04-03 18:00:00|
|G0100010|    0|     0|2020-04-05|2020-04-04 18:00:00|
|G0100010|    0|     1|2020-04-06|2020-04-05 18:00:00|
|G0100010|    0|     0|2020-04-02|2020-04-01 18:00:00|
|G0100010|    0|     0|2020-04-07|2020-04-06 18:00:00|
|G0100010|    0|     0|2020-04-08|2020-04-07 18:00:00|
|G0100010|    5|     0|2020-04-09|2020-04-08 18:00:00|
|G0100010|

## `groupBy()` experiements

In [12]:
grouped = df.groupBy('GISJOIN')

In [15]:
grouped.sum().take(2)

[Row(GISJOIN='G1201050', sum(cases)=58116, sum(deaths)=1175),
 Row(GISJOIN='G1700030', sum(cases)=453, sum(deaths)=9)]

In [28]:
# https://stackoverflow.com/a/43878152/5307917
distinct_gis_joins = df.select("GISJOIN").distinct().collect()
all_gis_joins = [x.__getattr__('GISJOIN') for x in distinct_gis_joins]
all_gis_joins

['G1201050',
 'G1700030',
 'G1701230',
 'G1701390',
 'G1801810',
 'G1901890',
 'G2001030',
 'G2700790',
 'G3100050',
 'G3600770',
 'G3800590',
 'G4200570',
 'G4500890',
 'G4600710',
 'G4701690',
 'G4804550',
 'G5300710',
 'G1200450',
 'G1701470',
 'G2001950',
 'G2002090',
 'G2100510',
 'G2200750',
 'G3000150',
 'G3500510',
 'G3600590',
 'G3700670',
 'G4001210',
 'G4001250',
 'G4500690',
 'G4500790',
 'G4801810',
 'G4803210',
 'G5500110',
 'G0900090',
 'G1700710',
 'G1900890',
 'G1901010',
 'G2001870',
 'G2801330',
 'G2901890',
 'G3101210',
 'G4700850',
 'G4701050',
 'G4801190',
 'G4803010',
 'G4804210',
 'G4804530',
 'G1200910',
 'G1300910',
 'G1900530',
 'G2000810',
 'G2101230',
 'G2200570',
 'G2300050',
 'G2500130',
 'G2700230',
 'G3600710',
 'G3800090',
 'G4000170',
 'G4200990',
 'G4600730',
 'G4900190',
 'G5107000',
 'G5300190',
 'G0100110',
 'G1301210',
 'G1301250',
 'G1302190',
 'G1600810',
 'G1600850',
 'G1701170',
 'G1801250',
 'G2000130',
 'G2102250',
 'G2200810',
 'G2400130',

In [29]:
all_dfs = [df.where(df.GISJOIN == x) for x in all_gis_joins]

In [30]:
len(all_dfs)

3116

In [33]:
dfs_cases = [x.select('GISJOIN', 'date', 'cases') \
             .withColumnRenamed('date', 'ds') \
             .withColumnRenamed('cases', 'y') for x in all_dfs]

print(f'#dfs_cases: {len(dfs_cases)}')

dfs_deaths = [x.select('GISJOIN', 'date', 'deaths') \
             .withColumnRenamed('date', 'ds') \
             .withColumnRenamed('deaths', 'y') for x in all_dfs]

print(f'#dfs_deaths: {len(dfs_deaths)}')

#dfs_cases: 3116
#dfs_deaths: 3116


In [39]:
dfs_cases[0]

DataFrame[GISJOIN: string, ds: string, y: int]

In [51]:
from prophet.diagnostics import performance_metrics

@pandas_udf(DoubleType(), PandasUDFType.GROUPED_MAP)
def get_rmse_for_county(df0):
    # instantiate the model, configure the parameters
    print('>>> predict(): call')
    m = Prophet()
    m.fit(df0)
    df0_future = m.make_future_dataframe(periods=365)
    df0_forecast = m.predict(df0_future)

    df_p = performance_metrics(df0_forecast)
    return df_p['RMSE']
#     return df0_forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']]

In [53]:
from pyspark.sql.functions import udf

func_predict = udf(get_rmse_for_county, DoubleType())

# result0 = func_predict(dfs_cases[0])

result = func_predict(dfs_cases[0])

TypeError: Invalid argument, not a string or column: DataFrame[GISJOIN: string, ds: string, y: int] of type <class 'pyspark.sql.dataframe.DataFrame'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.