In [1]:
from pyspark.sql.types import DoubleType, StringType, StructField, StructType

In [2]:
import pyspark
spark = pyspark.sql.SparkSession.builder.getOrCreate()

sc = spark.sparkContext
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, FloatType

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark

In [4]:
data = spark.read.csv('weatherAUS.csv')

In [5]:
data.head(3)

[Row(_c0='Date', _c1='Location', _c2='MinTemp', _c3='MaxTemp', _c4='Rainfall', _c5='Evaporation', _c6='Sunshine', _c7='WindGustDir', _c8='WindGustSpeed', _c9='WindDir9am', _c10='WindDir3pm', _c11='WindSpeed9am', _c12='WindSpeed3pm', _c13='Humidity9am', _c14='Humidity3pm', _c15='Pressure9am', _c16='Pressure3pm', _c17='Cloud9am', _c18='Cloud3pm', _c19='Temp9am', _c20='Temp3pm', _c21='RainToday', _c22='RainTomorrow'),
 Row(_c0='2008-12-01', _c1='Albury', _c2='13.4', _c3='22.9', _c4='0.6', _c5='NA', _c6='NA', _c7='W', _c8='44', _c9='W', _c10='WNW', _c11='20', _c12='24', _c13='71', _c14='22', _c15='1007.7', _c16='1007.1', _c17='8', _c18='NA', _c19='16.9', _c20='21.8', _c21='No', _c22='No'),
 Row(_c0='2008-12-02', _c1='Albury', _c2='7.4', _c3='25.1', _c4='0', _c5='NA', _c6='NA', _c7='WNW', _c8='44', _c9='NNW', _c10='WSW', _c11='4', _c12='22', _c13='44', _c14='25', _c15='1010.6', _c16='1007.8', _c17='NA', _c18='NA', _c19='17.2', _c20='24.3', _c21='No', _c22='No')]

In [6]:
schema = StructType([
  StructField("Date", StringType(), False),
  StructField("Location", StringType(), False),
  StructField("MinTemp", DoubleType(), False),
  StructField("MaxTemp", DoubleType(), False),
  StructField("Rainfall", DoubleType(), False),
  StructField("Evaporation", DoubleType(), False),
  StructField("Sunshine", DoubleType(), False),
  StructField("WindGustDir", StringType(), False),
  StructField("WindGustSpeed", DoubleType(), False),
  StructField("WindDir9am", StringType(), False),
  StructField("WindDir3pm", StringType(), False),
  StructField("WindSpeed9am", DoubleType(), False),
  StructField("WindSpeed3pm", DoubleType(), False),
  StructField("Humidity9am", DoubleType(), False),
  StructField("Humidity3pm", DoubleType(), False),
  StructField("Pressure9am", DoubleType(), False),
  StructField("Pressure3pm", DoubleType(), False),
  StructField("Cloud9am", DoubleType(), False), 
  StructField("Cloud3pm", DoubleType(), False),
  StructField("Temp9am", DoubleType(), False),
  StructField("Temp3pm", DoubleType(), False),
  StructField("RainToday", StringType(), False),
  StructField("RainTomorrow", StringType(), False)
 
])

In [7]:


dataset = spark.read.format("csv").schema(schema).load("weatherAUS.csv")
cols = dataset.columns

In [8]:
display(dataset)

DataFrame[Date: string, Location: string, MinTemp: double, MaxTemp: double, Rainfall: double, Evaporation: double, Sunshine: double, WindGustDir: string, WindGustSpeed: double, WindDir9am: string, WindDir3pm: string, WindSpeed9am: double, WindSpeed3pm: double, Humidity9am: double, Humidity3pm: double, Pressure9am: double, Pressure3pm: double, Cloud9am: double, Cloud3pm: double, Temp9am: double, Temp3pm: double, RainToday: string, RainTomorrow: string]

In [9]:
# Get count of nan or missing values in pyspark
from pyspark.sql.functions import isnan, when, count, col
null_val=dataset.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in dataset.columns])

null_val.toPandas().transpose()

