# Spark Cluster Setup

Mongo-Spark Connector Configuration to Connect to MongoDB

In [1]:
%%configure -f
{"conf": {"spark.jars.packages": "org.mongodb.spark:mongo-spark-connector_2.11:2.4.0"}}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,,pyspark,starting,,,


In [28]:
from datetime import datetime
import numpy as np

from pyspark import SparkContext, SparkConf

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, when, count, col, lag, avg, lit
from pyspark.sql.types import *
from pyspark.sql.window import Window

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

from pyspark.storagelevel import StorageLevel

import time

<br>
<br>
<br>

# Fetch Data from MongoDB

## Bikeshare Data

In [3]:
mongodb_path = "mongodb://172.31.38.7/msds697.bikeshare"

spark_bike = SparkSession \
    .builder \
    .appName("group25") \
    .config("spark.mongodb.input.uri", mongodb_path)\
    .config("spark.driver.memory", "24g")\
    .config("spark.yarn.appMasterEnv.PYSPARK_PYTHON", "python36")\
    .config("spark.executorEnv.PYSPARK_PYTHON", "python36")\
    .getOrCreate()

bikeshare = spark_bike.read.format("com.mongodb.spark.sql.DefaultSource").load().drop('_id')  # Load Data
bikeshare = bikeshare.select('starttime').filter("starttime != 'starttime'").cache()  # Keep only starttime column and filter out erroneous header line

bikeshare.printSchema()
# bikeshare.count()  # 47758652

root
 |-- starttime: string (nullable = true)

## AQI Data

In [4]:
mongodb_path = "mongodb://172.31.38.7/aqi.aqi"

spark_aqi = SparkSession \
    .builder \
    .appName("group25") \
    .config("spark.mongodb.input.uri", mongodb_path)\
    .config("spark.driver.memory", "24g")\
    .config("spark.yarn.appMasterEnv.PYSPARK_PYTHON", "python36")\
    .config("spark.executorEnv.PYSPARK_PYTHON", "python36")\
    .getOrCreate()

aqi = spark_aqi.read.format("com.mongodb.spark.sql.DefaultSource").load().drop('_id').cache()
aqi = aqi.select(['aqi', 'siteID', 'yyyy/mm/dd'])  # Keep only starttime column

aqi.printSchema()
aqi_total_count = aqi.count()
print(aqi_total_count)

root
 |-- aqi: integer (nullable = true)
 |-- siteID: integer (nullable = true)
 |-- yyyy/mm/dd: string (nullable = true)

12859

<br>
<br>
<br>

# Calculations

## Daily Usage

In [5]:
def formatDateRide(mydate):
    """
    Re-formats date to match bike date 
    """
    mydate = mydate.split(' ')[0]
    try:
        objDate = datetime.strptime(mydate, '%m/%d/%Y')
        return datetime.strftime(objDate,'%Y-%m-%d')
    except ValueError:
        return mydate

In [6]:
# Extract only date from timestamp
date_udf = udf(lambda date: formatDateRide(date))
daily_ridership = bikeshare.select("starttime").withColumn("date", date_udf("starttime")).groupBy("date").count()
daily_ridership.printSchema()

root
 |-- date: string (nullable = true)
 |-- count: long (nullable = false)

## Daily AQI

In [7]:
def formatDate(mydate):
    """
    Re-formats date to match bike date 
    """
    objDate = datetime.strptime(mydate, '%Y/%m/%d')
    return datetime.strftime(objDate, '%Y-%m-%d')

In [8]:
date_udf = udf(lambda date: formatDate(date))

# Changing format of date column
aqi = aqi.withColumn("date", date_udf("yyyy/mm/dd")).drop("siteID", "yyyy/mm/dd")

# Maintaining only the two stations near NYC
NYC_sites = aqi.filter("SiteID == 360610135 or SiteID == 340171002")

# Group by Date
daily_aqi = NYC_sites.groupBy("date").max("AQI")

daily_aqi.printSchema()

root
 |-- date: string (nullable = true)
 |-- max(AQI): integer (nullable = true)

<br>
<br>
<br>

# Joining AQI & Bike Data 

In [9]:
bike_aqi_joined = daily_ridership.join(daily_aqi, 'date', 'inner')
bike_aqi_joined.printSchema()

root
 |-- date: string (nullable = true)
 |-- count: long (nullable = false)
 |-- max(AQI): integer (nullable = true)

<br>
<br>
<br>

# Feature Engineering

## Previous Day's AQI

In [10]:
# Adding AQI lagged at 1 day
bike_aqi_joined = bike_aqi_joined.select('date', 'count', 'max(AQI)',\
                       lag('max(AQI)', 1).over(Window.orderBy('date'))\
                      .alias('pre_AQI'))

bike_aqi_joined.printSchema()

