## Packages

In [1]:
from pyspark.sql import SQLContext
from pyspark.sql import DataFrameNaFunctions
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import Binarizer
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer
from pyspark import SparkConf, SparkContext

In [2]:
conf = SparkConf().setMaster("local").setAppName("daily_water_classification")
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)

In [3]:
df = sqlContext.read.load("../datasets/daily_weather.csv", 
                          format="com.databricks.spark.csv",
                          header=True, inferSchema="true")

['number',
 'air_pressure_9am',
 'air_temp_9am',
 'avg_wind_direction_9am',
 'avg_wind_speed_9am',
 'max_wind_direction_9am',
 'max_wind_speed_9am',
 'rain_accumulation_9am',
 'rain_duration_9am',
 'relative_humidity_9am',
 'relative_humidity_3pm']

In [13]:
print(df.columns)
print(len(df.columns))

['air_pressure_9am', 'air_temp_9am', 'avg_wind_direction_9am', 'avg_wind_speed_9am', 'max_wind_direction_9am', 'max_wind_speed_9am', 'rain_accumulation_9am', 'rain_duration_9am', 'relative_humidity_9am', 'relative_humidity_3pm']
10


The next list will allow us to define the columns in the weather data that 
we are going to use

In [14]:
featureColumns = ['air_pressure_9am',
 'air_temp_9am',
 'avg_wind_direction_9am',
 'avg_wind_speed_9am',
 'max_wind_direction_9am',
 'max_wind_speed_9am',
 'rain_accumulation_9am',
 'rain_duration_9am']
print(len(featureColumns))

8


In [5]:
df = df.drop("number") # number is a column that isn't going to use

In [6]:
df = df.na.drop() # remove all the rows with missing values

In [10]:
df.count(), len(df.columns)

(1064, 10)

A new qualitative variable, let's create to denote if the humidity is not low. If the value is less than 25%, then we want the categorical value to be 0, otherwise the categorical value should be 1.

In [7]:
binarizer = Binarizer(threshold=24.99999, inputCol="relative_humidity_3pm", outputCol="label")
binarizerDF = binarizer.transform(df)
binarizerDF.columns

['air_pressure_9am',
 'air_temp_9am',
 'avg_wind_direction_9am',
 'avg_wind_speed_9am',
 'max_wind_direction_9am',
 'max_wind_speed_9am',
 'rain_accumulation_9am',
 'rain_duration_9am',
 'relative_humidity_9am',
 'relative_humidity_3pm',
 'label']

In [8]:
binarizerDF.select("relative_humidity_3pm", "label").show(4)

+---------------------+-----+
|relative_humidity_3pm|label|
+---------------------+-----+
|   36.160000000000494|  1.0|
|     19.4265967985621|  0.0|
|   14.460000000000045|  0.0|
|   12.742547353761848|  0.0|
+---------------------+-----+
only showing top 4 rows



Using VectorAssembler we can aggregate the features that we will use to make predictions

In [18]:
assembler = VectorAssembler(inputCols=featureColumns, outputCol="features")
assembled = assembler.transform(binarizerDF)
assembled

DataFrame[air_pressure_9am: double, air_temp_9am: double, avg_wind_direction_9am: double, avg_wind_speed_9am: double, max_wind_direction_9am: double, max_wind_speed_9am: double, rain_accumulation_9am: double, rain_duration_9am: double, relative_humidity_9am: double, relative_humidity_3pm: double, label: double, features: vector]

In [15]:
type(assembled)

pyspark.sql.dataframe.DataFrame

In [21]:
(trainingData, testData) = assembled.randomSplit([0.8, 0.2], seed=12334)

In [22]:
trainingData.count(), testData.count()

(845, 219)

In [25]:
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=5, 
                            minInstancesPerNode=20, impurity="gini")

The way to train our model is using a Pipeline

In [26]:
pipeline = Pipeline(stages=[dt])
model = pipeline.fit(trainingData)

Exception ignored in: <object repr() failed>
Traceback (most recent call last):
  File "D:\Usuarios\rhaps\Anaconda3\lib\site-packages\pyspark\ml\wrapper.py", line 40, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'DecisionTreeClassifier' object has no attribute '_java_obj'


In [28]:
predictions = model.transform(testData)

In [32]:
predictions.select("prediction", "label").show(10)

+----------+-----+
|prediction|label|
+----------+-----+
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  0.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
+----------+-----+
only showing top 10 rows



In [33]:
predictions.select("prediction", "label").write.save(path="predictions.csv",
                                                    format="com.databricks.spark.csv",
                                                    header=True)