# Predicting CAR Commute Time 

## Import All Packages Used

In [1]:
#Import Packages
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import rc
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

from pyspark.sql import functions as fn, Row
from pyspark.sql.functions import when, lit
from pyspark import sql
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.ml import feature, regression, evaluation, Pipeline
from pyspark.ml.feature import VectorAssembler,StringIndexer,OneHotEncoder, StandardScaler,QuantileDiscretizer
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, GBTRegressor
from pyspark.ml.tuning import ParamGridBuilder

# Data Processing

In [2]:

#Load data
trip_df = pd.read_csv("/dbfs/FileStore/tables/trippub.csv")
hhpub_df = pd.read_csv("/dbfs/FileStore/tables/hhpub.csv")
per_df = pd.read_csv("/dbfs/FileStore/tables/perpub.csv")
veh_df= pd.read_csv("/dbfs/FileStore/tables/vehpub.csv")

In [3]:
trip_df.loc[trip_df.WTTRDFIN > 0]
hhpub_df.loc[hhpub_df.WTHHFIN > 0]
per_df.loc[per_df.WTPERFIN > 0]
veh_df.loc[veh_df.WTHHFIN >0]

Unnamed: 0,HOUSEID,VEHID,VEHYEAR,VEHAGE,MAKE,MODEL,FUELTYPE,VEHTYPE,WHOMAIN,OD_READ,HFUEL,VEHOWNED,VEHOWNMO,ANNMILES,HYBRID,PERSONID,TRAVDAY,HOMEOWN,HHSIZE,HHVEHCNT,HHFAMINC,DRVRCNT,HHSTATE,HHSTFIPS,NUMADLT,WRKCOUNT,TDAYDATE,LIF_CYC,MSACAT,MSASIZE,RAIL,URBAN,URBANSIZE,URBRUR,CENSUS_D,CENSUS_R,CDIVMSAR,HH_RACE,HH_HISP,HH_CBSA,SMPLSRCE,WTHHFIN,BESTMILE,BEST_FLG,BEST_EDT,BEST_OUT,HBHUR,HTHTNRNT,HTPPOPDN,HTRESDN,HTEEMPDN,HBHTNRNT,HBPPOPDN,HBRESDN
0,30000007,1,2007,10,49,49032,1,1,3,69000,-1,1,-1,1000,2,3,2,1,3,5,7,3,NC,37,3,1,201608,10,3,1,2,1,1,1,5,3,53,2,2,XXXXX,2,187.314320,14611.926637,1,-1,4,T,50,1500,750,750,20,750,300
1,30000007,2,2004,13,49,49442,1,2,-8,164000,-1,1,-1,2500,2,-8,2,1,3,5,7,3,NC,37,3,1,201608,10,3,1,2,1,1,1,5,3,53,2,2,XXXXX,2,187.314320,4767.090946,3,-1,-1,T,50,1500,750,750,20,750,300
2,30000007,3,1998,19,19,19014,1,1,1,120000,-1,1,-1,900,2,1,2,1,3,5,7,3,NC,37,3,1,201608,10,3,1,2,1,1,1,5,3,53,2,2,XXXXX,2,187.314320,8000.324552,1,-1,-1,T,50,1500,750,750,20,750,300
3,30000007,4,1997,20,19,19021,1,1,2,-88,-1,1,-1,500,2,2,2,1,3,5,7,3,NC,37,3,1,201608,10,3,1,2,1,1,1,5,3,53,2,2,XXXXX,2,187.314320,725.932347,2,-1,-1,T,50,1500,750,750,20,750,300
4,30000007,5,1993,24,20,20481,1,4,2,300000,-1,1,-1,10000,2,2,2,1,3,5,7,3,NC,37,3,1,201608,10,3,1,2,1,1,1,5,3,53,2,2,XXXXX,2,187.314320,12437.658757,1,-1,-1,T,50,1500,750,750,20,750,300
5,30000008,1,2014,3,20,20028,1,1,97,-88,-1,1,-1,8000,2,97,5,1,2,4,8,2,WI,55,2,2,201608,2,2,5,2,4,6,2,3,2,32,1,2,33460,2,69.513032,6099.620939,3,-1,-1,R,5,300,300,150,5,300,300
6,30000008,2,2010,7,20,20024,1,3,2,-88,-1,1,-1,10000,2,2,5,1,2,4,8,2,WI,55,2,2,201608,2,2,5,2,4,6,2,3,2,32,1,2,33460,2,69.513032,12865.611703,2,-1,-1,R,5,300,300,150,5,300,300
7,30000008,3,2008,9,20,20039,1,1,97,-88,-1,1,-1,10000,2,97,5,1,2,4,8,2,WI,55,2,2,201608,2,2,5,2,4,6,2,3,2,32,1,2,33460,2,69.513032,6118.535423,3,-1,-1,R,5,300,300,150,5,300,300
8,30000008,4,2004,13,20,20481,1,4,1,128000,-1,1,-1,10000,2,1,5,1,2,4,8,2,WI,55,2,2,201608,2,2,5,2,4,6,2,3,2,32,1,2,33460,2,69.513032,9777.895446,1,-1,-1,R,5,300,300,150,5,300,300
9,30000012,1,2011,6,58,58037,1,1,1,70115,-1,1,-1,11000,2,1,5,1,1,2,10,1,NY,36,1,1,201607,1,3,3,2,1,3,1,2,1,23,1,2,XXXXX,2,79.419586,4616.917047,1,-1,-1,C,80,17000,17000,5000,60,17000,7000


In [4]:
trip_df1 = trip_df[
    np.abs(trip_df.WTTRDFIN - trip_df.WTTRDFIN.mean()) <= (3*trip_df.WTTRDFIN.std())
]
hhpub_df1 = hhpub_df[
    np.abs(hhpub_df.WTHHFIN - hhpub_df.WTHHFIN.mean()) <= (3*hhpub_df.WTHHFIN.std())
]
per_df1 = per_df[
    np.abs(per_df.WTPERFIN - per_df.WTPERFIN.mean()) <= (3*per_df.WTPERFIN.std())
]
veh_df1 = veh_df[
    np.abs(veh_df.WTHHFIN - veh_df.WTHHFIN.mean()) <= (3*veh_df.WTHHFIN.std())
]

