In [1]:
import os
import sys
import pandas as pd
import seaborn as sns

  import pandas.util.testing as tm


In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [3]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Mllib_CCA_Model').getOrCreate()

Load data from Local

In [4]:
credit_record_df = spark.read.csv('credit_record.csv', header=True, inferSchema=True)

In [5]:
application_record_df = spark.read.csv('application_record.csv', header=True, inferSchema=True)

What is the proportion of females in the applicant customer base?

In [6]:
import pyspark.sql.functions as F
from pyspark.sql.window import Window
q1 = application_record_df.groupBy('CODE_GENDER').count() \
.withColumn('Percent', F.col('count')*100/F.sum('count').over(Window.partitionBy()))
q1.show()

+-----------+------+-----------------+
|CODE_GENDER| count|          Percent|
+-----------+------+-----------------+
|          F|294440|67.13836513839706|
|          M|144117|32.86163486160294|
+-----------+------+-----------------+



Is house ownership higher among male applicants or female applicants?

In [7]:
q2 = application_record_df.filter(F.col("FLAG_OWN_REALTY")=='Y').groupBy('CODE_GENDER').count()
q2.show()

+-----------+------+
|CODE_GENDER| count|
+-----------+------+
|          F|208138|
|          M| 95936|
+-----------+------+



Is there any correlation between income levels and education level?

In [8]:
df1 = application_record_df.select('AMT_INCOME_TOTAL', 'NAME_EDUCATION_TYPE')
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="NAME_EDUCATION_TYPE", outputCol="education_level")
df1 = indexer.fit(df1).transform(df1)
corr = df1.corr('AMT_INCOME_TOTAL', 'education_level')
corr

0.1599998041001609

What is the average and median salary of the applicant base?

In [126]:
import numpy as np
import math
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType

In [None]:
def f_median(x):
  return float(np.median(x))

get_median = F.udf(f_median, FloatType())

application_record_df.select(
    F.mean(F.col('AMT_INCOME_TOTAL')).alias('mean'),
    get_median(F.collect_list('AMT_INCOME_TOTAL')).alias('median')    
).show()

All Customers which have defaulted in payment (NonPayment / delay in more than 60 days once are defaultes and Target value will be set to 1)

In [11]:
credit_df = credit_record_df.withColumn("Target", F.when(F.col("STATUS").isin([2,3,4,5]), 1).otherwise(0) )

In [12]:
credit_df = credit_df.groupBy('ID').agg(F.max('Target').alias('Target'))

In [110]:
analysis_data = credit_df.join(application_record_df, ['ID'])

Is the proportion of bad customers higher for people who own cars?

In [None]:
analysis_data.groupby('FLAG_OWN_CAR','target').count().show()

Do people living on rent have a higher proportion of bad customers compared to the rest of the population?

In [None]:
analysis_data.groupby('NAME_HOUSING_TYPE','target').count().show()

Do single customers have a high proportion of bad customers compared to married customers?

In [None]:
analysis_data.groupby('NAME_FAMILY_STATUS','target').count().show()

Check to see if we have missing value in the dataset

In [None]:
def count_missing(spark_df):
    df = spark_df.select([F.count(F.when(F.isnan(c) | F.isnull(c), c)).alias(c) for (c,c_type) in spark_df.dtypes]).toPandas()
    if len(df) == 0:
        return None
    return df
count_missing(analysis_data)

Check datatype of all columns in the dataset

In [None]:
#See data types
for (c,c_type) in analysis_data.dtypes:
  print("{} : {}".format(c, c_type))

In [111]:
#drop nulls
analysis_data = analysis_data.dropna()

In [None]:
#get the idea of the data distribution
for feature in features:
  analysis_data.groupBy(feature).count().show()

In [128]:
# buckets for salary
from pyspark.ml.feature import Bucketizer
bucketizer = Bucketizer(splits=[ 20000, 50000, 80000, 130000 , 160000, 190000 , 200000, 1600000,  float('Inf') ],inputCol="AMT_INCOME_TOTAL", outputCol="Income_Buckets")
analysis_data = bucketizer.setHandleInvalid("keep").transform(analysis_data)
analysis_data = analysis_data.withColumn("Income_Buckets", analysis_data["Income_Buckets"].cast(IntegerType()))

