# San Diego Rainforest Fire Prediction 

## Objectives
* Read CSV files into Spark Dataframes.
* Generate summary statistics.
* Compute correlation coefficients between two columns.
* Generate a categorical variable from a numeric variable
* Aggregate the features into one single column
* Randomly split the data into training and test sets
* Create a decision tree classifier to predict days with low humidity
* Determine the accuracy of a classifier model
* Display the confusion matrix for a classifier model


### Import important libraries

In [2]:
import findspark
findspark.init()
findspark.add_packages('mysql:mysql-connector-java:8.0.11')

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

spark = SparkSession.builder.getOrCreate()

In [3]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import Binarizer
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer
from pyspark import SparkContext
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

sc =SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

In [4]:
sqlContext = SQLContext(sc)

### Upload data

The file daily_weather.csv is a comma-separated file that contains weather data. This data comes
from a weather station located in San Diego, California. The weather station is equipped with
sensors that capture weather-related measurements such as air temperature, air pressure, and
relative humidity. Data was collected for a period of three years, from September 2011 to September
2014, to ensure that sufficient data for different seasons and weather conditions is captured. The dataset can be downladed from __[here](https://www.kaggle.com/youssef19/san-diago-weather-data)__.

Sensor measurements from the weather station were captured at one-minute intervals.  These measurements were then processed to generate values to describe daily weather. Since this dataset was created to classify low-humidity days vs. non-low-humidity days (that is, days with normal or high humidity), the variables included are weather measurements in the morning, with one measurement, namely relatively humidity, in the afternoon.  __The idea is to use the morning weather values to predict whether the day will be low-humidity or not based on the afternoon measurement of relatively humidity__. This will help in prediciting the occurance of rainforest fire.


In [6]:
df = spark.read.csv('daily_weather.csv', 
                          header='true',inferSchema='true')

### Data Exploration

In [7]:
# Look at data columns and types
df.columns

['number',
 'air_pressure_9am',
 'air_temp_9am',
 'avg_wind_direction_9am',
 'avg_wind_speed_9am',
 'max_wind_direction_9am',
 'max_wind_speed_9am',
 'rain_accumulation_9am',
 'rain_duration_9am',
 'relative_humidity_9am',
 'relative_humidity_3pm']

In [8]:
df.printSchema()

root
 |-- number: integer (nullable = true)
 |-- air_pressure_9am: double (nullable = true)
 |-- air_temp_9am: double (nullable = true)
 |-- avg_wind_direction_9am: double (nullable = true)
 |-- avg_wind_speed_9am: double (nullable = true)
 |-- max_wind_direction_9am: double (nullable = true)
 |-- max_wind_speed_9am: double (nullable = true)
 |-- rain_accumulation_9am: double (nullable = true)
 |-- rain_duration_9am: double (nullable = true)
 |-- relative_humidity_9am: double (nullable = true)
 |-- relative_humidity_3pm: double (nullable = true)



In [9]:
# Print summary statistics
df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
number,1095,547.0,316.24357700987383,0,1094
air_pressure_9am,1092,918.8825513138094,3.184161180386833,907.9900000000024,929.3200000000012
air_temp_9am,1090,64.93300141287072,11.175514003175877,36.752000000000685,98.90599999999992
avg_wind_direction_9am,1091,142.2355107005759,69.13785928889189,15.500000000000046,343.4
avg_wind_speed_9am,1092,5.50828424225493,4.5528134655317185,0.69345139999974,23.554978199999763
max_wind_direction_9am,1092,148.95351796516923,67.23801294602953,28.89999999999991,312.19999999999993
max_wind_speed_9am,1091,7.019513529175272,5.598209170780958,1.1855782000000479,29.84077959999996
rain_accumulation_9am,1089,0.20307895225211126,1.5939521253574893,0.0,24.01999999999907
rain_duration_9am,1092,294.1080522756142,1598.0787786601481,0.0,17704.0


In [10]:
df.describe('air_pressure_9am').show()

+-------+-----------------+
|summary| air_pressure_9am|
+-------+-----------------+
|  count|             1092|
|   mean|918.8825513138094|
| stddev|3.184161180386833|
|    min|907.9900000000024|
|    max|929.3200000000012|
+-------+-----------------+



In [15]:
# Compute correlation between two columns
df2.stat.corr("rain_accumulation_9am", "rain_duration_9am")

0.7298253479609021

### Data Preprocessing 

In [13]:
# Drop rows with missing values.
df2 = df.na.drop(subset=['air_pressure_9am'])

In [14]:
df2.count()

1092

In [16]:
# Select features used for classification
featureColumns = ['air_pressure_9am','air_temp_9am','avg_wind_direction_9am','avg_wind_speed_9am',
        'max_wind_direction_9am','max_wind_speed_9am','rain_accumulation_9am',
        'rain_duration_9am']

In [17]:
# Drop unused and missing data
df = df.drop('number')
df = df.na.drop() 

In [19]:
df.count(), len(df.columns)

(1064, 10)

In [21]:
# Create categorical variable for the relative humidity
binarizer = Binarizer(threshold=24.99999, inputCol="relative_humidity_3pm", outputCol="label")
binarizedDF = binarizer.transform(df)

In [8]:
binarizedDF.select("relative_humidity_3pm","label").show(4)

+---------------------+-----+
|relative_humidity_3pm|label|
+---------------------+-----+
|   36.160000000000494|  1.0|
|     19.4265967985621|  0.0|
|   14.460000000000045|  0.0|
|   12.742547353761848|  0.0|
+---------------------+-----+
only showing top 4 rows



In [22]:
binarizedDF.select("relative_humidity_3pm","label").show(4)

+---------------------+-----+
|relative_humidity_3pm|label|
+---------------------+-----+
|   36.160000000000494|  1.0|
|     19.4265967985621|  0.0|
|   14.460000000000045|  0.0|
|   12.742547353761848|  0.0|
+---------------------+-----+
only showing top 4 rows



In [23]:
# aggregate the features we will use to make predictions into a single column
assembler = VectorAssembler(inputCols=featureColumns, outputCol="features")
assembled = assembler.transform(binarizedDF)

In [24]:
# Splitting the data into train and test data 
trainingData, testData) = assembled.randomSplit([0.8,0.2], seed = 13234 )

In [25]:
trainingData.count(), testData.count()

(846, 218)

### Decision tree classification 

In [26]:
# create decision tree
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=5,
                            minInstancesPerNode=20, impurity="gini")

In [28]:
# train decision tree 
pipeline = Pipeline(stages=[dt])
model = pipeline.fit(trainingData)

In [52]:
#  make predictions using our test data set
predictions = model.transform(testData)

In [53]:
predictions = predictions.select("prediction", "label")

### Evaluation 

In [54]:
# Compute accuracy
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")

In [55]:
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g " % (accuracy))

Accuracy = 0.784404 


In [56]:
predictions.rdd.take(2)

[Row(prediction=1.0, label=1.0), Row(prediction=1.0, label=1.0)]

In [57]:
predictions.rdd.map(tuple).take(2)

[(1.0, 1.0), (1.0, 1.0)]

In [58]:
metrics = MulticlassMetrics(predictions.rdd.map(tuple))

In [59]:
# Display confusion matrix
metrics.confusionMatrix().toArray().transpose()

array([[87., 28.],
       [19., 84.]])