<a href="https://colab.research.google.com/github/baschram/bda-718-group-1/blob/master/Fleet%20Logistic%20Regression%20Analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
%%bash
# Need to install pyspark
# if pyspark is already installed, will print a message indicating pyspark already isntalled
pip install pyspark
pip install seaborn

Collecting pyspark
  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
Collecting py4j==0.10.9
  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py): started
  Building wheel for pyspark (setup.py): finished with status 'done'
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=6c01832a1d0ca7025825d4ae7dfbda9f74f013b443e449c05339e394a43c6fb4
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


In [2]:
%matplotlib inline
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.ml import feature
import re
from pyspark.sql import functions as fn
from pyspark.sql.functions import col, regexp_replace, split
from pyspark.sql.utils import AnalysisException
from pyspark.sql import Row
from pyspark.sql.types import IntegerType, DateType, FloatType, BooleanType
import os
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

spark = SparkSession \
  .builder \
  .master("local[*]")\
  .config("spark.memory.fraction", 0.8) \
  .config("spark.executor.memory", "12g") \
  .config("spark.driver.memory", "12g")\
  .config("spark.memory.offHeap.enabled",'true')\
  .config("spark.memory.offHeap.size","12g")\
  .getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)

from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
sampled_df = spark.read.format("csv").option("header", "true").option('inferSchema','true').load("drive/Shared drives/IST 718 Group1/sampled_used_cars_data_nodesc.csv")

# EDA: Duplicate Data Removal
sampled_df = sampled_df.distinct()

# Drop the index column
# Drop exterior color ,power, and wheel system displacement (redundant columns)
# Drop Bed height (no values)
sampled_df = sampled_df.drop(*['power','main_picture_url',
                               'combine_fuel_economy', 'listing_id',
                               'vehicle_damage_category','trimId',
                               'trim_name','exterior_color','wheel_system_display','bed_height','power','is_certified'])

# Generate of list of column names for future use
colList = sampled_df.columns
numColList = []
catColList = []

#Needs to occur prior to data type casting as the loop will convert columns back to strings
for x in sampled_df.columns:
  sampled_df = sampled_df.withColumn(x, regexp_replace(col(x),r'--',''))
  sampled_df = sampled_df.withColumn(x, regexp_replace(col(x),r'None',''))

#Column cleaning and type casting

#Function to check if column exists or not
def has_column(df, col):
    try:
        df[col]
        return True
    except AnalysisException:
        return False

sampled_df = sampled_df.withColumn('back_legroom',
                                   regexp_replace(col('back_legroom'),r'\sin',''))
sampled_df = sampled_df.withColumn('front_legroom',
                                   regexp_replace(col('front_legroom'),r'\sin',''))
sampled_df = sampled_df.withColumn('front_legroom',fn.translate('front_legroom','0',''))
sampled_df = sampled_df.withColumn('height',
                                   regexp_replace(col('height'),r'\sin',''))
sampled_df = sampled_df.withColumn('length',
                                   regexp_replace(col('length'),r'\sin',''))
sampled_df = sampled_df.withColumn('wheelbase',
                                   regexp_replace(col('wheelbase'),r'\sin',''))
sampled_df = sampled_df.withColumn('width',
                                   regexp_replace(col('width'),r'\sin',''))
sampled_df = sampled_df.withColumn('bed_length',
                                   regexp_replace(col('bed_length'),r'\sin',''))
sampled_df = sampled_df.withColumn('engine_cylinders',
                                   regexp_replace(col('engine_cylinders'),'[a-zA-Z]',''))
#Remove all digits from engine type
sampled_df = sampled_df.withColumn('engine_type',
                                   regexp_replace(col('engine_type'),r'\d',''))
#Remove all alphabet characters following space
sampled_df = sampled_df.withColumn('engine_type',
                                   regexp_replace(col('engine_type'),r'\s[a-zA-Z]*',''))
sampled_df = sampled_df.withColumn('maximum_seating',
                                   regexp_replace(col('maximum_seating'),r'\sseats',''))