In [5]:
print("hhpub_df before cleaning: ",hhpub_df.shape)
print("hhpub_df1 after cleaning: ",hhpub_df1.shape)
print("per_df before cleaning: ",per_df.shape)
print("per_df1 after cleaning: ",per_df1.shape)
print("trip_df before cleaning: ",trip_df.shape)
print("trip_df1 after cleaning: ",trip_df1.shape)
print("veh_df before cleaning: ",veh_df.shape)
print("veh_df1 after cleaning: ",veh_df1.shape)

In [6]:
#Way to Import data into spark dataframe on databricks:
#trip_df  = spark.read.format("csv").option("header", "true").load("/FileStore/tables/trippub.csv")
#hhpub_df = spark.read.format("csv").option("header", "true").load("/FileStore/tables/hhpub.csv")
#per_df   = spark.read.format("csv").option("header", "true").load("/FileStore/tables/perpub.csv")
#veh_df   = spark.read.format("csv").option("header", "true").load("/FileStore/tables/vehpub.csv")

In [7]:
#Loading Data into Spark Data Frame
hhpub = spark.createDataFrame(hhpub_df1)
vehpub = spark.createDataFrame(veh_df1)
trippub = spark.createDataFrame(trip_df1)
perpub = spark.createDataFrame(per_df1)

In [8]:
#Function to remove columns containing invalid/NA values:
def rmv_nacols(df):
  dropcol = ['1']
  id = ['HOUSEID', 'VEHID', 'PERSONID']
  #dfcol = redmergeddf.columns
  total = df.count()
  for i in df.columns:
    if (i == id[0]) or (i == id[1]) or (i == id[2]):
      #print(i)
      continue
    else:
      #print(i)
      a = df.where(fn.col(i).isin({-1, -9,-7,-77,-8,-88})).count()
      naval = (a/total)*100
      print('Percentage of NA values is '+ str(naval) + 'in column ' + i)
      if naval > 50.00:
        dropcol.append(i)
  print(dropcol)
  return dropcol




In [9]:
hhnacols = rmv_nacols(hhpub)
print(type(hhnacols))
print(len(hhnacols))
print(hhnacols)

In [10]:
#Household DataFrame after removing Columns with invalid/NA values

hhpub1 = reduce(DataFrame.drop, hhnacols, hhpub)

print('Before removing columns with NA values: ', len(hhpub.columns))
print('After removing columns with NA values: ', len(hhpub1.columns))

In [11]:
vehnacols = rmv_nacols(vehpub)
print(type(vehnacols))
print(len(vehnacols))
print(vehnacols)

In [12]:
#Vehicle DataFrame after removing Columns with invalid/NA values

vehpub1 = reduce(DataFrame.drop, vehnacols, vehpub)

print('Before removing columns with NA values: ', len(vehpub.columns))
print('After removing columns with NA values: ', len(vehpub1.columns))

In [13]:
#This is not requried to run if we are running the next block as results of this block is hardcoded in next block
pernacols = rmv_nacols(perpub)
print(type(pernacols))
print(len(pernacols))
print(pernacols)

In [14]:
#No Need to Run this if previous block is executed

#pernacols = ['1', 'PAYPROF', 'GT1JBLWK', 'WRK_HOME', 'WKFTPT', 'WRKTRANS', 'LSTTRDAY17', 'OCCAT', 'SCHTYP', 'BIKE4EX', 'BIKESHARE', 'MCUSED', 'CARRODE', 'TIMETOWK', 'NOCONG', 'PUBTIME', 'WRKTIME', 'WKRMHM', 'FLEXTIME', 'WKFMHMXX', 'SCHTRN1', 'SCHTRN2', 'MEDCOND6', 'VPACT', 'YRTOUS', 'SAMEPLC', 'W_NONE', 'W_CANE', 'W_WLKR', 'W_WHCANE', 'W_DOG', 'W_CRUTCH', 'W_SCOOTR', 'W_CHAIR', 'W_MTRCHR', 'GCDWORK', 'WKSTFIPS', 'DISTTOWK17', 'DISTTOSC17', 'WALK_DEF', 'WALK_GKQ', 'BIKE_DFR', 'BIKE_GKP', 'CONDTRAV', 'CONDRIDE', 'CONDNIGH', 'CONDRIVE', 'CONDPUB', 'CONDSPEC', 'CONDTAX']


In [15]:
#Person DataFrame after removing Columns with invalid/NA values

perpub1 = reduce(DataFrame.drop, pernacols, perpub)

print('Before removing columns with NA values: ', len(perpub.columns))
print('After removing columns with NA values: ', len(perpub1.columns))

In [16]:
tripnacols = rmv_nacols(trippub)
#print(type(tripnacols))
print(len(tripnacols))
print(tripnacols)

In [17]:
#Results of previous block are hardcoded here to save time everytime we execute this project

tripnacols = ['TRWAITTM', 'NUMTRANS', 'TRACCTM', 'DROP_PRK', 'TREGRTM', 'ONTD_P3', 'ONTD_P4', 'ONTD_P5', 'ONTD_P6', 'ONTD_P7', 'ONTD_P8', 'ONTD_P9', 'ONTD_P10',
              'ONTD_P11','ONTD_P12', 'ONTD_P13', 'TRACC_WLK', 'TRACC_POV', 'TRACC_BUS', 'TRACC_CRL', 'TRACC_SUB', 'TRACC_OTH', 'TREGR_WLK','TREGR_POV','TREGR_BUS',
              'TREGR_CRL','TREGR_SUB', 'TREGR_OTH']
print(len(tripnacols))

In [18]:
#Trip DataFrame after removing Columns with invalid/NA values

trippub1 = reduce(DataFrame.drop, tripnacols, trippub)
print('Before removing columns with NA values: ', len(trippub.columns))
print('After removing columns with NA values: ', len(trippub1.columns))