In [113]:
# divide CNT_children in 0,1,2, rest
analysis_data = analysis_data.withColumn('CNT_CHILDREN', F.when( (F.col("CNT_CHILDREN") <=2), F.col("CNT_CHILDREN")).otherwise(3))

In [114]:
# divide CNT_children in 1,2,3,4, rest
analysis_data = analysis_data.withColumn('CNT_FAM_MEMBERS', F.when( (F.col("CNT_FAM_MEMBERS") <=4), F.col("CNT_FAM_MEMBERS")).otherwise(5))

In [115]:
#Convert days to yrs
analysis_data = analysis_data.withColumn('DAYS_BIRTH', -F.floor(F.col('DAYS_BIRTH')/365))

In [116]:
#bucket for age
bucketizer = Bucketizer(splits=[ 0, 30, 40, 50, 60, float('Inf') ],inputCol="DAYS_BIRTH", outputCol="Age_Buckets")
analysis_data = bucketizer.setHandleInvalid("keep").transform(analysis_data)
analysis_data = analysis_data.withColumn("Age_Buckets", analysis_data["Age_Buckets"].cast(IntegerType()))

In [117]:
#Convert days to yrs
analysis_data = analysis_data.withColumn('DAYS_EMPLOYED', -F.floor(F.col('DAYS_EMPLOYED')/365))

In [118]:
#bucket for Work exp
bucketizer = Bucketizer(splits=[ 0, 5, 10, 20, 30, 40, float('Inf') ],inputCol="DAYS_EMPLOYED", outputCol="Exp_Buckets")
analysis_data = bucketizer.setHandleInvalid("keep").transform(analysis_data)
analysis_data = analysis_data.withColumn("Exp_Buckets", analysis_data["Exp_Buckets"].cast(IntegerType()))

In [119]:
# Function for WOE
def woe(event, non_event):
  all_events=event+non_event
  if non_event==0:
    return math.log(event/total_event)
  else:
    return math.log((event/total_event)/(non_event/total_non_event))
get_woe = F.udf(woe, FloatType())

# Function for IV
def iv(event, non_event, woe):
  all_events = event+non_event
  value = ((event/total_event)-(non_event/total_non_event))*woe
  return value

get_iv = F.udf(iv, FloatType())


In [120]:
#For all features calculate WOE and IV
features = ['CODE_GENDER', 'FLAG_OWN_CAR', 'FLAG_OWN_REALTY', 'FLAG_EMAIL', 'FLAG_WORK_PHONE', 'FLAG_PHONE', 'NAME_HOUSING_TYPE', 'OCCUPATION_TYPE','NAME_FAMILY_STATUS','Income_Buckets', 'CNT_CHILDREN', 'CNT_FAM_MEMBERS','Age_Buckets','Exp_Buckets' ]

In [121]:
global total_event
global total_non_event
total_event = analysis_data.filter(F.col("Target")=='0').count()
total_non_event = analysis_data.filter(F.col("Target")=='1').count()

for feature in features:
  fn_input = analysis_data.select(feature,'Target')
  fn_input = fn_input.groupBy(feature, 'Target').count()
  pivotDF = fn_input.groupBy(feature,"Target").sum("count").groupBy(feature).pivot("Target").sum("sum(count)")
  newColumns = [feature,"Event", "Non-Event"]
  pivotDF = pivotDF.toDF(*newColumns)
  pivotDF = pivotDF.na.fill(0)
  res = pivotDF.withColumn('WOE', get_woe(F.col('Event'), F.col('Non-Event'))).withColumn('IV', get_iv(F.col('Event'), F.col('Non-Event'), F.col('WOE')))
  res.show()

+-----------+-----+---------+-----------+-----------+
|CODE_GENDER|Event|Non-Event|        WOE|         IV|
+-----------+-----+---------+-----------+-----------+
|          F|15400|      230| 0.13400456| 0.01047317|
|          M| 9312|      192|-0.18847512|0.014730334|
+-----------+-----+---------+-----------+-----------+

