## <b>MSDS697 Spring I 2023 - Final Project</b><br>
<b>Group 15</b> - Project Bears<br>
<b>Members</b> - Sharon Dodda, Ensun Pak

This notebook will pull the training data from MongoDB. There will be some light pre-processing on the data prior to using it for model training. We setup a simple pipeline to convert the county feature into a numerical column, assemble the features into a vector via VectorAssembler. The model is cross-validated on 5 folds to tune the model hyperparameters. The model performance will be measured using the area under ROC.

<b>Analytic Objective</b><br>
To predict if a bear will appear again in the areas where there was bear sighting in the past.

<b>Problem Type</b><br>
This will be a binary classification problem where a logistic regression model will be trained to predict future bear sightings.

<b>Conclusion</b><br>
After cross-validating the training set, the logistic regression model achieved an area under ROC score of 0.6471. Evaluating this model on the test set, it achieved an area under ROC score of 0.6503. While the model performs only moderately better than random, the higher area under ROC score in the test evaluation suggests that the model is able to generalize new data well.<br>

<b>The best model has the following tuned hyperparameters:</b><br>
regParam: 500<br> 
maxIters: 0.5<br>
elasticNet: 0.25

<b>Model training performance summary:</b><br>
Training dataset size: 1,982 observations; 10 features<br>
Pipeline execution time: 52 minutes<br>
Train evaluation execution time: 6 seconds<br>
Test evaluation execution time: 2 seconds<br>

In [None]:
# Import packages
from pyspark.sql.functions import *
import numpy as np
import os

Initialize MongoDB link

In [None]:
# Initialize parameters to connect to MongoDB Atlas
mongo_username = os.environ.get("MONGO_USERNAME")
mongo_password =  os.environ.get("MONGO_PASSWORD")
mongo_ip_address = os.environ.get("MONGO_IP")
database_name = os.environ.get("MONGO_DB_NAME")

connectionString=f"mongodb+srv://{mongo_username}:{mongo_password}@{mongo_ip_address}"

Get training data from MongoDB for pre-processing

In [None]:
# Grab historical weather data from MongoDB
database="msds697_bears"
collection="noaa_weather_historical"

noaa_hist = spark.read.format("mongo")\
                    .option("database", database)\
                    .option("spark.mongodb.input.uri", connectionString)\
                    .option("collection", collection).load()

# Oldest date in training set is June 26, 1988
# Get tmax data
noaa_hist_tmax = noaa_hist.filter("datatype == 'TMAX'")\
                          .select('county', col('date').cast('date'), 'value')\
                          .withColumnRenamed('value', 'tmax')

# Get tmin data
noaa_hist_tmin = noaa_hist.filter("datatype == 'TMIN'")\
                          .select('county', col('date').cast('date'), 'value')\
                          .withColumnRenamed('value', 'tmin')

# Get prcp data
noaa_hist_prcp = noaa_hist.filter("datatype == 'PRCP'")\
                          .select('county', col('date').cast('date'), 'value')\
                          .withColumnRenamed('value', 'prcp')

# Join the data into one df
noaa = noaa_hist_tmax.join(noaa_hist_tmin, ['county', 'date'], 'left').join(noaa_hist_prcp, ['county', 'date'], 'left')

# Process df
noaa = noaa.withColumn('day', dayofmonth('date'))\
           .withColumn('month', month('date'))\
           .withColumn('year', year('date'))

In [None]:
# Grab training set data from MongoDB
database="msds697_bears"
collection="training_data"
bears = spark.read.format("mongo")\
                    .option("database", database)\
                    .option("spark.mongodb.input.uri", connectionString)\
                    .option("collection", collection).load()

# Process df
bears = bears.select('day', 'month', 'year', 'county', 'lat', 'lon', 'code', 'label')
bears = bears.withColumn('county', regexp_replace('county', ' County', ''))
bears = bears.withColumn('date', (concat_ws('-', 'year', 'month', 'day')).cast('date'))

# Exclude records before year 2018
bears = bears.filter("date >= '2018-01-01'")
bears = bears.filter("county != 'missing'")
bears = bears.drop('date')

In [None]:
# Join spark df together
bears = bears.join(noaa, ['county', 'year', 'month', 'day'], 'left')

# Exclude joined records with no weather data
bears = bears.filter(col('date').isNotNull())

# Exclude joined records with no prcp data
bears = bears.filter(col('prcp').isNotNull())

# Exclude joined records with no tmin data
bears = bears.filter(col('tmin').isNotNull())

# Prepare final df for model fitting
bears_data = bears.select('county', 'year', 'month', 'day', 'lat', 'lon', 'code', 'tmax', 'tmin', 'prcp', 'label')

bears_data.schema['prcp'].nullable = False

Implement Machine Learning (Model training)

