In [18]:
#Importing all the required libraries
from pyspark.sql import SQLContext
from pyspark.sql import DataFrameNaFunctions
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import Binarizer
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer
from pyspark import SparkContext
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

#### Read the CSV file into a dataframe and view the features

In [3]:
sqlContext = SQLContext(sc)
df = sqlContext.read.load('file:///home/harish/Downloads/big-data-4/daily_weather.csv', 
                          format='com.databricks.spark.csv', 
                          header='true',inferSchema='true')
df.columns

['number',
 'air_pressure_9am',
 'air_temp_9am',
 'avg_wind_direction_9am',
 'avg_wind_speed_9am',
 'max_wind_direction_9am',
 'max_wind_speed_9am',
 'rain_accumulation_9am',
 'rain_duration_9am',
 'relative_humidity_9am',
 'relative_humidity_3pm']

#### Out of the available features, we remove the 'number'.
#### The 9am features will be used to build the decision tree, while  relative humidity at 3pm will be the target variable

In [4]:
#Set of columns to be used for the decision tree model
featureColumns = ['air_pressure_9am','air_temp_9am','avg_wind_direction_9am','avg_wind_speed_9am',
        'max_wind_direction_9am','max_wind_speed_9am','rain_accumulation_9am',
        'rain_duration_9am']

In [5]:
#Remove the row ID
df = df.drop('number')

In [6]:
#Remove rows with NA records
df = df.na.drop() 

In [7]:
#Print the number of rows and columns
df.count(), len(df.columns)

(1064, 10)

#### Build the target variable
###### It is going to be based on relative humidity taken in the afternoon. If it is above 25, the day is considered humid, marked 1 in the label column

In [8]:
#Binarizer to categorise the relative humidity field
binarizer = Binarizer(threshold=24.99999, inputCol="relative_humidity_3pm", outputCol="label")
binarizedDF = binarizer.transform(df)
#The dataframe after binarising
binarizedDF.select("relative_humidity_3pm","label").show(4)

+---------------------+-----+
|relative_humidity_3pm|label|
+---------------------+-----+
|   36.160000000000494|  1.0|
|     19.4265967985621|  0.0|
|   14.460000000000045|  0.0|
|   12.742547353761848|  0.0|
+---------------------+-----+
only showing top 4 rows



In [9]:
#Aggregating the features to make predictions into a single column
assembler = VectorAssembler(inputCols=featureColumns, outputCol="features")
#New data frame with the aggregation applied
assembled = assembler.transform(binarizedDF)

In [11]:
#Split the data into training and test in 80-20 ratio
(trainingData, testData) = assembled.randomSplit([0.8,0.2], seed = 1804 )
trainingData.count(), testData.count()

(829, 235)

#### Creating and training decision tree

In [13]:
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=5,
                            minInstancesPerNode=20, impurity="gini")
pipeline = Pipeline(stages=[dt])
model = pipeline.fit(trainingData)

#### Applying the model on the Test Data and viewing the predictions

In [23]:
predictions = model.transform(testData)
predictions = predictions.select("prediction", "label")
predictions.show(10)

+----------+-----+
|prediction|label|
+----------+-----+
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
+----------+-----+
only showing top 10 rows



#### Evaluating the model

In [26]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
#Applying the evaluator and calculating accuracy
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g " % (accuracy))

Accuracy = 0.834043 


#### Getting the confusion matrix

In [27]:
#MulticlassMetrics class is going to be used to get the confusion matrix.
#Since it only works with RDDs, the prediction data frame needs to be converted
predictions.rdd.take(2)
predictions.rdd.map(tuple).take(2)

metrics = MulticlassMetrics(predictions.rdd.map(tuple))
#The Spark Matrix is converted to Python Numpy array to view
metrics.confusionMatrix().toArray().transpose()

array([[  94.,   22.],
       [  17.,  102.]])

#### As seen above, the overall accuracy is (94+102)/(94+102+17+22) = 83.4%
##### The precision is 102/102+22 = 82.3% and the recall is 94/94+17 = 84.7%