### Importing Libraries

In [1]:
# If you are running Databricks Runtime version 7.1 or above, uncomment this line and run this cell:
#%pip install mlflow

In [2]:
import pandas as pd
import numpy as np
from pyspark import SparkContext
from pyspark.sql import SparkSession
import time

### Creating a Spark Session

In [3]:
spark = SparkSession.builder.appName('college-scorecard').getOrCreate()

### Reading data into a Spark Dataframe:

In [4]:
start = time.time()
# Read data locally 
df = spark.read.csv("data/Scorecard_small.csv", header="true", inferSchema="true")
#df = spark.read.csv("data/Scorecard.csv", header="true", inferSchema="true")

# Read data from S3 bucket
#df = spark.read.csv("s3://sgupta4/Scorecard_small.csv", header="true", inferSchema="true")
#df = spark.read.csv("s3://sgupta4/Scorecard.csv", header="true", inferSchema="true")
stop = time.time()

print("Time taken to read a spark dataframe:", round((stop-start), 2), "seconds")

Time taken to read a spark dataframe: 3.73 seconds


In [5]:
print(type(df))
print(df.printSchema())
#df.head()

<class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- _c0: integer (nullable = true)
 |-- log_mn_earn_wne_p10: double (nullable = true)
 |-- ADM_RATE_ALL: double (nullable = true)
 |-- COSTT4_A: double (nullable = true)
 |-- UGDS_BLACK: double (nullable = true)
 |-- PPTUG_EF: double (nullable = true)
 |-- UG25abv: double (nullable = true)
 |-- PAR_ED_PCT_1STGEN: double (nullable = true)
 |-- PCTFLOAN8: double (nullable = true)
 |-- C150_4: double (nullable = true)
 |-- TUITIONFEE_OUT: double (nullable = true)
 |-- TUITIONFEE_IN: double (nullable = true)
 |-- UGDS_WHITE: double (nullable = true)
 |-- TUITFTE: double (nullable = true)
 |-- AVGFACSAL: double (nullable = true)
 |-- INC_PCT_LO: double (nullable = true)
 |-- UGDS: double (nullable = true)
 |-- PCTPELL: double (nullable = true)
 |-- DEBT_MDN: double (nullable = true)
 |-- PCTFLOAN19: double (nullable = true)
 |-- INC_PCT_H2: double (nullable = true)

None


### Coverting to a Pandas dataframe to take a look at the data:

In [6]:
pd.DataFrame(df.take(5), columns=df.columns)

Unnamed: 0,_c0,log_mn_earn_wne_p10,ADM_RATE_ALL,COSTT4_A,UGDS_BLACK,PPTUG_EF,UG25abv,PAR_ED_PCT_1STGEN,PCTFLOAN8,C150_4,...,TUITIONFEE_IN,UGDS_WHITE,TUITFTE,AVGFACSAL,INC_PCT_LO,UGDS,PCTPELL,DEBT_MDN,PCTFLOAN19,INC_PCT_H2
0,0,,,,,0.9752,,,,,...,,,,,,44141.0,,,,
1,1,,,,,0.0852,,,,,...,,,,4072.0,,3852.0,,,,
2,2,,,,,0.3455,,,,,...,,,,5708.0,,9889.0,,,,
3,3,,,,,0.1458,,,,,...,,,,4174.0,,295.0,,,,
4,4,,,,,0.8667,,,,,...,,,,3092.0,,60.0,,,,


In [7]:
df.toPandas()

