In [1]:
from pyspark.sql import SQLContext
from pyspark import SparkContext, SparkConf
from pyspark.sql import DataFrameNaFunctions
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import Binarizer
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer

### Loading Data

In [2]:
sc = SparkContext.getOrCreate()##Creación de Spark Context
sqlContext = SQLContext(sc)

df = sqlContext.read.load('daily_weather.csv', 
                          format='com.databricks.spark.csv', 
                          header='true',inferSchema='true')
df.columns

['number',
 'air_pressure_9am',
 'air_temp_9am',
 'avg_wind_direction_9am',
 'avg_wind_speed_9am',
 'max_wind_direction_9am',
 'max_wind_speed_9am',
 'rain_accumulation_9am',
 'rain_duration_9am',
 'relative_humidity_9am',
 'relative_humidity_3pm']

### Preprocessing Data

In [3]:
featureColumns = ['air_pressure_9am','air_temp_9am','avg_wind_direction_9am','avg_wind_speed_9am',
        'max_wind_direction_9am','max_wind_speed_9am','rain_accumulation_9am',
        'rain_duration_9am']

In [4]:
df = df.drop('number')

In [5]:
df = df.na.drop() 

In [6]:
df.count(), len(df.columns)

(1064, 10)

Create categorical variable. Let's create a categorical variable to denote if the humidity is not low. If the value is less than 25%, then we want the categorical value to be 0, otherwise the categorical value should be 1. We can create this categorical variable as a column in a DataFrame using Binarizer:

In [7]:
binarizer = Binarizer(threshold=24.99999, inputCol="relative_humidity_3pm", outputCol="label")
binarizedDF = binarizer.transform(df)

The threshold argument specifies the threshold value for the variable, inputCol is the input column to read, and outputCol is the name of the new categorical column. The second line applies the Binarizer and creates a new DataFrame with the categorical column. We can look at the first four values in the new DataFrame:

In [8]:
binarizedDF.select("relative_humidity_3pm","label").show(4)

+---------------------+-----+
|relative_humidity_3pm|label|
+---------------------+-----+
|   36.160000000000494|  1.0|
|     19.4265967985621|  0.0|
|   14.460000000000045|  0.0|
|   12.742547353761848|  0.0|
+---------------------+-----+
only showing top 4 rows



The inputCols argument specifies our list of column names we defined earlier, and outputCol is the name of the new column. The second line creates a new DataFrame with the aggregated features in a column.

In [9]:
assembler = VectorAssembler(inputCols=featureColumns, outputCol="features")
assembled = assembler.transform(binarizedDF)

In [19]:
(trainingData, testData) = assembled.randomSplit([0.7,0.3], seed = 13234 )

In [20]:
trainingData.count(), testData.count()

(742, 322)

### Modeling

The labelCol argument is the column we are trying to predict, featuresCol specifies the aggregated features column, maxDepth is stopping criterion for tree induction based on maximum depth of tree, minInstancesPerNode is stopping criterion for tree induction based on minimum number of samples in a node, and impurity is the impurity measure used to split nodes.

In [12]:
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=5,
                            minInstancesPerNode=20, impurity="gini")

We can create a model by training the decision tree. This is done by executing it in a Pipeline:

In [13]:
pipeline = Pipeline(stages=[dt])
model = pipeline.fit(trainingData)

In [14]:
predictions = model.transform(testData)

In [15]:
predictions.select("prediction", "label").show(10)

+----------+-----+
|prediction|label|
+----------+-----+
|       1.0|  1.0|
|       0.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  1.0|
|       1.0|  1.0|
+----------+-----+
only showing top 10 rows



In [16]:
#predictions.select("prediction", "label").write.save(path="Results/predictions.csv",format="com.databricks.spark.csv",header='true')

### Model evaluation

In [17]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="precision")

In [21]:
#accuracy = evaluator.evaluate(predictions)
#print("Accuracy = %g " % (accuracy))