In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .option('inferSchema', 'true')\
  .load(cos.url('credit_customer_history.csv', 'default-donotdelete-pr-bnveggkfxwmoac'))
df.take(5)


[Row(ACCOUNT_AGE=u'up to 1 YR', ACCOUNT_TYPE=u'above 1000 K USD', CREDIT_HISTORY=u'EXISTING CREDITS PAID BACK', EMI_TENURE=105, HAS_CO_APPLICANT=u'NO', HAS_GUARANTOR=u'YES', IS_DEFAULT=u'No', IS_STATE_BORDER=u'YES', IS_URBAN=u'YES', NUMBER_CREDITS=0, OTHER_INSTALMENT_PLAN=u'NO', OWN_CAR=u'YES', OWN_REAL_ESTATE=u'YES', OWN_RESIDENCE=u'YES', RFM_SCORE=4, SHIP_INTERNATIONAL=u'YES', STATE=u'CT', TRANSACTION_AMOUNT=25788, TRANSACTION_CATEGORY=u'EDUCATION'),
 Row(ACCOUNT_AGE=u'up to 1 YR', ACCOUNT_TYPE=u'UNKNOWN/NONE', CREDIT_HISTORY=u'EXISTING CREDITS PAID BACK', EMI_TENURE=105, HAS_CO_APPLICANT=u'NO', HAS_GUARANTOR=u'NO', IS_DEFAULT=u'No', IS_STATE_BORDER=u'NO', IS_URBAN=u'YES', NUMBER_CREDITS=0, OTHER_INSTALMENT_PLAN=u'YES', OWN_CAR=u'YES', OWN_REAL_ESTATE=u'NO', OWN_RESIDENCE=u'NO', RFM_SCORE=3, SHIP_INTERNATIONAL=u'YES', STATE=u'CT', TRANSACTION_AMOUNT=25788, TRANSACTION_CATEGORY=u'FURNITURE'),
 Row(ACCOUNT_AGE=u'up to 1 YR', ACCOUNT_TYPE=u'UNKNOWN/NONE', CREDIT_HISTORY=u'EXISTING CREDI

In [2]:
df.printSchema()

root
 |-- ACCOUNT_AGE: string (nullable = true)
 |-- ACCOUNT_TYPE: string (nullable = true)
 |-- CREDIT_HISTORY: string (nullable = true)
 |-- EMI_TENURE: integer (nullable = true)
 |-- HAS_CO_APPLICANT: string (nullable = true)
 |-- HAS_GUARANTOR: string (nullable = true)
 |-- IS_DEFAULT: string (nullable = true)
 |-- IS_STATE_BORDER: string (nullable = true)
 |-- IS_URBAN: string (nullable = true)
 |-- NUMBER_CREDITS: integer (nullable = true)
 |-- OTHER_INSTALMENT_PLAN: string (nullable = true)
 |-- OWN_CAR: string (nullable = true)
 |-- OWN_REAL_ESTATE: string (nullable = true)
 |-- OWN_RESIDENCE: string (nullable = true)
 |-- RFM_SCORE: integer (nullable = true)
 |-- SHIP_INTERNATIONAL: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- TRANSACTION_AMOUNT: integer (nullable = true)
 |-- TRANSACTION_CATEGORY: string (nullable = true)



In [3]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
import numpy as np

The dataset is available here: https://developer.ibm.com/blogs/snap-ml-use-cases-blog/ , credit default prediction. Quoting the text:
> The task in this use case is to predict whether a person who has credit will default (not be able to repay his credit). The data scientist is provided with a data set of 10 million transactions, each of which is characterized by 18 features (including account age, account type, credit history, owns car, transaction amount, and transaction category). Also provided are the labels of these transactions, default or not. The task is to build a model to predict whether transactions will default in an unseen data set, that is, a data set that has not been used to train the model and does not have labels.

# Looking for NULL or NaN

First create a temporary view so that we can run SQL queries.

In [4]:
df.createOrReplaceTempView("credit")

In [5]:
t = spark.sql("SELECT DISTINCT ACCOUNT_AGE, COUNT(ACCOUNT_AGE) AS age_counts FROM credit \
GROUP BY ACCOUNT_AGE \
ORDER BY age_counts")
t.show()

+-----------+----------+
|ACCOUNT_AGE|age_counts|
+-----------+----------+
| 1 to 4 YRS|      1093|
|        TBD|      1581|
| 4 to 7 YRS|    141090|
|above 7 YRS|    165109|
| up to 1 YR|    691127|
+-----------+----------+



In [6]:
t = spark.sql("SELECT DISTINCT ACCOUNT_TYPE, COUNT(ACCOUNT_TYPE) AS type_counts FROM credit \
GROUP BY ACCOUNT_TYPE \
ORDER BY type_counts")
t.show()

+-----------------+-----------+
|     ACCOUNT_TYPE|type_counts|
+-----------------+-----------+
|500 to 1000 K USD|       1472|
|  up to 100 K USD|       2578|
| 100 to 500 K USD|     141594|
|     UNKNOWN/NONE|     188476|
| above 1000 K USD|     665880|
+-----------------+-----------+



In [7]:
t = spark.sql("SELECT DISTINCT ACCOUNT_TYPE, IS_DEFAULT, COUNT(ACCOUNT_TYPE) FROM credit \
GROUP BY ACCOUNT_TYPE, IS_DEFAULT \
ORDER BY ACCOUNT_TYPE, IS_DEFAULT")
t.show()

+-----------------+----------+-------------------+
|     ACCOUNT_TYPE|IS_DEFAULT|count(ACCOUNT_TYPE)|
+-----------------+----------+-------------------+
| 100 to 500 K USD|        No|              94719|
| 100 to 500 K USD|       Yes|              46875|
|500 to 1000 K USD|        No|                667|
|500 to 1000 K USD|       Yes|                805|
|     UNKNOWN/NONE|        No|             127634|
|     UNKNOWN/NONE|       Yes|              60842|
| above 1000 K USD|        No|             475662|
| above 1000 K USD|       Yes|             190218|
|  up to 100 K USD|        No|               1318|
|  up to 100 K USD|       Yes|               1260|
+-----------------+----------+-------------------+



In [8]:
t = spark.sql("SELECT DISTINCT CREDIT_HISTORY, COUNT(CREDIT_HISTORY) FROM credit GROUP BY CREDIT_HISTORY")
t.show()

+--------------------+---------------------+
|      CREDIT_HISTORY|count(CREDIT_HISTORY)|
+--------------------+---------------------+
|          NONE TAKEN|                 7635|
|ALL CREDITS PAID ...|               456194|
|       DELAY IN PAST|                61922|
|EXISTING CREDITS ...|               473511|
|    CRITICAL ACCOUNT|                  738|
+--------------------+---------------------+



In [9]:
t = spark.sql("SELECT IS_DEFAULT, MEAN(EMI_TENURE), STD(EMI_TENURE) FROM credit GROUP BY IS_DEFAULT")
t.show()

+----------+-----------------+---------------------------------------+
|IS_DEFAULT|  avg(EMI_TENURE)|stddev_samp(CAST(EMI_TENURE AS DOUBLE))|
+----------+-----------------+---------------------------------------+
|        No|        116.77337|                     14.091804445397239|
|       Yes|93.11754666666667|                     18.168372237131955|
+----------+-----------------+---------------------------------------+



In [10]:
t = spark.sql("SELECT MEAN(EMI_TENURE), STD(EMI_TENURE) FROM credit")
t.show()

+---------------+---------------------------------------+
|avg(EMI_TENURE)|stddev_samp(CAST(EMI_TENURE AS DOUBLE))|
+---------------+---------------------------------------+
|     109.676623|                     18.855969714216904|
+---------------+---------------------------------------+



In [11]:
t = spark.sql("SELECT DISTINCT HAS_CO_APPLICANT, COUNT(HAS_CO_APPLICANT) FROM credit GROUP BY HAS_CO_APPLICANT")
t.show()

+----------------+-----------------------+
|HAS_CO_APPLICANT|count(HAS_CO_APPLICANT)|
+----------------+-----------------------+
|             YES|                 268613|
|              NO|                 731387|
+----------------+-----------------------+



In [12]:
t = spark.sql("SELECT DISTINCT HAS_GUARANTOR, COUNT(HAS_GUARANTOR) FROM credit GROUP BY HAS_GUARANTOR")
t.show()

+-------------+--------------------+
|HAS_GUARANTOR|count(HAS_GUARANTOR)|
+-------------+--------------------+
|          YES|              397798|
|           NO|              602202|
+-------------+--------------------+



In [13]:
t = spark.sql("SELECT DISTINCT IS_DEFAULT, COUNT(IS_DEFAULT) FROM credit GROUP BY IS_DEFAULT")
t.show()

+----------+-----------------+
|IS_DEFAULT|count(IS_DEFAULT)|
+----------+-----------------+
|        No|           700000|
|       Yes|           300000|
+----------+-----------------+



`No` has the higher frequency. Therefore, when using the StringIndexer, `No` will be given the index 0. `Yes` will be given the index 1.

In [14]:
t = spark.sql("SELECT DISTINCT IS_STATE_BORDER, COUNT(IS_STATE_BORDER) FROM credit GROUP BY IS_STATE_BORDER")
t.show()

+---------------+----------------------+
|IS_STATE_BORDER|count(IS_STATE_BORDER)|
+---------------+----------------------+
|            YES|                477983|
|             NO|                522017|
+---------------+----------------------+



In [15]:
t = spark.sql("SELECT DISTINCT IS_URBAN, COUNT(IS_URBAN) FROM credit GROUP BY IS_URBAN")
t.show()

+--------+---------------+
|IS_URBAN|count(IS_URBAN)|
+--------+---------------+
|     YES|         623097|
|      NO|         376903|
+--------+---------------+



In [16]:
t = spark.sql("SELECT DISTINCT NUMBER_CREDITS, COUNT(NUMBER_CREDITS) FROM credit GROUP BY NUMBER_CREDITS")
t.show()

+--------------+---------------------+
|NUMBER_CREDITS|count(NUMBER_CREDITS)|
+--------------+---------------------+
|             0|              1000000|
+--------------+---------------------+



No need to include `NUMBER_CREDITS` feature, it is only zeros.

In [17]:
t = spark.sql("SELECT DISTINCT OTHER_INSTALMENT_PLAN, COUNT(OTHER_INSTALMENT_PLAN) FROM credit GROUP BY OTHER_INSTALMENT_PLAN")
t.show()

+---------------------+----------------------------+
|OTHER_INSTALMENT_PLAN|count(OTHER_INSTALMENT_PLAN)|
+---------------------+----------------------------+
|                  YES|                      499274|
|                   NO|                      500726|
+---------------------+----------------------------+



In [18]:
t = spark.sql("SELECT DISTINCT OWN_CAR, COUNT(OWN_CAR) FROM credit GROUP BY OWN_CAR")
t.show()

+-------+--------------+
|OWN_CAR|count(OWN_CAR)|
+-------+--------------+
|    YES|        632143|
|     NO|        367857|
+-------+--------------+



In [19]:
t = spark.sql("SELECT DISTINCT OWN_REAL_ESTATE, COUNT(OWN_REAL_ESTATE) FROM credit GROUP BY OWN_REAL_ESTATE")
t.show()

+---------------+----------------------+
|OWN_REAL_ESTATE|count(OWN_REAL_ESTATE)|
+---------------+----------------------+
|            YES|                427453|
|             NO|                572547|
+---------------+----------------------+



In [21]:
t = spark.sql("SELECT DISTINCT OWN_RESIDENCE, COUNT(OWN_RESIDENCE) FROM credit GROUP BY OWN_RESIDENCE")
t.show()

+-------------+--------------------+
|OWN_RESIDENCE|count(OWN_RESIDENCE)|
+-------------+--------------------+
|          YES|              473235|
|           NO|              526765|
+-------------+--------------------+



In [22]:
t = spark.sql("SELECT DISTINCT RFM_SCORE, COUNT(RFM_SCORE) FROM credit GROUP BY RFM_SCORE ORDER BY RFM_SCORE")
t.show()

+---------+----------------+
|RFM_SCORE|count(RFM_SCORE)|
+---------+----------------+
|        1|           11460|
|        2|            3375|
|        3|          407275|
|        4|          577890|
+---------+----------------+



Only four different scores, but there is intrinsic ordering. So either use as-is. Might want to consider clubbing 1&2 and using as categorical.

In [23]:
t = spark.sql("SELECT DISTINCT SHIP_INTERNATIONAL, IS_DEFAULT, COUNT(SHIP_INTERNATIONAL) FROM credit \
GROUP BY SHIP_INTERNATIONAL, IS_DEFAULT \
ORDER BY SHIP_INTERNATIONAL, IS_DEFAULT")
t.show()

+------------------+----------+-------------------------+
|SHIP_INTERNATIONAL|IS_DEFAULT|count(SHIP_INTERNATIONAL)|
+------------------+----------+-------------------------+
|                NO|        No|                   333691|
|                NO|       Yes|                   142762|
|               YES|        No|                   366309|
|               YES|       Yes|                   157238|
+------------------+----------+-------------------------+



International or not probably has little to do with default directly. May have something to do with transaction amount?

In [24]:
t = spark.sql("SELECT DISTINCT SHIP_INTERNATIONAL, COUNT(SHIP_INTERNATIONAL), MEAN(TRANSACTION_AMOUNT), STD(TRANSACTION_AMOUNT) FROM credit \
GROUP BY SHIP_INTERNATIONAL \
ORDER BY SHIP_INTERNATIONAL")
t.show()

+------------------+-------------------------+-----------------------+-----------------------------------------------+
|SHIP_INTERNATIONAL|count(SHIP_INTERNATIONAL)|avg(TRANSACTION_AMOUNT)|stddev_samp(CAST(TRANSACTION_AMOUNT AS DOUBLE))|
+------------------+-------------------------+-----------------------+-----------------------------------------------+
|                NO|                   476453|     26789.872356769712|                              5102.042903503068|
|               YES|                   523547|     27153.019362158506|                             4098.4969321578565|
+------------------+-------------------------+-----------------------+-----------------------------------------------+



International shipping or not has little to do with transaction amount.

In [25]:
t = spark.sql("SELECT DISTINCT STATE, COUNT(STATE) as counts FROM credit GROUP BY STATE ORDER BY counts")
t.show()

+-----+------+
|STATE|counts|
+-----+------+
|   NJ|  4582|
|   NY| 13435|
|   PA|416961|
|   CT|565022|
+-----+------+



Interesting, only 4 states. NJ and NY have 'few' samples percentage-wise. Make as categorical. One would guess it shouldn't depend on the state. Also the 4 states border each other. May want to look at other features such as population/GDP just to see what it's like.

In [26]:
t = spark.sql("SELECT MEAN(TRANSACTION_AMOUNT), STD(TRANSACTION_AMOUNT) FROM credit GROUP BY IS_DEFAULT")
t.show()

+-----------------------+-----------------------------------------------+
|avg(TRANSACTION_AMOUNT)|stddev_samp(CAST(TRANSACTION_AMOUNT AS DOUBLE))|
+-----------------------+-----------------------------------------------+
|      28820.70300857143|                              3243.905091234831|
|            22685.01592|                              4456.413935434757|
+-----------------------+-----------------------------------------------+



In [27]:
t = spark.sql("SELECT TRANSACTION_CATEGORY, COUNT(TRANSACTION_CATEGORY) as counts FROM credit\
              GROUP BY TRANSACTION_CATEGORY \
              ORDER BY counts")
t.show()

+--------------------+------+
|TRANSACTION_CATEGORY|counts|
+--------------------+------+
|               OTHER|   686|
|             NEW CAR|   762|
|          RETRAINING| 27271|
|            USED CAR| 41880|
|           EDUCATION|186619|
|         ELECTRONICS|316553|
|           FURNITURE|426229|
+--------------------+------+



In [28]:
t = spark.sql("SELECT DISTINCT TRANSACTION_CATEGORY, IS_DEFAULT, COUNT(TRANSACTION_CATEGORY) FROM credit \
GROUP BY TRANSACTION_CATEGORY, IS_DEFAULT ORDER BY TRANSACTION_CATEGORY, IS_DEFAULT")
t.show()

+--------------------+----------+---------------------------+
|TRANSACTION_CATEGORY|IS_DEFAULT|count(TRANSACTION_CATEGORY)|
+--------------------+----------+---------------------------+
|           EDUCATION|        No|                      45615|
|           EDUCATION|       Yes|                     141004|
|         ELECTRONICS|        No|                     301451|
|         ELECTRONICS|       Yes|                      15102|
|           FURNITURE|        No|                     309836|
|           FURNITURE|       Yes|                     116393|
|             NEW CAR|        No|                        758|
|             NEW CAR|       Yes|                          4|
|               OTHER|        No|                          4|
|               OTHER|       Yes|                        682|
|          RETRAINING|        No|                        914|
|          RETRAINING|       Yes|                      26357|
|            USED CAR|        No|                      41422|
|       

Education, retraining and 'other' category more likely to default than not. Odds of defaulting:not defaulting for Electronics is 1:20, Furniture is 1:3, used car is 1:100. This is probably one of the first features we should model.

In [29]:
va1 = VectorAssembler(inputCols=['RFM_SCORE', 'TRANSACTION_AMOUNT', 'EMI_TENURE'], outputCol='rfm_amt_emi')
df2 = va1.transform(df)
ss = StandardScaler(withMean=True, inputCol='rfm_amt_emi', outputCol='rfm_amt_emi_scaled')
ssmodel = ss.fit(df2)
df2 = ssmodel.transform(df2)
print(ssmodel.mean, ssmodel.std)

(DenseVector([3.5516, 26979.9969, 109.6766]), DenseVector([0.5682, 4607.5721, 18.856]))


In [30]:
ss2 = StandardScaler(inputCol="rfm_amt_emi_scaled", outputCol="scaled_twice")
ssmodel2 = ss2.fit(df2)
print(ssmodel2.mean, ssmodel2.std)

(DenseVector([0.0, -0.0, 0.0]), DenseVector([1.0, 1.0, 1.0]))


In [31]:
indexer = StringIndexer(inputCol="IS_DEFAULT", outputCol="label")
ndf = indexer.fit(df2).transform(df2)
names = ["ACCOUNT_AGE", "ACCOUNT_TYPE", "CREDIT_HISTORY", 
         "HAS_CO_APPLICANT", "HAS_GUARANTOR", "IS_STATE_BORDER", 
         "IS_URBAN", "OTHER_INSTALMENT_PLAN","OWN_CAR", 
         "OWN_REAL_ESTATE", "OWN_RESIDENCE", "SHIP_INTERNATIONAL", 
         "STATE", "TRANSACTION_CATEGORY"]
feat_names = []
feat_base = []
for ind, name in enumerate(names):
    inpcol = name
    oupcol = name.lower() + "_ind"
    indexer = StringIndexer(inputCol = inpcol, outputCol = oupcol)
    model = indexer.fit(ndf)
    ndf = model.transform(ndf)
    this_labels = model.labels[:-1]
    [feat_names.append(inpcol + "(" + a_label + ")") for a_label in this_labels]
    feat_base.append(inpcol + "(" + model.labels[-1] + ")")
encoder = OneHotEncoder(inputCol="account_age_ind", outputCol="account_age_vec")
ndf = encoder.transform(ndf)
encoder = OneHotEncoder(inputCol="account_type_ind", outputCol="account_type_vec")
ndf = encoder.transform(ndf)
encoder = OneHotEncoder(inputCol="credit_history_ind", outputCol="credit_history_vec")
ndf = encoder.transform(ndf)
encoder = OneHotEncoder(inputCol="transaction_category_ind", outputCol="transaction_vec")
ndf = encoder.transform(ndf)
encoder = OneHotEncoder(inputCol="state_ind", outputCol="state_vec")
ndf = encoder.transform(ndf)

assembler = VectorAssembler(
    inputCols=["account_age_vec", "account_type_vec", "credit_history_vec",
               "has_co_applicant_ind", "has_guarantor_ind", "is_state_border_ind", 
               "is_urban_ind", "other_instalment_plan_ind", "own_car_ind", 
               "own_real_estate_ind", "own_residence_ind", "ship_international_ind",
               "state_vec", "transaction_vec",
               "rfm_amt_emi_scaled"],
    outputCol="features")

ndf = assembler.transform(ndf)
ndf.take(3)

[Row(ACCOUNT_AGE=u'up to 1 YR', ACCOUNT_TYPE=u'above 1000 K USD', CREDIT_HISTORY=u'EXISTING CREDITS PAID BACK', EMI_TENURE=105, HAS_CO_APPLICANT=u'NO', HAS_GUARANTOR=u'YES', IS_DEFAULT=u'No', IS_STATE_BORDER=u'YES', IS_URBAN=u'YES', NUMBER_CREDITS=0, OTHER_INSTALMENT_PLAN=u'NO', OWN_CAR=u'YES', OWN_REAL_ESTATE=u'YES', OWN_RESIDENCE=u'YES', RFM_SCORE=4, SHIP_INTERNATIONAL=u'YES', STATE=u'CT', TRANSACTION_AMOUNT=25788, TRANSACTION_CATEGORY=u'EDUCATION', rfm_amt_emi=DenseVector([4.0, 25788.0, 105.0]), rfm_amt_emi_scaled=DenseVector([0.7892, -0.2587, -0.248]), label=0.0, account_age_ind=0.0, account_type_ind=0.0, credit_history_ind=0.0, has_co_applicant_ind=0.0, has_guarantor_ind=1.0, is_state_border_ind=1.0, is_urban_ind=0.0, other_instalment_plan_ind=0.0, own_car_ind=0.0, own_real_estate_ind=1.0, own_residence_ind=1.0, ship_international_ind=0.0, state_ind=0.0, transaction_category_ind=2.0, account_age_vec=SparseVector(4, {0: 1.0}), account_type_vec=SparseVector(4, {0: 1.0}), credit_hist

In [32]:
feat_names_full = feat_names + ["scaled rfm", "scaled transaction amount", "scaled emi tenure"]
feat_base_full = feat_base + ["rfm = " + str(round(ssmodel.mean[0], 2)), 
                              "transaction amount = " + str(round(ssmodel.mean[1], 2)),
                              "emi tenure = " + str(round(ssmodel.mean[2], 2))]
print(feat_base_full)

[u'ACCOUNT_AGE(1 to 4 YRS)', u'ACCOUNT_TYPE(500 to 1000 K USD)', u'CREDIT_HISTORY(CRITICAL ACCOUNT)', u'HAS_CO_APPLICANT(YES)', u'HAS_GUARANTOR(YES)', u'IS_STATE_BORDER(YES)', u'IS_URBAN(NO)', u'OTHER_INSTALMENT_PLAN(YES)', u'OWN_CAR(NO)', u'OWN_REAL_ESTATE(YES)', u'OWN_RESIDENCE(YES)', u'SHIP_INTERNATIONAL(NO)', u'STATE(NJ)', u'TRANSACTION_CATEGORY(OTHER)', 'rfm = 3.55', 'transaction amount = 26980.0', 'emi tenure = 109.68']


In [33]:
np.savez("feat_names.npz", feat_base_full=feat_base_full, feat_names_full=feat_names_full,
        mean=ssmodel.mean.toArray(), std=ssmodel.std.toArray())

In [34]:
ndf.select("label", "features").write.mode('overwrite').save("credit_feats.parquet")