# Step 3: Modeling

## Start Spark Session

In [7]:
import findspark
findspark.init()

In [8]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Bosch Project").getOrCreate()

In [9]:
spark

# Modify dataset for modeling

In [3]:
import pandas as pd
num_new_pd = pd.read_csv('s3://jiaruxu233/new_train_numerical.csv')

In [None]:
response = pd.read_csv('s3://jiaruxu233/new_train_numerical.csv',usecols=['Response']) 

In [None]:
num_new_pd = pd.concat([num_new_pd,response],axis=1).to_csv('s3://jiaruxu233/new_train_numerical.csv')

In [4]:
num_new = spark.read\
  .option('header', 'true')\
  .option('inferSchema', 'true')\
  .format('csv')\
  .load('s3://jiaruxu233/new_train_numerical.csv')

In [5]:
cat_new = spark.read\
  .option('header', 'true')\
  .option('inferSchema', 'true')\
  .format('csv')\
  .load('s3://jiaruxu233/train_categorical.csv')

In [6]:
date_new = spark.read\
  .option('header', 'true')\
  .option('inferSchema', 'true')\
  .format('csv')\
  .load('s3://jiaruxu233/train_date.csv')

In [7]:
# drop unnecessary columns
new_train = num_new.join(cat_new, ["_c0"]).join(date_new, ["_c0"]).drop('_c0').drop('Unnamed: 0')

In [8]:
new_train.count()

1183747

In [9]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, IndexToString, VectorAssembler,Binarizer
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml import Pipeline, Model
from pyspark.sql.functions import to_timestamp, year, month, dayofweek

In [10]:
# Somehow we have duplicate response columns. Therefore, we drop the unnecessary one.
new_train = new_train.drop('Response104')

In [11]:
# Rename response column for convenience
new_train = new_train.withColumnRenamed("Response103", "Response")

In [12]:
# Fill missing values in accordance with our previous engineering section
new_train = new_train.na.fill(-1)

In [13]:
# new_train.show()

In [16]:
for col in num_and_date_feature:
    new_train = new_train.withColumn(col,new_train[col].cast('float'))
# new_train.printSchema()

In [17]:
splitted_data = new_train.randomSplit([0.8, 0.18, 0.02], 666)
train_data = splitted_data[0]
test_data = splitted_data[1]
predict_data = splitted_data[2]

print("Number of training records: " + str(train_data.count()))
print("Number of testing records : " + str(test_data.count()))
print("Number of prediction records : " + str(predict_data.count()))

Number of training records: 947155
Number of testing records : 213200
Number of prediction records : 23392


In [18]:
len(new_train.columns)

163

In [19]:
new_train.columns[101]

'Response'

In [15]:
# Select the needed numerical and date columns 
num_and_date_feature = new_train.columns[1:101]+new_train.columns[111:]
len(num_and_date_feature)

152

In [None]:
# num_and_date_feature

In [20]:
# stringIndexer
stringIndexer_L1_S24_F1114 = StringIndexer(inputCol="L1_S24_F1114", outputCol="L1_S24_F1114_IX",handleInvalid="keep")
stringIndexer_L1_S24_F1137 = StringIndexer(inputCol="L1_S24_F1137", outputCol="L1_S24_F1137_IX",handleInvalid="keep")
stringIndexer_L1_S24_F1140 = StringIndexer(inputCol="L1_S24_F1140", outputCol="L1_S24_F1140_IX",handleInvalid="keep")
stringIndexer_L1_S24_F1510 = StringIndexer(inputCol="L1_S24_F1510", outputCol="L1_S24_F1510_IX",handleInvalid="keep")
stringIndexer_L1_S24_F1525 = StringIndexer(inputCol="L1_S24_F1525", outputCol="L1_S24_F1525_IX",handleInvalid="keep")
stringIndexer_L1_S24_F1530 = StringIndexer(inputCol="L1_S24_F1530", outputCol="L1_S24_F1530_IX",handleInvalid="keep")
stringIndexer_L1_S25_F1852 = StringIndexer(inputCol="L1_S25_F1852", outputCol="L1_S25_F1852_IX",handleInvalid="keep")
stringIndexer_L1_S25_F2779 = StringIndexer(inputCol="L1_S25_F2779", outputCol="L1_S25_F2779_IX",handleInvalid="keep")
stringIndexer_L3_S32_F3854 = StringIndexer(inputCol="L3_S32_F3854", outputCol="L3_S32_F3854_IX",handleInvalid="keep")

