In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [2]:
df=spark.read.option("header", "true").csv("weatherAUS.csv")

In [3]:
df.show(5, truncate=False)

+----------+--------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+------------+
|Date      |Location|MinTemp|MaxTemp|Rainfall|Evaporation|Sunshine|WindGustDir|WindGustSpeed|WindDir9am|WindDir3pm|WindSpeed9am|WindSpeed3pm|Humidity9am|Humidity3pm|Pressure9am|Pressure3pm|Cloud9am|Cloud3pm|Temp9am|Temp3pm|RainToday|RainTomorrow|
+----------+--------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+------------+
|2008-12-01|Albury  |13.4   |22.9   |0.6     |NA         |NA      |W          |44           |W         |WNW       |20          |24          |71         |22         |1007.7     |1007.1     |8       |NA      |16.9   |21.8   |No       |No          |
|2008-12-02|

In [4]:
col_list=df.toPandas().columns.values.tolist()

In [5]:
print(col_list)

['Date', 'Location', 'MinTemp', 'MaxTemp', 'Rainfall', 'Evaporation', 'Sunshine', 'WindGustDir', 'WindGustSpeed', 'WindDir9am', 'WindDir3pm', 'WindSpeed9am', 'WindSpeed3pm', 'Humidity9am', 'Humidity3pm', 'Pressure9am', 'Pressure3pm', 'Cloud9am', 'Cloud3pm', 'Temp9am', 'Temp3pm', 'RainToday', 'RainTomorrow']


In [6]:
#check rain tomorrow column value
df.select ("RainTomorrow").distinct().collect()

[Row(RainTomorrow='NA'), Row(RainTomorrow='No'), Row(RainTomorrow='Yes')]

In [7]:
#exclue the NA value row in the raintomorrow
df_clean=df.where(df["RainTomorrow"] != "NA")

In [8]:
#check result
df_clean.select ("RainTomorrow").distinct().collect()

[Row(RainTomorrow='No'), Row(RainTomorrow='Yes')]

In [9]:
from pyspark.ml.feature import RFormula

In [10]:
formula = RFormula(
    formula="RainTomorrow ~ .",
    featuresCol="features",
    labelCol="label")

output = formula.fit(df_clean).transform(df_clean)
output.select("features", "label").show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(8018,[3035,3449,...|  0.0|
|(8018,[3036,3449,...|  0.0|
|(8018,[3009,3449,...|  0.0|
|(8018,[3010,3449,...|  0.0|
|(8018,[3011,3449,...|  0.0|
|(8018,[3012,3449,...|  0.0|
|(8018,[3013,3449,...|  0.0|
|(8018,[3014,3449,...|  0.0|
|(8018,[3037,3449,...|  1.0|
|(8018,[3038,3449,...|  0.0|
|(8018,[3015,3449,...|  1.0|
|(8018,[3016,3449,...|  1.0|
|(8018,[3017,3449,...|  1.0|
|(8018,[3018,3449,...|  0.0|
|(8018,[3019,3449,...|  0.0|
|(8018,[3020,3449,...|  1.0|
|(8018,[3021,3449,...|  1.0|
|(8018,[3022,3449,...|  0.0|
|(8018,[3023,3449,...|  0.0|
|(8018,[3024,3449,...|  0.0|
+--------------------+-----+
only showing top 20 rows



1. split data, 2. training data clean and convert 3. parameter grid, use pipeline to training model

In [11]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [12]:
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(output)

# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer = (VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=2).fit(output))

(trainingData, testData) = output.randomSplit([0.8, 0.2], seed=12345)

dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

# model = pipeline.fit(trainingData)

In [13]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")

In [14]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [15]:
grid = ParamGridBuilder() \
    .addGrid(dt.impurity, ["gini", "entropy"]) \
    .addGrid(dt.maxBins, [5, 10, 15]) \
    .addGrid(dt.minInfoGain, [0.0, 0.2, 0.4]) \
    .addGrid(dt.maxDepth, [3, 5, 7]) \
    .build()

4. coress-validata with 4 folds

In [16]:
cv= CrossValidator(estimator=pipeline, evaluator=evaluator, estimatorParamMaps=grid, numFolds=4)

5. model training ~ take long time here

In [17]:
cvModel = cv.fit(trainingData)

In [18]:
test_metric = evaluator.evaluate(cvModel.transform(testData))

In [19]:
test_metric

0.7985231329180373

5. print parameter

In [21]:
best_Model = cvModel.bestModel

    #applicable to your model to pull list of all stages
for x in range(len(best_Model.stages)):
    print(best_Model.stages[x])

StringIndexerModel: uid=StringIndexer_a41e01018192, handleInvalid=error
VectorIndexerModel: uid=VectorIndexer_20a1f9359598, numFeatures=8018, handleInvalid=error
DecisionTreeClassificationModel: uid=DecisionTreeClassifier_34defdf7a871, depth=7, numNodes=185, numClasses=2, numFeatures=8018


In [22]:
java_model = best_Model.stages[-1]._java_obj
{param.name: java_model.getOrDefault(java_model.getParam(param.name)) 
    for param in grid[0]}

{'impurity': 'gini', 'maxBins': 5, 'minInfoGain': 0.0, 'maxDepth': 7}

6. ROC Curve and Area under Precision-Recall Curve

In [23]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics

In [24]:
# Compute raw scores on the test set
predictionAndLabels = best_Model.transform(testData)

In [None]:
# Instantiate metrics object
metrics = BinaryClassificationMetrics(predictionAndLabels.select("prediction","indexedLabel").rdd)

# Area under precision-recall curve
print("Area under PR = %s" % metrics.areaUnderPR)

# Area under ROC curve
print("Area under ROC = %s" % metrics.areaUnderROC)

