In [1]:
from pyspark.sql import SQLContext

In [2]:
sqlContext = SQLContext(sc)

In [3]:
df = sqlContext.read.load('file:///home/cloudera/Downloads/big-data-4/daily_weather.csv',
                         format='com.databricks.spark.csv',
                         header='true', inferSchema='true')

In [4]:
df.columns

['number',
 'air_pressure_9am',
 'air_temp_9am',
 'avg_wind_direction_9am',
 'avg_wind_speed_9am',
 'max_wind_direction_9am',
 'max_wind_speed_9am',
 'rain_accumulation_9am',
 'rain_duration_9am',
 'relative_humidity_9am',
 'relative_humidity_3pm']

In [5]:
df.printSchema()

root
 |-- number: integer (nullable = true)
 |-- air_pressure_9am: double (nullable = true)
 |-- air_temp_9am: double (nullable = true)
 |-- avg_wind_direction_9am: double (nullable = true)
 |-- avg_wind_speed_9am: double (nullable = true)
 |-- max_wind_direction_9am: double (nullable = true)
 |-- max_wind_speed_9am: double (nullable = true)
 |-- rain_accumulation_9am: double (nullable = true)
 |-- rain_duration_9am: double (nullable = true)
 |-- relative_humidity_9am: double (nullable = true)
 |-- relative_humidity_3pm: double (nullable = true)



In [4]:
# Check summary statistics
df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
number,1095,547.0,316.24357700987383,0,1094
air_pressure_9am,1092,918.8825513138094,3.184161180386833,907.9900000000024,929.3200000000012
air_temp_9am,1090,64.93300141287072,11.175514003175877,36.752000000000685,98.90599999999992
avg_wind_direction_9am,1091,142.2355107005759,69.13785928889189,15.500000000000046,343.4
avg_wind_speed_9am,1092,5.50828424225493,4.5528134655317185,0.69345139999974,23.554978199999763
max_wind_direction_9am,1092,148.95351796516923,67.23801294602953,28.89999999999991,312.19999999999993
max_wind_speed_9am,1091,7.019513529175272,5.598209170780958,1.1855782000000479,29.84077959999996
rain_accumulation_9am,1089,0.20307895225211126,1.5939521253574893,0.0,24.01999999999907
rain_duration_9am,1092,294.1080522756142,1598.0787786601481,0.0,17704.0


In [10]:
df.describe('air_pressure_9am').show()

+-------+-----------------+
|summary| air_pressure_9am|
+-------+-----------------+
|  count|             1092|
|   mean|918.8825513138094|
| stddev|3.184161180386833|
|    min|907.9900000000024|
|    max|929.3200000000012|
+-------+-----------------+



In [11]:
len(df.columns)

11

In [12]:
df.count()

1095

In [13]:
 df2 = df.na.drop(subset=['air_pressure_9am'])

In [14]:
df2.count()

1092

In [15]:
df2.stat.corr("rain_accumulation_9am", "rain_duration_9am")

0.7298253479609021

In [16]:
df.na.drop(subset=['rain_accumulation_9am']).count()

1089

In [17]:
 df2.stat.corr("relative_humidity_9am", "relative_humidity_3pm")

0.882338853077243

In [6]:
#Start Data Preparation assignment
df.describe(['air_temp_9am']).show()

+-------+------------------+
|summary|      air_temp_9am|
+-------+------------------+
|  count|              1090|
|   mean| 64.93300141287072|
| stddev|11.175514003175877|
|    min|36.752000000000685|
|    max| 98.90599999999992|
+-------+------------------+



In [7]:
df.count()

1095

In [8]:
# Drop all the rows with missing a value
removeAllDF = df.na.drop()

In [9]:
removeAllDF.describe(['air_temp_9am']).show()

+-------+------------------+
|summary|      air_temp_9am|
+-------+------------------+
|  count|              1064|
|   mean| 65.02260949558733|
| stddev|11.168033449415704|
|    min|36.752000000000685|
|    max| 98.90599999999992|
+-------+------------------+



In [10]:
removeAllDF.count()

1064

In [11]:
#Impute missing values with mean value for that column
from pyspark.sql.functions import avg
imputeDF = df

