In [1]:
import pyspark
import pyspark.sql.functions as F

pyspark.__version__

'3.1.3'

In [2]:
from pyspark import StorageLevel
from pyspark.sql import SparkSession, types
from pyspark.sql import functions as F
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
import pandas as pd 
import numpy as np 

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
    .appName("amex-app")\
    .master("local[*]")\
    .getOrCreate()

In [4]:
train_df = spark.read.option('header','true').csv('gs://amex-data/amex-csv-data/train_data.csv').limit(20)
label_df = spark.read.option('header','true').csv('gs://amex-data/amex-csv-data/train_labels.csv')

                                                                                

**Defining Schema:**

In [5]:
types_map = {
    "object": types.StringType(),
    "float64": types.FloatType(),
    "int64": types.IntegerType(),
}

# Known dtypes
string_dtypes = ["customer_ID", 'B_30', 'B_38', 'D_114', 'D_116', 'D_117', 'D_120', 'D_126', 'D_63', 'D_64', 'D_66', 'D_68']
date_dtypes = ['S_2']
# integer_dtypes = ['target']

In [6]:
def create_spark_schema(series):
    fields = []
    
    for value in series:
        if value in string_dtypes:
            field = types.StructField(value, types.StringType(), True)
            
#         elif value in date_dtypes:
#             field = types.StructField(value, types.DateType(), True)
        
        else:
            field = types.StructField(value, types.FloatType(), True)
            
        fields.append(field)
    return types.StructType(fields)

In [7]:
train_schema = create_spark_schema(train_df.columns) 
label_schema = create_spark_schema(label_df.columns)  

**Read data back with the defined schema**

In [8]:
train_df = spark.read.option("header", "true").csv("gs://amex-data/amex-csv-data/train_data.csv", schema=train_schema)
label_df = spark.read.option("header", "true").csv("gs://amex-data/amex-csv-data/train_labels.csv", schema=label_schema)

In [9]:
train_df

DataFrame[customer_ID: string, S_2: float, P_2: float, D_39: float, B_1: float, B_2: float, R_1: float, S_3: float, D_41: float, B_3: float, D_42: float, D_43: float, D_44: float, B_4: float, D_45: float, B_5: float, R_2: float, D_46: float, D_47: float, D_48: float, D_49: float, B_6: float, B_7: float, B_8: float, D_50: float, D_51: float, B_9: float, R_3: float, D_52: float, P_3: float, B_10: float, D_53: float, S_5: float, B_11: float, S_6: float, D_54: float, R_4: float, S_7: float, B_12: float, S_8: float, D_55: float, D_56: float, B_13: float, R_5: float, D_58: float, S_9: float, B_14: float, D_59: float, D_60: float, D_61: float, B_15: float, S_11: float, D_62: float, D_63: string, D_64: string, D_65: float, B_16: float, B_17: float, B_18: float, B_19: float, D_66: string, B_20: float, D_68: string, S_12: float, R_6: float, S_13: float, B_21: float, D_69: float, B_22: float, D_70: float, D_71: float, D_72: float, S_15: float, B_23: float, D_73: float, P_4: float, D_74: float, D_

In [10]:
label_df

DataFrame[customer_ID: string, target: float]

**Data Preprocessing**

In [11]:
def add_suffix(names, suffix):
    return [name + suffix for name in names]

In [12]:
# # Known Columns
# info_cols = ['customer_ID', 'S_2']
# target_cols = ['target']
# cat_cols = [
#     'B_30', 'B_38', 
#     'D_114', 'D_116', 'D_117', 'D_120', 'D_126', 'D_63', 'D_64', 'D_66', 'D_68']


# # Define Numeric Columns
# excluded = info_cols + cat_cols
# num_cols = [col for col in train_df.columns if col not in excluded]

# # Define Feature Columns
# features_cols =  cat_cols + num_cols

# print(f"Number of categoric cols: {len(cat_cols)}")
# print(f"Number of numeric cols: {len(num_cols)}")

In [13]:
# Known Columns
info_cols = ['customer_ID', 'S_2']
target_cols = ['target']
cat_cols = [
    'B_30', 'B_38', 
    'D_114', 'D_116', 'D_117', 'D_120', 'D_126', 'D_63', 'D_64', 'D_68']


# Define Numeric Columns
excluded = info_cols + cat_cols
num_cols = [col for col in train_df.columns if col not in excluded]

# Define Feature Columns
features_cols =  cat_cols + num_cols

print(f"Number of categoric cols: {len(cat_cols)}")
# print(f"Number of numeric cols: {len(num_cols)}")

Number of categoric cols: 10


**Check null values**

In [14]:
# Dict_Null = {col:train_df.filter(train_df[col].isNull()).count() for col in train_df.columns}
# Dict_Null

In [15]:
from pyspark.sql.functions import col,isnan, when, count

train_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in train_df.columns]).show(vertical=True)