sampled_df = sampled_df.withColumn('fuel_tank_volume',
                                   regexp_replace(col('fuel_tank_volume'),r'\sgal',''))
sampled_df = sampled_df.withColumn('major_options',
                                   regexp_replace(col('major_options'),r"[\[]'",''))
sampled_df = sampled_df.withColumn('major_options',
                                   regexp_replace(col('major_options'),r"'[\]]",''))
sampled_df = sampled_df.withColumn('major_options',
                                   regexp_replace(col('major_options'),r"'",''))
sampled_df = sampled_df.withColumn('transmission_display',
                                   regexp_replace(col('transmission_display'),r"\D*",''))

sampled_df = sampled_df.withColumn('city',fn.lower(fn.col('city')))
sampled_df = sampled_df.withColumn('interior_color',fn.lower(fn.col('interior_color')))
sampled_df = sampled_df.withColumn('listing_color',fn.lower(fn.col('listing_color')))
sampled_df = sampled_df.withColumn('make_name',fn.lower(fn.col('make_name')))
sampled_df = sampled_df.withColumn('model_name',fn.lower(fn.col('model_name')))
sampled_df = sampled_df.withColumn('sp_name',fn.lower(fn.col('sp_name')))

#Decompose Torque column 
if has_column(sampled_df, 'torque'):
  split_torque = split(sampled_df['torque'],r'\slb-ft\s@')
  sampled_df = sampled_df.withColumn('torque_ftlb',split_torque.getItem(0))
  sampled_df = sampled_df.withColumn('torque_rpm',split_torque.getItem(1))
  #Drop now redundant torque column
  sampled_df = sampled_df.drop('torque')

#Remove all non digit charaters
sampled_df = sampled_df.withColumn('torque_rpm',
                                   regexp_replace(col('torque_rpm'),'\D*',''))
#Calculate new torque grade column
sampled_df = sampled_df.withColumn('torque_grade',
                                   sampled_df.torque_ftlb/sampled_df.torque_rpm)


#Recast column data types


#Integer type loop
integer_cols = ['engine_displacement','engine_cylinders','daysonmarket','horsepower','maximum_seating','mileage','owner_count','savings_amount','transmission_display','year','torque_ftlb','torque_rpm']
for i in integer_cols:
  sampled_df = sampled_df.withColumn(i, sampled_df[i].cast(IntegerType()))

#FloatType loop
float_cols = ['back_legroom','front_legroom','city_fuel_economy','bed_length','fuel_tank_volume','latitude','height','highway_fuel_economy','length','longitude','price','seller_rating','wheelbase','width','torque_grade']
for i in float_cols:
  sampled_df = sampled_df.withColumn(i, sampled_df[i].cast(FloatType()))

#DateType Loop
date_cols = ['listed_date']
for i in date_cols:
  sampled_df = sampled_df.withColumn(i, sampled_df[i].cast(DateType()))

#Boolean Loop
bool_cols = ['fleet','frame_damaged','franchise_dealer','has_accidents','isCab','is_cpo','is_new','is_oemcpo','salvage','theft_title']
for i in bool_cols:
  sampled_df = sampled_df.withColumn(i, sampled_df[i].cast(BooleanType()))

#NA Handeling

#Set empty booleans values to false (assume no report = negative condition)
sampled_df = sampled_df.na.fill(False, bool_cols)

sampled_df = sampled_df.na.fill(0, "back_legroom")
sampled_df = sampled_df.na.fill('Gasoline', "fuel_type")
sampled_df = sampled_df.na.fill('SUV / Crossover', "body_type")
sampled_df = sampled_df.na.fill(' ', "major_options")


sampled_df = sampled_df.na.fill('A', "transmission")
#Consolidate Dual clutch transmissions into automatic category
sampled_df = sampled_df.withColumn('transmission',fn.when(col('transmission') == 'Dual Clutch','A').otherwise(col('transmission')))
    #fn.translate('transmission',"Dual Clutch",'A'))

