In [1]:
# Download JAR from https://spark-packages.org/package/databricks/spark-csv (1.5.0)

# Command to start: pyspark --packages com.databricks:spark-csv_2.11:1.5.0

from pyspark.sql import SQLContext
from pyspark.sql import DataFrameNaFunctions
import pandas as pd
from pyspark.ml import Pipeline

In [2]:
sqlContext = SQLContext(sc)

In [3]:
dataDF = sqlContext.read.load('file:///home/cloudera/coursera/courseraDataSimulation/course4-ML/daily_weather.csv', 
                          format='com.databricks.spark.csv', 
                          header='true',inferSchema='true')

In [4]:
dataDF.columns

['number',
 'air_pressure_9am',
 'air_temp_9am',
 'avg_wind_direction_9am',
 'avg_wind_speed_9am',
 'max_wind_direction_9am',
 'max_wind_speed_9am',
 'rain_accumulation_9am',
 'rain_duration_9am',
 'relative_humidity_9am',
 'relative_humidity_3pm']

In [5]:
dataDF.describe().show()

+-------+------------------+------------------+------------------+----------------------+------------------+----------------------+------------------+---------------------+-----------------+---------------------+---------------------+
|summary|            number|  air_pressure_9am|      air_temp_9am|avg_wind_direction_9am|avg_wind_speed_9am|max_wind_direction_9am|max_wind_speed_9am|rain_accumulation_9am|rain_duration_9am|relative_humidity_9am|relative_humidity_3pm|
+-------+------------------+------------------+------------------+----------------------+------------------+----------------------+------------------+---------------------+-----------------+---------------------+---------------------+
|  count|              1095|              1092|              1090|                  1091|              1092|                  1092|              1091|                 1089|             1092|                 1095|                 1095|
|   mean|             547.0| 918.8825513138097| 64.933001412

In [7]:
dataDF = dataDF.drop('number') # row number
dataDF.columns

['air_pressure_9am',
 'air_temp_9am',
 'avg_wind_direction_9am',
 'avg_wind_speed_9am',
 'max_wind_direction_9am',
 'max_wind_speed_9am',
 'rain_accumulation_9am',
 'rain_duration_9am',
 'relative_humidity_9am',
 'relative_humidity_3pm']

In [12]:
# TODO: create a new column using Spark DataFrame (withColumn)
sqlContext = SQLContext(sc)

# Categorical Variable 'label'

def functionG(row):
    if row < 25:
        val = 1
    else:
        val = 0
    return val

In [14]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

funct = udf(lambda data_time: date_time.day, IntegerType())

Traceback (most recent call last):


RuntimeError: maximum recursion depth exceeded in cmp

In [49]:
dataDF = dataDF.na.drop()

W = dataDF.withColumn('label', functionG("relative_humidity_3pm"))

Traceback (most recent call last):


RuntimeError: maximum recursion depth exceeded in cmp

# Hands On 1: Data Exploration

In [9]:
# Displat first few lines
dataDF.take(2)

[Row(air_pressure_9am=918.0600000000087, air_temp_9am=74.82200000000041, avg_wind_direction_9am=271.1, avg_wind_speed_9am=2.080354199999768, max_wind_direction_9am=295.39999999999986, max_wind_speed_9am=2.863283199999908, rain_accumulation_9am=0.0, rain_duration_9am=0.0, relative_humidity_9am=42.42000000000046, relative_humidity_3pm=36.160000000000494),
 Row(air_pressure_9am=917.3476881177097, air_temp_9am=71.40384263106537, avg_wind_direction_9am=101.93517935618371, avg_wind_speed_9am=2.4430092157340217, max_wind_direction_9am=140.47154847112498, max_wind_speed_9am=3.5333236016106238, rain_accumulation_9am=0.0, rain_duration_9am=0.0, relative_humidity_9am=24.328697291802207, relative_humidity_3pm=19.4265967985621)]