22/12/14 01:37:42 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

-RECORD 0--------------
 customer_ID | 0       
 S_2         | 5531451 
 P_2         | 45985   
 D_39        | 0       
 B_1         | 0       
 B_2         | 2016    
 R_1         | 0       
 S_3         | 1020544 
 D_41        | 2016    
 B_3         | 2016    
 D_42        | 4740137 
 D_43        | 1658396 
 D_44        | 274319  
 B_4         | 0       
 D_45        | 2017    
 B_5         | 0       
 R_2         | 0       
 D_46        | 1211699 
 D_47        | 0       
 D_48        | 718725  
 D_49        | 4985917 
 B_6         | 233     
 B_7         | 0       
 B_8         | 22268   
 D_50        | 3142402 
 D_51        | 0       
 B_9         | 0       
 R_3         | 0       
 D_52        | 29563   
 P_3         | 301492  
 B_10        | 0       
 D_53        | 4084585 
 S_5         | 0       
 B_11        | 0       
 S_6         | 0       
 D_54        | 2016    
 R_4         | 0       
 S_7         | 1020544 
 B_12        | 0       
 S_8         | 0       
 D_55        | 1

In [16]:
#dropping columns with high percentage of null values.
columns_to_drop = ['S_2','D_66','D_42','D_49','D_73','D_76','R_9','B_29','D_87','D_88','D_106','R_26','D_108','D_110','D_111','B_39','B_42','D_132','D_134','D_135','D_136','D_137','D_138','D_142']
train_df = train_df.drop(*columns_to_drop)

Imputing null values in the column of our data

In [17]:
selected_col = np.array(['P_2','S_3','B_2','D_41','D_43','B_3','D_44','D_45','D_46','D_48','D_50','D_53','S_7','D_56','S_9','B_6','B_8','D_52','P_3','D_54','D_55','B_13','D_59','D_61','B_15','D_62','B_16','B_17','D_77','B_19','B_20','D_69','B_22','D_70','D_72','D_74','R_7','B_25','B_26','D_78','D_79','D_80','B_27','D_81','R_12','D_82','D_105','S_27','D_83','R_14','D_84','D_86','R_20','B_33','D_89','D_91','S_22','S_23','S_24','S_25','S_26','D_102','D_103','D_104','D_107','B_37','R_27','D_109','D_112','B_40','D_113','D_115','D_118','D_119','D_121','D_122','D_123','D_124','D_125','D_128','D_129','B_41','D_130','D_131','D_133','D_139','D_140','D_141','D_143','D_144','D_145'])

In [18]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols= selected_col, 
    outputCols= selected_col
    ).setStrategy("median")

# Add imputation cols to df
train_df = imputer.fit(train_df).transform(train_df)

                                                                                

In [19]:
train_df = (train_df.fillna("null", subset=cat_cols))

**Recheck the null value count to verify. Drop columns if any new null columns found.**

In [20]:
# Dict_Null_recheck = {col:train_df.filter(train_df[col].isNull()).count() for col in train_df.columns}
# Dict_Null_recheck

In [21]:
# from pyspark.sql.functions import col,isnan, when, count