sampled_df = sampled_df.na.fill('FWD', "wheel_system")
#Consolidate Wheel system type 4x2 in RWD drive category
sampled_df = sampled_df.withColumn('wheel_system',fn.translate('wheel_system',"4X2","RWD"))

#Use Spark imputer transform to replace NAs with median values of specified columns
median_replace_col = ['owner_count','engine_cylinders','engine_displacement','horsepower','maximum_seating','mileage','transmission_display','torque_ftlb','torque_rpm','torque_grade']
na_replace_with_median = feature.Imputer(strategy='median', inputCols=median_replace_col, outputCols=median_replace_col).fit(sampled_df)
sampled_df = na_replace_with_median.transform(sampled_df)

#Use Spark imputer transform to replace NAs with mean values of specified columns
mean_replace_col = ['city_fuel_economy','front_legroom','fuel_tank_volume','highway_fuel_economy','seller_rating','wheelbase','height','length','width']
na_replace_with_mean = feature.Imputer(strategy='mean', inputCols=mean_replace_col, outputCols=mean_replace_col).fit(sampled_df)
sampled_df = na_replace_with_mean.transform(sampled_df)

#Conditional engine type fill if cylinders =6-12 then v type, if < 6 I type
sampled_df = sampled_df.withColumn('engine_type',fn.when(col('engine_cylinders').isin([6,8,10,12]) & fn.isnull(col('engine_type')),'V').otherwise(col('engine_type')))
sampled_df = sampled_df.withColumn('engine_type',fn.when((col('engine_cylinders') < 6) & fn.isnull(col('engine_type')),'I').otherwise(col('engine_type')))
#Any remaining engine_type NAs get filled in with I
sampled_df = sampled_df.na.fill('I', "engine_type")



In [27]:
df_agg = logreg_sampled_df.agg(*[fn.count(fn.when(fn.isnull(c), c)).alias(c) for c in logreg_sampled_df.columns])
df_agg.show()

+------------+------+----------+---------+------+----+-----------------+------------+----------+----------------+-------------------+-----------+------+-------------+----------------+--------------+-------------+----------------+---------+-------------+------+--------------------+----------+--------------+------+------+------+---------+--------+------+-----------+-------------+---------+-------------+---------+---------------+-------+----------+-----------+-----+-------+--------------+-------------+-----+-------+-----------+------------+--------------------+------------+---------+-----+----+-----------+----------+------------+---------+----------------+---------------+--------------+-----------+-----------+------+-------------+--------------+-----------+------------+
|back_legroom|   bed|bed_length|body_type| cabin|city|city_fuel_economy|daysonmarket|dealer_zip|engine_cylinders|engine_displacement|engine_type| fleet|frame_damaged|franchise_dealer|franchise_make|front_legroom|fuel_tank_

In [4]:
from pyspark.ml import feature
from pyspark.ml import Pipeline

#Convert categorical columns to numeric values with string indexer.  Use freqency as sorting value as no ordinal strucutre
#Create list of all categorical columns used in model
categorical_col_input = ['body_type','city','engine_type','fuel_type','interior_color','listing_color','make_name','model_name','sp_name','transmission','wheel_system']
#Create transformed column names by adding _x
categorical_col_output = [i + '_x' for i in categorical_col_input]
#Convert strings to category numbers
categorical_feature = feature.StringIndexer(inputCols=categorical_col_input, outputCols=categorical_col_output)

#Convert boolean columns to integer to change from booleans to 0 and 1
#create list of boolena columns
bool_cols_input = ['fleet','frame_damaged','franchise_dealer','has_accidents','isCab','is_cpo','is_new','is_oemcpo','salvage','theft_title']
#Iterate through list and cast to integer type
for i in bool_cols_input:
  sampled_df = sampled_df.withColumn(i, sampled_df[i].cast(IntegerType()))

