In [1]:
from pyspark.rdd import RDD
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
import json
import copy as cp
import os

from pyspark.sql import Row
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, OneHotEncoderEstimator, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
import pyspark.sql.types as sparkTypes


data_folder = '../data'
filename = 'bureau.csv'

def dump(rdd, fn="out"):
    if isinstance(rdd, DataFrame):
        rdd = rdd.rdd
    with open(fn, 'w+') as f:
        text = rdd.map(lambda row: str(row) + "\n").reduce(lambda row1,row2: row1+ row2)
        f.write(text)
    return True

def init_spark():
    spark = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    return spark

def get_project_root_dir() -> str:
    # because the root of the project contains the .git/ repo
    while not os.path.isdir('.git/'):
        if os.getcwd() == '/':
            print('\nYou are trying to get the root folder of the big data project')
            print('but you are running this script outside of the project.')
            print('Navigate to your big data directory and try again')
            exit(1)
        else:
            os.chdir('..')

    return os.getcwd()+'/'

root_dir = get_project_root_dir()
bureau_csv = 'bureau.csv'
id_column = 'SK_ID_CURR'
spark = init_spark()



In [37]:
bureau_df = spark.read.csv(f'{root_dir}data/{bureau_csv}',header=True)
bureau_df.columns

['SK_ID_CURR',
 'SK_ID_BUREAU',
 'CREDIT_ACTIVE',
 'CREDIT_CURRENCY',
 'DAYS_CREDIT',
 'CREDIT_DAY_OVERDUE',
 'DAYS_CREDIT_ENDDATE',
 'DAYS_ENDDATE_FACT',
 'AMT_CREDIT_MAX_OVERDUE',
 'CNT_CREDIT_PROLONG',
 'AMT_CREDIT_SUM',
 'AMT_CREDIT_SUM_DEBT',
 'AMT_CREDIT_SUM_LIMIT',
 'AMT_CREDIT_SUM_OVERDUE',
 'CREDIT_TYPE',
 'DAYS_CREDIT_UPDATE',
 'AMT_ANNUITY']

In [38]:
total_loans = bureau_df.count()

previous_loan_count_df = (bureau_df
                          .groupBy(id_column)
                          .count().na.fill(0)
                          .withColumnRenamed('count','previous_loans_count')
                         )

ontime_loans_total = (bureau_df
                      .filter(bureau_df['CREDIT_DAY_OVERDUE']<=0)
                      .count()
                     )

late_loans_total = (bureau_df
                    .filter(bureau_df['CREDIT_DAY_OVERDUE']>0)
                    .count()
                   )

previous_loan_count_df.show()

print(f'ontime total {ontime_loans_total}')
print(f'late total {late_loans_total}')
print(f'total {total_loans}')

+----------+--------------------+
|SK_ID_CURR|previous_loans_count|
+----------+--------------------+
|    370925|                   1|
|    233505|                   4|
|    146433|                   6|
|    297765|                   3|
|    106090|                   1|
|    291034|                   4|
|    362212|                   4|
|    424856|                   8|
|    142039|                   1|
|    416451|                   3|
|    352959|                  18|
|    234035|                   6|
|    100320|                   6|
|    153814|                   2|
|    146827|                   4|
|    129041|                  13|
|    431323|                   2|
|    402445|                   6|
|    339710|                   2|
|    321579|                  10|
+----------+--------------------+
only showing top 20 rows

ontime total 1712211
late total 4217
total 1716428


In [39]:
x = ontime_loans_total + late_loans_total
print(f'x: {x} should be == total_loans: {total_loans}')
print(x == total_loans)

percent_late = round(late_loans_total/ontime_loans_total *100,4)
print(f'{percent_late}% of previous loans are late')

x: 1716428 should be == total_loans: 1716428
True
0.2463% of previous loans are late


In [40]:
late_df = (bureau_df
           .where(bureau_df['CREDIT_DAY_OVERDUE']>0)
           .groupBy(id_column)
           .count()
           .withColumnRenamed('count','late_loans')
          )
           
# late_df.cache()
late_df.show()
late_count = late_df.count()
print(f'late:{late_count}')

