In [131]:
!pip install pyspark



In [132]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [133]:
from pyspark.sql.types import *
file_data = "/content/weatherAUS.csv"
file_Schema = StructType([
      StructField("Date",StringType(),True),
      StructField("Location",StringType(),True),
      StructField("MinTemp",FloatType(),True),
      StructField("MaxTemp",FloatType(),True),
      StructField("Rainfall",FloatType(),True),
      StructField("Evaporation",StringType(),True),
      StructField("Sunshine",StringType(),True),
      StructField("WindGustDir",StringType(),True),
      StructField("WindGustSpeed",FloatType(),True),
      StructField("WindDir9am",StringType(),True),
      StructField("WindDir3pm",StringType(),True),
      StructField("WindSpeed9am",FloatType(),True),
      StructField("WindSpeed3pm",FloatType(),True),
      StructField("Humidity9am",FloatType(),True),
      StructField("Humidity3pm",FloatType(),True),
      StructField("Pressure9am",FloatType(),True),
      StructField("Pressure3pm",FloatType(),True),
      StructField("Cloud9am",FloatType(),True),
      StructField("Cloud3pm",FloatType(),True),
      StructField("Temp9am",FloatType(),True),
      StructField("Temp3pm",FloatType(),True),
      StructField("RainToday",StringType(),True),
      StructField("RainTomorrow",StringType(),True)
])

In [135]:
df = spark.read.csv(file_data, header="true", schema=file_Schema,nullValue= 'NA')
df = df.drop("Date", "Evaporation","Sunshine","Cloud9am", "Cloud3pm", 'WindGustDir', 'WindGustSpeed')

In [136]:
df = df.dropna()

In [137]:
(traindata, testdata) = df.randomSplit([0.8, 0.2],12345)

In [138]:
categoricalColumns = ["Location", "WindDir9am", "WindDir3pm", "RainToday"]

In [139]:
stages = []

In [140]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]
    

In [141]:
stages

[StringIndexer_98908de32bd5,
 OneHotEncoder_16e80a30d418,
 StringIndexer_7c9f00d59cfb,
 OneHotEncoder_7afacbe62bfa,
 StringIndexer_4fc70721b0f3,
 OneHotEncoder_4c9c051953c9,
 StringIndexer_2d5bca9f1676,
 OneHotEncoder_6dd14920bf6b]

In [142]:
label_stringIdx = StringIndexer(inputCol="RainTomorrow", outputCol="label")

stages += [label_stringIdx]

In [143]:
numericCols = ["MinTemp", "MaxTemp", "WindSpeed9am", "WindSpeed3pm", "Humidity9am", "Humidity3pm", "Temp9am", "Temp3pm", "Pressure9am", "Pressure3pm"]

In [144]:
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]
dtree = DecisionTreeClassifier(labelCol="label", featuresCol=assembler.getOutputCol())

In [122]:
label_stringIdx = StringIndexer(inputCol="RainTomorrow", outputCol="label")
stages += [label_stringIdx]

In [123]:
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numeric_Cols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]
dt = DecisionTreeClassifier(labelCol="RainTomorrowIndex", featuresCol="features")

In [145]:
paramGrid = (ParamGridBuilder()
    .addGrid(dtree.impurity, ['gini', 'entropy'])
    .addGrid(dtree.maxBins, [5, 10, 15])
    .addGrid(dtree.minInfoGain, [0.0, 0.2, 0.4])
    .addGrid(dtree.maxDepth, [3, 5, 7])
    .build())

In [146]:
evaluator = BinaryClassificationEvaluator()

In [147]:
cv = CrossValidator(estimator=dtree, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=4)
stages += [cv]

In [148]:
from pyspark.ml import Pipeline

In [149]:
pipeline = Pipeline().setStages(stages)


In [150]:
pipeline_model = pipeline.fit(traindata)
prediction = pipeline_model.transform(testdata)

In [156]:
best_model = pipeline_model.stages[-1].bestModel

In [157]:
best_modelobj = best_model._java_obj.parent()

best_modeldepth = best_modelobj.getMaxDepth()
best_modelbins = best_modelobj.getMaxBins()
best_modelimpurity = best_modelobj.getImpurity()
best_modelgain = best_modelobj.getMinInfoGain()


In [155]:
print("Best model grid params are ")
print(best_modeldepth)
print(best_modelbins)
print(best_modelimpurity)
print(best_modelgain)

Best model grid params are 
7
15
entropy
0.0


In [158]:
evaluator.evaluate(prediction)

0.38596277301136334

In [164]:
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(prediction, {evaluator.metricName: "areaUnderROC"})))

Test Area Under ROC: 0.38596277301136334


In [165]:
print("Test Area Under PR: " + str(evaluator.evaluate(prediction, {evaluator.metricName: "areaUnderPR"})))

Test Area Under PR: 0.3416236420408684