#Create preprocessing pipline with 
logreg_preprocessing = Pipeline(stages=[categorical_feature]).fit(sampled_df)
logreg_sampled_df= logreg_preprocessing.transform(sampled_df)

#Create list of feature columns for use in vector assemeble
#Pull all column names from transformed DF
colList = logreg_sampled_df.columns
#Remove original categorical columns
feature_cols = [i for i in colList if i not in categorical_col_input]
#Remove unneeded columns (bed and cabin columns only apply to trucks, dealer_zip format is inconsistent (5 or 9 digits) fleet is target, isCab is derivative of fleet, sp_id incomplete)
for i in ['bed','bed_length','cabin','dealer_zip','fleet','franchise_make','isCab','major_options','sp_id','listed_date']:
  feature_cols.remove(i)


#Second preprocessing step to create feature vector and scale so feature coeffcients can be compared for importance
va = feature.VectorAssembler(inputCols=feature_cols, outputCol='features')

scaler = feature.StandardScaler(withMean=True, inputCol='features')

logreg_preprocessing_2 = Pipeline(stages=[va, scaler]).fit(logreg_sampled_df)
logreg_sampled_df = logreg_preprocessing_2.transform(logreg_sampled_df)

In [5]:
#Classifier imports and dataset split
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder

training_df, validation_df, testing_df = logreg_sampled_df.randomSplit([0.6, 0.3, 0.1], seed=0)

In [6]:
#Random Forest

rf = RandomForestClassifier(featuresCol='features', labelCol='fleet',maxBins=24607)

rf_pipe = Pipeline(stages = [rf]).fit(training_df)

score_rf = BinaryClassificationEvaluator(labelCol='fleet',metricName = 'areaUnderROC').evaluate(rf_pipe.transform(validation_df))

score_rf


ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/py4j/java_gateway.py", line 1207, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/py4j/java_gateway.py", line 1033, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.6/dist-packages/py4j/java_gateway.py", line 1212, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/py4j/java_gateway.py", line 1207, in send_command
    raise Py4JNetworkError("Answer from J

Py4JError: ignored

In [None]:
pd.DataFrame(list(zip(feature_cols, rf_pipe.stages[-1].featureImportances.toArray())),
            columns = ['column', 'weight']).sort_values('weight')

In [None]:
#logistical Regression with Grid Search
enable_grid = False

if enable_grid:
  lr = LogisticRegression(labelCol='fleet',featuresCol='features')

  grid = ParamGridBuilder().\
  addGrid(lr.regParam, [0., 0.01, 0.02, 0.03]).\
  addGrid(lr.elasticNetParam, [0., 0.1, 0.3]).\
  build()

  lr_pipe = Pipeline(stages = [lr])

  all_models = []
  for j in range(len(grid)):
    print("Fitting model {}".format(j+1))
    model = lr_pipe.fit(training_df, grid[j])
    all_models.append(model)
      
  scores =[]
  for m in all_models:
    score_lr_pipe_1 = BinaryClassificationEvaluator(labelCol='fleet',metricName = 'areaUnderROC').evaluate(m.transform(validation_df))
    scores.append(score_lr_pipe_1)
        
  print(max(scores), '\n', grid[scores.index(max(scores))])
  pass

Fitting model 1
Fitting model 2
Fitting model 3
Fitting model 4
Fitting model 5
Fitting model 6
Fitting model 7
Fitting model 8
Fitting model 9
Fitting model 10
Fitting model 11
Fitting model 12
0.8734250189723907 
 {Param(parent='LogisticRegression_37d0ca2128d6', name='regParam', doc='regularization parameter (>= 0).'): 0.0, Param(parent='LogisticRegression_37d0ca2128d6', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0}


In [None]:
#Feature Importance Inference from grid search
if enable_grid:
  abs_feature_scores = [abs(i) for i in all_models[scores.index(max(scores))].stages[-1].coefficients.toArray().tolist()]
  df_col = ['feature','weight']
  abs_coeffs_df = spark.createDataFrame(data=zip(feature_cols,abs_feature_scores),schema = df_col)

  lr_important_features = abs_coeffs_df.sort(fn.desc('weight')).limit(10).toPandas()

  display(lr_important_features)

  feature_scores = all_models[scores.index(max(scores))].stages[-1].coefficients.toArray().tolist()

  lr_features_pos = coeffs_df.sort(fn.desc('weight')).limit(10).toPandas()
  lr_features_neg = coeffs_df.sort('weight').limit(10).toPandas())

  display(lr_features_pos)
  display(lr_features_neg)
  pass

With scaled features, grid search gets best result with alpha = 0  and lambda =0.  Score is 0.8734250189723907, worst score is 0.84

In [None]:
scores
#coeffs_df.show()

[0.8734250189723907,
 0.8731998973761991,
 0.8715029385810065,
 0.8704625808525013,
 0.8662412558560946,
 0.8653974858404606,
 0.8656830841648188,
 0.8619957365483409,
 0.8549239827919557,
 0.8621483614994883,
 0.8553235819888318,
 0.8404882635975588]

In [7]:
#Optimized Logistical regression model
alpha_par = 0.0
lambda_par = 0.0

en_lr = LogisticRegression(labelCol='fleet',featuresCol='features',regParam=lambda_par, elasticNetParam=alpha_par)

lr_pipe_2 = Pipeline(stages = [en_lr]).fit(training_df)

evaluator = BinaryClassificationEvaluator(labelCol='fleet',metricName = 'areaUnderROC')

score_pipe_2 = evaluator.evaluate(lr_pipe_2.transform(validation_df))



ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:36561)
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/py4j/java_gateway.py", line 977, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/py4j/java_gateway.py", line 1115, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused


Py4JNetworkError: ignored

In [None]:
#Feature importance inference from optimized model
abs_feature_scores = [abs(i) for i in lr_pipe_2.stages[-1].coefficients.toArray().tolist()]
df_col = ['feature','weight']
abs_coeffs_df = spark.createDataFrame(data=zip(feature_cols,abs_feature_scores),schema = df_col)

lr_important_features = abs_coeffs_df.sort(fn.desc('weight')).toPandas()

display(lr_important_features)

feature_scores = all_models[scores.index(max(scores))].stages[-1].coefficients.toArray().tolist()

lr_features_pos = coeffs_df.sort(fn.desc('weight')).limit(10).toPandas()
lr_features_neg = coeffs_df.sort('weight').limit(10).toPandas())

display(lr_features_pos)
display(lr_features_neg)


SyntaxError: ignored

In [None]:
tokenizer = feature.Tokenizer().setInputCol('major_options').setOutputCol('words')

cv = feature.CountVectorizer(minTF=1., minDF=5)\
  .setInputCol("words")\
  .setOutputCol("tf")

idf = feature.IDF().\
    setInputCol('tf').\
    setOutputCol('tfidf')

if enable_grid:
  lr_options = LogisticRegression(labelCol='fleet',featuresCol='tfidf')

  grid_options = ParamGridBuilder().\
  addGrid(lr.regParam, [0., 0.01, 0.02, 0.03]).\
  addGrid(lr.elasticNetParam, [0., 0.1, 0.3]).\
  build()

  lr_pipe_options = Pipeline(stages = [tokenizer,cv,idf,lr_options]).fit(training_df)

  all_models = []
  for j in range(len(grid)):
    print("Fitting model {}".format(j+1))
    model = lr_pipe_options.fit(training_df, grid_options[j])
    all_models.append(model)
      
  scores =[]
  for m in all_models:
    score_lr_pipe_options = BinaryClassificationEvaluator(labelCol='fleet',metricName = 'areaUnderROC').evaluate(m.transform(validation_df))
    scores.append(score_lr_pipe_options)
        
  print(max(scores), '\n', grid_options[scores.index(max(scores))])
  pass



In [None]:
score_lr_pipe

0.8731230343206494

In [None]:
score_lr_pipe_options

0.7479094502499525