Unnamed: 0,0
Date,0
Location,0
MinTemp,1486
MaxTemp,1262
Rainfall,3262
Evaporation,62791
Sunshine,69836
WindGustDir,0
WindGustSpeed,10264
WindDir9am,0


In [10]:
process_df=dataset.drop("Location","Date", "Evaporation","Sunshine","WindGustSpeed","Pressure9am","Pressure3am",
                      "Cloud9am","Cloud3am" )


In [11]:
process_df=process_df.na.drop()

In [12]:
import pyspark
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
 
from distutils.version import LooseVersion
 
categoricalColumns = [ "WindGustDir", "WindDir9am","WindDir3pm","RainToday"]
stages = [] # stages in Pipeline
for categoricalCol in categoricalColumns:
    
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    
    if LooseVersion(pyspark.__version__) < LooseVersion("3.0"):
        from pyspark.ml.feature import OneHotEncoderEstimator
        encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    else:
        from pyspark.ml.feature import OneHotEncoder
        encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])

    stages += [stringIndexer, encoder]

In [13]:
label_stringIdx = StringIndexer(inputCol="RainTomorrow", outputCol="label")
stages += [label_stringIdx]

In [14]:
numericCols = [ 'MinTemp', 'MaxTemp','Rainfall', 'WindSpeed9am', 
               'WindSpeed3pm', 'Humidity9am', 'Humidity3pm','Temp9am','Temp3pm']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [15]:
from pyspark.ml.classification import LogisticRegression
  
partialPipeline = Pipeline().setStages(stages)
pipelineModel = partialPipeline.fit(process_df)


In [16]:
preppedDataDF = pipelineModel.transform(process_df)

In [17]:
(trainingData, testData) = preppedDataDF.randomSplit([0.8, 0.2], seed=12345)
print(trainingData.count())
print(testData.count())

65089
16486


In [18]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")

In [19]:
dtModel = dt.fit(trainingData)

In [20]:
print("numNodes = ", dtModel.numNodes)
print("depth = ", dtModel.depth)

numNodes =  35
depth =  5


In [21]:
display(dtModel)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_56f3eea50af3, depth=5, numNodes=35, numClasses=3, numFeatures=58

In [22]:
predictions = dtModel.transform(testData)

In [23]:
selected = predictions.select("label","probability", "prediction")
display(selected)

DataFrame[label: double, probability: vector, prediction: double]

In [24]:
from pyspark.ml.evaluation import RegressionEvaluator
# Evaluate model
evaluator= RegressionEvaluator()


In [25]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
dtparamGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [3, 5, 7])
             .addGrid(dt.maxBins, [5, 10, 15])
             .addGrid(dt.minInfoGain, [0.0, 0.2, 0.4])
             .addGrid(dt.impurity, ['gini', 'entropy'])
             .build())

In [26]:
dtcv = CrossValidator(estimator = dt ,
                      estimatorParamMaps = dtparamGrid,
                      evaluator = evaluator,
                      numFolds = 4)




In [27]:
dtcvModel = dtcv.fit(trainingData)


In [28]:

bestModel = dtcvModel.bestModel
print( 'Best Param (maxDepth): ', bestModel._java_obj.depth())
print( 'Best Param (impurity): ', bestModel._java_obj.getImpurity())
print( 'Best Param (maxBins): ', bestModel._java_obj.getMaxBins())
print( 'Best Param (minInfoGain): ', bestModel._java_obj.getMinInfoGain())

Best Param (maxDepth):  5
Best Param (impurity):  gini
Best Param (maxBins):  15
Best Param (minInfoGain):  0.0


In [29]:
predictions = dtcvModel.transform(testData)

In [30]:
evaluator.evaluate(predictions)

0.44726784595148283

In [31]:
from pyspark.ml.evaluation import RegressionEvaluator
regressionEvaluator = RegressionEvaluator(
predictionCol="prediction",
labelCol="label",
metricName="rmse")
rmse = regressionEvaluator.evaluate(predictions)
print(f"RMSE is {rmse:.1f}")

RMSE is 0.4