In [None]:
# Load libraries for machine learning pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler, StringIndexer, StringIndexerModel
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [None]:
# Create train / test datasets
bear_data_sets = bears_data.randomSplit([0.8, 0.2])
train = bear_data_sets[0].cache()
test = bear_data_sets[1].cache()

# Prepare stages for pipeline
# Stage 1: Convert county from string to numerical with StringIndexer
stage_1 = StringIndexer(inputCol='county', outputCol='county_index').setHandleInvalid("skip")

# Stage 2: Assemble the features into a feature vector with VectorAssembler
stage_2 = VectorAssembler(inputCols=['county_index', 'year', 'month', 'day', 'lat', 'lon', 'code', 'tmax', 'tmin', 'prcp'],
                         outputCol='features')

# Stage 3: Fit the logistic regression model 
stage_3 = LogisticRegression(featuresCol='features', labelCol='label')

# Construct the pipeline
regression_pl = Pipeline(stages=[stage_1, stage_2, stage_3])

# Define the evaluator
evaluator = BinaryClassificationEvaluator()

# Setup hyperparameters to be tuned
# paramGrid = ParamGridBuilder().addGrid(stage_3.maxIter, [100, 500, 1000])\
#                               .addGrid(stage_3.regParam, [0.0001, 0.001, 0.01, 0.05, 0.1, 0.25, 0.5, 1])\
#                               .addGrid(stage_3.elasticNetParam, [0, 0.25, 0.5, 0.75, 1] ).build()

# Setup hyperparameters to be tuned
paramGrid = ParamGridBuilder().addGrid(stage_3.maxIter, [500, 1000])\
                              .addGrid(stage_3.regParam, [0.0001, 0.001, 0.01, 0.05, 0.1, 1])\
                              .addGrid(stage_3.elasticNetParam, [0.1, 0.25, 0.5, 0.75, 1]).build()

# Setup cross validator object
cv = CrossValidator(estimator=regression_pl,
                    evaluator=evaluator,
                    numFolds=5,
                    estimatorParamMaps=paramGrid)

# Fit the model
cv_model = cv.fit(train)

Model Evaluation

In [None]:
# Best hyperparameters
cv_model.getEstimatorParamMaps()[np.argmax(cv_model.avgMetrics)]

Out[11]: {Param(parent='LogisticRegression_cb81447afa24', name='maxIter', doc='max number of iterations (>= 0).'): 500,
 Param(parent='LogisticRegression_cb81447afa24', name='regParam', doc='regularization parameter (>= 0).'): 0.05,
 Param(parent='LogisticRegression_cb81447afa24', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.25}

In [None]:
# Get prediction from best model
y_pred = cv_model.bestModel.transform(train)

# Evaluate performance - area under ROC
print(f"Training evaluation: {evaluator.getMetricName()} : {evaluator.evaluate(y_pred):.4f}")

Training evaluation: areaUnderROC : 0.6546


In [None]:
# Evaluate model performance on test set
y_hat = cv_model.bestModel.transform(test)

# Evaluate performance - area under ROC
print(f"Test evaluation: {evaluator.getMetricName()} : {evaluator.evaluate(y_hat):.4f}")

Test evaluation: areaUnderROC : 0.6427


In [None]:
y_pred.select('features', 'rawPrediction', 'probability', 'prediction').show(5)

+--------------------+--------------------+--------------------+----------+
|            features|       rawPrediction|         probability|prediction|
+--------------------+--------------------+--------------------+----------+
|[15.0,2018.0,6.0,...|[-1.1498621511422...|[0.24051426260845...|       1.0|
|[15.0,2018.0,6.0,...|[-1.0476403715788...|[0.25967847160900...|       1.0|
|[15.0,2018.0,8.0,...|[-1.0436311803425...|[0.26044996313019...|       1.0|
|[15.0,2018.0,10.0...|[-0.8686038022162...|[0.29554490474468...|       1.0|
|[15.0,2019.0,5.0,...|[-1.0874722129504...|[0.25209457596919...|       1.0|
+--------------------+--------------------+--------------------+----------+
only showing top 5 rows



In [None]:
y_hat.select('features', 'rawPrediction', 'probability', 'prediction').show(5)

+--------------------+--------------------+--------------------+----------+
|            features|       rawPrediction|         probability|prediction|
+--------------------+--------------------+--------------------+----------+
|[2.0,2020.0,11.0,...|[-1.4319759290077...|[0.19279099829339...|       1.0|
|[0.0,2019.0,5.0,4...|[-1.7772936106343...|[0.14463764015408...|       1.0|
|[1.0,2020.0,10.0,...|[-1.4417260379776...|[0.19127820398915...|       1.0|
|[12.0,2021.0,10.0...|[-1.0190047043220...|[0.26522131740086...|       1.0|
|[15.0,2019.0,8.0,...|[-0.9692136684416...|[0.27503726251059...|       1.0|
+--------------------+--------------------+--------------------+----------+
only showing top 5 rows

