# Spark Cluster Setup

Mongo-Spark Connector Configuration to Connect to MongoDB

In [1]:
%%configure -f
{"conf": {"spark.jars.packages": "org.mongodb.spark:mongo-spark-connector_2.11:2.4.0"}}

In [2]:
from datetime import datetime
import numpy as np

from pyspark import SparkContext, SparkConf

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, when, count, col, lag, avg, lit
from pyspark.sql.types import *
from pyspark.sql.window import Window

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

from pyspark.storagelevel import StorageLevel

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
9,application_1550374948226_0700,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


<br>
<br>
<br>

Start timing for notebook execution

In [3]:
%%local
import time
start = time.time()

<br>
<br>
<br>

# Fetch Data from MongoDB

## Bikeshare Data

In [4]:
mongodb_path = "mongodb://172.31.38.7/msds697.bikeshare"

spark_bike = SparkSession \
    .builder \
    .appName("group25") \
    .config("spark.mongodb.input.uri", mongodb_path)\
    .config("spark.driver.memory", "24g")\
    .config("spark.yarn.appMasterEnv.PYSPARK_PYTHON", "python36")\
    .config("spark.executorEnv.PYSPARK_PYTHON", "python36")\
    .getOrCreate()

bikeshare = spark_bike.read.format("com.mongodb.spark.sql.DefaultSource").load().drop('_id')  # Load Data
bikeshare = bikeshare.select('starttime').filter("starttime != 'starttime'").cache()  # Keep only starttime column and filter out erroneous header line

bikeshare.printSchema()
# bikeshare.count()  # 47758652

root
 |-- starttime: string (nullable = true)

## AQI Data

In [5]:
mongodb_path = "mongodb://172.31.38.7/aqi.aqi"

spark_aqi = SparkSession \
    .builder \
    .appName("group25") \
    .config("spark.mongodb.input.uri", mongodb_path)\
    .config("spark.driver.memory", "24g")\
    .config("spark.yarn.appMasterEnv.PYSPARK_PYTHON", "python36")\
    .config("spark.executorEnv.PYSPARK_PYTHON", "python36")\
    .getOrCreate()

aqi = spark_aqi.read.format("com.mongodb.spark.sql.DefaultSource").load().drop('_id').cache()
aqi = aqi.select(['aqi', 'siteID', 'yyyy/mm/dd'])  # Keep only starttime column

aqi.printSchema()
aqi_total_count = aqi.count()
print(aqi_total_count)

root
 |-- aqi: integer (nullable = true)
 |-- siteID: integer (nullable = true)
 |-- yyyy/mm/dd: string (nullable = true)

12859

<br>
<br>
<br>

# Calculations

## Daily Usage

In [6]:
def formatDateRide(mydate):
    """
    Re-formats date to match bike date 
    """
    mydate = mydate.split(' ')[0]
    try:
        objDate = datetime.strptime(mydate, '%Y-%m-%d')
        return datetime.strftime(objDate,'%-m/%-d/%Y')
    except ValueError:
        return mydate

In [7]:
# Extract only date from timestamp
date_udf = udf(lambda date: formatDateRide(date))
daily_ridership = bikeshare.select("starttime").withColumn("date", date_udf("starttime")).groupBy("date").count()
daily_ridership.printSchema()

root
 |-- date: string (nullable = true)
 |-- count: long (nullable = false)

## Daily AQI

In [8]:
def formatDate(mydate):
    """
    Re-formats date to match bike date 
    """
    objDate = datetime.strptime(mydate, '%Y/%m/%d')
    return datetime.strftime(objDate,'%-m/%-d/%Y')

In [9]:
date_udf = udf(lambda date: formatDate(date))

# Changing format of date column
aqi = aqi.withColumn("date", date_udf("yyyy/mm/dd")).drop("siteID", "yyyy/mm/dd")

# Maintaining only the two stations near NYC
NYC_sites = aqi.filter("SiteID == 360610135 or SiteID == 340171002")

# Group by Date
daily_aqi = NYC_sites.groupBy("date").max("AQI")

daily_aqi.printSchema()

root
 |-- date: string (nullable = true)
 |-- max(AQI): integer (nullable = true)

<br>
<br>
<br>

# Joining AQI & Bike Data 

In [10]:
bike_aqi_joined = daily_ridership.join(daily_aqi, 'date','inner')
bike_aqi_joined.printSchema()

root
 |-- date: string (nullable = true)
 |-- count: long (nullable = false)
 |-- max(AQI): integer (nullable = true)

<br>
<br>
<br>

# Feature Engineering

## Previous Day's AQI

In [11]:
# Adding AQI lagged at 1 day
bike_aqi_joined = bike_aqi_joined.select('date', 'count', 'max(AQI)',\
                       lag('max(AQI)', 1).over(Window.orderBy('date'))\
                      .alias('pre_AQI'))