In [21]:
# new_train.printSchema()

In [22]:
vectorAssembler_features = VectorAssembler(
    inputCols = [col for col in num_and_date_feature] + ["L1_S24_F1114_IX","L1_S24_F1137_IX","L1_S24_F1140_IX",
              "L1_S24_F1510_IX","L1_S24_F1525_IX","L1_S24_F1530_IX","L1_S25_F1852_IX",
               "L1_S25_F2779_IX","L3_S32_F3854_IX"],
    outputCol="features",handleInvalid="skip")

In [23]:
vectorAssembler_features

VectorAssembler_b87992bc41da

# Modeling with Method 1

## Logistic Regression

In [169]:
logit = LogisticRegression(labelCol="Response", featuresCol="features")

In [170]:
pipeline_logit = Pipeline(stages=[stringIndexer_L1_S24_F1114, stringIndexer_L1_S24_F1137, 
                               stringIndexer_L1_S24_F1140, stringIndexer_L1_S24_F1510,
                               stringIndexer_L1_S24_F1525, stringIndexer_L1_S24_F1530,
                               stringIndexer_L1_S25_F1852, stringIndexer_L1_S25_F2779,
                               stringIndexer_L3_S32_F3854,
                               vectorAssembler_features, 
                               logit])

In [171]:
model_logit = pipeline_logit.fit(train_data)

In [172]:
predictions_logit = model_logit.transform(test_data)

In [173]:
predictions_logit.groupBy('prediction').count().show()

+----------+------+
|prediction| count|
+----------+------+
|       0.0|213198|
|       1.0|     2|
+----------+------+



In [187]:
logit_pred = predictions_logit.select('prediction','Response')

In [160]:
# The real structure for response in our test dataset
test_data.groupBy('Response').count().show()

+--------+------+
|Response| count|
+--------+------+
|     1.0|  1248|
|     0.0|211952|
+--------+------+



In [200]:
logit_pred.filter("Response == 1 ").filter("prediction == 1").count() # True Positive

1

In [201]:
logit_pred.filter("Response == 1 ").filter("prediction == 0").count() # False Negative

1247

In [202]:
logit_pred.filter("Response == 0 ").filter("prediction == 1").count() # False Positive

1

In [203]:
logit_pred.filter("Response == 0 ").filter("prediction == 0").count() # True Negative

211951

In [205]:
import math

tp = 1
fp = 1
fn = 1247
tn = 211952
mcc = (tp*tn - fp*fn) /( math.sqrt((tp+fp)*(tp+fn)*(tn+fp)*(tn+fn)))
mcc

0.01983993227463039

In [174]:
predictionAndLabels = predictions.select('prediction','Response').rdd

In [175]:
# predictionAndLabels.show()

In [176]:
# from pyspark.mllib.evaluation import BinaryClassificationMetrics
# evaluatorLogit = BinaryClassificationMetrics(predictionAndLabels)

In [207]:
# evaluatorLogit.areaUnderROC

## Random Forest

In [178]:
# from pyspark.ml.classification import RandomForestClassifier

# Create an initial RandomForest model.
# rf = RandomForestClassifier(labelCol="Response", featuresCol="features")


In [179]:
# pipeline_rf = Pipeline(stages=[stringIndexer_L1_S24_F1114, stringIndexer_L1_S24_F1137, 
#                                stringIndexer_L1_S24_F1140, stringIndexer_L1_S24_F1510,
#                                stringIndexer_L1_S24_F1525, stringIndexer_L1_S24_F1530,
#                                stringIndexer_L1_S25_F1852, stringIndexer_L1_S25_F2779,
#                                stringIndexer_L3_S32_F3854,
#                                vectorAssembler_features, 
#                                rf])

