In [1]:
import os
from datetime import datetime
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, RegressionEvaluator
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier, LogisticRegression
from pyspark.ml.feature import StringIndexer, OneHotEncoder,  VectorAssembler, VectorIndexer, IndexToString
from pyspark.sql.window import Window
from pyspark.storagelevel import StorageLevel
import pyspark.sql.functions as f
from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.ml import Pipeline

import pyspark

In [3]:
pyspark_submit_args = '--packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.0 pyspark-shell'
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args

spark = SparkSession \
    .builder \
    .appName("aqi") \
    .config("spark.mongodb.input.uri", "mongodb://34.221.229.103/mydb.air")\
    .getOrCreate()
spark.conf.set("spark.executor.memory", '4g')
spark.conf.set('spark.executor.cores', '4')
spark.conf.set('spark.cores.max', '4')
spark.conf.set("spark.driver.memory",'4g')
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
df.printSchema()

root
 |-- Unnamed: 0.1.1: integer (nullable = true)
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- aqi_co: double (nullable = true)
 |-- aqi_no2: double (nullable = true)
 |-- aqi_o3: double (nullable = true)
 |-- aqi_pm10: double (nullable = true)
 |-- aqi_pm25_frm: double (nullable = true)
 |-- aqi_pm25_nonfrm: double (nullable = true)
 |-- aqi_so2: double (nullable = true)
 |-- arithmetic_mean_co: double (nullable = true)
 |-- arithmetic_mean_no2: double (nullable = true)
 |-- arithmetic_mean_o3: double (nullable = true)
 |-- arithmetic_mean_pm10: double (nullable = true)
 |-- arithmetic_mean_pm25_frm: double (nullable = true)
 |-- arithmetic_mean_pm25_nonfrm: double (nullable = true)
 |-- arithmetic_mean_pm25_speciation: double (nullable = true)
 |-- arithmetic_mean_pressure: double (nullable = true)
 |-- arithmetic_mean_so2: double (nullable = true)
 |-- arithmetic_mean_temp: double (nullable = true)
 |-- arithmetic_mean_wind: double (nullable = tr

In [4]:
va = VectorAssembler(outputCol="features", inputCols=[
 'aqi_co',
 'aqi_no2',
 'aqi_o3',
 'aqi_pm10',
 'aqi_pm25_frm',
 'aqi_pm25_nonfrm',
 'aqi_so2',
 'arithmetic_mean_co',
 'arithmetic_mean_no2',
 'arithmetic_mean_o3',
 'arithmetic_mean_pm10',
 'arithmetic_mean_pm25_frm',
 'arithmetic_mean_pm25_nonfrm',
 'arithmetic_mean_pm25_speciation',
 'arithmetic_mean_pressure',
 'arithmetic_mean_so2',
 'arithmetic_mean_temp',
 'arithmetic_mean_wind',
 'county_code',
 'day',
 'dow',
 'first_max_value_co',
 'first_max_value_no2',
 'first_max_value_o3',
 'first_max_value_pm10',
 'first_max_value_pm25_frm',
 'first_max_value_pm25_nonfrm',
 'first_max_value_pm25_speciation',
 'first_max_value_pressure',
 'first_max_value_so2',
 'first_max_value_temp',
 'first_max_value_wind',
 'latitude',
 'longitude',
 'max_aqi',
 'max_aqi_before_yesterday',
 'max_aqi_yesterday',
 'month',
 'observation_count_co',
 'observation_count_no2',
 'observation_count_o3',
 'observation_count_pm10',
 'observation_count_pm25_frm',
 'observation_count_pm25_nonfrm',
 'observation_count_pm25_speciation',
 'observation_count_pressure',
 'observation_count_so2',
 'observation_count_temp',
 'observation_count_wind',
 'site_num']) 

In [6]:
df = va.transform(df).select('features', 'label').persist(StorageLevel.DISK_ONLY)

In [7]:
# Load and parse the data file, converting it to a DataFrame.
data = df

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
#featureIndexer =\
#    VectorIndexer(inputCol="features", outputCol="indexedFeatures").fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# 100 trees

In [8]:
%%time
# Train a RandomForest model.
rf100 = RandomForestClassifier(labelCol="indexedLabel", featuresCol="features", numTrees=100)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline100 = Pipeline(stages=[labelIndexer, rf100, labelConverter])

# Train model.  This also runs the indexers.
model100 = pipeline100.fit(trainingData)

# Make predictions.
predictions100 = model100.transform(testData)

# Select example rows to display.
predictions100.select("predictedLabel", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator100 = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy100 = evaluator100.evaluate(predictions100)
print("Test Error = %g" % (1.0 - accuracy100))

rf100Model = model100.stages[1]
print(rf100Model)  # summary only

+--------------+--------+--------------------+
|predictedLabel|   label|            features|
+--------------+--------+--------------------+
|          good|    good|[0.0,8.0,14.0,19....|
|      moderate|moderate|[1.0,8.0,25.0,19....|
|          good|moderate|[1.0,9.0,29.0,19....|
|      moderate|    good|[1.0,10.0,34.0,19...|
|          good|    good|[1.0,11.0,29.0,19...|
+--------------+--------+--------------------+
only showing top 5 rows

Test Error = 0.192764
RandomForestClassificationModel (uid=RandomForestClassifier_9d25d12365d2) with 100 trees
CPU times: user 65.1 ms, sys: 6.57 ms, total: 71.7 ms
Wall time: 1min 51s


# 10 trees

In [9]:
%%time
# Train a RandomForest model.
rf10 = RandomForestClassifier(labelCol="indexedLabel", featuresCol="features", numTrees=10)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline10 = Pipeline(stages=[labelIndexer, rf10, labelConverter])

# Train model.  This also runs the indexers.
model10 = pipeline10.fit(trainingData)

# Make predictions.
predictions10 = model10.transform(testData)

# Select example rows to display.
predictions10.select("predictedLabel", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator10 = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy10 = evaluator100.evaluate(predictions10)
print("Test Error = %g" % (1.0 - accuracy10))

rf10Model = model10.stages[1]
print(rf10Model)  # summary only

+--------------+--------+--------------------+
|predictedLabel|   label|            features|
+--------------+--------+--------------------+
|          good|    good|[0.0,8.0,14.0,19....|
|      moderate|moderate|[1.0,8.0,25.0,19....|
|          good|moderate|[1.0,9.0,29.0,19....|
|          good|    good|[1.0,10.0,34.0,19...|
|          good|    good|[1.0,11.0,29.0,19...|
+--------------+--------+--------------------+
only showing top 5 rows

Test Error = 0.193538
RandomForestClassificationModel (uid=RandomForestClassifier_a86aff82384e) with 10 trees
CPU times: user 60.4 ms, sys: 2.35 ms, total: 62.7 ms
Wall time: 27.7 s


# 500 trees

In [None]:
%%time
# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="features", numTrees=500)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, rf, labelConverter])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

rfModel = model.stages[1]
print(rfModel)  # summary only