## COVID_PROJECT starts from here

In [0]:
# Importing required libraries. The Plotly library was not working so did most of the visualization on Colab. Implemented 3 algorithms in this report.

from pyspark.sql import SparkSession
from pyspark.sql.types import StructField,StructType,StringType
import pyspark.sql.functions as f
from pyspark.ml.regression import LinearRegression
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt 
import seaborn as sns
from pyspark.ml.classification import DecisionTreeClassifier, GBTClassifier, RandomForestClassifier
from pyspark.ml.regression import RandomForestRegressor, DecisionTreeRegressor, GBTRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

In [0]:
# Name for Spark session
spark = SparkSession.builder.appName("COVID").getOrCreate()

In [0]:
# Importing libraries
covid_df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('/FileStore/tables/country_wise_latest-2.csv')
who_df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('/FileStore/tables/WHO_COVID_19_global_data-10.csv')
owid_df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('/FileStore/tables/owid_covid_data-9.csv')

In [0]:
covid_df.show()

In [0]:
who_df.show()

In [0]:
owid_df.show()

In [0]:
covid_df.printSchema()

In [0]:
who_df.printSchema()

In [0]:
owid_df.printSchema()

In [0]:
#Converting the datatype to Integer for implementing the algorithms

from pyspark.sql.types import IntegerType
covid_df = covid_df.withColumn("Recovered / 100 Cases", covid_df["Recovered / 100 Cases"].cast(IntegerType()))
covid_df = covid_df.withColumn("Deaths / 100 Recovered", covid_df["Deaths / 100 Recovered"].cast(IntegerType()))
covid_df = covid_df.withColumn("Deaths / 100 Cases", covid_df["Deaths / 100 Cases"].cast(IntegerType()))
covid_df = covid_df.withColumn("1 week % increase", covid_df["1 week % increase"].cast(IntegerType()))


In [0]:
# All the numerical columns are converted to integer type
covid_df.printSchema()

In [0]:
covid_df.describe().show()

In [0]:
display(covid_df.select('Confirmed', 'Deaths'))

Confirmed,Deaths
36263,1269
4880,144
27973,1163
907,52
950,41
86,3
167416,3059
37390,711
15303,167
20558,713


In [0]:
display(who_df.select('Cumulative_cases', 'Cumulative_deaths'))

Cumulative_cases,Cumulative_deaths
0,0
0,0
0,0
0,0
0,0
0,0
0,0
0,0
0,0
0,0


In [0]:
display(owid_df.select('continent', 'total_cases'))

continent,total_cases
Asia,1.0
Asia,1.0
Asia,1.0
Asia,1.0
Asia,1.0
Asia,1.0
Asia,1.0
Asia,1.0
Asia,2.0
Asia,4.0


In [0]:
#Selecting the growth factor column to determine the increase of covid cases per week
covid_df.select("1 week change").show()

In [0]:
covid_df = covid_df.withColumnRenamed("1 week change", "Change_Growth")

In [0]:
covid_df.show()

In [0]:
covid_df = covid_df.na.fill("0", subset = ['New cases', 'New deaths', 'New recovered', 'Deaths / 100 Cases', 'Recovered / 100 Cases', 'Deaths / 100 Recovered'])

In [0]:
covid_df.select("Change_Growth").show()

In [0]:
#Filtering out the column growth factor
covid_df.filter("Change_Growth == 0").select(["Country/Region","New cases","Change_Growth" ]).show()

In [0]:
covid_new = covid_df.filter(covid_df["Change_Growth"] > 0).sort("Change_Growth", ascending = True)

In [0]:
covid_new.select("Change_Growth").show()

In [0]:
covid_new.describe().show()

In [0]:
covid_new.printSchema()

In [0]:
covid_new.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Country/Region,174,,,Afghanistan,Zimbabwe
Confirmed,174,94682.02298850575,396678.0303352999,14,4290259
Deaths,174,3758.080459770115,14586.544569931519,0,148011
Recovered,174,54396.84482758621,196683.77697176125,0,1846641
Active,174,36527.097701149425,220987.4633903633,1,2816444
New cases,174,1314.2816091954023,5910.842358130151,0,56336
New deaths,174,31.120689655172413,124.19322223083086,0,1076
New recovered,174,1003.580459770115,4344.483640918166,0,33728
Deaths / 100 Cases,174,2.6149425287356323,3.465103614650184,0,28


