In [1]:
from pyspark.sql import SQLContext
from pyspark.sql import DataFrameNaFunctions
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import Binarizer
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer

In [22]:
sqlContext = SQLContext(sc)
df = sqlContext.read.load('file:///home/cloudera/Downloads/big-data-4/daily_weather.csv', 
                          format='com.databricks.spark.csv', 
                          header='true',inferSchema='true')
df.columns
numberDF = df

In [3]:
featureColumns = ['air_pressure_9am','air_temp_9am','avg_wind_direction_9am','avg_wind_speed_9am',
        'max_wind_direction_9am','max_wind_speed_9am','rain_accumulation_9am',
        'rain_duration_9am']


In [4]:
df = df.drop('number')

In [5]:
df = df.na.drop()

In [6]:
df.count(), len(df.columns)

(1064, 10)

In [8]:
binarizer = Binarizer(threshold = 24.99999, inputCol="relative_humidity_3pm", outputCol = "label")
binarizeDF = binarizer.transform(df)

In [9]:
binarizeDF.select("relative_humidity_3pm","label").show(5)

+---------------------+-----+
|relative_humidity_3pm|label|
+---------------------+-----+
|   36.160000000000494|  1.0|
|     19.4265967985621|  0.0|
|   14.460000000000045|  0.0|
|   12.742547353761848|  0.0|
|    76.74000000000046|  1.0|
+---------------------+-----+
only showing top 5 rows



In [10]:
assembler = VectorAssembler(inputCols = featureColumns, outputCol = "features")
assembled = assembler.transform(binarizeDF)

In [26]:
(trainingData, testData) = assembled.randomSplit([0.7, 0.3], seed = 13234)

In [27]:
trainingData.count(), testData.count()

(730, 334)

In [13]:
dt = DecisionTreeClassifier(labelCol = "label", featuresCol = "features", maxDepth = 5, minInstancesPerNode = 20, impurity = "gini")

In [14]:
pipeline = Pipeline(stages = [dt])
model = pipeline.fit(trainingData)

In [15]:
predictions = model.transform(testData)

In [25]:
predictions.select("prediction", "label").show(20)

+----------+-----+
|prediction|label|
+----------+-----+
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       0.0|  1.0|
|       1.0|  1.0|
+----------+-----+
only showing top 20 rows



In [19]:
predictions.select("prediction", "label").write.save('file:///home/cloudera/Downloads/big-data-4/predictions.csv', 
                          format='com.databricks.spark.csv', 
                          header='true')

In [23]:
numberDF.describe(['number']).show(10)

+-------+------------------+
|summary|            number|
+-------+------------------+
|  count|              1095|
|   mean|             547.0|
| stddev|316.24357700987383|
|    min|                 0|
|    max|              1094|
+-------+------------------+



In [24]:
numberDF.select("number").show(10)

+------+
|number|
+------+
|     0|
|     1|
|     2|
|     3|
|     4|
|     5|
|     6|
|     7|
|     8|
|     9|
+------+
only showing top 10 rows