### Summary Statistics

In [189]:
#Columns in DataFrame
len(dataDF.columns)

10

In [190]:
#Rows in DataFrame
dataDF.count()

1095

In [191]:
# Show summary of one column
dataDF.describe("air_pressure_9am").show()

#Notice the Nan

+-------+-----------------+
|summary| air_pressure_9am|
+-------+-----------------+
|  count|             1095|
|   mean|              NaN|
| stddev|              NaN|
|    min|907.9900000000024|
|    max|              NaN|
+-------+-----------------+



In [192]:
# Drop the rows with missing values on specific column and show statistics
dataDF = dataDF.na.drop(subset=['air_pressure_9am'])

dataDF.describe().show()

+-------+-----------------+------------------+----------------------+------------------+----------------------+------------------+---------------------+-----------------+---------------------+------------------+
|summary| air_pressure_9am|      air_temp_9am|avg_wind_direction_9am|avg_wind_speed_9am|max_wind_direction_9am|max_wind_speed_9am|rain_accumulation_9am|rain_duration_9am|relative_humidity_3pm|             label|
+-------+-----------------+------------------+----------------------+------------------+----------------------+------------------+---------------------+-----------------+---------------------+------------------+
|  count|             1092|              1092|                  1092|              1092|                  1092|              1092|                 1092|             1092|                 1092|              1092|
|   mean|918.8825513138097|               NaN|                   NaN|               NaN|                   NaN|               NaN|                  NaN|

In [193]:
# Drop the rows with missing values and check statistics
dataDF.describe("air_pressure_9am").show()

+-------+-----------------+
|summary| air_pressure_9am|
+-------+-----------------+
|  count|             1092|
|   mean|918.8825513138097|
| stddev|3.184161180386832|
|    min|907.9900000000024|
|    max|929.3200000000012|
+-------+-----------------+



# Pairwise Correlation

In [194]:
# Correlation between two columns
dataDF.stat.corr("rain_accumulation_9am","rain_duration_9am")

nan

In [195]:
cols = ['air_pressure_9am','air_temp_9am','avg_wind_direction_9am','avg_wind_speed_9am','max_wind_direction_9am','max_wind_speed_9am','rain_accumulation_9am','rain_duration_9am']

# Hands on 2: Data Preparation

## Handling Missing Values

## Missing Values: Remove them

In [197]:
dataDF.describe().show()

+-------+-----------------+------------------+----------------------+------------------+----------------------+------------------+---------------------+-----------------+---------------------+------------------+
|summary| air_pressure_9am|      air_temp_9am|avg_wind_direction_9am|avg_wind_speed_9am|max_wind_direction_9am|max_wind_speed_9am|rain_accumulation_9am|rain_duration_9am|relative_humidity_3pm|             label|
+-------+-----------------+------------------+----------------------+------------------+----------------------+------------------+---------------------+-----------------+---------------------+------------------+
|  count|             1092|              1092|                  1092|              1092|                  1092|              1092|                 1092|             1092|                 1092|              1092|
|   mean|918.8825513138097|               NaN|                   NaN|               NaN|                   NaN|               NaN|                  NaN|

In [198]:
#Remove 
dataDF_remove = dataDF.na.drop()

In [199]:
dataDF_remove.describe().show()

+-------+------------------+------------------+----------------------+------------------+----------------------+------------------+---------------------+------------------+---------------------+------------------+
|summary|  air_pressure_9am|      air_temp_9am|avg_wind_direction_9am|avg_wind_speed_9am|max_wind_direction_9am|max_wind_speed_9am|rain_accumulation_9am| rain_duration_9am|relative_humidity_3pm|             label|
+-------+------------------+------------------+----------------------+------------------+----------------------+------------------+---------------------+------------------+---------------------+------------------+
|  count|              1064|              1064|                  1064|              1064|                  1064|              1064|                 1064|              1064|                 1064|              1064|
|   mean| 918.9031798641056| 65.02260949558739|    142.30675564934035| 5.485793050713691|    148.48042413321312|6.9997136588756925|  0.182023476

