In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Bosch Project").getOrCreate()

In [2]:
spark

# Modify dataset for modeling

In [3]:
import pandas as pd
num_new_pd = pd.read_csv('new_train_numerical.csv')

In [None]:
response = pd.read_csv('train_numeric.csv',usecols=['Response']) 

In [None]:
num_new_pd = pd.concat([num_new_pd,response],axis=1).to_csv('new_train_numerical.csv')

In [4]:
num_new = spark.read\
  .option('header', 'true')\
  .option('inferSchema', 'true')\
  .format('csv')\
  .load('new_train_numerical.csv')

In [5]:
cat_new = spark.read\
  .option('header', 'true')\
  .option('inferSchema', 'true')\
  .format('csv')\
  .load('new_train_cat.csv')

In [6]:
date_new = spark.read\
  .option('header', 'true')\
  .option('inferSchema', 'true')\
  .format('csv')\
  .load('new_train_date.csv')

In [7]:
# drop unnecessary columns
new_train = num_new.join(cat_new, ["_c0"]).join(date_new, ["_c0"]).drop('_c0').drop('Unnamed: 0')

In [8]:
new_train.count()

1183747

In [9]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, IndexToString, VectorAssembler,Binarizer
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml import Pipeline, Model
from pyspark.sql.functions import to_timestamp, year, month, dayofweek

In [10]:
# Somehow we have duplicate response columns. Therefore, we drop the unnecessary one.
new_train = new_train.drop('Response104')

In [11]:
# Rename response column for convenience
new_train = new_train.withColumnRenamed("Response103", "Response")

In [12]:
# Fill missing values in accordance with our previous engineering section
new_train = new_train.na.fill(-1)

In [13]:
# new_train.show()

In [16]:
for col in num_and_date_feature:
    new_train = new_train.withColumn(col,new_train[col].cast('float'))
# new_train.printSchema()

In [17]:
splitted_data = new_train.randomSplit([0.8, 0.18, 0.02], 666)
train_data = splitted_data[0]
test_data = splitted_data[1]
predict_data = splitted_data[2]

print("Number of training records: " + str(train_data.count()))
print("Number of testing records : " + str(test_data.count()))
print("Number of prediction records : " + str(predict_data.count()))

Number of training records: 947155
Number of testing records : 213200
Number of prediction records : 23392


In [18]:
len(new_train.columns)

163

In [19]:
new_train.columns[101]

'Response'

In [15]:
# Select the needed numerical and date columns 
num_and_date_feature = new_train.columns[1:101]+new_train.columns[111:]
len(num_and_date_feature)

152

In [None]:
# num_and_date_feature

In [20]:
# stringIndexer
stringIndexer_L1_S24_F1114 = StringIndexer(inputCol="L1_S24_F1114", outputCol="L1_S24_F1114_IX",handleInvalid="keep")
stringIndexer_L1_S24_F1137 = StringIndexer(inputCol="L1_S24_F1137", outputCol="L1_S24_F1137_IX",handleInvalid="keep")
stringIndexer_L1_S24_F1140 = StringIndexer(inputCol="L1_S24_F1140", outputCol="L1_S24_F1140_IX",handleInvalid="keep")
stringIndexer_L1_S24_F1510 = StringIndexer(inputCol="L1_S24_F1510", outputCol="L1_S24_F1510_IX",handleInvalid="keep")
stringIndexer_L1_S24_F1525 = StringIndexer(inputCol="L1_S24_F1525", outputCol="L1_S24_F1525_IX",handleInvalid="keep")
stringIndexer_L1_S24_F1530 = StringIndexer(inputCol="L1_S24_F1530", outputCol="L1_S24_F1530_IX",handleInvalid="keep")
stringIndexer_L1_S25_F1852 = StringIndexer(inputCol="L1_S25_F1852", outputCol="L1_S25_F1852_IX",handleInvalid="keep")
stringIndexer_L1_S25_F2779 = StringIndexer(inputCol="L1_S25_F2779", outputCol="L1_S25_F2779_IX",handleInvalid="keep")
stringIndexer_L3_S32_F3854 = StringIndexer(inputCol="L3_S32_F3854", outputCol="L3_S32_F3854_IX",handleInvalid="keep")

In [21]:
# new_train.printSchema()

In [22]:
vectorAssembler_features = VectorAssembler(
    inputCols = [col for col in num_and_date_feature] + ["L1_S24_F1114_IX","L1_S24_F1137_IX","L1_S24_F1140_IX",
              "L1_S24_F1510_IX","L1_S24_F1525_IX","L1_S24_F1530_IX","L1_S25_F1852_IX",
               "L1_S25_F2779_IX","L3_S32_F3854_IX"],
    outputCol="features",handleInvalid="skip")

In [23]:
vectorAssembler_features

VectorAssembler_b87992bc41da

# Models

## Logistic Regression

In [169]:
logit = LogisticRegression(labelCol="Response", featuresCol="features")

In [170]:
pipeline_logit = Pipeline(stages=[stringIndexer_L1_S24_F1114, stringIndexer_L1_S24_F1137, 
                               stringIndexer_L1_S24_F1140, stringIndexer_L1_S24_F1510,
                               stringIndexer_L1_S24_F1525, stringIndexer_L1_S24_F1530,
                               stringIndexer_L1_S25_F1852, stringIndexer_L1_S25_F2779,
                               stringIndexer_L3_S32_F3854,
                               vectorAssembler_features, 
                               logit])