# train_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in train_df.columns]).show(vertical=True)

In [22]:
# columns_to_dropagain = ['D_64','D_68','B_30','B_38','D_114','D_116','D_117','D_120', 'D_126',]
# train_df = train_df.drop(*columns_to_dropagain)

In [23]:
# #final check for null values.
# from pyspark.sql.functions import col,isnan, when, count

# train_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in train_df.columns]).show(vertical=True)

In [24]:
# from pyspark.sql.functions import *
# amount_missing_df_again = train_df.select([(count(when(isnan(c) | col(c).isNull(), c))/count(lit(1))).alias(c) for c in train_df.columns])
# amount_missing_df_again.show(vertical =True)

**String indexer**

In [25]:
# Create columns aliases
cat_index_cols = add_suffix(cat_cols, "_index")

# Fit StringIndexer
indexers = StringIndexer(inputCols=cat_cols, outputCols=cat_index_cols)
indexers_model = indexers.fit(train_df)

# Transform to data
train_df_indexed = indexers_model.transform(train_df)

                                                                                

In [26]:
# See what columns the indexer handle
indexers.getInputCols()

['B_30',
 'B_38',
 'D_114',
 'D_116',
 'D_117',
 'D_120',
 'D_126',
 'D_63',
 'D_64',
 'D_68']

**Onehot encoding**

In [27]:
# Create columns aliases
cat_ohe_cols = add_suffix(cat_cols, "_ohe")

# Fit OneHotEncoder
ohe = OneHotEncoder(inputCols=cat_index_cols, outputCols=cat_ohe_cols)
ohe_model = ohe.fit(train_df_indexed)

# Transform to data
train_df_ohed = ohe_model.transform(train_df_indexed)

**Data Aggregation**

In [28]:
# Functions for each type
# each tuple consist of: (function, column's suffix)
num_funcs = [
    (F.mean, "_mean"),
#     (F.stddev, "_std"),
    (F.min, "_min"),
    (F.max, "_max"),
]

cat_funcs = [
    (F.count, "_count"),
    (F.last, "_last"),
    (F.countDistinct, "_nunique"),
]

In [29]:
# Arguments for .agg method
# each arg consist of: func(colname).alias(colname + suffix)
import itertools 
agg_num_args = [
    func(col).alias(col + suffix) 
    for col, (func, suffix) in itertools.product(selected_col, num_funcs)]

agg_cols_args = [
    func(col).alias(col + suffix) 
    for col, (func, suffix) in itertools.product(cat_ohe_cols, cat_funcs)]

# Combine numeric and categoric agg arguments
agg_args = agg_num_args + agg_cols_args
agg_args[0]

Column<'avg(P_2) AS `P_2_mean`'>

In [30]:
len(agg_num_args)

273

In [31]:
# Columns that we won't use
unused_cols = cat_cols + num_cols + cat_index_cols + cat_ohe_cols
print(f"Unused columns {len(unused_cols)}")

Unused columns 208


In [32]:
# Apply the agg while also dropping unused columns
train_df_grouped = (train_df_ohed.groupBy("customer_ID")
                                 .agg(*agg_args)
                                 .drop(*unused_cols))

In [33]:
# #final check for null values.
# from pyspark.sql.functions import col,isnan, when, count

# train_df_grouped.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in train_df_grouped.columns]).show(vertical=True)

In [34]:
# train_df_grouped

In [35]:
#joining they labels with training data

train_joined_df = train_df_grouped.join(F.broadcast(label_df), on="customer_ID")

In [36]:
#Total number of features
dim = len(train_joined_df.columns)
print(f"Total features: {dim}")

Total features: 305


In [37]:
va = VectorAssembler(
    inputCols=train_joined_df.drop("customer_ID", "target").columns,
    outputCol="features",
    handleInvalid="error",
)

In [38]:
train_ready_df = (va.transform(train_joined_df)
                    .select(["customer_ID", "features", "target"])
                    .persist(StorageLevel.DISK_ONLY))

                                                                                

