In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.context import SparkContext

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import BinaryClassificationEvaluator

from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator
from pyspark.ml.classification import LogisticRegression


In [0]:
IS_SPARK_SUBMIT_CLI = False
if IS_SPARK_SUBMIT_CLI:
    sc = SparkContext.getOrCreate()
    spark = SparkSession(sc)

In [0]:
if IS_SPARK_SUBMIT_CLI:
    coviddeath = spark.read.csv('UScasestemp1.csv', inferSchema=True, header=True)
else:
    coviddeath = spark.sql("SELECT * FROM uscasestemp1_csv")

In [0]:
data = coviddeath.select("Year","Date","Day", "Temp", "Admin2","Lat","Long","Province",col("Case").alias("label"))
data = StringIndexer(inputCol='Admin2', outputCol='Admin2'+"_index").fit(data).transform(data)
data = StringIndexer(inputCol='Province', outputCol='Province'+"_index").fit(data).transform(data)

data.show(5)


In [0]:
splits = data.randomSplit([0.7, 0.3])
# for gradient boosted tree regression
dt_train = splits[0]
dt_test = splits[1].withColumnRenamed("label", "trueLabel")

print ("Training Rows:", dt_train.count(), " Testing Rows:", dt_test.count())

dt_train.show(5)


In [0]:

#assembler = VectorAssembler(inputCols =["Day","Temp","Lat","Long","Admin_index","Province_index"],outputCol="features")
assembler = VectorAssembler(inputCols =["Date","Year","Day","Temp","Lat","Admin2_index","Province_index"],outputCol="features")
#assembler = VectorAssembler(inputCols =["Date","Year","Day","Temp"],outputCol="features")

dt = DecisionTreeRegressor(featuresCol='features', labelCol='label', maxBins=77582)
stages = [assembler, dt]
pipeline = Pipeline(stages=stages)
#dt_pipeline = pipeline(stages=[assembler, dt])
pipelineModel = pipeline.fit(dt_train)
dtModel = pipelineModel.stages[-1]
print(dtModel.toDebugString)

#paramGrid = ParamGridBuilder().build()
#cv = CrossValidator(estimator=dt_pipeline, evaluator=RegressionEvaluator(), estimatorParamMaps=paramGrid, numFolds=5)
#dt_model = cv.fit(dt_train)
#dt_prediction = dt_model.transform(dt_test)
#dt_predicted = dt_prediction.select("features", "prediction", "trueLabel")
#dt_predicted.show(10)



In [0]:
import pandas as pd
featureImp = pd.DataFrame(list(zip(assembler.getInputCols(), dtModel.featureImportances)),columns=["feature", "importance"])
featureImp.sort_values(by="importance", ascending=False)

Unnamed: 0,feature,importance
2,Day,0.609987
5,Admin2_index,0.320423
0,Date,0.064503
4,Lat,0.005087
1,Year,0.0
3,Temp,0.0
6,Province_index,0.0


In [0]:
dt_evaluator = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="rmse")
dt_rmse = dt_evaluator.evaluate(dt_prediction)


print ("Root Mean Square Error (RMSE):", dt_rmse)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluation = MulticlassClassificationEvaluator(
    labelCol="trueLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluation.evaluate(dt_prediction)
print(accuracy)