In [19]:
# Function to Drop Common Columns in DF2

def drop_comcols(df1,df2):
    a = df1.columns
    b = df2.columns
    res = df2
    #temp = ['1']
    for j in b:
        if (j == 'HOUSEID') or (j == 'PERSONID') or (j == 'VEHID'):
            #print('pass: '+ j)
            continue
        else:
            #print('ref: '+ j)
            for i in a:
                if (i == j):
                    if (i != 'HOUSEID') or (i != 'PERSONID') or (i != 'VEHID'):
                        #print(j)
                        res = res.drop(j)
                        #temp.append(j)
    #print(len(temp))
    #return temp
    #print(len(res.columns))
    #print(len(df2.columns))
    return res

In [20]:
#Drop common Columns in Person DF in Ref: Household

#Deleting the common columns (except IDs) between household dataset and person dataset
perpub_df1 = drop_comcols(hhpub1, perpub1)

#print(len(perpub_df1.columns))
#print(perpub_df1.columns)

print("Person DF before dropping common columns: ",len(perpub1.columns))
print("Person DF after dropping common columns: ",len(perpub_df1.columns))

#testing the function drop_comcols
#temp = drop_comcols(hhpub, perpub)
#print(temp)
#print(len(temp))

In [21]:
# Merging household and Person Dataset 
hhper_df = perpub_df1.join(hhpub1, on = ['HOUSEID'])

In [22]:
print(hhpub1.count(),len(hhpub1.columns))
print(perpub_df1.count(),len(perpub_df1.columns))
print(hhper_df.count(),len(hhper_df.columns))

In [23]:
## Verification of Joins ##


# logic required to check why the resultant dataframe hhper_df has 1036 rows less than person dataframe
#Find HouseIDs that are not in merged dataset: hhper_df
print(hhpub1.select(fn.countDistinct('HOUSEID')).collect())
print(perpub_df1.select(fn.countDistinct('HOUSEID')).collect())
print(hhper_df.select(fn.countDistinct('HOUSEID')).collect())

#Difference between 2 lists
def Diff(li1, li2): 
    res = list(set(li1) - set(li2))
    return res

a = hhpub1.select("HOUSEID").collect()
b = perpub_df1.select("HOUSEID").collect()
c = hhper_df.select("HOUSEID").collect()

res = Diff(a, b)
print('Number of HouseIDs NOT common between household ds and person ds is: ' + str(len(res)))

res1 = Diff(a,c)
print('Number of HouseIDs NOT common between household ds and mergerd ds is: ' + str(len(res1)))

res2 = Diff(b,c)
print('Number of HouseIDs NOT common between person ds and merged ds is: ' + str(len(res2)))

In [24]:
#Deleting the common columns (except IDs) between vehicle dataset and merged dataset
vehpub_df1 = drop_comcols(hhper_df, vehpub1)

print("Vehicle DF before dropping columns: ",len(vehpub1.columns))
print("Vehicle DF after dropping columns: ",len(vehpub_df1.columns))

In [25]:
#Merging household and Vehicle Dataset
hhperveh_df = hhper_df.join(vehpub_df1, on = ['HOUSEID','PERSONID'])

In [26]:
print(vehpub_df1.count(),len(vehpub_df1.columns))
print(hhper_df.count(),len(hhper_df.columns))
print(hhperveh_df.count(),len(hhperveh_df.columns))

In [27]:
trippub_df1 = drop_comcols(hhperveh_df, trippub1)

print("Trip DF before dropping columns: ",len(trippub1.columns))
print("Trip DF after dropping columns: ",len(trippub_df1.columns))

In [28]:
#Merging household and Vehicle Dataset
hhpervehtrip_df = hhperveh_df.join(trippub_df1, on = ['HOUSEID','PERSONID', 'VEHID'])

In [29]:
print(trippub_df1.count(),len(trippub_df1.columns))
print(hhperveh_df.count(),len(hhperveh_df.columns))
print(hhpervehtrip_df.count(),len(hhpervehtrip_df.columns))

In [32]:
drop_columns = ['HOUSEID', 'PERSONID', 'VEHID', 'TDCASEID', 'WTHHFIN', 'WTTRDFIN', 'WTPERFIN']
#hhpervehtrip_df.drop(i for i in c)

#from functools import reduce
#from pyspark.sql import DataFrame
redmergeddf = reduce(DataFrame.drop, drop_columns, hhpervehtrip_df)

In [33]:
print(len(redmergeddf.columns))
print(redmergeddf.columns)


In [34]:
intuitrmved = ["EDUC"
,"R_RELAT"
,"PAYPROF"
,"GT1JBLWK"
,"WRK_HOME"
,"WKFTPT"
,"WALK4EX"
,"CARRODE"
,"WRKTIME"
,"WKRMHM"
,"FLEXTIME"
,"DELIVER"
,"HEALTH"
,"VPACT"
,"LPACT"
,"BORNINUS"
,"YEARMILE"
,"PROXY"
,"WHOPROXY"
,"USEPUBTR"
,"WORKER"
,"DIARY"
,"OUTCNTRY"
,"FRSTHM17"
,"CNTTDTR"
,"GCDWORK"
,"WKSTFIPS"
,"DRIVER"
,"OUTOFTWN"
,"R_AGE_IMP"
,"R_SEX_IMP"
,"SAMPSTRAT"
,"HOMEOWN"
,"HHSIZE"
,"HHVEHCNT"
,"HHFAMINC"
,"PC"
,"SPHONE"
,"TAB"
,"PARA"
,"PRICE"
,"HHRELATD"
,"DRVRCNT"
,"CNTTDHH"
,"HHSTATE"
,"HHSTFIPS"
,"NUMADLT"
,"YOUNGCHILD"
,"WRKCOUNT"
,"HHRESP"
,"LIF_CYC"
,"MSACAT"
,"MSASIZE"
,"RAIL"
,"URBAN"
,"URBANSIZE"
,"SCRESP"
,"CENSUS_D"
,"CENSUS_R"
,"HH_RACE"
,"HH_HISP"
,"RESP_CNT"
,"WEBUSE17"
,"SMPLSRCE"
,"HBHUR"
,"HTHTNRNT"
,"HTPPOPDN"
,"HTRESDN"
,"HTEEMPDN"
,"HBHTNRNT"
,"HBPPOPDN"
,"HBRESDN"
,"VEHYEAR"
,"MAKE"
,"MODEL"
,"FUELTYPE"
,"VEHTYPE"
,"WHOMAIN"
,"OD_READ"
,"VEHOWNED"
,"ANNMILES"
,"HYBRID"
,"BEST_FLG"
,"TDTRPNUM"
,"TRPACCMP"
,"TRPHHACC"
,"WHODROVE"
,"WHYFROM"
,"LOOP_TRIP"
,"TRPHHVEH"
,"HHMEMDRV"
,"HH_ONTD"
,"NONHHCNT"
,"PSGR_FLG"
,"PUBTRANS"
,"TRIPPURP"
,"DWELTIME"
,"TDWKND"
,"VMT_MILE"
,"ONTD_P1"
,"ONTD_P2"
,"ONTD_P3"
,"WHYTO"
,"WHYTRP90"
,"HH_CBSA"
,"ALT_16"
,"ALT_23"
,"ALT_45"
,"NWALKTRP"
,"NBIKETRP"
,"WALK2SAVE"
,"BIKE2SAVE"
,"PRMACT"
,"NUMONTRP"
,"ENDTIME"]