In [171]:
model_logit = pipeline_logit.fit(train_data)

In [172]:
predictions_logit = model_logit.transform(test_data)

In [173]:
predictions_logit.groupBy('prediction').count().show()

+----------+------+
|prediction| count|
+----------+------+
|       0.0|213198|
|       1.0|     2|
+----------+------+



In [187]:
logit_pred = predictions_logit.select('prediction','Response')

In [160]:
# The real structure for response in our test dataset
test_data.groupBy('Response').count().show()

+--------+------+
|Response| count|
+--------+------+
|     1.0|  1248|
|     0.0|211952|
+--------+------+



In [200]:
logit_pred.filter("Response == 1 ").filter("prediction == 1").count() # True Positive

1

In [201]:
logit_pred.filter("Response == 1 ").filter("prediction == 0").count() # False Negative

1247

In [202]:
logit_pred.filter("Response == 0 ").filter("prediction == 1").count() # False Positive

1

In [203]:
logit_pred.filter("Response == 0 ").filter("prediction == 0").count() # True Negative

211951

In [205]:
import math

tp = 1
fp = 1
fn = 1247
tn = 211952
mcc = (tp*tn - fp*fn) /( math.sqrt((tp+fp)*(tp+fn)*(tn+fp)*(tn+fn)))
mcc

0.01983993227463039

In [174]:
predictionAndLabels = predictions.select('prediction','Response').rdd

In [175]:
# predictionAndLabels.show()

In [176]:
# from pyspark.mllib.evaluation import BinaryClassificationMetrics
# evaluatorLogit = BinaryClassificationMetrics(predictionAndLabels)

In [207]:
# evaluatorLogit.areaUnderROC

## Random Forest

In [178]:
# from pyspark.ml.classification import RandomForestClassifier

# Create an initial RandomForest model.
# rf = RandomForestClassifier(labelCol="Response", featuresCol="features")


In [179]:
# pipeline_rf = Pipeline(stages=[stringIndexer_L1_S24_F1114, stringIndexer_L1_S24_F1137, 
#                                stringIndexer_L1_S24_F1140, stringIndexer_L1_S24_F1510,
#                                stringIndexer_L1_S24_F1525, stringIndexer_L1_S24_F1530,
#                                stringIndexer_L1_S25_F1852, stringIndexer_L1_S25_F2779,
#                                stringIndexer_L3_S32_F3854,
#                                vectorAssembler_features, 
#                                rf])

In [180]:
# model_rf = pipeline_rf.fit(train_data)

In [181]:
# predictions = model_rf.transform(test_data)

In [182]:
# predictionAndLabels_rf = predictions.select('prediction','Response').rdd

In [183]:
# evaluatorRF = BinaryClassificationMetrics(predictionAndLabels_rf)

In [208]:
# evaluatorRF.areaUnderROC -0.5

In [209]:
# predictions.groupBy('prediction').count().show()  - prediction has no 1

## Gradient-Boosted Tree 

In [24]:
from pyspark.ml.classification import GBTClassifier

# Train a GBT model.
gbt = GBTClassifier(labelCol="Response", featuresCol="features")

# Chain indexers and GBT in a Pipeline
pipeline_gbt = Pipeline(stages=[stringIndexer_L1_S24_F1114, stringIndexer_L1_S24_F1137, 
                               stringIndexer_L1_S24_F1140, stringIndexer_L1_S24_F1510,
                               stringIndexer_L1_S24_F1525, stringIndexer_L1_S24_F1530,
                               stringIndexer_L1_S25_F1852, stringIndexer_L1_S25_F2779,
                               stringIndexer_L3_S32_F3854,
                               vectorAssembler_features, 
                               gbt])

# Train model.  This also runs the indexers.
gbt_model = pipeline_gbt.fit(train_data)

# Make predictions.
predictions_gbt = gbt_model.transform(test_data)
predictions_gbt.groupBy('prediction').count().show()


+----------+------+
|prediction| count|
+----------+------+
|       0.0|213148|
|       1.0|    52|
+----------+------+



From the count table above, we notice that we are doing a far more better job than the previous two methods.

In [25]:
gbt_pred = predictions_gbt.select('prediction','Response')

In [26]:
gbt_pred.filter("Response == 1 ").filter("prediction == 1").count() # True Positive

25

In [27]:
gbt_pred.filter("Response == 1 ").filter("prediction == 0").count() # False Negative

1223

In [28]:
gbt_pred.filter("Response == 0 ").filter("prediction == 1").count() # False Positive

27

In [29]:
gbt_pred.filter("Response == 0 ").filter("prediction == 0").count() # True Negative

211925

In [30]:
import math

tp = 25
fp = 27
fn = 1223
tn = 211925
mcc = (tp*tn - fp*fn) /( math.sqrt((tp+fp)*(tp+fn)*(tn+fp)*(tn+fn)))
mcc

0.09723857799358827

In [31]:
spark.stop()

# Conclusion

In conclusion, in modeling part, we have gone through different classfication methods, such as **Logistic Regression, Random Forest and Gradient-boosted tree classifier**. According to each result, we find out that **Gradient-boosted tree** has the best predictability. As for evaluation method, we choose **Matthews correlation coefficient** to prevent the effect of data imbalance. We adopt classfication metric to look into in what proportion we are doing things right and making mistakes.