In [6]:
from pyspark.sql import SQLContext
from pyspark.sql import DataFrameNaFunctions
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import Binarizer
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer

In [31]:
sqlc = SQLContext(sc)
df = sqlContext.read.load('file:///home/cloudera/Downloads/big-data-4/daily_weather.csv',
                          format = 'com.databricks.spark.csv',
                          header = 'true', inferSchema = 'true')    

In [8]:
df.columns

['number',
 'air_pressure_9am',
 'air_temp_9am',
 'avg_wind_direction_9am',
 'avg_wind_speed_9am',
 'max_wind_direction_9am',
 'max_wind_speed_9am',
 'rain_accumulation_9am',
 'rain_duration_9am',
 'relative_humidity_9am',
 'relative_humidity_3pm']

In [34]:
df.count()

1095

In [37]:
df.select('number').show(10)

+------+
|number|
+------+
|     0|
|     1|
|     2|
|     3|
|     4|
|     5|
|     6|
|     7|
|     8|
|     9|
+------+
only showing top 10 rows



In [10]:
featureColumns = ['air_pressure_9am',
 'air_temp_9am',
 'avg_wind_direction_9am',
 'avg_wind_speed_9am',
 'max_wind_direction_9am',
 'max_wind_speed_9am',
 'rain_accumulation_9am',
 'rain_duration_9am']

In [11]:
df = df.drop('number')

In [12]:
df = df.na.drop()

In [33]:
df1 = df.na.drop(subset='air_temp_9am')

In [35]:
df1.count()

1090

In [14]:
df.count() , len(df.columns)

(1064, 10)

In [15]:
binarizer = Binarizer(threshold = 24.99999, inputCol= "relative_humidity_3pm" , outputCol="label")
binarizerDF = binarizer.transform(df)

In [16]:
binarizerDF.select("relative_humidity_3pm", "label").show(4)

+---------------------+-----+
|relative_humidity_3pm|label|
+---------------------+-----+
|   36.160000000000494|  1.0|
|     19.4265967985621|  0.0|
|   14.460000000000045|  0.0|
|   12.742547353761848|  0.0|
+---------------------+-----+
only showing top 4 rows



In [17]:
assembler = VectorAssembler(inputCols= featureColumns, outputCol= "features")
assembled = assembler.transform(binarizerDF)

In [39]:
(traingData, testData) = assembled.randomSplit([0.7 , 0.3], seed = 13234)

In [40]:
traingData.count() , testData.count()

(730, 334)

In [22]:
dt = DecisionTreeClassifier(labelCol= "label", featuresCol = "features", maxDepth=5, 
                            minInstancesPerNode=20, impurity="gini")

In [24]:
pipeline = Pipeline(stages = [dt])
model = pipeline.fit(traingData)

In [25]:
preditions = model.transform(testData)

In [28]:
preditions.columns

['air_pressure_9am',
 'air_temp_9am',
 'avg_wind_direction_9am',
 'avg_wind_speed_9am',
 'max_wind_direction_9am',
 'max_wind_speed_9am',
 'rain_accumulation_9am',
 'rain_duration_9am',
 'relative_humidity_9am',
 'relative_humidity_3pm',
 'label',
 'features',
 'rawPrediction',
 'probability',
 'prediction']

In [38]:
preditions.select("prediction", "label").show(20)

+----------+-----+
|prediction|label|
+----------+-----+
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       0.0|  1.0|
|       1.0|  1.0|
+----------+-----+
only showing top 20 rows



In [30]:
preditions.select("prediction", "label").write.save(path="file:///home/cloudera/Downloads/big-data-4/daily_weather_pred.csv",
                                                   format = "com.databricks.spark.csv",
                                                   header = 'true')