In [200]:
dataDF_replace = dataDF.na.fill(0)
dataDF_replace.describe().show()

+-------+-----------------+------------------+----------------------+------------------+----------------------+------------------+---------------------+------------------+---------------------+------------------+
|summary| air_pressure_9am|      air_temp_9am|avg_wind_direction_9am|avg_wind_speed_9am|max_wind_direction_9am|max_wind_speed_9am|rain_accumulation_9am| rain_duration_9am|relative_humidity_3pm|             label|
+-------+-----------------+------------------+----------------------+------------------+----------------------+------------------+---------------------+------------------+---------------------+------------------+
|  count|             1092|              1092|                  1092|              1092|                  1092|              1092|                 1092|              1092|                 1092|              1092|
|   mean|918.8825513138097| 64.67149057390388|    141.57780362528246| 5.489489334525926|    148.37845301711084| 6.988499373909827|  0.18912635439793

## Missing values : Replace with Mean

In [202]:
s = f.fillna(f.mean())

# TODO: spark

# OR replace with Median
#s = f.fillna(f.median()) 

In [18]:
#Compare the Summary Statistics
s.describe().head(3)

Unnamed: 0,air_pressure_9am,air_temp_9am,avg_wind_direction_9am,avg_wind_speed_9am,max_wind_direction_9am,max_wind_speed_9am,rain_accumulation_9am,rain_duration_9am,label
count,1095.0,1095.0,1095.0,1095.0,1095.0,1095.0,1095.0,1095.0,1095.0
mean,918.882551,64.933001,142.235511,5.508284,148.953518,7.019514,0.203079,294.108052,0.500457
std,3.179792,11.149947,69.011349,4.546567,67.145759,5.587965,1.589575,1595.886124,0.500228


# Hands On 3: Classification

In [196]:
from pyspark.ml.classification import DecisionTreeClassifier

In [20]:
# delete before classification
dataDF = dataDF.drop('relative_humidity_9am')
dataDF = dataDF.drop('relative_humidity_3pm')

## Features Column

In [204]:
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer
assembler = VectorAssembler(inputCols=cols, outputCol="features")
assembled = assembler.transform(dataDF)
assembled

DataFrame[air_pressure_9am: double, air_temp_9am: double, avg_wind_direction_9am: double, avg_wind_speed_9am: double, max_wind_direction_9am: double, max_wind_speed_9am: double, rain_accumulation_9am: double, rain_duration_9am: double, relative_humidity_3pm: double, label: bigint, features: vector]

## Test and Training Data

In [205]:
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = assembled.randomSplit([0.7, 0.3], seed = 1234)

In [206]:
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(assembled)

# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer= VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(assembled)

## Decision Tree in Spark

In [212]:
# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures",
                  maxDepth=5,  minInstancesPerNode=20, 
                  impurity="gini")
    
# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)
predictions.select("prediction", "indexedLabel", "features").show(5)


+----------+------------+--------------------+
|prediction|indexedLabel|            features|
+----------+------------+--------------------+
|       1.0|         1.0|[907.990000000002...|
|       1.0|         1.0|[908.420000000007...|
|       1.0|         1.0|[908.970000000004...|
|       1.0|         1.0|[913.060000000003...|
|       1.0|         0.0|[913.633267677041...|
+----------+------------+--------------------+
only showing top 5 rows



# Hands On 4: Evaluation of Machine Learning Models

## Accuracy - Decision Tree

In [213]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="precision")

accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g " % (accuracy))

Accuracy = 0.801917 


## Confusion Matrix - Decision Tree

In [215]:
from sklearn.metrics import confusion_matrix

confusion_matrix(predictions.select('label').toPandas(), predictions.select('prediction').toPandas())

array([[ 30, 124],
       [127,  32]])