In [180]:
# model_rf = pipeline_rf.fit(train_data)

In [181]:
# predictions = model_rf.transform(test_data)

In [182]:
# predictionAndLabels_rf = predictions.select('prediction','Response').rdd

In [183]:
# evaluatorRF = BinaryClassificationMetrics(predictionAndLabels_rf)

In [208]:
# evaluatorRF.areaUnderROC -0.5

In [209]:
# predictions.groupBy('prediction').count().show()  - prediction has no 1

## Gradient-Boosted Tree 

In [24]:
from pyspark.ml.classification import GBTClassifier

# Train a GBT model.
gbt = GBTClassifier(labelCol="Response", featuresCol="features")

# Chain indexers and GBT in a Pipeline
pipeline_gbt = Pipeline(stages=[stringIndexer_L1_S24_F1114, stringIndexer_L1_S24_F1137, 
                               stringIndexer_L1_S24_F1140, stringIndexer_L1_S24_F1510,
                               stringIndexer_L1_S24_F1525, stringIndexer_L1_S24_F1530,
                               stringIndexer_L1_S25_F1852, stringIndexer_L1_S25_F2779,
                               stringIndexer_L3_S32_F3854,
                               vectorAssembler_features, 
                               gbt])

# Train model.  This also runs the indexers.
gbt_model = pipeline_gbt.fit(train_data)

# Make predictions.
predictions_gbt = gbt_model.transform(test_data)
predictions_gbt.groupBy('prediction').count().show()


+----------+------+
|prediction| count|
+----------+------+
|       0.0|213148|
|       1.0|    52|
+----------+------+



From the count table above, we notice that we are doing a far more better job than the previous two methods.

In [25]:
gbt_pred = predictions_gbt.select('prediction','Response')

In [26]:
gbt_pred.filter("Response == 1 ").filter("prediction == 1").count() # True Positive

25

In [27]:
gbt_pred.filter("Response == 1 ").filter("prediction == 0").count() # False Negative

1223

In [28]:
gbt_pred.filter("Response == 0 ").filter("prediction == 1").count() # False Positive

27

In [29]:
gbt_pred.filter("Response == 0 ").filter("prediction == 0").count() # True Negative

211925

In [30]:
import math

tp = 25
fp = 27
fn = 1223
tn = 211925
mcc = (tp*tn - fp*fn) /( math.sqrt((tp+fp)*(tp+fn)*(tn+fp)*(tn+fn)))
mcc

0.09723857799358827

In [31]:
spark.stop()

# Modeling with Method 2

## Build RandomForest model

In [10]:
train_num =spark.read.csv('s3://jiaruxu233/train_numeric.csv', header = True, inferSchema = True)

In [11]:
train_cat = spark.read.csv('s3://jiaruxu233/train_categorical.csv', header = True, inferSchema = True)

In [12]:
train_date = spark.read.csv('s3://jiaruxu233/train_date.csv', header = True, inferSchema = True)

In [13]:
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline

In [23]:
c_feature = ['L1_S24_F1078',
 'L1_S24_F833',
 'L0_S2_F63',
 'L0_S2_F67',
 'L1_S24_F866',
 'L1_S24_F838',
 'L1_S24_F849',
 'L1_S24_F824',
 'L1_S24_F873',
 'L1_S24_F817',
 'L1_S24_F825',
 'L1_S24_F871',
 'L1_S24_F1095',
 'L1_S24_F845',
 'L1_S24_F856',
 'L0_S2_F35',
 'L1_S24_F828',
 'L0_S2_F43',
 'L0_S2_F47',
 'L0_S2_F39',
 'L1_S24_F868',
 'L0_S2_F51',
 'L1_S24_F1061',
 'L1_S24_F861',
 'L1_S24_F1105',
 'L1_S24_F1069',
 'L1_S24_F830',
 'L1_S24_F863',
 'L1_S24_F898',
 'L1_S24_F1082',
 'L1_S24_F820',
 'L1_S24_F1097',
 'L1_S24_F835',
 'L1_S24_F896',
 'L1_S24_F821',
 'L1_S24_F1608',
 'L0_S2_F55',
 'L1_S24_F852',
 'L1_S24_F858',
 'L0_S10_F270']