In [35]:
print(len(intuitrmved))
print(len(redmergeddf.columns ))


In [36]:
final_df = reduce(DataFrame.drop, intuitrmved, redmergeddf)

In [37]:
print(len(final_df.columns))
final_df.printSchema()

In [38]:
#Select only rows with trips of travel where CAR, SUV, Van, Pickup Truck
#Select only rows where travel mins is known
#Consider rows where respondent is the one who was on trip
cardf = final_df.where((fn.col('TRPTRANS').isin({3,4,5,6})) & (fn.col('TRVLCMIN') > 0) & (fn.col('DRVR_FLG') == 1) & (fn.col('TRPMILES') < 30) & (fn.col('TRPMILES') >= 0) & (fn.col('TRVLCMIN') < 50))
print(cardf.count(), len(cardf.columns))


In [39]:
b = ['TRPTRANS','DRVR_FLG']
cardf = reduce(DataFrame.drop, b, cardf)
print(cardf.count(), len(cardf.columns))
cardf.printSchema()

In [40]:
#Code to replace negative values in categorical variables

a = cardf.columns
b = ['TDAYDATE', 'URBRUR', 'CDIVMSAR', 'STRTTIME', 'TRPTRANS', 'WHYTRP1S', 'GASPRICE']
#print(len(a))
c = list(set(a) - set(b))
#print(len(c))

for i in c:
  cardf = cardf.withColumn(i, when(fn.col(i) < 0, lit(-1)).otherwise(fn.col(i)))

In [41]:
#display(cardf)
cardf.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").mode('overwrite').save("dbfs:/FileStore/tables/finalproject.csv")

# Feature Selection

In [43]:
# File location and type
file_location = "/FileStore/tables/finalproject.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
cardf = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

In [44]:
training_df, validation_df, testing_df = cardf.randomSplit([0.6, 0.3, 0.1], )

[training_df.count(), validation_df.count(), testing_df.count()]