In [39]:
train_ready_df.dtypes

[('customer_ID', 'string'), ('features', 'vector'), ('target', 'float')]

**ML models: Logistic regression, Decision tree, Random forest**

Split training and test data using the training data only.

In [40]:
train_split, test_split = train_ready_df.randomSplit(weights = [0.80, 0.20], seed = 100)

**Logistic Regression**

In [41]:
logres = LogisticRegression(featuresCol="features", labelCol="target")
logres_model = logres.fit(train_split)

22/12/14 01:55:24 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/12/14 01:55:24 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

In [42]:
test_pred_lr = logres_model.transform(test_split)
test_pred_lr

DataFrame[customer_ID: string, features: vector, target: float, rawPrediction: vector, probability: vector, prediction: double]

**Decision Tree**

In [46]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol="features", labelCol="target")
dt_model = dt.fit(train_split)

                                                                                

In [47]:
test_pred_dt = dt_model.transform(test_split)
test_pred_dt

DataFrame[customer_ID: string, features: vector, target: float, rawPrediction: vector, probability: vector, prediction: double]

**Random Forest**

In [48]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#Training Model
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'target', numTrees = 500, maxDepth = 5)
rfModel = rf.fit(train_split)

22/12/13 07:00:38 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 1317.1 KiB
22/12/13 07:01:12 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 2009.3 KiB
22/12/13 07:01:58 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.3 MiB
                                                                                

In [49]:
#Prediction
test_pred_rf = rfModel.transform(test_split)

In [50]:
ml = {}

In [51]:
ml['test_pred_lr'] = test_pred_lr
ml['test_pred_dt'] = test_pred_dt
ml['test_pred_rf'] = test_pred_rf

In [52]:
ml

{'test_pred_lr': DataFrame[customer_ID: string, features: vector, target: float, rawPrediction: vector, probability: vector, prediction: double],
 'test_pred_dt': DataFrame[customer_ID: string, features: vector, target: float, rawPrediction: vector, probability: vector, prediction: double],
 'test_pred_rf': DataFrame[customer_ID: string, features: vector, target: float, rawPrediction: vector, probability: vector, prediction: double]}

**Evaluations**

**Evaluating using Pyspark metric**

In [53]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# Create both evaluators
evaluatorMulti = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction")
evaluator = BinaryClassificationEvaluator(labelCol="target", rawPredictionCol="prediction", metricName='areaUnderROC')

# Make predicitons
# predictionAndTarget = model.transform(df).select("target", "prediction")
for key, value in ml.items(): 
# Get metrics
#      acc = "acc_{}".format(i)
#      f1 = "f1_{}".format(i)
#      weightedPrecision = "weightedPrecision_{}".format(i)
#      weightedRecall = "weightedRecall_{}".format(i)
#      auc = "auc_{}".format(i)   
#      test_pred = "test_pred_{}".format(i)
     acc = evaluatorMulti.evaluate(value, {evaluatorMulti.metricName: "accuracy"})
     f1 = evaluatorMulti.evaluate(value, {evaluatorMulti.metricName: "f1"})
     weightedPrecision = evaluatorMulti.evaluate(value, {evaluatorMulti.metricName: "weightedPrecision"})
     weightedRecall = evaluatorMulti.evaluate(value, {evaluatorMulti.metricName: "weightedRecall"})
     auc = evaluator.evaluate(value)
#      prnt(test_pred)
     print("accuracy_{}".format(key), acc)
     print("f1_{}".format(key), f1)
     print("weightedPrecision_{}".format(key), weightedPrecision)
     print("weightedRecall_{}".format(key),weightedRecall)
     print("auc_{}".format(key),auc)

                                                                                

accuracy_test_pred_lr 0.8876329787234043
f1_test_pred_lr 0.8867674993644983
weightedPrecision_test_pred_lr 0.8861970497152565
weightedRecall_test_pred_lr 0.8876329787234043
auc_test_pred_lr 0.8451827179103529


                                                                                