d_feature = ['L3_S37_D3949','L3_S30_D3541','L3_S34_D3877','L3_S33_D3870']
n_feature = ['L3_S33_F3865',
 'L3_S33_F3857',
 'L3_S29_F3342',
 'L3_S29_F3412',
 'L3_S29_F3351',
 'L3_S29_F3470',
 'L3_S29_F3339',
 'L3_S29_F3479',
 'L3_S29_F3458',
 'L0_S0_F0',
 'L3_S29_F3455',
 'L3_S29_F3449',
 'L3_S29_F3327',
 'L3_S29_F3407',
 'L3_S30_F3754',
 'L3_S30_F3759']

In [16]:
### Subset train_date set(pick up only the unique date columns)
date_columns = ['Id','L3_S37_D3949', 'L3_S30_D3541', 'L3_S29_D3428', 'L3_S34_D3877', 'L3_S33_D3870', 'L0_S0_D3', 'L0_S1_D30', 'L0_S8_D145', 'L3_S36_D3919', 'L3_S35_D3910', 'L0_S5_D115', 'L0_S3_D70', 'L0_S6_D120', 'L0_S7_D143', 'L0_S2_D54', 'L0_S4_D106', 'L0_S12_D333', 'L0_S20_D465', 'L0_S13_D355', 'L0_S10_D266', 'L0_S11_D284', 'L0_S9_D192', 'L2_S26_D3084', 'L1_S24_D1828', 'L0_S19_D457', 'L0_S17_D432', 'L0_S14_D380', 'L0_S15_D401', 'L0_S16_D428', 'L2_S27_D3156', 'L0_S18_D447', 'L0_S21_D469', 'L0_S23_D629', 'L0_S22_D608', 'L3_S41_D4021', 'L3_S40_D3981', 'L3_S45_D4129', 'L3_S48_D4203', 'L3_S47_D4155', 'L3_S39_D3974', 'L3_S51_D4255', 'L1_S25_D1887', 'L3_S31_D3848', 'L3_S43_D4097', 'L3_S49_D4208', 'L3_S50_D4254', 'L3_S44_D4122', 'L3_S38_D3953', 'L3_S32_D3852', 'L2_S28_D3234', 'L3_S46_D4135', 'L3_S42_D4057']
new_date = train_date.select(date_columns)

In [17]:
new_df = train_num.select(n_feature+['Id','Response']).join(new_date.select(['Id']+d_feature), 'Id').join(train_cat.select(['Id']+c_feature), 'Id')

In [24]:
new_df.cache()