In [45]:
pipe_lr = Pipeline(stages=[StringIndexer(inputCol='R_HISP',outputCol='R_HISP_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           StringIndexer(inputCol='R_SEX',outputCol='R_SEX_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           StringIndexer(inputCol='R_RACE',outputCol='R_RACE_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           StringIndexer(inputCol='MEDCOND',outputCol='MEDCOND_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           StringIndexer(inputCol='PHYACT',outputCol='PHYACT_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           StringIndexer(inputCol='TRAVDAY',outputCol='TRAVDAY_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           StringIndexer(inputCol='WALK',outputCol='WALK_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           StringIndexer(inputCol='BIKE',outputCol='BIKE_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           StringIndexer(inputCol='CAR',outputCol='CAR_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           StringIndexer(inputCol='TAXI',outputCol='TAXI_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           StringIndexer(inputCol='BUS',outputCol='BUS_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           StringIndexer(inputCol='TRAIN',outputCol='TRAIN_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           StringIndexer(inputCol='PLACE',outputCol='PLACE_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           StringIndexer(inputCol='PTRANS',outputCol='PTRANS_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           StringIndexer(inputCol='URBRUR',outputCol='URBRUR_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           StringIndexer(inputCol='CDIVMSAR',outputCol='CDIVMSAR_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           StringIndexer(inputCol='WHYTRP1S',outputCol='WHYTRP1S_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           OneHotEncoder(inputCol='R_HISP_str',outputCol='R_HISP_ohe',dropLast=False),
                           OneHotEncoder(inputCol='R_SEX_str',outputCol='R_SEX_ohe',dropLast=False),
                           OneHotEncoder(inputCol='R_RACE_str',outputCol='R_RACE_ohe',dropLast=False),
                           OneHotEncoder(inputCol='MEDCOND_str',outputCol='MEDCOND_ohe',dropLast=False),
                           OneHotEncoder(inputCol='PHYACT_str',outputCol='PHYACT_ohe',dropLast=False),
                           OneHotEncoder(inputCol='TRAVDAY_str',outputCol='TRAVDAY_ohe',dropLast=False),
                           OneHotEncoder(inputCol='WALK_str',outputCol='WALK_ohe',dropLast=False),
                           OneHotEncoder(inputCol='BIKE_str',outputCol='BIKE_ohe',dropLast=False),
                           OneHotEncoder(inputCol='CAR_str',outputCol='CAR_ohe',dropLast=False),
                           OneHotEncoder(inputCol='TAXI_str',outputCol='TAXI_ohe',dropLast=False),
                           OneHotEncoder(inputCol='BUS_str',outputCol='BUS_ohe',dropLast=False),
                           OneHotEncoder(inputCol='TRAIN_str',outputCol='TRAIN_ohe',dropLast=False),
                           OneHotEncoder(inputCol='PLACE_str',outputCol='PLACE_ohe',dropLast=False),
                           OneHotEncoder(inputCol='PTRANS_str',outputCol='PTRANS_ohe',dropLast=False),
                           OneHotEncoder(inputCol='URBRUR_str',outputCol='URBRUR_ohe',dropLast=False),
                           OneHotEncoder(inputCol='CDIVMSAR_str',outputCol='CDIVMSAR_ohe',dropLast=False),
                           OneHotEncoder(inputCol='WHYTRP1S_str',outputCol='WHYTRP1S_ohe',dropLast=False),
                           feature.VectorAssembler(inputCols=['R_AGE', 'PTUSED', 'CARSHARE','RIDESHARE', 'TDAYDATE','VEHAGE','BESTMILE','STRTTIME',
                                                              'TRPMILES', 'GASPRICE']
                                                   ,outputCol='num_features'),
                           feature.StandardScaler(withMean=True,inputCol='num_features', outputCol='zfeatures'),
                           feature.VectorAssembler(inputCols=['R_HISP_ohe','R_SEX_ohe', 'R_RACE_ohe', 'MEDCOND_ohe','PHYACT_ohe', 'TRAVDAY_ohe', 'WALK_ohe', 
                                                              'BIKE_ohe',  'CAR_ohe', 'TAXI_ohe', 'BUS_ohe', 'TRAIN_ohe', 'PLACE_ohe','PTRANS_ohe', 'URBRUR_ohe',
                                                              'CDIVMSAR_ohe', 'WHYTRP1S_ohe', 'zfeatures'
                                                             ],
                                                   outputCol='features')
                           ])

In [46]:
lr = regression.LinearRegression(featuresCol='features', labelCol='TRVLCMIN')
pipe1_model = Pipeline(stages=[pipe_lr, lr]).fit(training_df)

In [47]:
rmse = fn.sqrt(fn.mean((fn.col('TRVLCMIN') - fn.col('prediction'))**2)).alias('rmse')

In [48]:
pipe1_model.transform(training_df).select(rmse).show()
pipe1_model.transform(validation_df).select(rmse).show()
pipe1_model.transform(testing_df).select(rmse).show()

In [49]:
pipe1_model.stages[-1].coefficients

In [50]:
#Elastic Net Regression

lambda_par = 0.02
alpha_par = 0.3
en_lr = LinearRegression().\
        setLabelCol('TRVLCMIN').\
        setFeaturesCol('features').\
        setRegParam(lambda_par).\
        setMaxIter(100).\
        setElasticNetParam(alpha_par)

In [51]:
en_lr_estimator = Pipeline(stages=[pipe_lr, en_lr])

In [52]:
grid = ParamGridBuilder().\
    addGrid(en_lr.regParam, [0., 0.01, 0.02]).\
    addGrid(en_lr.elasticNetParam, [0., 0.2, 0.4]).\
    build()

In [53]:
grid

In [54]:
all_models = []
for j in range(len(grid)):
    print("Fitting model {}".format(j+1))
    model = en_lr_estimator.fit(training_df, grid[j])
    all_models.append(model)

In [55]:
t1 = all_models[0].transform(validation_df).select(rmse).show()
t2 = all_models[1].transform(validation_df).select(rmse).show()
t3 = all_models[2].transform(validation_df).select(rmse).show()
t4 = all_models[3].transform(validation_df).select(rmse).show()
t5 = all_models[4].transform(validation_df).select(rmse).show()
t6= all_models[5].transform(validation_df).select(rmse).show()
t7 = all_models[6].transform(validation_df).select(rmse).show()
t8 = all_models[7].transform(validation_df).select(rmse).show()
t9 = all_models[8].transform(validation_df).select(rmse).show()

In [56]:
best_model = all_models[8]

In [57]:
best_model.stages[-1].coefficients

In [58]:
features = best_model.stages[0].stages[-3].getInputCols()

for i in range(17):
  for j in range(len(best_model.stages[0].stages[i].labels)):
    a = best_model.stages[0].stages[i].getInputCol()+best_model.stages[0].stages[i].labels[j]
    features.append(a)

In [59]:
featureimp = pd.DataFrame(list(zip(features,best_model.stages[-1].coefficients.toArray())),columns = ['features', 'weights']).sort_values('weights', ascending = False)

In [60]:
featureimp

Unnamed: 0,features,weights
124,WHYTRP1S10,1.368506
109,CDIVMSAR63,1.287555
53,CAR3,1.146040
65,BUS3,0.750252
97,CDIVMSAR32,0.701699
10,R_HISP-1,0.631274
117,CDIVMSAR83,0.599482
11,R_HISP1,0.587347
57,TAXI1,0.565048
101,CDIVMSAR42,0.543750


In [61]:
#Visualizations
plt.style.use('ggplot')
rc('figure', figsize=(20,10))
fig, ax = plt.subplots()

featureimp.plot.bar(x = 'features', y = 'weights', ax = ax)
ax.grid(False)
ax.set_facecolor((1, 1, 1))
ax.spines['bottom'].set_color('black')
ax.spines['left'].set_color('black')
fig.suptitle('Elastic Net Regularization', fontsize=15)
plt.ylabel("Feature importance")
ax.legend_.remove()
display(plt.show())

In [62]:
pipe_final = Pipeline(stages=[
                           StringIndexer(inputCol='TRAVDAY',outputCol='TRAVDAY_str',stringOrderType='alphabetAsc', handleInvalid="keep"), 
                           StringIndexer(inputCol='URBRUR',outputCol='URBRUR_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           StringIndexer(inputCol='CDIVMSAR',outputCol='CDIVMSAR_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           StringIndexer(inputCol='WHYTRP1S',outputCol='WHYTRP1S_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           OneHotEncoder(inputCol='TRAVDAY_str',outputCol='TRAVDAY_ohe',dropLast=False),
                           OneHotEncoder(inputCol='URBRUR_str',outputCol='URBRUR_ohe',dropLast=False),
                           OneHotEncoder(inputCol='CDIVMSAR_str',outputCol='CDIVMSAR_ohe',dropLast=False),
                           OneHotEncoder(inputCol='WHYTRP1S_str',outputCol='WHYTRP1S_ohe',dropLast=False),
                           feature.VectorAssembler(inputCols=['R_AGE', 'STRTTIME', 'TRPMILES' ],
                                                   outputCol='num_features'),
                           feature.StandardScaler(withMean=True,inputCol='num_features', 
                                                  outputCol='zfeatures'),
                           feature.VectorAssembler(inputCols=['TRAVDAY_ohe', 'URBRUR_ohe','CDIVMSAR_ohe', 'WHYTRP1S_ohe', 'zfeatures'],
                                                   outputCol='features')
                           ])

In [63]:
lr = regression.LinearRegression(featuresCol='features', labelCol='TRVLCMIN')
pipelr_model = Pipeline(stages=[pipe_final, lr]).fit(training_df)

In [64]:
pipelr_model.transform(training_df).select(rmse).show()
pipelr_model.transform(validation_df).select(rmse).show()
pipelr_model.transform(testing_df).select(rmse).show()

In [65]:
pipe_all = Pipeline(stages=[StringIndexer(inputCol='R_HISP',outputCol='R_HISP_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           StringIndexer(inputCol='R_SEX',outputCol='R_SEX_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           StringIndexer(inputCol='R_RACE',outputCol='R_RACE_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           StringIndexer(inputCol='MEDCOND',outputCol='MEDCOND_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           StringIndexer(inputCol='PHYACT',outputCol='PHYACT_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           StringIndexer(inputCol='TRAVDAY',outputCol='TRAVDAY_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           StringIndexer(inputCol='WALK',outputCol='WALK_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           StringIndexer(inputCol='BIKE',outputCol='BIKE_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           StringIndexer(inputCol='CAR',outputCol='CAR_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           StringIndexer(inputCol='TAXI',outputCol='TAXI_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           StringIndexer(inputCol='BUS',outputCol='BUS_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           StringIndexer(inputCol='TRAIN',outputCol='TRAIN_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           StringIndexer(inputCol='PLACE',outputCol='PLACE_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           StringIndexer(inputCol='PTRANS',outputCol='PTRANS_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           StringIndexer(inputCol='URBRUR',outputCol='URBRUR_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           StringIndexer(inputCol='CDIVMSAR',outputCol='CDIVMSAR_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           StringIndexer(inputCol='WHYTRP1S',outputCol='WHYTRP1S_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           feature.VectorAssembler(inputCols=['R_HISP_str','R_SEX_str', 'R_RACE_str', 'MEDCOND_str','PHYACT_str', 'TRAVDAY_str', 'WALK_str', 
                                                              'BIKE_str',  'CAR_str', 'TAXI_str', 'BUS_str', 'TRAIN_str', 'PLACE_str', 'PTRANS_str', 'URBRUR_str',
                                                              'CDIVMSAR_str', 'WHYTRP1S_str','R_AGE', 'PTUSED', 'CARSHARE','RIDESHARE',
                                                              'TDAYDATE','VEHAGE','BESTMILE','STRTTIME','TRPMILES', 'GASPRICE'], outputCol='features')])

In [66]:
rf_all = RandomForestRegressor(featuresCol="features", labelCol="TRVLCMIN",maxDepth = 7,numTrees=500,maxBins=36)
pipe_rf_all = Pipeline(stages=[pipe_all,rf_all]).fit(training_df)

In [67]:
#rmse on all features 
evaluator = evaluation.RegressionEvaluator(labelCol='TRVLCMIN', metricName='rmse')
rf_rmse_val = evaluator.evaluate(pipe_rf_all.transform(validation_df))
print(rf_rmse_val) 

In [68]:
model_rf_all = pipe_rf_all.stages[-1]
feature_importance= pd.DataFrame(list(zip(['R_HISP_str','R_SEX_str', 'R_RACE_str', 'MEDCOND_str','PHYACT_str', 'TRAVDAY_str', 'WALK_str','BIKE_str', 'CAR_str',
                                           'TAXI_str', 'BUS_str', 'TRAIN_str', 'PLACE_str', 'PTRANS_str', 'URBRUR_str','CDIVMSAR_str', 'WHYTRP1S_str','R_AGE',
                                           'PTUSED', 'CARSHARE','RIDESHARE','TDAYDATE','VEHAGE','BESTMILE','STRTTIME','TRPMILES', 'GASPRICE'],
                                          model_rf_all.featureImportances.toArray())),
                                 columns = ['feature', 'importance']).sort_values('importance', ascending = False)

In [69]:
print(feature_importance)

In [70]:
plt.style.use('ggplot')
rc('figure', figsize=(20,10))
fig, ax = plt.subplots()

pd.DataFrame(list(zip(['R_HISP_str','R_SEX_str', 'R_RACE_str', 'MEDCOND_str','PHYACT_str', 'TRAVDAY_str', 'WALK_str','BIKE_str',  'CAR_str', 'TAXI_str', 'BUS_str',
                       'TRAIN_str', 'PLACE_str', 'PTRANS_str', 'URBRUR_str','CDIVMSAR_str', 'WHYTRP1S_str','R_AGE', 'PTUSED','CARSHARE','RIDESHARE',
                       'TDAYDATE','VEHAGE','BESTMILE','STRTTIME','TRPMILES', 'GASPRICE'], 
                      model_rf_all.featureImportances.toArray())),
             columns = ['feature', 'importance']).sort_values('importance', ascending = False).set_index('feature')[0:29].plot(kind = 'bar', ax = ax)

ax.grid(False)
ax.set_facecolor((0.91,0.41,0.17))
ax.spines['bottom'].set_color('black')
ax.spines['left'].set_color('black')
fig.suptitle('Top 30 Feature Importances', fontsize=15)
plt.ylabel("Feature Importance")
ax.legend_.remove()
display(plt.show())

# Regression Models to Predict Commute Time

In [71]:
pipe = Pipeline(stages=[StringIndexer(inputCol='URBRUR',outputCol='URBRUR_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           StringIndexer(inputCol='TRAVDAY',outputCol='TRAVDAY_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           StringIndexer(inputCol='CDIVMSAR',outputCol='CDIVMSAR_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           StringIndexer(inputCol='WHYTRP1S',outputCol='WHYTRP1S_str',stringOrderType='alphabetAsc', handleInvalid="keep"),
                           feature.VectorAssembler(inputCols=['URBRUR_str','R_AGE','TRAVDAY_str','CDIVMSAR_str', 'WHYTRP1S_str','STRTTIME','TRPMILES'], outputCol='features')]) 

In [72]:
rf_sel = RandomForestRegressor(featuresCol="features", labelCol="TRVLCMIN",maxDepth = 7,numTrees=500,maxBins=36)
pipe_rf = Pipeline(stages=[pipe,rf_sel]).fit(training_df)

In [73]:
evaluator = evaluation.RegressionEvaluator(labelCol='TRVLCMIN', metricName='rmse')

In [74]:
#Running the with cross validation
pipe_rf = Pipeline(stages=[pipe,rf_sel])

# Number of trees in random forest
n_estimators = [300,400,500,600]

# Maximum number of levels in tree
max_depth = [7,8,9]

tfparamgrid = {'n_estimators': n_estimators,
               'max_depth': max_depth}

rfcv = CrossValidator(estimator = pipe_rf,
                      estimatorParamMaps = rfparamGrid,
                      evaluator =evaluator,
                      numFolds = 10)
rfcvModel = rfcv.fit(training_df)
print(rfcvModel)
# Use test set here so we can measu the accuracy of our model on new data
rfpredictions = rfcvModel.transform(testing_df)
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
print('RMSE:', rfevaluator.evaluate(rfpredictions))

In [75]:
#rrmse on all features 

rf_rmse = evaluator.evaluate(pipe_rf.transform(validation_df))
print(rf_rmse)

In [76]:
#Random Forest for 300 trees & Depth 7
rf_300_7 = RandomForestRegressor(featuresCol="features", labelCol="TRVLCMIN",maxDepth = 7,numTrees=300,maxBins=36)
pipe_rf_300 = Pipeline(stages=[pipe,rf_300_7]).fit(training_df)

rf_300_7_rmse = evaluator.evaluate(pipe_rf_300.transform(validation_df)) 
print(rf_300_7_rmse)

In [77]:
#Random Forest for 300 trees & Depth 8
rf_300 = RandomForestRegressor(featuresCol="features", labelCol="TRVLCMIN",maxDepth = 8,numTrees=300,maxBins=36)
pipe_rf_300 = Pipeline(stages=[pipe,rf_300]).fit(training_df)

rf_300_rmse = evaluator.evaluate(pipe_rf_300.transform(validation_df)) 
print(rf_300_rmse)

In [78]:
#Random Forest for 300 trees & Depth 9
rf_300_9 = RandomForestRegressor(featuresCol="features", labelCol="TRVLCMIN",maxDepth = 9,numTrees=300,maxBins=36)
pipe_rf_300 = Pipeline(stages=[pipe,rf_300_9]).fit(training_df)

rf_300_9_rmse = evaluator.evaluate(pipe_rf_300.transform(validation_df)) 
print(rf_300_9_rmse)

In [79]:
#Random Forest for 400 trees & Depth 7
rf_400 = RandomForestRegressor(featuresCol="features", labelCol="TRVLCMIN",maxDepth = 7,numTrees=400,maxBins=36)
pipe_rf_400 = Pipeline(stages=[pipe,rf_400]).fit(training_df)

rf_400_rmse = evaluator.evaluate(pipe_rf_400.transform(validation_df)) 
print(rf_400_rmse)

In [80]:
#Random Forest for 400 trees & Depth 8
rf_400_8 = RandomForestRegressor(featuresCol="features", labelCol="TRVLCMIN",maxDepth = 8,numTrees=400,maxBins=36)
pipe_rf_400 = Pipeline(stages=[pipe,rf_400_8]).fit(training_df)

rf_400_8_rmse = evaluator.evaluate(pipe_rf_400.transform(validation_df)) 
print(rf_400_8_rmse)

In [81]:
#Random Forest for 400 trees & Depth 9
rf_400_9 = RandomForestRegressor(featuresCol="features", labelCol="TRVLCMIN",maxDepth = 9,numTrees=400,maxBins=36)
pipe_rf_400 = Pipeline(stages=[pipe,rf_400_9]).fit(training_df)

rf_400_9_rmse = evaluator.evaluate(pipe_rf_400.transform(validation_df)) 
print(rf_400_9_rmse)

In [82]:
#Random Forest for 500 trees & Depth 7
rf_500_7 = RandomForestRegressor(featuresCol="features", labelCol="TRVLCMIN",maxDepth = 7,numTrees=500,maxBins=36)
pipe_rf_500 = Pipeline(stages=[pipe,rf_500_7]).fit(training_df)

rf_500_7_rmse = evaluator.evaluate(pipe_rf_500.transform(validation_df)) 
print(rf_500_7_rmse)

In [83]:
#Random Forest for 500 trees & Depth 8
rf_500_8 = RandomForestRegressor(featuresCol="features", labelCol="TRVLCMIN",maxDepth = 8,numTrees=500,maxBins=36)
pipe_rf_500 = Pipeline(stages=[pipe,rf_500_8]).fit(training_df)

rf_500_8_rmse = evaluator.evaluate(pipe_rf_500.transform(validation_df)) 
print(rf_500_8_rmse)

In [84]:
#Random Forest for 500 trees & Depth 9 
rf_500_9 = RandomForestRegressor(featuresCol="features", labelCol="TRVLCMIN",maxDepth = 9,numTrees=500,maxBins=36)
pipe_rf_500 = Pipeline(stages=[pipe,rf_500_9]).fit(training_df)

rf_500_9_rmse = evaluator.evaluate(pipe_rf_500.transform(validation_df)) 
print(rf_500_9_rmse)

In [85]:
#Random Forest for 600 trees & Depth 7 
rf_600_7 = RandomForestRegressor(featuresCol="features", labelCol="TRVLCMIN",maxDepth = 7,numTrees=600,maxBins=36)
pipe_rf_600 = Pipeline(stages=[pipe,rf_600_7]).fit(training_df)

rf_600_7_rmse = evaluator.evaluate(pipe_rf_600.transform(validation_df)) 
print(rf_600_7_rmse)

In [86]:
#Random Forest for 600 trees & Depth 8 
rf_600_8 = RandomForestRegressor(featuresCol="features", labelCol="TRVLCMIN",maxDepth = 8,numTrees=600,maxBins=36)
pipe_rf_600 = Pipeline(stages=[pipe,rf_600_8]).fit(training_df)

rf_600_8_rmse = evaluator.evaluate(pipe_rf_600.transform(validation_df)) 
print(rf_600_8_rmse)

In [87]:
#Random Forest for 600 Trees & Depth 9 
rf_600_9 = RandomForestRegressor(featuresCol="features", labelCol="TRVLCMIN",maxDepth = 9,numTrees=600,maxBins=36)
pipe_rf_600 = Pipeline(stages=[pipe,rf_600_9]).fit(training_df)

rf_600_9_rmse = evaluator.evaluate(pipe_rf_600.transform(validation_df)) 
print(rf_600_9_rmse)

In [88]:
#GBT with 50 Trees and Depth 1
gbt_50_1 =GBTRegressor(featuresCol='features', labelCol='TRVLCMIN',maxBins=36,maxIter=50,maxDepth=1)
pipe_gbt_50_1 = Pipeline(stages=[pipe,gbt_50_1]).fit(training_df)

gbt_50_1_rmse = evaluator.evaluate(pipe_gbt_50_1.transform(validation_df)) 
print(gbt_50_1_rmse)

In [89]:
#GBT with 50 Trees and Depth 2
gbt_50_2 =GBTRegressor(featuresCol='features', labelCol='TRVLCMIN',maxBins=36,maxIter=50,maxDepth=2)
pipe_gbt_50_2 = Pipeline(stages=[pipe,gbt_50_2]).fit(training_df)

gbt_50_2_rmse = evaluator.evaluate(pipe_gbt_50_2.transform(validation_df)) 
print(gbt_50_2_rmse)

In [90]:
#GBT with 50 Trees and Depth 5
gbt_50_5 =GBTRegressor(featuresCol='features', labelCol='TRVLCMIN',maxBins=36,maxIter=50,maxDepth=5)
pipe_gbt_50_5 = Pipeline(stages=[pipe,gbt_50_5]).fit(training_df)

gbt_50_5_rmse = evaluator.evaluate(pipe_gbt_50_5.transform(validation_df)) 
print(gbt_50_5_rmse)

In [91]:
#GBT with 100 Trees and Depth 1
gbt_100_1 =GBTRegressor(featuresCol='features', labelCol='TRVLCMIN',maxBins=36,maxIter=100,maxDepth=1)
pipe_gbt_100_1 = Pipeline(stages=[pipe,gbt_100_1]).fit(training_df)

gbt_100_1_rmse = evaluator.evaluate(pipe_gbt_100_1.transform(validation_df)) 
print(gbt_100_1_rmse)

In [92]:
#GBT with 100 Trees and Depth 2
gbt_100_2 =GBTRegressor(featuresCol='features', labelCol='TRVLCMIN',maxBins=36,maxIter=100,maxDepth=2)
pipe_gbt_100_2 = Pipeline(stages=[pipe,gbt_100_2]).fit(training_df)

gbt_100_2_rmse = evaluator.evaluate(pipe_gbt_100_2.transform(validation_df)) 
print(gbt_100_2_rmse)

In [93]:
#GBT with 100 Trees and Depth 5
gbt_100_5 =GBTRegressor(featuresCol='features', labelCol='TRVLCMIN',maxBins=36,maxIter=100,maxDepth=5)
pipe_gbt_100_5 = Pipeline(stages=[pipe,gbt_100_5]).fit(training_df)

gbt_100_5_rmse = evaluator.evaluate(pipe_gbt_100_5.transform(validation_df)) 
print(gbt_100_5_rmse)

In [94]:
#GBT with 200 Trees and Depth 1
gbt_200_1 =GBTRegressor(featuresCol='features', labelCol='TRVLCMIN',maxBins=36,maxIter=200,maxDepth=1)
pipe_gbt_200_1 = Pipeline(stages=[pipe,gbt_200_1]).fit(training_df)

gbt_200_1_rmse = evaluator.evaluate(pipe_gbt_200_1.transform(validation_df)) 
print(gbt_200_1_rmse)

In [95]:
#GBT with 200 Trees and Depth 2
gbt_200_2 =GBTRegressor(featuresCol='features', labelCol='TRVLCMIN',maxBins=36,maxIter=200,maxDepth=2)
pipe_gbt_200_2 = Pipeline(stages=[pipe,gbt_200_2]).fit(training_df)

gbt_200_2_rmse = evaluator.evaluate(pipe_gbt_200_2.transform(validation_df)) 
print(gbt_200_2_rmse)

In [96]:
#GBT with 200 Trees and Depth 5
gbt_200_5 =GBTRegressor(featuresCol='features', labelCol='TRVLCMIN',maxBins=36,maxIter=200,maxDepth=5)
pipe_gbt_200_5 = Pipeline(stages=[pipe,gbt_200_5]).fit(training_df)

gbt_200_5_rmse = evaluator.evaluate(pipe_gbt_200_5.transform(validation_df)) 
print(gbt_200_5_rmse)

In [97]:
#Best Model is GBT with Number of Trees = 100 and Depth = 5

best_model = pipe_gbt_100_5
best_model_rmse = evaluator.evaluate(best_model.transform(testing_df)) 
print(best_model_rmse)