Unnamed: 0,_c0,log_mn_earn_wne_p10,ADM_RATE_ALL,COSTT4_A,UGDS_BLACK,PPTUG_EF,UG25abv,PAR_ED_PCT_1STGEN,PCTFLOAN8,C150_4,...,TUITIONFEE_IN,UGDS_WHITE,TUITFTE,AVGFACSAL,INC_PCT_LO,UGDS,PCTPELL,DEBT_MDN,PCTFLOAN19,INC_PCT_H2
0,0,,,,,0.9752,,,,,...,,,,,,44141.0,,,,
1,1,,,,,0.0852,,,,,...,,,,4072.0,,3852.0,,,,
2,2,,,,,0.3455,,,,,...,,,,5708.0,,9889.0,,,,
3,3,,,,,0.1458,,,,,...,,,,4174.0,,295.0,,,,
4,4,,,,,0.8667,,,,,...,,,,3092.0,,60.0,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
124694,124694,,,,,,,0.446187,,,...,6276.0,,,,,,,6076.0,,
124695,124695,,,,,,,0.446187,,,...,5271.0,,,,,,,6076.0,,
124696,124696,,,,,,,0.446187,,,...,5148.0,,,,,,,6076.0,,
124697,124697,,,,,,,0.446187,,,...,6900.0,,,,,,,6076.0,,


In [8]:
df.dtypes

[('_c0', 'int'),
 ('log_mn_earn_wne_p10', 'double'),
 ('ADM_RATE_ALL', 'double'),
 ('COSTT4_A', 'double'),
 ('UGDS_BLACK', 'double'),
 ('PPTUG_EF', 'double'),
 ('UG25abv', 'double'),
 ('PAR_ED_PCT_1STGEN', 'double'),
 ('PCTFLOAN8', 'double'),
 ('C150_4', 'double'),
 ('TUITIONFEE_OUT', 'double'),
 ('TUITIONFEE_IN', 'double'),
 ('UGDS_WHITE', 'double'),
 ('TUITFTE', 'double'),
 ('AVGFACSAL', 'double'),
 ('INC_PCT_LO', 'double'),
 ('UGDS', 'double'),
 ('PCTPELL', 'double'),
 ('DEBT_MDN', 'double'),
 ('PCTFLOAN19', 'double'),
 ('INC_PCT_H2', 'double')]

#### Dropping c0 column (index column)

In [9]:
df = df.drop("_c0")

In [10]:
df

DataFrame[log_mn_earn_wne_p10: double, ADM_RATE_ALL: double, COSTT4_A: double, UGDS_BLACK: double, PPTUG_EF: double, UG25abv: double, PAR_ED_PCT_1STGEN: double, PCTFLOAN8: double, C150_4: double, TUITIONFEE_OUT: double, TUITIONFEE_IN: double, UGDS_WHITE: double, TUITFTE: double, AVGFACSAL: double, INC_PCT_LO: double, UGDS: double, PCTPELL: double, DEBT_MDN: double, PCTFLOAN19: double, INC_PCT_H2: double]

### Handling missing data:

In [11]:
# Missing Data
from pyspark.sql.functions import isnull, when, count, col

df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

+-------------------+------------+--------+----------+--------+-------+-----------------+---------+------+--------------+-------------+----------+-------+---------+----------+-----+-------+--------+----------+----------+
|log_mn_earn_wne_p10|ADM_RATE_ALL|COSTT4_A|UGDS_BLACK|PPTUG_EF|UG25abv|PAR_ED_PCT_1STGEN|PCTFLOAN8|C150_4|TUITIONFEE_OUT|TUITIONFEE_IN|UGDS_WHITE|TUITFTE|AVGFACSAL|INC_PCT_LO| UGDS|PCTPELL|DEBT_MDN|PCTFLOAN19|INC_PCT_H2|
+-------------------+------------+--------+----------+--------+-------+-----------------+---------+------+--------------+-------------+----------+-------+---------+----------+-----+-------+--------+----------+----------+
|             108106|       87314|  104195|     83535|   15318|  72108|            21888|    90255| 89819|         67855|        66675|     83535|  18644|    55047|     61214|14356|  83834|   29830|     90255|     76369|
+-------------------+------------+--------+----------+--------+-------+-----------------+---------+------+----------

In [12]:
from collections import defaultdict

data_types = defaultdict(list)
for entry in df.schema.fields:
    data_types[str(entry.dataType)].append(entry.name)

In [13]:
data_types