+------------+-----+---------+-------------+------------+
|FLAG_OWN_CAR|Event|Non-Event|          WOE|          IV|
+------------+-----+---------+-------------+------------+
|           Y|10339|      177|-0.0025102214| 2.639611E-6|
|           N|14373|      245| 0.0018095945|1.9028702E-6|
+------------+-----+---------+-------------+------------+

+---------------+-----+---------+-----------+-----------+
|FLAG_OWN_REALTY|Event|Non-Event|        WOE|         IV|
+---------------+-----+---------+-----------+-----------+
|              Y|16218|      243| 0.13077666|0.010521108|
|              N| 8494|      179|-0.21030942|0.016919596|
+---------------+-----+---------+---

In [130]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

In [140]:
housing_indexr = StringIndexer(inputCol='NAME_HOUSING_TYPE',   outputCol='NAME_HOUSING_TYPE_idx')      
family_indexr = StringIndexer(inputCol='NAME_FAMILY_STATUS',   outputCol='NAME_FAMILY_STATUS_idx')      
gender_indexer = StringIndexer(inputCol='CODE_GENDER',   outputCol='CODE_GENDER_idx')      
income_indexer = StringIndexer(inputCol='NAME_INCOME_TYPE',   outputCol='NAME_INCOME_TYPE_idx')      

one_hot_encoder = OneHotEncoderEstimator(inputCols=['NAME_HOUSING_TYPE_idx' ,'NAME_FAMILY_STATUS_idx',  'CODE_GENDER_idx', 'NAME_INCOME_TYPE_idx'],
                             outputCols=['OCCUPATION_TYPE_Enc' ,'NAME_FAMILY_STATUS_Enc', 'CODE_GENDER_Enc', 'NAME_INCOME_TYPE_Enc'])
assembler = VectorAssembler(inputCols=['OCCUPATION_TYPE_Enc' ,'NAME_FAMILY_STATUS_Enc', 'CODE_GENDER_Enc', 'NAME_INCOME_TYPE_Enc',"Income_Buckets", "Age_Buckets", "CNT_FAM_MEMBERS"],
                            outputCol="features")

pipeline = Pipeline(stages=[housing_indexr,family_indexr,gender_indexer,income_indexer, one_hot_encoder, assembler])

anlysis_data_input = pipeline.fit(analysis_data).transform(analysis_data)

In [141]:
#Split data into train and Test
train, test = anlysis_data_input.randomSplit([0.7,0.3])

In [142]:
#Logistic Regression model
lr = LogisticRegression(featuresCol='features', labelCol='Target')
model = lr.fit(train)
evaluation = model.evaluate(test)
evaluation.recallByLabel

[1.0, 0.064]

In [144]:
evaluation.accuracy

0.9845727848101266

In [145]:
evaluation.areaUnderROC

0.560657997050543

In [155]:
preds_and_targets  = evaluation.predictions.select(['prediction', 'Target'])

In [157]:
#Print confusion metrics
preds_and_targets.groupBy('prediction', 'Target').count().show()

+----------+------+-----+
|prediction|Target|count|
+----------+------+-----+
|       0.0|     0| 7459|
|       0.0|     1|  117|
|       1.0|     1|    8|
+----------+------+-----+



In [159]:
tp = preds_and_targets[(preds_and_targets.Target == 1) & (preds_and_targets.prediction == 1)].count()
tn = preds_and_targets[(preds_and_targets.Target == 0) & (preds_and_targets.prediction == 0)].count()
fp = preds_and_targets[(preds_and_targets.Target == 0) & (preds_and_targets.prediction == 1)].count()
fn = preds_and_targets[(preds_and_targets.Target == 1) & (preds_and_targets.prediction == 0)].count()

r = float(tp)/(tp + fn)
print("Recall: {}".format(r))

p = float(tp) / (tp + fp)
print("Precision: {}".format(p))


Recall: 0.064
Precision: 1.0
