This FINAL homework recaps the Spark ML library:
1. Download the "Rain in Australia" dataset from Kaggle (it is also attached to this assignment): https://www kaggle.com/sphyg/weather-
dataset-rattle-package
2. In a Jupyter Notebook, write a SparkML script that uses a Decision Tree Classifier to predict the Rain Tomorrow target varible
1. Split the data 80/20 train/test, using a seed of 12345
2. Use transformers to remove unnecessary columns (use your best judgement) and convert categorical variables into one-hot
encoded vanables
3. Use a parameter grid to determine the best parameters for:
1. impurity - gini, entropy
2. maxBins - 5, 10, 15
3 minInfoGain - 0.0. 0.2. 0 4
4 maxDepth - 3. 5. 7
4. Cross-validate with 4 folds
5. Use a pipeline to encapsulate all steps
6. Print the parameters from the best model selected
7. Calculate and print the Area under ROC Curve and Area under Precision-Recall Curve scores for your training and test data sets
(these are built-in metrics, you do not need to calculate anything by hand)
8. Your script should be clean of all the testing and exploration and should only contain the necessary code to satisfy the above
conditions
3. Submit only your clean Jupyter notebook, run from top to bottom with the output showing, as homework/hw07.ipynb in your github
repository in a new branch
4. Create a pull request and tag me as the reviewer. Submit your pull request link here

In [126]:
from pyspark import SparkContext,SparkConf
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from sklearn.metrics import confusion_matrix
import pandas as pd 

In [15]:
# conf = SparkConf().setMaster('local')
# sc = SparkContext(conf = conf)

In [16]:
# sc

In [127]:
spark = SparkSession \
    .builder \
    .appName("AUS weather data") \
    .getOrCreate()

In [165]:
df = spark.read.csv("weatherAUS.csv",header = True)

In [129]:
df.show(5)

+----------+--------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+------------+
|      Date|Location|MinTemp|MaxTemp|Rainfall|Evaporation|Sunshine|WindGustDir|WindGustSpeed|WindDir9am|WindDir3pm|WindSpeed9am|WindSpeed3pm|Humidity9am|Humidity3pm|Pressure9am|Pressure3pm|Cloud9am|Cloud3pm|Temp9am|Temp3pm|RainToday|RainTomorrow|
+----------+--------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+------------+
|2008-12-01|  Albury|   13.4|   22.9|     0.6|         NA|      NA|          W|           44|         W|       WNW|          20|          24|         71|         22|     1007.7|     1007.1|       8|      NA|   16.9|   21.8|       No|          No|
|2008-12-02|

In [88]:
df.describe().show()

[Stage 343:>                                                        (0 + 1) / 1]

+-------+----------+--------+-----------------+------------------+-----------------+------------------+------------------+-----------+------------------+----------+----------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+---------+------------+
|summary|      Date|Location|          MinTemp|           MaxTemp|         Rainfall|       Evaporation|          Sunshine|WindGustDir|     WindGustSpeed|WindDir9am|WindDir3pm|      WindSpeed9am|      WindSpeed3pm|       Humidity9am|      Humidity3pm|       Pressure9am|       Pressure3pm|          Cloud9am|          Cloud3pm|           Temp9am|           Temp3pm|RainToday|RainTomorrow|
+-------+----------+--------+-----------------+------------------+-----------------+------------------+------------------+-----------+------------------+----------+----------+------------------+------------------+-----------

                                                                                

In [130]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- MinTemp: string (nullable = true)
 |-- MaxTemp: string (nullable = true)
 |-- Rainfall: string (nullable = true)
 |-- Evaporation: string (nullable = true)
 |-- Sunshine: string (nullable = true)
 |-- WindGustDir: string (nullable = true)
 |-- WindGustSpeed: string (nullable = true)
 |-- WindDir9am: string (nullable = true)
 |-- WindDir3pm: string (nullable = true)
 |-- WindSpeed9am: string (nullable = true)
 |-- WindSpeed3pm: string (nullable = true)
 |-- Humidity9am: string (nullable = true)
 |-- Humidity3pm: string (nullable = true)
 |-- Pressure9am: string (nullable = true)
 |-- Pressure3pm: string (nullable = true)
 |-- Cloud9am: string (nullable = true)
 |-- Cloud3pm: string (nullable = true)
 |-- Temp9am: string (nullable = true)
 |-- Temp3pm: string (nullable = true)
 |-- RainToday: string (nullable = true)
 |-- RainTomorrow: string (nullable = true)