In [0]:
import pandas as pd
import matplotlib.pyplot as plt
from pandas.plotting import scatter_matrix
numeric_features = [t[0] for t in covid_new.dtypes if t[1] == 'int' or t[1] == 'double']
covid_new = covid_new.select(numeric_features).sample(False, 0.8).toPandas()
axis = pd.plotting.scatter_matrix(covid_new, figsize=(20, 20))
n = len(covid_new.columns)
for i in range(n):
    v = axis[i, 0]
    v.yaxis.label.set_rotation(0)
    v.yaxis.label.set_ha('right')
    v.set_yticks(())
    h = axis[n-1, i]
    h.xaxis.label.set_rotation(90)
    h.set_xticks(())

In [0]:
import six
for i in covid_df.columns:
    if not( isinstance(covid_new.select(i).take(1)[0][0], six.string_types)):
        print( "Correlation to New cases for ", i, covid_df.stat.corr('New cases',i))

In [0]:
# Preparing data for Machine Learning

from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['New cases', 'New deaths', 'New recovered', 'Change_Growth'], outputCol = 'features')
vector_df = vectorAssembler.transform(covid_new)
vector_df = covid_new.select(['New cases', 'New recovered'])
vector_df.show(50)

In [0]:
#Avg of growthfactor used to predict the percentile of week 2
confirm_total = covid_new.groupBy("Confirmed").sum()
death_total = covid_new.groupBy("Deaths").sum()
recovered_total = covid_new.groupBy("Recovered").sum()

change_growth = covid_new.groupBy("Change_Growth").sum()

In [0]:
change_growth.show()

In [0]:
vec_assm = VectorAssembler(inputCols=['Confirmed','Change_Growth', 'Confirmed last week'], outputCol='features')

In [0]:
who_vec_assm = VectorAssembler(inputCols=['New_cases', 'New_deaths', 'Cumulative_cases', 'Cumulative_deaths', outputCol='features'])

In [0]:
new_vec = vec_assm.transform(covid_new)

In [0]:
new_vec.printSchema()

In [0]:
vec_f = new_vec.select("features", "Change_Growth")

In [0]:
vec_f.show()

In [0]:
#Splitting the Dataset
train_X,test_X = vec_f.randomSplit([0.8,0.2])

In [0]:
train_X.show()

In [0]:
test_X.show()

In [0]:
# Implementing Linear Regression Model on the smallest dataset
lin_reg = LinearRegression(labelCol="Change_Growth")

In [0]:
lin_model = lin_reg.fit(train_X)

In [0]:
res = lin_model.evaluate(test_X)

In [0]:
res.residuals.show()

In [0]:
#Rootmeansquared error
res.rootMeanSquaredError

In [0]:
#Returning the coefficient of determination
res.r2

In [0]:
res.describe().show()

In [0]:
#Data prediction using the Feature column 
feat_col = test_X.select("features")

In [0]:
feat_col.show()

In [0]:
lin_pred = lin_model.transform(feat_col)

In [0]:
# Growth Factor prediction
lin_pred.show()

In [0]:
# Decision Tree Implementation 

from pyspark.ml.regression import DecisionTreeRegressor
decision_tree = DecisionTreeRegressor(labelCol= "Change_Growth",featuresCol= "features")

In [0]:
decision_tree_model = decision_tree.fit(train_X)

In [0]:
decision_tree_prediction = decision_tree_model.transform(test_X)

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluate_dt = RegressionEvaluator(labelCol= "Change_Growth")

In [0]:
pred_dt = evaluate_dt.evaluate(decision_tree_prediction)
print(pred_dt)

In [0]:
# Random Forest Implementation
from pyspark.ml.regression import RandomForestRegressor

random_forest = RandomForestRegressor(numTrees= 100,labelCol= "Change_Growth", featuresCol= "features")
random_forest_model = random_forest.fit(train_X)
random_forest_prediction = random_forest_model.transform(test_X)

In [0]:
pred_rf = reg_eval.evaluate(random_forest_prediction)
print(pred_rf)

## The Linear Regression Model is working the best among all three.

## COVID_PROJECT ends here.