root
 |-- date: string (nullable = true)
 |-- count: long (nullable = false)
 |-- max(AQI): integer (nullable = true)
 |-- pre_AQI: integer (nullable = true)

## Season Indicator Variable

In [11]:
def getMonth(inval):
    try:
        return int(datetime.strptime(inval, "%Y-%m-%d").month)
    except ValueError:
        return None

def getYear(inval):
    try:
        return int(datetime.strptime(inval, "%Y-%m-%d").year)
    except ValueError:
        return None
def getSeason(month):
    season_dict = {'winter' : [1, 2, 12],
                   'summer' : [6, 7, 8],
                   'spring' : [3, 4, 5],
                   'autumn' : [9, 10, 11]}
    for season in season_dict:
        if month in season_dict.get(season):
            return season 

In [12]:
year_udf = udf(lambda date: getYear(date))
month_udf = udf(lambda date: getMonth(date))
season_udf = udf(lambda month: getSeason(int(month)))

bike_aqi_joined = bike_aqi_joined.withColumn("month", month_udf(bike_aqi_joined.date))\
                .withColumn("year", year_udf(bike_aqi_joined.date))
bike_aqi_joined = bike_aqi_joined.withColumn("season", season_udf(bike_aqi_joined.month))

<br>
<br>
<br>

# Prepare Data for Machine Learning Pipeline

## Filter Out All Nulls

In [13]:
bike_ml = bike_aqi_joined.filter(bike_aqi_joined.pre_AQI.isNotNull()).drop('date')
bike_ml.printSchema()

root
 |-- count: long (nullable = false)
 |-- max(AQI): integer (nullable = true)
 |-- pre_AQI: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- year: string (nullable = true)
 |-- season: string (nullable = true)

## Convert Categorical Values to One-Hot Encoding

In [14]:
# Convert to Numeric type first
for _ in range(2):
    si = StringIndexer(inputCol="season", outputCol="season-num")
    sm = si.fit(bike_ml)
    bike_num = sm.transform(bike_ml).drop("season")
    bike_num = bike_num.withColumnRenamed("season-num", "season")

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 45798)
Traceback (most recent call last):
  File "/usr/lib64/python3.6/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib64/python3.6/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib64/python3.6/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib64/python3.6/socketserver.py", line 696, in __init__
    self.handle()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/accumulators.py", line 266, in handle
    poll(authenticate_and_accum_updates)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/accumulators.py", line 241, in poll
    if func():
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/accumulators.py", line 254, in authenticate_and_accum_updates
    received_to

In [15]:
# Then One-Hot encode numerics
onehotenc = OneHotEncoder(inputCol="season", outputCol="season-onehot", dropLast=True)
bike_ohe = onehotenc.transform(bike_num).drop("season")
bike_ohe = bike_ohe.withColumnRenamed("season-onehot", "season")

In [16]:
bike_ohe.printSchema()

root
 |-- count: long (nullable = false)
 |-- max(AQI): integer (nullable = true)
 |-- pre_AQI: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- year: string (nullable = true)
 |-- season: vector (nullable = true)

### Split Training (Pre-2018) and Test Sets (2018)

In [17]:
test = bike_ohe.filter("year == 2018 and month >= 1")
training = bike_ohe.subtract(test)

test = test.drop('month', 'year')
training = training.drop('month', 'year')

### Create Feature Vector and Label Column

In [18]:
input_cols = ["count", "pre_AQI", "season"]
va = VectorAssembler(outputCol="features", inputCols=input_cols)

bike_labeled_train = va.transform(training).select("features", "max(AQI)").withColumnRenamed("max(AQI)", "label")
bike_labeled_test = va.transform(test).select("features", "max(AQI)").withColumnRenamed("max(AQI)", "label")

