### Preprocessing

In [1]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Ops').getOrCreate()
import pandas as pd
import numpy as np

# Confirmed
c_df = pd.read_csv("time_series_covid19_confirmed_global.csv")
global_c_total = c_df.select_dtypes(pd.np.number).sum().to_frame()
canada_c_total = c_df[c_df['Country/Region'] == 'Canada'].select_dtypes(pd.np.number).sum().to_frame()
ontario_c_total = c_df[c_df['Province/State'] == 'Ontario'].select_dtypes(pd.np.number).sum().to_frame()

# Recovered
r_df = pd.read_csv("time_series_covid19_recovered_global.csv")
global_r_total = r_df.select_dtypes(pd.np.number).sum().to_frame()
canada_r_total = r_df[r_df['Country/Region'] == 'Canada'].select_dtypes(pd.np.number).sum().to_frame()
ontario_r_total = r_df[r_df['Province/State'] == 'Ontario'].select_dtypes(pd.np.number).sum().to_frame()

# Deaths
d_df = pd.read_csv("time_series_covid19_deaths_global.csv")
global_d_total = d_df.select_dtypes(pd.np.number).sum().to_frame()
canada_d_total = d_df[d_df['Country/Region'] == 'Canada'].select_dtypes(pd.np.number).sum().to_frame()
ontario_d_total = d_df[d_df['Province/State'] == 'Ontario'].select_dtypes(pd.np.number).sum().to_frame()

# Merging Global Dataset
global_df = pd.concat([global_c_total, global_r_total, global_d_total], axis=1)
global_df.columns = ['g_confirmed','g_recovered','g_deaths']
global_df = global_df.drop(['Lat','Long'], axis=0)

# Merging Canada Dataset
canada_df = pd.concat([canada_c_total, canada_r_total, canada_d_total], axis=1)
canada_df.columns = ['c_confirmed','c_recovered','c_deaths']
canada_df = canada_df.drop(['Lat','Long'], axis=0)

# Merging Ontario Dataset
ontario_df = pd.concat([ontario_c_total, ontario_r_total, ontario_d_total], axis=1)
ontario_df.columns = ['o_confirmed','o_recovered','o_deaths']
ontario_df = ontario_df.drop(['Lat','Long'], axis=0)

# Merging all dataset
df = pd.concat([global_df,canada_df,ontario_df], axis=1)
df = spark.createDataFrame(df)
df.toPandas().to_csv('Spark_df.csv')
df.show()

#c_df = spark.read.csv("time_series_covid19_confirmed_global.csv", header=True)
#d_df = spark.read.csv("time_series_covid19_deaths_global.csv", header=True)
#r_df = spark.read.csv("time_series_covid19_recovered_global.csv", header=True)

+-----------+-----------+--------+-----------+-----------+--------+-----------+-----------+--------+
|g_confirmed|g_recovered|g_deaths|c_confirmed|c_recovered|c_deaths|o_confirmed|o_recovered|o_deaths|
+-----------+-----------+--------+-----------+-----------+--------+-----------+-----------+--------+
|      555.0|       28.0|    17.0|        0.0|        0.0|     0.0|        0.0|        0.0|     0.0|
|      654.0|       30.0|    18.0|        0.0|        0.0|     0.0|        0.0|        0.0|     0.0|
|      941.0|       36.0|    26.0|        0.0|        0.0|     0.0|        0.0|        0.0|     0.0|
|     1434.0|       39.0|    42.0|        0.0|        0.0|     0.0|        0.0|        0.0|     0.0|
|     2118.0|       52.0|    56.0|        1.0|        0.0|     0.0|        1.0|        0.0|     0.0|
|     2927.0|       61.0|    82.0|        1.0|        0.0|     0.0|        1.0|        0.0|     0.0|
|     5578.0|      107.0|   131.0|        2.0|        0.0|     0.0|        1.0|        0.0|

### Linear Regression

In [2]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler


vectorAssembler = VectorAssembler(
    inputCols=["g_confirmed","g_recovered","g_deaths",
               "c_confirmed","c_recovered","c_deaths",
               "o_confirmed","o_recovered"], outputCol="features")
"""
vectorAssembler = VectorAssembler(
    inputCols=["g_confirmed","g_deaths",
               "c_confirmed","c_deaths",
               "o_confirmed"], outputCol="features")
"""

df = vectorAssembler.transform(df)
lr = LinearRegression(featuresCol="features", labelCol="o_deaths")
lr_model = lr.fit(df)
lr_model.coefficients

DenseVector([0.0001, 0.0001, -0.0026, 0.0016, -0.0067, 0.7166, 0.0058, 0.0])

In [3]:
lr_model.intercept

0.16802774778269086

In [4]:
lr_model.summary.rootMeanSquaredError

0.9810006656488205

### Decision Tree Regression

In [5]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler

splits = df.randomSplit([0.7,0.3])
train_df = splits[0]
test_df = splits[1]
dt = DecisionTreeRegressor(featuresCol="features", labelCol="o_deaths")
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_evaluator = RegressionEvaluator(labelCol="o_deaths", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
rmse

2.4591907302238774

### Gradient-boosted tree regression

In [6]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol="features", labelCol="o_deaths")
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
gbt_evaluator = RegressionEvaluator(labelCol="o_deaths", predictionCol="prediction", metricName="rmse")
gbt_rmse = gbt_evaluator.evaluate(gbt_predictions)
gbt_rmse

2.4591907302238774