In [1]:
from pyspark.sql import SQLContext
from pyspark.sql import DataFrameNaFunctions
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import Binarizer
from pyspark.ml.feature import VectorAssembler, VectorIndexer, StringIndexer

In [2]:
sqlContext = SQLContext(sc)
df = sqlContext.read.load('/FileStore/tables/kjegy0un1492802013999/daily_weather.csv', format = 'com.databricks.spark.csv', header='true', inferschema='true')


In [3]:
df = df.drop('number')
df.columns

In [4]:
featureColumns = ['air_pressure_9am',
 'air_temp_9am',
 'avg_wind_direction_9am',
 'avg_wind_speed_9am',
 'max_wind_direction_9am',
 'max_wind_speed_9am',
 'rain_accumulation_9am',
 'rain_duration_9am']

In [5]:
df = df.na.drop()

In [6]:
df.count(),len(df.columns)

In [7]:
binarizer = Binarizer(threshold=24.999999, inputCol='relative_humidity_3pm', outputCol='label')
binarizerDF = binarizer.transform(df)
binarizerDF.select("relative_humidity_3pm","label").show(5)

In [8]:
assembler = VectorAssembler(inputCols=featureColumns, outputCol="features")
assembled = assembler.transform(binarizerDF)

In [9]:
(trainingData, testData) = assembled.randomSplit([0.8,0.2], seed = 13234)
trainingData.count(), testData.count()

In [10]:
dt = DecisionTreeClassifier(labelCol = "label", featuresCol = "features", maxDepth = 5, minInstancesPerNode = 20, impurity = "gini" )
pipeline = Pipeline(stages=[dt])
model = pipeline.fit(trainingData)

In [11]:
# prediction on test data
predictions = model.transform(testData)

In [12]:
predictions.select("prediction","label").show(5)