ontime_df = (bureau_df
             .where(bureau_df['CREDIT_DAY_OVERDUE']<=0)
             .groupBy(id_column)
             .count()
             .withColumnRenamed('count','ontime_loans')
            )
# ontime_df.cache()
ontime_df.show()
ontime_count = ontime_df.count()
print(f'ontime:{ontime_count}')

+----------+----------+
|SK_ID_CURR|late_loans|
+----------+----------+
|    368629|         1|
|    387665|         1|
|    252858|         1|
|    408270|         1|
|    339803|         1|
|    120848|         2|
|    121007|         1|
|    335250|         1|
|    199579|         1|
|    383926|         1|
|    310629|         1|
|    261005|         1|
|    325091|         1|
|    292303|         1|
|    357324|         1|
|    444119|         1|
|    388098|         1|
|    222071|         1|
|    356406|         1|
|    374686|         1|
+----------+----------+
only showing top 20 rows

late:3864
+----------+------------+
|SK_ID_CURR|ontime_loans|
+----------+------------+
|    370925|           1|
|    233505|           4|
|    146433|           6|
|    297765|           3|
|    106090|           1|
|    291034|           4|
|    362212|           4|
|    424856|           8|
|    142039|           1|
|    416451|           3|
|    352959|          18|
|    234035|           6

In [41]:

joined = previous_loan_count_df.join(late_df, on=[id_column],how='outer').na.fill(0)
joined = joined.join(ontime_df,on=[id_column],how='outer').na.fill(0)

joined.filter(joined['late_loans']!=0).show(1000)


+----------+--------------------+----------+------------+
|SK_ID_CURR|previous_loans_count|late_loans|ontime_loans|
+----------+--------------------+----------+------------+
|    120848|                   9|         2|           7|
|    121007|                   1|         1|           0|
|    146392|                   7|         1|           6|
|    199579|                   4|         1|           3|
|    222071|                  10|         1|           9|
|    252858|                  13|         1|          12|
|    261005|                   2|         1|           1|
|    292303|                   4|         1|           3|
|    310629|                  10|         1|           9|
|    325091|                   8|         1|           7|
|    335250|                   9|         1|           8|
|    339803|                   5|         1|           4|
|    356406|                   7|         1|           6|
|    357324|                   3|         1|           2|
|    368629|  

In theory, for each id (each record), the values in columns late_loans and ontime_loans should add up to the value in the previous_loans_count column 

In [47]:
print(joined.columns)
def validator(row):
    '''validate the assertion'''
    
    late = row['late_loans']
    ontime = row['ontime_loans']
    total = row['previous_loans_count']
    
    assert(late +ontime == total ), 'some id is corrupt'
    
    return Row(SK_ID_CURR=row[id_column],ontime=ontime,late=late,total=total,validated=True)

resulting_df = joined.rdd.map(validator).toDF().
resulting_df.show(1000)


['SK_ID_CURR', 'previous_loans_count', 'late_loans', 'ontime_loans']
+----------+----+------+-----+---------+
|SK_ID_CURR|late|ontime|total|validated|
+----------+----+------+-----+---------+
|    100010|   0|     2|    2|     true|
|    100227|   0|     4|    4|     true|
|    100263|   0|     4|    4|     true|
|    100320|   0|     6|    6|     true|
|    100553|   0|     1|    1|     true|
|    100704|   0|     5|    5|     true|
|    100735|   0|     5|    5|     true|
|    100768|   0|     2|    2|     true|
|    100964|   0|     1|    1|     true|
|    101021|   0|     3|    3|     true|
|    101122|   0|     2|    2|     true|
|    101205|   0|     1|    1|     true|
|    101261|   0|     1|    1|     true|
|    101272|   0|     4|    4|     true|
|    102113|   0|     3|    3|     true|
|    102521|   0|     1|    1|     true|
|    102536|   0|     7|    7|     true|
|    102684|   0|     5|    5|     true|
|    102745|   0|     5|    5|     true|
|    102944|   0|     5|    5

In [3]:
'''Now we could start using this information as a feature to be incorporated during model training'''

'Now we could start using this information as a feature to be incorporated during model training'