bike_aqi_joined.printSchema()

root
 |-- date: string (nullable = true)
 |-- count: long (nullable = false)
 |-- max(AQI): integer (nullable = true)
 |-- pre_AQI: integer (nullable = true)

## Season Indicator Variable

In [12]:
def getMonth(inval):
    try:
        return int(datetime.strptime(inval, "%m/%d/%Y").month)
    except ValueError:
        return None

def getYear(inval):
    try:
        return int(datetime.strptime(inval, "%m/%d/%Y").year)
    except ValueError:
        return None
def getSeason(month):
    season_dict = {'winter' : [1, 2, 12],
                   'summer' : [6, 7, 8],
                   'spring' : [3, 4, 5],
                   'autumn' : [9, 10, 11]}
    for season in season_dict:
        if month in season_dict.get(season):
            return season 

In [13]:
year_udf = udf(lambda date: getYear(date))
month_udf = udf(lambda date: getMonth(date))
season_udf = udf(lambda month: getSeason(int(month)))

bike_aqi_joined = bike_aqi_joined.withColumn("month", month_udf(bike_aqi_joined.date))\
                .withColumn("year", year_udf(bike_aqi_joined.date))
bike_aqi_joined = bike_aqi_joined.withColumn("season", season_udf(bike_aqi_joined.month))

<br>
<br>
<br>

# Prepare Data for Machine Learning Pipeline

## Filter Out All Nulls

In [14]:
bike_ml = bike_aqi_joined.filter(bike_aqi_joined.pre_AQI.isNotNull()).drop('date', 'month', 'year')
bike_ml.printSchema()

root
 |-- count: long (nullable = false)
 |-- max(AQI): integer (nullable = true)
 |-- pre_AQI: integer (nullable = true)
 |-- season: string (nullable = true)

## Convert Categorical Values to One-Hot Encoding

In [15]:
# Convert to Numeric type first
# First time usually doesn't work while executing the fetching of the data from MongoDB
# Second time's a charm. Execution is fast since the data is now actually loaded
for i in range(2):
    si = StringIndexer(inputCol="season", outputCol="season-num")
    sm = si.fit(bike_ml)
    bike_num = sm.transform(bike_ml).drop("season")
    bike_num = bike_num.withColumnRenamed("season-num", "season")

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 33672)
Traceback (most recent call last):
  File "/usr/lib64/python3.6/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib64/python3.6/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib64/python3.6/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib64/python3.6/socketserver.py", line 696, in __init__
    self.handle()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/accumulators.py", line 266, in handle
    poll(authenticate_and_accum_updates)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/accumulators.py", line 241, in poll
    if func():
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/accumulators.py", line 254, in authenticate_and_accum_updates
    received_to

In [16]:
# Then One-Hot encode numerics
onehotenc = OneHotEncoder(inputCol="season", outputCol="season-onehot", dropLast=True)
bike_ohe = onehotenc.transform(bike_num).drop("season")
bike_ohe = bike_ohe.withColumnRenamed("season-onehot", "season")

In [17]:
bike_ohe.printSchema()

root
 |-- count: long (nullable = false)
 |-- max(AQI): integer (nullable = true)
 |-- pre_AQI: integer (nullable = true)
 |-- season: vector (nullable = true)

### Create Feature Vector and Label Column

In [18]:
input_cols = ["count", "pre_AQI", "season"]
va = VectorAssembler(outputCol="features", inputCols=input_cols)
bike_labeled = va.transform(bike_ohe).select("features", "max(AQI)").withColumnRenamed("max(AQI)", "label")

