In [1]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder, Imputer, VectorAssembler
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
schema = "Date DATE, Location STRING, MinTemp FLOAT, MaxTemp FLOAT, Rainfall FLOAT, Evaporation INT, Sunshine INT, WindGustDir STRING, WindGustSpeed INT, WindDir9am STRING, WindDir3pm STRING, WindSpeed9am INT, WindSpeed3pm INT, Humidity9am INT, \
            Humidity3pm INT, Pressure9am FLOAT, Pressure3pm FLOAT, Cloud9am INT, Cloud3pm INT, Temp9am FLOAT, Temp3pm FLOAT, RainToday STRING, RainTomorrow STRING"
data = spark.read.csv("weatherAUS.csv", header="true", schema=schema)
data = data.drop("Date", "Evaporation","Sunshine","Cloud9am", "Cloud3pm", 'WindGustDir', 'WindGustSpeed')
data = data.na.drop(subset=["Humidity3pm", "Temp3pm", "Rainfall", "Windspeed3pm", "Humidity9am", 'WindSpeed9am', "Temp9am", "MinTemp", "MaxTemp"])

In [4]:
# Split the data into training and test sets (80/20)
(train_df, test_df) = data.randomSplit([0.8, 0.2])

In [5]:
categoricalColumns = ["Location", "WindDir9am", "WindDir3pm", "RainToday"]
stages = [] # stages in Pipeline
for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]

In [6]:
# impute missing data
mean_imputer = Imputer(inputCols=["Pressure9am", "Pressure3pm"], outputCols=["Pressure9am_imp", "Pressure3pm_imp"])
stages += [mean_imputer]

In [7]:
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol="RainTomorrow", outputCol="target")
stages += [label_stringIdx]

In [8]:
mode_imputer = Imputer(strategy='mode', missingValue=2.0, inputCol=label_stringIdx.getOutputCol(), outputCol="label")
stages += [mode_imputer]

In [9]:
# Transform all features into a vector using VectorAssembler
numericCols = ["MinTemp", "MaxTemp", "WindSpeed9am", "WindSpeed3pm", "Humidity9am", "Humidity3pm", "Temp9am", "Temp3pm"] + mean_imputer.getOutputCols()
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [10]:
dt = DecisionTreeClassifier(labelCol=mode_imputer.getOutputCol(), featuresCol=assembler.getOutputCol())

paramGrid = (ParamGridBuilder()
    .addGrid(dt.maxDepth, [3, 5, 7])
    .addGrid(dt.maxBins, [5, 10, 15])
    .addGrid(dt.impurity, ['gini', 'entropy'])
    .addGrid(dt.minInfoGain, [0.0, 0.2, 0.4])
    .build())

evaluator = BinaryClassificationEvaluator()
dt = DecisionTreeClassifier(labelCol=mode_imputer.getOutputCol(), featuresCol=assembler.getOutputCol())

cv = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=4)
stages += [cv]

In [None]:
pipeline = Pipeline().setStages(stages)
pipeline_model = pipeline.fit(train_df)

In [None]:
prediction = pipeline_model.transform(test_df)

In [None]:
# get the best model
best_model = pipeline_model.stages[-1].bestModel._java_obj.parent()

# get parameters of best model
best_depth = best_model.getMaxDepth()
best_bins = best_model.getMaxBins()
best_impurity = best_model.getImpurity()
best_gain = best_model.getMinInfoGain()

print('Best Max Depth Value: ' + str(best_depth))
print('Best Max Bins Value: ' + str(best_bins))
print('Best Impurity Value: ' + str(best_impurity))
print('Best Min Info Gain Value: ' + str(best_gain))

In [None]:
auc_evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="probability", metricName="areaUnderROC")
aupr_evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="probability", metricName="areaUnderPR")
print('Accuracy:',evaluator.evaluate(prediction))
print('AUC:', auc_evaluator.evaluate(prediction))
print('AUPR:', aupr_evaluator.evaluate(prediction))