defaultdict(list,
            {'DoubleType': ['log_mn_earn_wne_p10',
              'ADM_RATE_ALL',
              'COSTT4_A',
              'UGDS_BLACK',
              'PPTUG_EF',
              'UG25abv',
              'PAR_ED_PCT_1STGEN',
              'PCTFLOAN8',
              'C150_4',
              'TUITIONFEE_OUT',
              'TUITIONFEE_IN',
              'UGDS_WHITE',
              'TUITFTE',
              'AVGFACSAL',
              'INC_PCT_LO',
              'UGDS',
              'PCTPELL',
              'DEBT_MDN',
              'PCTFLOAN19',
              'INC_PCT_H2']})

### Using Imputer library in Spark

In [14]:
start = time.time()

numericals = data_types["DoubleType"]
numericals = [var for var in numericals]
#and numericals_imputed = [var + "_imputed" for var in numericals]

from pyspark.ml.feature import Imputer
imputer = Imputer(inputCols = numericals, outputCols = numericals)
df = imputer.fit(df).transform(df)

stop = time.time()

print('Time taken to Impute Data:', round((stop-start), 2), 'seconds')

Time taken to Impute Data: 1.41 seconds


In [15]:
# Checking for missing data again
from pyspark.sql.functions import isnull, when, count, col

df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

+-------------------+------------+--------+----------+--------+-------+-----------------+---------+------+--------------+-------------+----------+-------+---------+----------+----+-------+--------+----------+----------+
|log_mn_earn_wne_p10|ADM_RATE_ALL|COSTT4_A|UGDS_BLACK|PPTUG_EF|UG25abv|PAR_ED_PCT_1STGEN|PCTFLOAN8|C150_4|TUITIONFEE_OUT|TUITIONFEE_IN|UGDS_WHITE|TUITFTE|AVGFACSAL|INC_PCT_LO|UGDS|PCTPELL|DEBT_MDN|PCTFLOAN19|INC_PCT_H2|
+-------------------+------------+--------+----------+--------+-------+-----------------+---------+------+--------------+-------------+----------+-------+---------+----------+----+-------+--------+----------+----------+
|                  0|           0|       0|         0|       0|      0|                0|        0|     0|             0|            0|         0|      0|        0|         0|   0|      0|       0|         0|         0|
+-------------------+------------+--------+----------+--------+-------+-----------------+---------+------+--------------

In [16]:
df.dtypes

[('log_mn_earn_wne_p10', 'double'),
 ('ADM_RATE_ALL', 'double'),
 ('COSTT4_A', 'double'),
 ('UGDS_BLACK', 'double'),
 ('PPTUG_EF', 'double'),
 ('UG25abv', 'double'),
 ('PAR_ED_PCT_1STGEN', 'double'),
 ('PCTFLOAN8', 'double'),
 ('C150_4', 'double'),
 ('TUITIONFEE_OUT', 'double'),
 ('TUITIONFEE_IN', 'double'),
 ('UGDS_WHITE', 'double'),
 ('TUITFTE', 'double'),
 ('AVGFACSAL', 'double'),
 ('INC_PCT_LO', 'double'),
 ('UGDS', 'double'),
 ('PCTPELL', 'double'),
 ('DEBT_MDN', 'double'),
 ('PCTFLOAN19', 'double'),
 ('INC_PCT_H2', 'double')]

In [17]:
features_cols = list(df.columns)
#print(features_cols)

# Remove response variable from feature list
features_cols.remove("log_mn_earn_wne_p10")
print(features_cols)

['ADM_RATE_ALL', 'COSTT4_A', 'UGDS_BLACK', 'PPTUG_EF', 'UG25abv', 'PAR_ED_PCT_1STGEN', 'PCTFLOAN8', 'C150_4', 'TUITIONFEE_OUT', 'TUITIONFEE_IN', 'UGDS_WHITE', 'TUITFTE', 'AVGFACSAL', 'INC_PCT_LO', 'UGDS', 'PCTPELL', 'DEBT_MDN', 'PCTFLOAN19', 'INC_PCT_H2']


In [18]:
from pyspark.ml.feature import VectorAssembler