DataFrame[Id: int, L3_S33_F3865: double, L3_S33_F3857: double, L3_S29_F3342: double, L3_S29_F3412: double, L3_S29_F3351: double, L3_S29_F3470: double, L3_S29_F3339: double, L3_S29_F3479: double, L3_S29_F3458: double, L0_S0_F0: double, L3_S29_F3455: double, L3_S29_F3449: double, L3_S29_F3327: double, L3_S29_F3407: double, L3_S30_F3754: double, L3_S30_F3759: double, Response: int, L3_S37_D3949: double, L3_S30_D3541: double, L3_S34_D3877: double, L3_S33_D3870: double, L1_S24_F1078: string, L1_S24_F833: string, L0_S2_F63: string, L0_S2_F67: string, L1_S24_F866: string, L1_S24_F838: string, L1_S24_F849: string, L1_S24_F824: string, L1_S24_F873: string, L1_S24_F817: string, L1_S24_F825: string, L1_S24_F871: string, L1_S24_F1095: string, L1_S24_F845: string, L1_S24_F856: string, L0_S2_F35: string, L1_S24_F828: string, L0_S2_F43: string, L0_S2_F47: string, L0_S2_F39: string, L1_S24_F868: string, L0_S2_F51: string, L1_S24_F1061: string, L1_S24_F861: string, L1_S24_F1105: string, L1_S24_F1069: s

In [18]:
split = new_df.randomSplit([0.8, 0.2], 1234)
train_data = split[0]
test_data = split[1]

In [19]:
indexers = [StringIndexer(inputCol=column, outputCol=column+"_IX",\
                          handleInvalid="keep") for column in c_feature]

vectorAssembler_features = VectorAssembler(inputCols=[x+"_IX" for x in c_feature]+d_feature+n_feature, outputCol="features", handleInvalid="keep")

In [20]:
rf = RandomForestClassifier(labelCol="Response", featuresCol="features")
pipeline_rf = Pipeline(stages= indexers + [vectorAssembler_features, rf])

In [25]:
model1 = pipeline_rf.fit(train_data)

In [26]:
predictions = model1.transform(test_data)

In [27]:
# True Positives
tp= predictions[(predictions.Response == 1) & (predictions.prediction == 1)].count()
# True Negatives
tn= predictions[(predictions.Response == 0) & (predictions.prediction == 0)].count()
# False Positives
fp= predictions[(predictions.Response == 0) & (predictions.prediction == 1)].count()
# False Negatives
fn= predictions[(predictions.Response == 1) & (predictions.prediction == 0)].count()
   
print ("True Positives= %s; True Negatives= %s; False Positives= %s; False Negatives= %s" %(tp, tn, fp, fn))

True Positives= 0; True Negatives= 235286; False Positives= 0; False Negatives= 1331


In [29]:
import math
mcc = (tp*tn - fp*fn) /( math.sqrt((tp+fp)*(tp+fn)*(tn+fp)*(tn+fn)))
# mcc
# ZeroDivisionError: float division by zero

ZeroDivisionError: float division by zero

## Build Gradient Boosted Trees model

In [30]:
from pyspark.ml.classification import GBTClassifier

# Train a GBT model.
gbt = GBTClassifier(labelCol="Response", featuresCol="features")

# Train model.  This also runs the indexers.
pipeline_gbt = Pipeline(stages= indexers + [vectorAssembler_features, gbt])
gbt_model = pipeline_gbt.fit(train_data)

# Make predictions.
predictions_gbt = gbt_model.transform(test_data)
predictions_gbt.groupBy('prediction').count().show()


+----------+------+
|prediction| count|
+----------+------+
|       0.0|236611|
|       1.0|     6|
+----------+------+



In [31]:
tp= predictions_gbt[(predictions_gbt.Response == 1) & (predictions_gbt.prediction == 1)].count()
# True Negatives
tn= predictions_gbt[(predictions_gbt.Response == 0) & (predictions_gbt.prediction == 0)].count()
# False Positives
fp= predictions_gbt[(predictions_gbt.Response == 0) & (predictions_gbt.prediction == 1)].count()
# False Negatives
fn= predictions_gbt[(predictions_gbt.Response == 1) & (predictions_gbt.prediction == 0)].count()

print ("True Positives= %s; True Negatives= %s; False Positives= %s; False Negatives= %s" %(tp, tn, fp, fn))

True Positives= 2; True Negatives= 235282; False Positives= 4; False Negatives= 1329


In [32]:
import math
mcc = (tp*tn - fp*fn) /( math.sqrt((tp+fp)*(tp+fn)*(tn+fp)*(tn+fn)))
mcc

0.02206502140871241

In [33]:
spark.stop()

# Conclusion

In conclusion, in modeling part, we have gone through different classfication methods, such as **Logistic Regression, Random Forest and Gradient-boosted tree classifier**. According to each result, we find out that **Gradient-boosted tree** has the best predictability. As for evaluation method, we choose **Matthews correlation coefficient** to prevent the effect of data imbalance. We adopt classfication metric to look into in what proportion we are doing things right and making mistakes.