In [90]:
import pandas as pd 

In [166]:
df = df.drop(*("Date","Location"))
cols = df.columns

In [167]:
cols1 = ['WindGustDir','WindDir9am','WindDir3pm','RainToday','RainTomorrow']
for i in cols1:
    cols.remove(i)

In [133]:
cols

['MinTemp',
 'MaxTemp',
 'Rainfall',
 'Evaporation',
 'Sunshine',
 'WindGustSpeed',
 'WindSpeed9am',
 'WindSpeed3pm',
 'Humidity9am',
 'Humidity3pm',
 'Pressure9am',
 'Pressure3pm',
 'Cloud9am',
 'Cloud3pm',
 'Temp9am',
 'Temp3pm']

In [168]:

from pyspark.sql.functions import col
for col_name in cols:
    df = df.withColumn(col_name, col(col_name).cast('float'))

In [135]:
df.printSchema()

root
 |-- MinTemp: float (nullable = true)
 |-- MaxTemp: float (nullable = true)
 |-- Rainfall: float (nullable = true)
 |-- Evaporation: float (nullable = true)
 |-- Sunshine: float (nullable = true)
 |-- WindGustDir: string (nullable = true)
 |-- WindGustSpeed: float (nullable = true)
 |-- WindDir9am: string (nullable = true)
 |-- WindDir3pm: string (nullable = true)
 |-- WindSpeed9am: float (nullable = true)
 |-- WindSpeed3pm: float (nullable = true)
 |-- Humidity9am: float (nullable = true)
 |-- Humidity3pm: float (nullable = true)
 |-- Pressure9am: float (nullable = true)
 |-- Pressure3pm: float (nullable = true)
 |-- Cloud9am: float (nullable = true)
 |-- Cloud3pm: float (nullable = true)
 |-- Temp9am: float (nullable = true)
 |-- Temp3pm: float (nullable = true)
 |-- RainToday: string (nullable = true)
 |-- RainTomorrow: string (nullable = true)



In [169]:
from pyspark.sql.functions import col,isnan, when, count
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

[Stage 873:>                                                        (0 + 1) / 1]

+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+------------+
|MinTemp|MaxTemp|Rainfall|Evaporation|Sunshine|WindGustDir|WindGustSpeed|WindDir9am|WindDir3pm|WindSpeed9am|WindSpeed3pm|Humidity9am|Humidity3pm|Pressure9am|Pressure3pm|Cloud9am|Cloud3pm|Temp9am|Temp3pm|RainToday|RainTomorrow|
+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+------------+
|   1485|   1261|    3261|      62790|   69835|          0|        10263|         0|         0|        1767|        3062|       2654|       4507|      15065|      15028|   55888|   59358|   1767|   3609|        0|           0|
+-------+-------+--------+-----------+--------+-----------+-------------+----------+--------

                                                                                

In [97]:
df.count()

145460

In [136]:
Dict_Null = {col:df.filter(df[col].isNull()).count() for col in df.columns}
Dict_Null

{'MinTemp': 1485,
 'MaxTemp': 1261,
 'Rainfall': 3261,
 'Evaporation': 62790,
 'Sunshine': 69835,
 'WindGustDir': 0,
 'WindGustSpeed': 10263,
 'WindDir9am': 0,
 'WindDir3pm': 0,
 'WindSpeed9am': 1767,
 'WindSpeed3pm': 3062,
 'Humidity9am': 2654,
 'Humidity3pm': 4507,
 'Pressure9am': 15065,
 'Pressure3pm': 15028,
 'Cloud9am': 55888,
 'Cloud3pm': 59358,
 'Temp9am': 1767,
 'Temp3pm': 3609,
 'RainToday': 0,
 'RainTomorrow': 0}

In [170]:
df= df.drop(*('Sunshine','Evaporation',))

In [171]:
df = df.fillna( { 'Cloud9am':0, 'Cloud3pm':0 } )

In [172]:
Dict_Null = {col:df.filter(df[col].isNull()).count() for col in df.columns}
Dict_Null