In [13]:
for x in imputeDF.columns:
    meanValue = removeAllDF.agg(avg(x)).first()[0]
    print(x, meanValue)
    imputeDF = imputeDF.na.fill(meanValue, [x])

number 545.0018796992481
air_pressure_9am 918.9031798641051
air_temp_9am 65.02260949558733
avg_wind_direction_9am 142.30675564934037
avg_wind_speed_9am 5.48579305071369
max_wind_direction_9am 148.48042413321315
max_wind_speed_9am 6.999713658875691
rain_accumulation_9am 0.18202347650615522
rain_duration_9am 266.3936973996037
relative_humidity_9am 34.07743985327709
relative_humidity_3pm 35.14838093290533


In [14]:
df.describe(['air_temp_9am']).show()
imputeDF.describe(['air_temp_9am']).show()

+-------+------------------+
|summary|      air_temp_9am|
+-------+------------------+
|  count|              1090|
|   mean| 64.93300141287072|
| stddev|11.175514003175877|
|    min|36.752000000000685|
|    max| 98.90599999999992|
+-------+------------------+

+-------+------------------+
|summary|      air_temp_9am|
+-------+------------------+
|  count|              1095|
|   mean| 64.93341058219818|
| stddev| 11.14994819992023|
|    min|36.752000000000685|
|    max| 98.90599999999992|
+-------+------------------+



In [5]:
#Classification work starts here
from pyspark.sql import SQLContext
from pyspark.sql import DataFrameNaFunctions
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import Binarizer
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer


In [7]:
featureColumns = ['air_pressure_9am','air_temp_9am','avg_wind_direction_9am','avg_wind_speed_9am',
        'max_wind_direction_9am','max_wind_speed_9am','rain_accumulation_9am',
        'rain_duration_9am']

In [9]:
droppedDF = df.drop('number')

In [11]:
droppedDF.columns


['air_pressure_9am',
 'air_temp_9am',
 'avg_wind_direction_9am',
 'avg_wind_speed_9am',
 'max_wind_direction_9am',
 'max_wind_speed_9am',
 'rain_accumulation_9am',
 'rain_duration_9am',
 'relative_humidity_9am',
 'relative_humidity_3pm']

In [12]:
imputeDropDF = droppedDF.na.drop()

In [13]:
df.count(), len(df.columns)

(1095, 11)

In [14]:
imputeDropDF.count(), len(imputeDropDF.columns)

(1064, 10)

In [15]:
binarizer = Binarizer(threshold=24.99999, inputCol="relative_humidity_3pm", outputCol="label")

In [19]:
binarizedDF = binarizer.transform(imputeDropDF)

In [20]:
binarizedDF.select("relative_humidity_3pm", "label").show(4)

+---------------------+-----+
|relative_humidity_3pm|label|
+---------------------+-----+
|   36.160000000000494|  1.0|
|     19.4265967985621|  0.0|
|   14.460000000000045|  0.0|
|   12.742547353761848|  0.0|
+---------------------+-----+
only showing top 4 rows



In [24]:
assembler = VectorAssembler(inputCols=featureColumns, outputCol="features")

In [25]:
assembledDF = assembler.transform(binarizedDF)

In [45]:
(trainingDataDF, testDataDF) = assembledDF.randomSplit([0.7,0.3], seed=13234)

In [46]:
trainingDataDF.count(), testDataDF.count()

(730, 334)

In [29]:
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=5, minInstancesPerNode = 20, impurity="gini")

In [30]:
pipeline = Pipeline(stages=[dt])

In [32]:
model = pipeline.fit(trainingDataDF)

In [36]:
predictions = model.transform(testDataDF)

In [43]:
predictions.select("prediction", "label").show(20)

+----------+-----+
|prediction|label|
+----------+-----+
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       0.0|  1.0|
|       1.0|  1.0|
+----------+-----+
only showing top 20 rows



In [40]:
#save prediction to CSV
predictions.select("prediction", "label").write.save(path='file:///home/cloudera/Downloads/predictions.csv',
                                                    format='com.databricks.spark.csv',
                                                    header='true')