bike_labeled.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(5,[0,1],[16009.0...|   40|
|(5,[0,1],[5500.0,...|   28|
|(5,[0,1],[14275.0...|   26|
|(5,[0,1],[23232.0...|   46|
|(5,[0,1],[28252.0...|   84|
+--------------------+-----+
only showing top 5 rows

In [19]:
# Persist on Memory for Faster Access
bike_labeled.persist(StorageLevel.MEMORY_AND_DISK)

DataFrame[features: vector, label: int]

<br>
<br>
<br>

# Train-Test Split 

80/20 split

In [20]:
train, test = bike_labeled.randomSplit([0.8, 0.2])

<br>
<br>
<br>

# Modeling
## Baseline

Baseline modeling would be to predict the AQI for the day to be to be the mean AQI

In [21]:
avg_aqi = bike_ml.select('max(AQI)').agg({"max(AQI)": "avg"}).collect()[0][0]  # best constant prediction
print(f"Average AQI Over Entire Dataset: {avg_aqi}")  # best constant

Average AQI Over Entire Dataset: 27.19016083254494

In [22]:
baseline = test
baseline = baseline.withColumn('prediction', lit(avg_aqi))

evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
print(f'Baseline RMSE = {round(evaluator.evaluate(baseline),4)}')

Baseline RMSE = 12.8273

## Linear Regression

In [23]:
%%notify -m "Linear Regression Complete"
# Build Pipeline
scaler = StandardScaler(inputCol='features', outputCol='scaled_feats', withStd=True, withMean=True)  # Feature Scaling
lr = LinearRegression(featuresCol='scaled_feats')  # Linear Regressor
pipeline = Pipeline(stages=[scaler, lr])  # Pipeline

# Parameter Grid
paramGrid = ParamGridBuilder() \
    .addGrid(lr.elasticNetParam, np.arange(0, 1, 0.1)) \
    .build()

# Cross-Validator
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(),
                          numFolds=10)

# Fit
lr_model = crossval.fit(train)

In [24]:
# Predict
pred = lr_model.transform(test)

# Performance Evaluation
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
print(f'RMSE = {round(evaluator.evaluate(pred),4)}')

# Coefficients
print("Feature Coefficients:")
features = ['count', 'pre_AQI', 'season1', 'season2', 'season3']
feat_coef = sorted(zip(features, lr_model.bestModel.stages[-1].coefficients), key=lambda x: x[1], reverse=True)

for feat in feat_coef:
    print(f"\t{feat[0]}: {feat[1]}")

RMSE = 10.835
Feature Coefficients:
	count: 3.007209355426316
	pre_AQI: -1.365996915619375
	season2: -2.13451769466443
	season1: -6.778874412317296
	season3: -8.082021945576125

## Random Forest 

In [25]:
%%notify -m "Random Forest Complete"
# Regressor
rfr = RandomForestRegressor()

# Parameter Grid
rf_paramGrid = ParamGridBuilder()\
  .addGrid(rfr.maxDepth, list(range(2, 30, 7)))\
  .addGrid(rfr.numTrees, list(range(2, 50, 8)))\
  .build()

# Cross-Validator
rf_crossval = CrossValidator(estimator=rfr,
                          estimatorParamMaps=rf_paramGrid,
                          evaluator=RegressionEvaluator(),
                          numFolds=10)

# Fit
rfr_model = rf_crossval.fit(train)

In [26]:
# Predict
pred = rfr_model.transform(test)

# Performance Evaluation
print(f'RMSE = {round(evaluator.evaluate(pred),4)}')

# Feature Importance
print("Feature Importance:")
feat_impor = sorted([(feat, rfr_model.bestModel.featureImportances[i]) for i,feat in enumerate(features)],
                    key=lambda x: x[1], reverse=True)
for feat in feat_impor:
    print(f"\t{feat[0]}: {feat[1]}")

# Decision Tree if desired but too big to print
# print(rfr_model._call_java('toDebugString'))

RMSE = 10.8328
Feature Importance:
	count: 0.3135135884009873
	pre_AQI: 0.2503746425023543
	season3: 0.22891882027138757
	season1: 0.11718529612757503
	season2: 0.09000765269769577

## Gradient Boosted Trees

In [27]:
%%notify -m "Gradient Boosted Trees Complete"
# Regressor
gbt = GBTRegressor()

# Parameter Grid
gbt_paramGrid = ParamGridBuilder()\
  .addGrid(gbt.maxDepth, list(range(2, 30, 7)))\
  .addGrid(gbt.maxIter, list(range(2, 50, 8)))\
  .build()

# Cross-Validator
gbt_crossval = CrossValidator(estimator=gbt,
                          estimatorParamMaps=rf_paramGrid,
                          evaluator=RegressionEvaluator(),
                          numFolds=10)

# Fit
gbt_model = gbt_crossval.fit(train)

In [31]:
# Predict
pred = gbt_model.transform(test)

# Performance Evalution
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
print(f'RMSE = {round(evaluator.evaluate(pred),4)}')

# Feature Importance
print("Feature Importance:")
feat_impor = sorted([(feat, gbt_model.bestModel.featureImportances[i]) for i,feat in enumerate(features)],
                    key=lambda x: x[1], reverse=True)
for feat in feat_impor:
    print(f"\t{feat[0]}: {feat[1]}")

# Decision Tree if desired but too big to print
# print(gb_model._call_java('toDebugString'))

RMSE = 11.3094
Feature Importance:
	pre_AQI: 0.4562231143941505
	count: 0.4144718236293796
	season3: 0.057088220860547954
	season2: 0.03900636650278398
	season1: 0.03321047461313793

<br>
<br>
<br>

Total Execution Time

In [37]:
%%local
print(f"Total Execution Time: {(time.time() - start) / 60} minutes")

Total Execution Time: 50.25802971124649 minutes


<br>
<br>
<br>

Save Final Dataframe to S3

In [None]:
# bike_aqi_joined.write.csv('s3://msds697-phil/BikeshareDF', 'overwrite')