{'MinTemp': 1485,
 'MaxTemp': 1261,
 'Rainfall': 3261,
 'WindGustDir': 0,
 'WindGustSpeed': 10263,
 'WindDir9am': 0,
 'WindDir3pm': 0,
 'WindSpeed9am': 1767,
 'WindSpeed3pm': 3062,
 'Humidity9am': 2654,
 'Humidity3pm': 4507,
 'Pressure9am': 15065,
 'Pressure3pm': 15028,
 'Cloud9am': 0,
 'Cloud3pm': 0,
 'Temp9am': 1767,
 'Temp3pm': 3609,
 'RainToday': 0,
 'RainTomorrow': 0}

In [176]:
df.select('RainToday').distinct().collect()

[Row(RainToday='No'), Row(RainToday='Yes')]

In [174]:
df=df.where("RainToday!='NA'")

In [177]:
df.select('RainTomorrow').distinct().collect()

[Row(RainTomorrow='NA'), Row(RainTomorrow='No'), Row(RainTomorrow='Yes')]

In [178]:
df=df.where("RainTomorrow!='NA'")

In [179]:
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
Rain_Today = StringIndexer(inputCol="RainToday", outputCol="Rain_Today")
Rain_Tomorrow = StringIndexer(inputCol="RainTomorrow", outputCol="Rain_Tomorrow")
WindGustDir_indexer = StringIndexer(inputCol="WindGustDir", outputCol="WindGustDirIndex")
WindDir9am_indexer = StringIndexer(inputCol="WindDir9am", outputCol="WindDir9amIndex")
WindDir3pm_indexer = StringIndexer(inputCol="WindDir3pm", outputCol="WindDir3pmIndex")
onehotencoder_WindGustDir_vector = OneHotEncoder(inputCol="WindGustDirIndex", outputCol="WindGustDir_vec")
onehotencoder_WindDir9am_vector = OneHotEncoder(inputCol="WindDir9amIndex", outputCol="WindDir9am_vec")
onehotencoder_WindDir3pm_vector = OneHotEncoder(inputCol="WindDir3pmIndex", outputCol="WindDir3pm_vec")
#Create pipeline and pass all stages
pipeline = Pipeline(stages=[WindGustDir_indexer,
                            WindDir9am_indexer,
                            WindDir3pm_indexer,
                            Rain_Today,
                            Rain_Tomorrow,
                            onehotencoder_WindGustDir_vector,
                            onehotencoder_WindDir9am_vector,
                            onehotencoder_WindDir3pm_vector
                    ])


In [180]:
df_transformed = pipeline.fit(df).transform(df)
df_transformed.show(5)

+-------+-------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+------------+----------------+---------------+---------------+----------+-------------+---------------+---------------+---------------+
|MinTemp|MaxTemp|Rainfall|WindGustDir|WindGustSpeed|WindDir9am|WindDir3pm|WindSpeed9am|WindSpeed3pm|Humidity9am|Humidity3pm|Pressure9am|Pressure3pm|Cloud9am|Cloud3pm|Temp9am|Temp3pm|RainToday|RainTomorrow|WindGustDirIndex|WindDir9amIndex|WindDir3pmIndex|Rain_Today|Rain_Tomorrow|WindGustDir_vec| WindDir9am_vec| WindDir3pm_vec|
+-------+-------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+------------+----------------+---------------+---------------+----------+-------------+---------------+---------------+---------------+
|   13.4|   22.9

In [181]:
df_transformed.toPandas()

                                                                                