bike_labeled_train.show(5)
bike_labeled_test.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[23849.0,42.0,0.0...|   20|
|[37802.0,25.0,0.0...|   22|
|[14588.0,32.0,0.0...|   42|
|[40637.0,25.0,1.0...|   19|
|[41490.0,40.0,0.0...|   10|
+--------------------+-----+
only showing top 5 rows

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(5,[0,1],[5500.0,...|   28|
|(5,[0,1],[18818.0...|   30|
|(5,[0,1],[24299.0...|   51|
|(5,[0,1],[1922.0,...|   41|
|(5,[0,1],[4972.0,...|   22|
+--------------------+-----+
only showing top 5 rows

In [19]:
# Persist on Memory for Faster Access
bike_labeled_train.persist(StorageLevel.MEMORY_AND_DISK)
bike_labeled_test.persist(StorageLevel.MEMORY_AND_DISK)

DataFrame[features: vector, label: int]

<br>
<br>
<br>

# Train-Validation Split 

80/20 split from training set

In [20]:
train, valid = bike_labeled_train.randomSplit([0.8, 0.2])

<br>
<br>
<br>

# Modeling
## Baseline

Baseline modeling would be to predict the AQI for the day to be to be the mean AQI

In [21]:
train_avg_aqi = training.select('max(AQI)').agg({'max(AQI)': 'avg'}).collect()[0][0]  # best constant prediction
print(f"Average AQI Over Train-Split Dataset: {train_avg_aqi}")

baseline_val = valid
baseline_val = baseline_val.withColumn('prediction', lit(train_avg_aqi))

evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
print(f"Baseline Validation RMSE = {round(evaluator.evaluate(baseline_val), 4)}")

baseline_test = bike_labeled_test
baseline_test = baseline_test.withColumn('prediction', lit(train_avg_aqi))

print(f"Baseline Test RMSE = {round(evaluator.evaluate(baseline_test), 4)}")

Average AQI Over Train-Split Dataset: 27.650969529085874
Baseline Validation RMSE = 9.3701
Baseline Test RMSE = 16.0113

## Random Forest 

In [22]:
rf_exec_time = []

In [29]:
for depth in range(2, 30, 7):
    rf_depth_time = []
    for trees in range(2, 50, 8):
        start = time.time()
    
        # Regressor
        rfr = RandomForestRegressor()

        # Parameter Grid
        rf_paramGrid = ParamGridBuilder()\
          .addGrid(rfr.maxDepth, [depth])\
          .addGrid(rfr.numTrees, [trees])\
          .build()

        # Cross-Validator
        rf_crossval = CrossValidator(estimator=rfr,
                                  estimatorParamMaps=rf_paramGrid,
                                  evaluator=evaluator,
                                  numFolds=10)

        # Fit
        rf_crossval.fit(train)
    
        rf_depth_time.append(time.time() - start)
    rf_exec_time.append(rf_depth_time)

CrossValidatorModel_ea1bcdeed52b
CrossValidatorModel_346df6e21955
CrossValidatorModel_0e96f4e31ded
CrossValidatorModel_613c378d0361
CrossValidatorModel_9a602b674da6
CrossValidatorModel_d6341372ab15
CrossValidatorModel_881a298d8322
CrossValidatorModel_3b2e7d8bf45c
CrossValidatorModel_45b65254eb4a
CrossValidatorModel_e5c57f8f8de5
CrossValidatorModel_7182e1c0c93f
CrossValidatorModel_b864f40424e5
CrossValidatorModel_64d460906241
CrossValidatorModel_87a8c29e9c61
CrossValidatorModel_1ac20747bffd
CrossValidatorModel_5271fa4b44ee
CrossValidatorModel_5f2989324a27
CrossValidatorModel_dc5c5a8b2592
CrossValidatorModel_0118c83b83aa
CrossValidatorModel_f9c1d2a1064f
CrossValidatorModel_641e321a504d
CrossValidatorModel_34cbd5c66735
CrossValidatorModel_4e0e22026eb1
CrossValidatorModel_3600b24605f7

In [30]:
print(rf_exec_time)

[[22.050609827041626, 20.224673748016357, 22.78614091873169, 24.931244611740112, 28.143370389938354, 29.912338256835938], [52.8497257232666, 97.263028383255, 115.99773979187012, 132.09366583824158, 143.721209526062, 154.19694328308105], [76.41726112365723, 146.3692603111267, 179.22828483581543, 206.7002022266388, 229.7097511291504, 253.48752355575562], [74.20605802536011, 145.59734225273132, 179.36284589767456, 209.45955324172974, 234.04102563858032, 257.59033036231995]]

## Gradient Boosted Trees

In [31]:
gbt_exec_time = []

In [32]:
for depth in range(2, 11, 4):
    gbt_depth_time = []
    for iters in range(2, 11, 4):
        start = time.time()
    
        # Regressor
        gbt = GBTRegressor()

        # Parameter Grid
        gbt_paramGrid = ParamGridBuilder()\
          .addGrid(gbt.maxDepth, [depth])\
          .addGrid(gbt.maxIter, [iters])\
          .build()

        # Cross-Validator
        gbt_crossval = CrossValidator(estimator=gbt,
                                  estimatorParamMaps=gbt_paramGrid,
                                  evaluator=evaluator,
                                  numFolds=10)

        # Fit
        gbt_model = gbt_crossval.fit(train)
    
        gbt_depth_time.append(time.time() - start)
    gbt_exec_time.append(gbt_depth_time)

In [33]:
print(gbt_exec_time)

[[28.97140598297119, 75.42643928527832, 123.2839126586914], [59.12826347351074, 169.164710521698, 290.4194505214691], [123.34894919395447, 383.85175943374634, 683.8586118221283]]