accuracy_test_pred_dt 0.8595221485873735
f1_test_pred_dt 0.8588990929851348
weightedPrecision_test_pred_dt 0.858370318265501
weightedRecall_test_pred_dt 0.8595221485873735
auc_test_pred_dt 0.8113804314901693


22/12/13 07:03:21 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 2.8 MiB
22/12/13 07:03:27 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 2.8 MiB
22/12/13 07:03:33 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 2.8 MiB
22/12/13 07:03:38 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 2.8 MiB
22/12/13 07:03:44 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 2.8 MiB

accuracy_test_pred_rf 0.8666070805720265
f1_test_pred_rf 0.8648602210811689
weightedPrecision_test_pred_rf 0.8639013640002504
weightedRecall_test_pred_rf 0.8666070805720265
auc_test_pred_rf 0.8127272435136679


                                                                                

**Evaluation using AMEX metric**

In [54]:
import numpy as np
import pandas as pd
def amex_metric(y_true: pd.DataFrame, y_pred: pd.DataFrame) -> float:

    def top_four_percent_captured(y_true: pd.DataFrame, y_pred: pd.DataFrame) -> float:
        df = (pd.concat([y_true, y_pred], axis='columns')
              .sort_values('prediction', ascending=False))
        df['weight'] = df['target'].apply(lambda x: 20 if x==0 else 1)
        four_pct_cutoff = int(0.04 * df['weight'].sum())
        df['weight_cumsum'] = df['weight'].cumsum()
        df_cutoff = df.loc[df['weight_cumsum'] <= four_pct_cutoff]
        return (df_cutoff['target'] == 1).sum() / (df['target'] == 1).sum()
        
    def weighted_gini(y_true: pd.DataFrame, y_pred: pd.DataFrame) -> float:
        df = (pd.concat([y_true, y_pred], axis='columns')
              .sort_values('prediction', ascending=False))
        df['weight'] = df['target'].apply(lambda x: 20 if x==0 else 1)
        df['random'] = (df['weight'] / df['weight'].sum()).cumsum()
        total_pos = (df['target'] * df['weight']).sum()
        df['cum_pos_found'] = (df['target'] * df['weight']).cumsum()
        df['lorentz'] = df['cum_pos_found'] / total_pos
        df['gini'] = (df['lorentz'] - df['random']) * df['weight']
        return df['gini'].sum()

    def normalized_weighted_gini(y_true: pd.DataFrame, y_pred: pd.DataFrame) -> float:
        y_true_pred = y_true.rename(columns={'target': 'prediction'})
        return weighted_gini(y_true, y_pred) / weighted_gini(y_true, y_true_pred)

    g = normalized_weighted_gini(y_true, y_pred)
    d = top_four_percent_captured(y_true, y_pred)

    return 0.5 * (g + d)

In [55]:
for key, value in ml.items():
    y_pred = pd.DataFrame((value.select(["customer_ID", "prediction"])
                                  .toPandas()))

    y_true = pd.DataFrame((value.select(["customer_ID", "target"])
                                  .toPandas()))

    amex_accuracy = amex_metric(y_true, y_pred)
    print("amex_accuracy_{}".format(key),amex_accuracy)
                          

                                                                                

amex_accuracy_test_pred_lr 0.5395285451031633


                                                                                

amex_accuracy_test_pred_dt 0.45006891837885576


22/12/13 07:03:58 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 2.8 MiB
                                                                                

amex_accuracy_test_pred_rf 0.47386258130733566


**Save model**

In [56]:
logres_model.save("gs://amex-data/saved_models/lr_model")

                                                                                

**Explainable model using game theory like Shapley**

In [46]:
!pip install shap

[0m

In [None]:
ct.fit(X)
X_shap = ct.fit_transform(X)
test_shap  = ct.transform(test)
explainer = shap.LinearExplainer(logr_pipe.named_steps['LR'], X_shap, feature_perturbation="interventional")
shap_values = explainer.shap_values(test_shap)

In [None]:
# Evaluate SHAP values
shap_values = explainer.shap_values(X)