Unnamed: 0,MinTemp,MaxTemp,Rainfall,WindGustDir,WindGustSpeed,WindDir9am,WindDir3pm,WindSpeed9am,WindSpeed3pm,Humidity9am,...,RainToday,RainTomorrow,WindGustDirIndex,WindDir9amIndex,WindDir3pmIndex,Rain_Today,Rain_Tomorrow,WindGustDir_vec,WindDir9am_vec,WindDir3pm_vec
0,13.4,22.900000,0.6,W,44.0,W,WNW,20.0,24.0,71.0,...,No,No,0.0,7.0,7.0,0.0,0.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, ..."
1,7.4,25.100000,0.0,WNW,44.0,NNW,WSW,4.0,22.0,44.0,...,No,No,10.0,10.0,3.0,0.0,0.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
2,12.9,25.700001,0.0,WSW,46.0,W,WSW,19.0,26.0,38.0,...,No,No,7.0,7.0,3.0,0.0,0.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, ...","(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
3,9.2,28.000000,0.0,NE,24.0,SE,E,11.0,9.0,45.0,...,No,No,14.0,2.0,10.0,0.0,0.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
4,17.5,32.299999,1.0,W,41.0,ENE,NW,7.0,20.0,82.0,...,No,No,0.0,11.0,8.0,0.0,0.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ..."
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
140782,3.5,21.799999,0.0,E,31.0,ESE,E,15.0,13.0,59.0,...,No,No,3.0,12.0,10.0,0.0,0.0,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
140783,2.8,23.400000,0.0,E,31.0,SE,ENE,13.0,11.0,51.0,...,No,No,3.0,2.0,14.0,0.0,0.0,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
140784,3.6,25.299999,0.0,NNW,22.0,SE,N,13.0,9.0,56.0,...,No,No,15.0,2.0,6.0,0.0,0.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, ..."
140785,5.4,26.900000,0.0,N,37.0,SE,WNW,9.0,9.0,53.0,...,No,No,4.0,2.0,7.0,0.0,0.0,"(0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, ..."


In [182]:
df_transformed = df_transformed.drop(*("WindGustDir","WindDir9am","WindDir3pm","RainToday","RainTomorrow"))

In [183]:
df_transformed.printSchema()

root
 |-- MinTemp: float (nullable = true)
 |-- MaxTemp: float (nullable = true)
 |-- Rainfall: float (nullable = true)
 |-- WindGustSpeed: float (nullable = true)
 |-- WindSpeed9am: float (nullable = true)
 |-- WindSpeed3pm: float (nullable = true)
 |-- Humidity9am: float (nullable = true)
 |-- Humidity3pm: float (nullable = true)
 |-- Pressure9am: float (nullable = true)
 |-- Pressure3pm: float (nullable = true)
 |-- Cloud9am: float (nullable = false)
 |-- Cloud3pm: float (nullable = false)
 |-- Temp9am: float (nullable = true)
 |-- Temp3pm: float (nullable = true)
 |-- WindGustDirIndex: double (nullable = false)
 |-- WindDir9amIndex: double (nullable = false)
 |-- WindDir3pmIndex: double (nullable = false)
 |-- Rain_Today: double (nullable = false)
 |-- Rain_Tomorrow: double (nullable = false)
 |-- WindGustDir_vec: vector (nullable = true)
 |-- WindDir9am_vec: vector (nullable = true)
 |-- WindDir3pm_vec: vector (nullable = true)



In [195]:

imputer = Imputer(
    inputCols=df2.columns, 
    outputCols=["{}_imputed".format(c) for c in df2.columns]
)
imputer.fit(df2).transform(df2)

                                                                                

DataFrame[MinTemp: float, MaxTemp: float, Rainfall: float, WindGustSpeed: float, WindSpeed9am: float, WindSpeed3pm: float, Humidity9am: float, Humidity3pm: float, Pressure9am: float, Pressure3pm: float, Cloud9am: float, Cloud3pm: float, Temp9am: float, Temp3pm: float, Rain_Today: double, Rain_Tomorrow: double, MinTemp_imputed: float, MaxTemp_imputed: float, Rainfall_imputed: float, WindGustSpeed_imputed: float, WindSpeed9am_imputed: float, WindSpeed3pm_imputed: float, Humidity9am_imputed: float, Humidity3pm_imputed: float, Pressure9am_imputed: float, Pressure3pm_imputed: float, Cloud9am_imputed: float, Cloud3pm_imputed: float, Temp9am_imputed: float, Temp3pm_imputed: float, Rain_Today_imputed: double, Rain_Tomorrow_imputed: double]

In [205]:
num_cols = [col_type[0] for col_type in filter(lambda dtype: dtype[1] in {"float","double"}, df_transformed.dtypes)]
median_dict = dict()
for c in num_cols:
    median_dict[c] = df_transformed.stat.approxQuantile(c, [0.5], 0.001)[0]

In [206]:
df_imputed = df_transformed.na.fill(median_dict)


In [207]:
Dict_Null = {col:df_imputed.filter(df_imputed[col].isNull()).count() for col in df_imputed.columns}
Dict_Null

{'MinTemp': 0,
 'MaxTemp': 0,
 'Rainfall': 0,
 'WindGustSpeed': 0,
 'WindSpeed9am': 0,
 'WindSpeed3pm': 0,
 'Humidity9am': 0,
 'Humidity3pm': 0,
 'Pressure9am': 0,
 'Pressure3pm': 0,
 'Cloud9am': 0,
 'Cloud3pm': 0,
 'Temp9am': 0,
 'Temp3pm': 0,
 'WindGustDirIndex': 0,
 'WindDir9amIndex': 0,
 'WindDir3pmIndex': 0,
 'Rain_Today': 0,
 'Rain_Tomorrow': 0,
 'WindGustDir_vec': 0,
 'WindDir9am_vec': 0,
 'WindDir3pm_vec': 0}

In [208]:
df_imputed = df_imputed.drop(*("WindGustDirIndex","WindDir9amIndex","WindDir3pmIndex"))

In [209]:
df_imputed.printSchema()

root
 |-- MinTemp: float (nullable = false)
 |-- MaxTemp: float (nullable = false)
 |-- Rainfall: float (nullable = false)
 |-- WindGustSpeed: float (nullable = false)
 |-- WindSpeed9am: float (nullable = false)
 |-- WindSpeed3pm: float (nullable = false)
 |-- Humidity9am: float (nullable = false)
 |-- Humidity3pm: float (nullable = false)
 |-- Pressure9am: float (nullable = false)
 |-- Pressure3pm: float (nullable = false)
 |-- Cloud9am: float (nullable = false)
 |-- Cloud3pm: float (nullable = false)
 |-- Temp9am: float (nullable = false)
 |-- Temp3pm: float (nullable = false)
 |-- Rain_Today: double (nullable = false)
 |-- Rain_Tomorrow: double (nullable = false)
 |-- WindGustDir_vec: vector (nullable = true)
 |-- WindDir9am_vec: vector (nullable = true)
 |-- WindDir3pm_vec: vector (nullable = true)



In [239]:
cols3 = df_imputed.columns
# Rearrange the list any way you want
cols3.remove("Rain_Tomorrow")

In [245]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=cols3,outputCol="features")