# Create the vector assembler transformer
vec = VectorAssembler(inputCols=features_cols, outputCol='features')

# Apply the vector transformer to data
df = vec.transform(df)

# Select only the dependent variable and features
ml_ready_df = df.select(['log_mn_earn_wne_p10','features'])
# Inspect Results
ml_ready_df.show(5)

+-------------------+--------------------+
|log_mn_earn_wne_p10|            features|
+-------------------+--------------------+
| 10.486485368440269|[0.69834354955196...|
| 10.486485368440269|[0.69834354955196...|
| 10.486485368440269|[0.69834354955196...|
| 10.486485368440269|[0.69834354955196...|
| 10.486485368440269|[0.69834354955196...|
+-------------------+--------------------+
only showing top 5 rows



In [19]:
ml_ready_df.printSchema

<bound method DataFrame.printSchema of DataFrame[log_mn_earn_wne_p10: double, features: vector]>

In [20]:
ml_ready_df

DataFrame[log_mn_earn_wne_p10: double, features: vector]

In [21]:
ml_ready_df = ml_ready_df.withColumnRenamed("log_mn_earn_wne_p10", "label")

In [22]:
ml_ready_df

DataFrame[label: double, features: vector]

#### Splitting into training and test data

In [23]:
# Split the data into training and test sets (30% held out for testing)

(trainingData, testData) = ml_ready_df.randomSplit([0.7, 0.3])

In [24]:
trainingData.show(2)

+------------------+--------------------+
|             label|            features|
+------------------+--------------------+
|10.486485368440269|[0.0,23002.166699...|
|10.486485368440269|[0.0,23002.166699...|
+------------------+--------------------+
only showing top 2 rows



In [25]:
trainingData.count()

87469

In [26]:
testData.count()

37230

### Random Forest

In [27]:
from pyspark.ml.regression import RandomForestRegressor

# Initialize model with columns to utilize
rf = RandomForestRegressor(featuresCol="features",labelCol="label", seed=42)

#### Training Random Forest Model

In [28]:
# Train model
start = time.time()
rf_model = rf.fit(trainingData)
stop = time.time()
print("Time taken to train using Random Forest:", round((stop-start), 2), 'seconds')

Time taken to train using Random Forest: 4.07 seconds


In [29]:
rf_model

RandomForestRegressionModel: uid=RandomForestRegressor_dcc2ced67a96, numTrees=20, numFeatures=19

#### Making Predictions:

In [30]:
# Make predictions
predictions = rf_model.transform(testData)

In [31]:
predictions

DataFrame[label: double, features: vector, prediction: double]

In [32]:
# Inspect results
predictions.select("Prediction","Label").show(5)

+------------------+------------------+
|        Prediction|             Label|
+------------------+------------------+
|10.479202551321368|10.486485368440269|
|10.478525519098973|10.486485368440269|
| 10.51202413145482|10.486485368440269|
| 10.48658704770523|10.486485368440269|
|10.480287364830394|10.486485368440269|
+------------------+------------------+
only showing top 5 rows



#### Hyperparameter Tuning

In [33]:
# Initialize model with columns to utilize
rf = RandomForestRegressor(featuresCol="features",labelCol="label", seed=42)

In [34]:
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

pipeline = Pipeline(stages=[rf])

In [35]:
# Create ParamGrid for Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, TrainValidationSplit

# paramGrid = ParamGridBuilder() \
#     .addGrid(rf.numTrees, [int(x) for x in np.linspace(start = 10, stop = 50, num = 3)]) \
#     .addGrid(rf.maxDepth, [int(x) for x in np.linspace(start = 5, stop = 25, num = 3)]) \
#     .build()

paramGrid = ParamGridBuilder() \
            .addGrid(rf.maxDepth, [2, 5, 10]) \
            .addGrid(rf.maxBins, [5, 10, 20]) \
            .addGrid(rf.numTrees, [5, 20, 50]) \
            .build()

#### Cross Validation:

In [36]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=pipeline, 
                    estimatorParamMaps=paramGrid, 
                    evaluator=RegressionEvaluator(), 
                    numFolds=5)

In [37]:
start = time.time()
cvModel = cv.fit(trainingData)
stop = time.time()

print("Time taken for Cross Validation Search:", round((stop-start), 2), 'seconds')

Time taken for Cross Validation Search: 304.14 seconds


In [38]:
start = time.time()
cvModel = cv.fit(testData)
stop = time.time()

print("Time taken for Cross Validation Search:", round((stop-start), 2), 'seconds')

Time taken for Cross Validation Search: 201.1 seconds


#### Train Validation:

In [39]:
tvs = TrainValidationSplit(estimator=pipeline,
                           estimatorParamMaps=paramGrid,
                           evaluator=RegressionEvaluator(),
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)

In [40]:
start = time.time()
tvs_model = tvs.fit(trainingData)
stop = time.time()

print("Time taken for Train Validation Split:", round((stop-start), 2), 'seconds')

Time taken for Train Validation Split: 66.05 seconds


In [41]:
start = time.time()
tvs_model = tvs.fit(testData)
stop = time.time()

print("Time taken for Train Validation Split:", round((stop-start), 2), 'seconds')

Time taken for Train Validation Split: 45.04 seconds


### Gradient Boost

#### Defining the GB Model:

In [42]:
# GBM Model
from pyspark.ml.regression import GBTRegressor

# Initialize model with columns to utilize
gbt = GBTRegressor(featuresCol="features",labelCol="label",seed=42)

#### Training the GBT Model:

In [43]:
# Train model
start = time.time()
gb_model = gbt.fit(trainingData)
stop = time.time()
print("Time taken to train using a Gradient Boosted Model:", round((stop-start), 2), "seconds")

Time taken to train using a Gradient Boosted Model: 6.97 seconds


#### Making Predictions:

In [44]:
# Make predictions
predictions = gb_model.transform(testData)

In [45]:
# Inspect results
predictions.select("Prediction","Label").show(5)

+------------------+------------------+
|        Prediction|             Label|
+------------------+------------------+
|10.483321677179367|10.486485368440269|
|10.485339774715243|10.486485368440269|
|10.495968124008572|10.486485368440269|
|10.483753714475059|10.486485368440269|
|10.489408559529725|10.486485368440269|
+------------------+------------------+
only showing top 5 rows



#### Hyperparameter Tuning:

In [46]:
pipeline = Pipeline(stages=[gbt])

In [47]:
paramGrid = ParamGridBuilder()\
    .addGrid(gbt.maxDepth, [2, 5])\
    .addGrid(gbt.maxIter, [10, 100])\
    .addGrid(rf.numTrees, [5, 20, 50]) \
    .build()

#### Cross Validation:

In [48]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=pipeline, 
                    estimatorParamMaps=paramGrid, 
                    evaluator=RegressionEvaluator(), 
                    numFolds=5)

In [49]:
start = time.time()
cvModel = cv.fit(trainingData)
stop = time.time()

print("Time taken for Cross Validation Search:", round((stop-start), 2), 'seconds')

Time taken for Cross Validation Search: 753.39 seconds


In [50]:
start = time.time()
cvModel = cv.fit(testData)
stop = time.time()

print("Time taken for Cross Validation Search:", round((stop-start), 2), 'seconds')

Time taken for Cross Validation Search: 713.97 seconds


#### Train Validation:

In [51]:
tvs = TrainValidationSplit(estimator=pipeline,
                           estimatorParamMaps=paramGrid,
                           evaluator=RegressionEvaluator(),
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)

In [52]:
start = time.time()
tvs_model = tvs.fit(trainingData)
stop = time.time()

print("Time taken for Train Validation Split:", round((stop-start), 2), 'seconds')

Time taken for Train Validation Split: 211.63 seconds


In [53]:
start = time.time()
tvs_model = tvs.fit(testData)
stop = time.time()

print("Time taken for Train Validation Split:", round((stop-start), 2), 'seconds')

Time taken for Train Validation Split: 165.95 seconds