In [262]:
new_df = assembler.transform(df_imputed)

In [258]:
output.toPandas()

Exception ignored in: <function JavaWrapper.__del__ at 0x7f8cb0a06040>          
Traceback (most recent call last):
  File "/Users/gouthamkakani/opt/anaconda3/lib/python3.9/site-packages/pyspark/ml/wrapper.py", line 39, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'VectorAssembler' object has no attribute '_java_obj'


Unnamed: 0,MinTemp,MaxTemp,Rainfall,WindGustSpeed,WindSpeed9am,WindSpeed3pm,Humidity9am,Humidity3pm,Pressure9am,Pressure3pm,Cloud9am,Cloud3pm,Temp9am,Temp3pm,Rain_Today,Rain_Tomorrow,WindGustDir_vec,WindDir9am_vec,WindDir3pm_vec,features
0,13.4,22.900000,0.6,44.0,20.0,24.0,71.0,22.0,1007.700012,1007.099976,8.0,0.0,16.900000,21.799999,0.0,0.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, ...","(13.399999618530273, 22.899999618530273, 0.600..."
1,7.4,25.100000,0.0,44.0,4.0,22.0,44.0,25.0,1010.599976,1007.799988,0.0,0.0,17.200001,24.299999,0.0,0.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(7.400000095367432, 25.100000381469727, 0.0, 4..."
2,12.9,25.700001,0.0,46.0,19.0,26.0,38.0,30.0,1007.599976,1008.700012,0.0,2.0,21.000000,23.200001,0.0,0.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, ...","(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(12.899999618530273, 25.700000762939453, 0.0, ..."
3,9.2,28.000000,0.0,24.0,11.0,9.0,45.0,16.0,1017.599976,1012.799988,0.0,0.0,18.100000,26.500000,0.0,0.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(9.199999809265137, 28.0, 0.0, 24.0, 11.0, 9.0..."
4,17.5,32.299999,1.0,41.0,7.0,20.0,82.0,33.0,1010.799988,1006.000000,7.0,8.0,17.799999,29.700001,0.0,0.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ...","(17.5, 32.29999923706055, 1.0, 41.0, 7.0, 20.0..."
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
140782,3.5,21.799999,0.0,31.0,15.0,13.0,59.0,27.0,1024.699951,1021.200012,0.0,0.0,9.400000,20.900000,0.0,0.0,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(3.5, 21.799999237060547, 0.0, 31.0, 15.0, 13...."
140783,2.8,23.400000,0.0,31.0,13.0,11.0,51.0,24.0,1024.599976,1020.299988,0.0,0.0,10.100000,22.400000,0.0,0.0,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(2.799999952316284, 23.399999618530273, 0.0, 3..."
140784,3.6,25.299999,0.0,22.0,13.0,9.0,56.0,21.0,1023.500000,1019.099976,0.0,0.0,10.900000,24.500000,0.0,0.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, ...","(3.5999999046325684, 25.299999237060547, 0.0, ..."
140785,5.4,26.900000,0.0,37.0,9.0,9.0,53.0,24.0,1021.000000,1016.799988,0.0,0.0,12.500000,26.100000,0.0,0.0,"(0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, ...","(5.400000095367432, 26.899999618530273, 0.0, 3..."


In [263]:
new_df.select("features","Rain_Tomorrow").toPandas()

                                                                                

Unnamed: 0,features,Rain_Tomorrow
0,"(13.399999618530273, 22.899999618530273, 0.600...",0.0
1,"(7.400000095367432, 25.100000381469727, 0.0, 4...",0.0
2,"(12.899999618530273, 25.700000762939453, 0.0, ...",0.0
3,"(9.199999809265137, 28.0, 0.0, 24.0, 11.0, 9.0...",0.0
4,"(17.5, 32.29999923706055, 1.0, 41.0, 7.0, 20.0...",0.0
...,...,...
140782,"(3.5, 21.799999237060547, 0.0, 31.0, 15.0, 13....",0.0
140783,"(2.799999952316284, 23.399999618530273, 0.0, 3...",0.0
140784,"(3.5999999046325684, 25.299999237060547, 0.0, ...",0.0
140785,"(5.400000095367432, 26.899999618530273, 0.0, 3...",0.0


In [269]:
final_df = new_df.select("features","Rain_Tomorrow")

In [271]:
train, test = final_df.randomSplit([0.7, 0.3],seed=12345)

In [281]:
from pyspark.ml.classification import DecisionTreeClassifier 
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [275]:
dtree = DecisionTreeClassifier(labelCol="Rain_Tomorrow").fit(train)

                                                                                

In [276]:
df_pred = dtree.transform(test)

In [284]:
accuracy = MulticlassClassificationEvaluator(labelCol="Rain_Tomorrow",metricName='accuracy').evaluate(df_pred)

                                                                                

In [286]:
print(accuracy)

0.8399443448731252


In [293]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics


dtree = DecisionTreeClassifier(labelCol="Rain_Tomorrow", featuresCol="features", maxDepth=2,impurity ="gini" )


dtparamGrid = (ParamGridBuilder().addGrid(dtree.impurity,["gini",'entropy']).addGrid(dtree.maxDepth, [3,5,7]).addGrid(dtree.maxBins, [5,10,15,3]).build())


dtree_evl = BinaryClassificationEvaluator(labelCol="Rain_Tomorrow")


dtcv = CrossValidator(estimator = dtree,
                      estimatorParamMaps = dtparamGrid,
                      evaluator = dtree_evl,
                      numFolds = 4)


dtcvModel = dtcv.fit(train)
print(dtcvModel)


dtreefinal_pred = dtcvModel.transform(test)

print('Accuracy:', dtree_evl.evaluate(dtreefinal_pred))
print('AUC:', BinaryClassificationMetrics(dtreefinal_pred['Rain_Tomorrow','prediction'].rdd).areaUnderROC)

                                                                                

CrossValidatorModel_8194ad08a5bd


                                                                                

Accuracy: 0.5676081757050775


[Stage 9155:>                                                       (0 + 1) / 1]

AUC: 0.7152006353725182


                                                                                