### `Dependencies`

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql import types as T
from pyspark.sql.types import *

import pandas as pd
import sys
import os

In [2]:
spark = SparkSession.builder.appName('abc')\
.config('spark.driver.memory','10g')\
.config('spark.sql.legacy.timeParserPolicy','LEGACY')\
.config('spark.sql.codegen.wholeStage', 'false')\
.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/08 07:33:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
pd.set_option('display.max_colwidth', None)

In [None]:
data_dir = "./data/"

In [3]:
raw_data_path = '/home/data/manu/sample/cibil/'
prep_data_path = '/home/jupyter-monica.marmit/feature_enginerring/cibil/output/'

In [4]:
run_date = '2024-05-01'

In [None]:
# try:
#     os.mkdir(data_dir + "bureau")
#     os.mkdir(data_dir + "bureau/cir")
#     os.mkdir(data_dir + "bureau/cir/raw")
#     os.mkdir(data_dir + "bureau/cir/raw/tradeline")
#     os.mkdir(data_dir + "bureau/cir/raw/history")
#     os.mkdir(data_dir + "bureau/cir/raw/inquiry")
#     os.mkdir(data_dir + "bureau/scrub")
#     os.mkdir(data_dir + "bureau/scrub/raw")
#     os.mkdir(data_dir + "bureau/scrub/raw/tradeline")
#     os.mkdir(data_dir + "bureau/scrub/raw/history")
#     os.mkdir(data_dir + "bureau/scrub/raw/inquiry")
#     print("directory build sucessfull!!")
# except :
#     print("some of the directory already exits.\nplease clear all data in base directory and retry.")

### `Inquiry Data Preparation`

#### `Inquiry Steps`
    1. Read Inquiry Raw Files
    2. Format Date and Integer Columns. Dates before 1950-01-01 are replaced with Null
    3. Clean Account Type 
    4. Inquiry Data Filtering and Preprocessing
    5. Inquiry Data Save
    
#### `Inquiry Data Filters`
    1. Drop all duplicate records of Inquiry in a Day of Same Product Type keeping the one with Maximum Sanction Amount
    2. Retro Date and Inq Date cannot be null
    3. Retro Date > Inq Date
    4. Inquiries of Last 3 Years to be considered

In [19]:
sch_cd_iq=StructType([StructField('EnquiryControlNumber',IntegerType(),True),StructField('MemberReference',StringType(),True),StructField('EnquiringMemberShortName',StringType(),True),StructField('DateofEnquiry',StringType(),True),StructField('EnquiryPurpose',StringType(),True),StructField('EnquiryAmount',IntegerType(),True)])


In [20]:
### Step 1 - Read Inquiry File

inq_data0 = spark.read.csv(
    raw_data_path + 'iq.csv',schema=sch_cd_iq,header=True
)


In [16]:
inq_data0.show()

+--------------------+---------------+------------------------+-------------+--------------------+-------------+
|EnquiryControlNumber|MemberReference|EnquiringMemberShortName|DateofEnquiry|      EnquiryPurpose|EnquiryAmount|
+--------------------+---------------+------------------------+-------------+--------------------+-------------+
|            99401699|       79238541|    NOT DISCLOSED    ...|    25-Jun-24|Credit Card      ...|       250000|
|            99401699|       79238541|    NOT DISCLOSED    ...|    28-Dec-23|Credit Card      ...|         1400|
|            99401699|       79238541|    NOT DISCLOSED    ...|    29-May-23|Credit Card      ...|         1400|
|            99401699|       79238541|    NOT DISCLOSED    ...|     8-May-23|Credit Card      ...|         1400|
|            99401699|       79238541|    NOT DISCLOSED    ...|    14-Mar-23|Auto Loan (Person...|         3420|
|            99401699|       79238541|    NOT DISCLOSED    ...|    14-Mar-23|Auto Loan (Person..

25/05/05 11:46:55 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: EnquiryControlNumber, MemberReference, EnquiringMemberShortName, DateofEnquiry, EnquiryPurpose                               , EnquiryAmount
 Schema: EnquiryControlNumber, MemberReference, EnquiringMemberShortName, DateofEnquiry, EnquiryPurpose, EnquiryAmount
Expected: EnquiryPurpose but found: EnquiryPurpose                               
CSV file: file:///home/data/manu/sample/cibil/iq.csv


In [21]:
iq_date_ipad = ["*"]\
+[F.lpad(F.col("DateofEnquiry"),9,'0').alias("DateofEnquiry_pad")]

In [22]:
iq_cast_expr0 = ["*"]\
+[F.to_date(F.col("DateofEnquiry_pad"),'dd-MMM-yy').alias('ENQUIRYDATE')]

In [23]:
w = Window.partitionBy("MemberReference","ENQUIRYDATE","EnquiryPurpose").orderBy(F.desc("EnquiryAmount"))

In [26]:
inq_data1 = inq_data0\
.dropDuplicates()\
.withColumn("run_dt",F.to_date(F.lit(run_date)))\
.select(*iq_date_ipad)\
.select(*iq_cast_expr0)\
.withColumn("EnquiryPurpose", F.trim(F.col("EnquiryPurpose")))\
.filter("ENQUIRYDATE is not null")\
.filter("EnquiryPurpose is not null")\
.withColumn("ReportToEnquiry",F.months_between(F.trunc('run_dt','month'),F.trunc('ENQUIRYDATE','month')))\
.withColumn("row_num",F.row_number().over(w))\
.filter("row_num = 1")\
.drop("row_num")

In [27]:

inq_data1.write.mode('overwrite').parquet(prep_data_path + 'prep_inq_data.parquet')

25/05/08 07:43:07 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: EnquiryControlNumber, MemberReference, EnquiringMemberShortName, DateofEnquiry, EnquiryPurpose                               , EnquiryAmount
 Schema: EnquiryControlNumber, MemberReference, EnquiringMemberShortName, DateofEnquiry, EnquiryPurpose, EnquiryAmount
Expected: EnquiryPurpose but found: EnquiryPurpose                               
CSV file: file:///home/data/manu/sample/cibil/iq.csv


### `Tradeline Data Preparation`

#### `Tradeline Steps`
    
    1. Read Tradeline Files
    2. Clean Account Type
    3. Create and Rename required columns
    4. Format and Clean Numerical Columns
    5. Format and Clean Date Columns. Replace Dates before 1950-01-01 with Null
    6. Create p_hist columns from Retro Month -1 Month till Retro Month - 36 Months from History DPD File
    7. Create DPD Buckets from Retro Date - 1 Month till Retro Date - 36 Months
    8. Data Preprocessing and Filtering
    9. Save Final Tradeline File
    
#### `Tradeline Column Format`
    
    1. Balance, Sanction Amount, High Credit and Past Due Amount < 0 are replace with 0
    2. Credit Limit < 0 are replaced with Null
    3. Dates <= 1950-01-01 are replaced with Null 
    4. DPD Buckets after payment history start date or before payment history end date are replaced with -2
    5. In case of Payment History Start Date > Retro Date then replace Payment History Start Date with Retro Date - 1 Month
        and cut DPD string accordingly
    6. If Payment History Start Date > Retro Date then Balance & Past Due Amount are replaced with Null
    7. If Last Payment Date > Retro Date then replace Last Payment Date is replaced with Null
    8. If Date Closed > Retro Date then replace Date Closed with Null
    
#### `Tradeline Data Filters`
    
    1. Accounts for a customer are deduped at ID , Account Type , Open Date , Sanction Amount , High Credit & Credit Limit and
        Account with Latest Reporting Date/ Highest Balance/ Latest Date Closed
    2. Retro Date > Date Opened
    3. Retro Date > Payment End Date
    4. Retro Date , Date Opened & Payment History Start Date cannot be Null
    5. Date Closed > Date Opened or Date Closed = Null
    6. Hanging Trades are removed. Tradelines which have Retro Date - Date Reported > 36 Months are removed

In [28]:
sch_tl=StructType([StructField('EnquiryControlNumber',IntegerType(),True),StructField('MemberReference',StringType(),True),StructField('Loan_account_Number',StringType(),True),StructField('Loan_Status',StringType(),True),StructField('Loan_Type',StringType(),True),StructField('Loan_Classification',StringType(),True),StructField('Sanction_Amount',IntegerType(),True),StructField('High_Credit_Amount',IntegerType(),True),StructField('Out_standing_Balance',IntegerType(),True),StructField('EMI',IntegerType(),True),StructField('CREDITLIMIT',IntegerType(),True),StructField('CASH_LIMIT',IntegerType(),True),StructField('actual_paymt_amt',IntegerType(),True),StructField('Interest_Rates',DoubleType(),True),StructField('tenor',IntegerType(),True),StructField('Income',IntegerType(),True),StructField('monthly_annual_indicator',StringType(),True),StructField('net_gross_indicator',StringType(),True),StructField('occupation_code',IntegerType(),True),StructField('COLLATERALType',IntegerType(),True),StructField('COLLATERALVALUE',IntegerType(),True),StructField('DATE_OPENED',StringType(),True),StructField('DATE_CLOSED',StringType(),True),StructField('DateReported_trades',IntegerType(),True),StructField('Last_payment_date',IntegerType(),True),StructField('Pay_Hist_Start_Date',StringType(),True),StructField('Pay_Hist_END_Date',StringType(),True),StructField('Ownership_Type',StringType(),True),StructField('Over_due_amount',IntegerType(),True),StructField('Tenor_Frequency',IntegerType(),True),StructField('suit_filed_status',IntegerType(),True),StructField('Credit_Facility_Status',IntegerType(),True),StructField('writeoff_amt_tot',IntegerType(),True),StructField('writeoff_amt_prin',IntegerType(),True),StructField('settlement_amt',IntegerType(),True),StructField('Dpd_string',StringType(),True),StructField('PL_Propensity_Score',StringType(),True),StructField('NTC_Score',StringType(),True),StructField('TU_Score',StringType(),True),StructField('ExclusionCode1',StringType(),True),StructField('ExclusionCode2',StringType(),True),StructField('ExclusionCode3',StringType(),True),StructField('ExclusionCode4',StringType(),True),StructField('ExclusionCode5',StringType(),True),StructField('ExclusionCode6',StringType(),True),StructField('ExclusionCode7',StringType(),True),StructField('ReasonCode1',IntegerType(),True),StructField('ReasonCode2',IntegerType(),True),StructField('ReasonCode3',IntegerType(),True),StructField('ReasonCode4',IntegerType(),True),StructField('ReasonCode5',IntegerType(),True),StructField('ErrorCode',StringType(),True),StructField('Sector',StringType(),True)])



In [29]:
tl_data0 = spark.read.csv(
    raw_data_path + 'tl.csv',schema=sch_tl,header=True
)


In [30]:
new_column_names = [col.strip() for col in tl_data0.columns]
tl_data1 = tl_data0.toDF(*new_column_names)

In [9]:
columnList = [col for col in tl_data0.columns if 'date' in col.lower()]
print(columnList)

['DATE_OPENED', 'DATE_CLOSED', 'DateReported_trades', 'Last_payment_date', 'Pay_Hist_Start_Date', 'Pay_Hist_END_Date']


In [10]:
tl_data0.select(*[F.col(col) for col in columnList]).show(1)

                                                                                

+-----------+-----------+-------------------+-----------------+-------------------+-----------------+
|DATE_OPENED|DATE_CLOSED|DateReported_trades|Last_payment_date|Pay_Hist_Start_Date|Pay_Hist_END_Date|
+-----------+-----------+-------------------+-----------------+-------------------+-----------------+
|  14-Nov-11|  26-Nov-11|           30112011|             NULL|           1-Nov-11|         1-Nov-11|
+-----------+-----------+-------------------+-----------------+-------------------+-----------------+
only showing top 1 row



In [33]:
vs_tl_date_ipad=["*"]\
+[F.lpad(F.col("DATE_OPENED"),9,'0').alias("DATE_OPENED_pad")]\
+[F.lpad(F.col("DATE_CLOSED"),9,'0').alias("DATE_CLOSED_pad")]\
+[F.lpad(F.col("Pay_Hist_Start_Date"),9,'0').alias("Pay_Hist_Start_Date_pad")]\
+[F.lpad(F.col("Last_payment_date"),9,'0').alias("Last_payment_date_pad")]
 
vs_tl_cast_expr0=["*"]\
+[F.col("datereported_trades").cast(StringType()).alias("datereported_trades_c")]\
+[F.to_date(F.col("DATE_OPENED_pad"), "dd-MMM-yy").alias("openDate")]\
+[F.to_date(F.col("DATE_CLOSED_pad"), "dd-MMM-yy").alias("DATECLOSE")]\
+[F.to_date(F.col("Pay_Hist_Start_Date_pad"), "dd-MMM-yy").alias("PaymentHistoryStartDate")]\
+[F.to_date(F.col("Last_payment_date_pad"), "dd-MMM-yy").alias("Last_payment_date")]
 
vs_tl_cast_expr1=["*"]\
+[F.to_date(F.col("datereported_trades1"), "ddMMyyyy").alias("dateReported")]
 

In [46]:
vs_tl_expr2=["*"]\
+["substring(dpd_string,"+str(i+2*(i-1))+",3) as PAYMT_DPD_BKT_"+str(i) for i in range(1,37)]\
+["case when Loan_Type in ('Auto Loan (Personal)') then 'Auto_Loan' \
when Loan_Type in ('Microfinance - Personal Loan',\
    'Microfinance Personal Loan',\
    'P2P Personal Loan',\
    'Personal Loan') then 'Personal_Loan' \
when Loan_Type in ('Gold Loan') then 'Gold_Loan' \
when Loan_Type in ('Corporate Credit Card','Credit Card','Secured Credit Card','Fleet Card') then 'Credit_Card' \
when Loan_Type in ('Business Loan',\
    'Business Loan - General',\
    'Business Loan - Priority Sector - Agriculture',\
    'Business Loan - Priority Sector - Others',\
    'Business Loan - Priority Sector - Small Business',\
    'Business Loan - Unsecured',\
    'Business Loan - Secured',\
    'Business Loan Against Bank Deposits',\
    'Business Non-Funded Credit Facility - General',\
    'Business Non-Funded Credit Facility - Priority Sector - Agriculture',\
    'Business Non-Funded Credit Facility - Priority Sector - Small Business',\
    'Microfinance - Business Loan') then 'Business_Loan' \
when Loan_Type in ('Home Loan',\
    'Housing Loan',\
    'Microfinance Housing Loan') then 'Housing_Loan' \
when Loan_Type in ('Consumer Loan') then 'Consumer_Loan' \
when Loan_Type in ('Property Loan','LAP') then 'LAP' \
when Loan_Type in ('Two-wheeler Loan','TW') then 'Two-wheeler_Loan' \
when Loan_Type in ('Commercial Vehicle Loan','CV Loan') then 'Commercial_Vehicle' \
when Loan_Type in ('Used Car Loan', 'UCL') then 'Used_Car_Loan' \
when Loan_Type is Null then Null \
else 'OTHERS' end as PRODUCT_TYPE"]\
+["case when Loan_Type in ('Auto Loan (Personal)','Business Loan Against Bank Deposits','Commercial Vehicle Loan','CV LOAN','Construction Equipment Loan','Gold Loan','HOME LOAN','Housing Loan','Leasing','Loan Against Bank Deposits','Loan Against Shares/Securities','Microfinance - Housing Loan','Property Loan','LAP','Secured Credit Card','Tractor Loan','TW','Two-wheeler Loan','UCL','Used Car Loan','Business Loan - Secured') or Loan_Type is Null then 'Secured' else 'Unsecured' end as FLAG_SECURED_UNSEC"]
 

In [43]:
vs_tl_expr3 = ["*"]\
+["case when settlement_amt > 0 or writeoff_amt_tot > 0 then 1 else 0 end as WRITTEN_OFF_STATUS"]\
+["case when PAYMT_DPD_BKT_"+str(i)+" in ('SUB','LSS','DBT') then 91 \
when PAYMT_DPD_BKT_"+str(i)+" = 'SMA' or (PAYMT_DPD_BKT_"+str(i)+" = 'STD' and UPPER(PRODUCT_DESC) != 'CC' and OVER_DUE_AMOUNT>5000) or (PAYMT_DPD_BKT_"+str(i)+" = 'STD' and UPPER(PRODUCT_DESC) = 'CC' and OVER_DUE_AMOUNT > 10000) then 89 \
when PAYMT_DPD_BKT_"+str(i)+" in ('XXX','DDD','X/X') then -1  when PAYMT_DPD_BKT_"+str(i)+" = 'STD' then 0 \
when PAYMT_DPD_BKT_"+str(i)+" = '901' and OVER_DUE_AMOUNT < 1000 then 0 \
when PAYMT_DPD_BKT_"+str(i)+" = '901' and OVER_DUE_AMOUNT >= 1000 and OVER_DUE_AMOUNT <= 2000 then 29 \
when PAYMT_DPD_BKT_"+str(i)+" = '901' and OVER_DUE_AMOUNT > 2000 and OVER_DUE_AMOUNT <= 5000 then 30 \
when PAYMT_DPD_BKT_"+str(i)+" = '901' and OVER_DUE_AMOUNT > 5000 then 90 \
when PAYMT_DPD_BKT_"+str(i)+" in ('902','903','904') then 91 \
when PAYMT_DPD_BKT_"+str(i)+" = 'DDD' then -11 \
when PAYMT_DPD_BKT_"+str(i)+" = '905' then 61 else cast(PAYMT_DPD_BKT_"+str(i)+" as int) end as PAYMT_DPD_"+str(i) for i in range(1,37)]
 
vs1_pp_tl_expr = ["*"]\
+["case when month_diff = 0 then date_add(trunc(add_months(run_dt, 1),'month'),-1) \
when month_diff = 1 then date_add(trunc(add_months(run_dt, 0),'month'),-1) \
when month_diff > 1 then date_add(trunc(add_months(run_dt, -1),'month'),-1) end as ref_date"]\
+["case when PaymentHistoryStartDate is not null then PaymentHistoryStartDate \
when PaymentHistoryStartDate is null and dateReported is not null then dateReported \
when PaymentHistoryStartDate is null and dateReported is null then create_date_n end as paymentend_date"]\
+["case when DATECLOSE < PaymentHistoryStartDate and DATECLOSE is not null then PaymentHistoryStartDate else DATECLOSE end as DATECLOSE_new"]
 
 

In [36]:
vs2_tl_expr_payhits_new = ["*"]\
+["case when Last_payment_date ='' then DATECLOSE when Last_payment_date ='' and DATECLOSE = '' then dateReported else Last_payment_date end as Last_payment_date_new"]\
+["case when PAYHITS_"+str(i)+" in ('SUB','LSS','DBT') then '091' \
when PAYHITS_"+str(i)+" = 'SMA' or (PAYHITS_"+str(i)+" = 'STD' and UPPER(PRODUCT_DESC) != 'CC' and PastDueAmount_new>5000) or (PAYHITS_"+str(i)+" = 'STD' and UPPER(PRODUCT_DESC) = 'CC' and PastDueAmount_new > 10000) then '089' \
when PAYHITS_"+str(i)+" in ('XXX','DDD') then '-11'  \
when PAYHITS_"+str(i)+" = 'STD' then '000' \
when PAYHITS_"+str(i)+" = '901' and PastDueAmount_new < 1000 then '000' \
when PAYHITS_"+str(i)+" = '901' and PastDueAmount_new >= 1000 and PastDueAmount_new <= 2000 then '029' \
when PAYHITS_"+str(i)+" = '901' and PastDueAmount_new > 2000 and PastDueAmount_new <= 5000 then '030' \
when PAYHITS_"+str(i)+" = '901' and PastDueAmount_new > 5000 then '090' \
when PAYHITS_"+str(i)+" in ('902','903','904') then '091' \
when PAYHITS_"+str(i)+" = '905' then '061' else PAYHITS_"+str(i)+" end as PAYHITS_NEW_"+str(i) for i in range(1,37)]\
+["case when DATECLOSE is not null and DATECLOSE > openDate then months_between(trunc(DATECLOSE,'month'), trunc(openDate,'month')) \
else months_between(trunc(dateReported,'month'), trunc(openDate,'month')) end as MOB"]
 
 

In [37]:
vs3_tl_expr_paymenthistory2 = ["*"]\
+["concat(payhits_new_1,payhits_new_2,payhits_new_3,payhits_new_4,payhits_new_5,payhits_new_6,payhits_new_7, \
payhits_new_8,payhits_new_9,payhits_new_10,payhits_new_11,payhits_new_12,payhits_new_13,payhits_new_14, \
payhits_new_15,payhits_new_16,payhits_new_17,payhits_new_18,payhits_new_19,payhits_new_20,payhits_new_21, \
payhits_new_22,payhits_new_23,payhits_new_24,payhits_new_25,payhits_new_26,payhits_new_27, \
payhits_new_28,payhits_new_29,payhits_new_30,payhits_new_31,payhits_new_32,payhits_new_33, \
payhits_new_34,payhits_new_35,payhits_new_36) as paymenthistory2"]
 
vs4_tl_expr_payhits_new2 = ["*"]\
+["case when PAYHITS_"+str(i)+" in ('SUB','LSS','DBT') then 91 \
when PAYHITS_"+str(i)+" = 'SMA' or (PAYHITS_"+str(i)+" = 'STD' and UPPER(PRODUCT_DESC) != 'CC' and PastDueAmount_new>5000) or (PAYHITS_"+str(i)+" = 'STD' and UPPER(PRODUCT_DESC) = 'CC' and PastDueAmount_new > 10000) then 89 \
when PAYHITS_"+str(i)+" in ('XXX','DDD') then -11  \
when PAYHITS_"+str(i)+" = 'STD' then 0 \
when PAYHITS_"+str(i)+" = '901' and PastDueAmount_new < 1000 then 0 \
when PAYHITS_"+str(i)+" = '901' and PastDueAmount_new >= 1000 and PastDueAmount_new <= 2000 then 29 \
when PAYHITS_"+str(i)+" = '901' and PastDueAmount_new > 2000 and PastDueAmount_new <= 5000 then 30 \
when PAYHITS_"+str(i)+" = '901' and PastDueAmount_new > 5000 then 90 \
when PAYHITS_"+str(i)+" in ('902','903','904') then 91 \
when PAYHITS_"+str(i)+" = '905' then 61 else cast(PAYHITS_"+str(i)+" as int) end as PAYHITS_NEW2_"+str(i) for i in range(1,37)]
 
 

In [54]:
vs1_pp_tl_expr = ["*"]\
+["case when month_diff = 0 then date_add(trunc(add_months(run_dt, 1),'month'),-1) \
when month_diff = 1 then date_add(trunc(add_months(run_dt, 0),'month'),-1) \
when month_diff > 1 then date_add(trunc(add_months(run_dt, -1),'month'),-1) end as ref_date"]\
+["case when PaymentHistoryStartDate is not null then PaymentHistoryStartDate \
when PaymentHistoryStartDate is null and dateReported is not null then dateReported \
when PaymentHistoryStartDate is null and dateReported is null then create_date_n end as paymentend_date"]\
+["case when DATECLOSE < PaymentHistoryStartDate and DATECLOSE is not null then PaymentHistoryStartDate else DATECLOSE end as DATECLOSE_new"]
 

dpd_exp1 = ["*"]\
+["case when Last_payment_date ='' then DATECLOSE when Last_payment_date ='' and DATECLOSE = '' then dateReported else Last_payment_date end as Last_payment_date_new"]\
+[
    f"""
    case
    when (PAYMT_DPD_BKT_{i} >= 360 and PAYMT_DPD_BKT_{i} <= 900) then 7
    when (PAYMT_DPD_BKT_{i} >= 180 and PAYMT_DPD_BKT_{i} < 360) or (PAYMT_DPD_BKT_{i} = 904) or (PAYMT_DPD_BKT_{i} = 903) then 6 
    when (PAYMT_DPD_BKT_{i} >= 150 and PAYMT_DPD_BKT_{i} < 180) then 5 
    when (PAYMT_DPD_BKT_{i} >= 90 and PAYMT_DPD_BKT_{i} < 150) or (PAYMT_DPD_BKT_{i} = 902) then 4 
    when (PAYMT_DPD_BKT_{i} >= 60 and PAYMT_DPD_BKT_{i} < 90)  or (PAYMT_DPD_BKT_{i} = 905) then 3 
    when (PAYMT_DPD_BKT_{i} >= 30 and PAYMT_DPD_BKT_{i} < 60) then 2 
    when (PAYMT_DPD_BKT_{i} >= 1 and PAYMT_DPD_BKT_{i} < 30) then 1 
    when (PAYMT_DPD_BKT_{i} = 0) or (PAYMT_DPD_BKT_{i} = 901) then 0 
    else 8 end as p_hist_bucket_{i}
    """ 
    for i in range(1 , 37)
]

dpd_exp2=["*"]\
+["concat_ws('', " + ", ".join(['p_hist_bucket_'+str(i) for i in range(1,37)]) + ") as phist_buckets"]

In [50]:
tl_dpd_exp1 = ["*"]\
+["cast(substr(phist_buckets , {} , 1) as int) as dpd_bucket_int_{}".format(i , i) for i in range(1 , 37)]

tl_dpd_exp2 = ["*"]\
+[
    """
    case 
    when (dpd_bucket_int_{i} >= 0) and (dpd_bucket_int_{i} <= 7) then dpd_bucket_int_{i}
    when dpd_bucket_int_{i} = 8 and ({i} < mn_diff_retro_start or {i} > mn_diff_retro_end) then -2
    else -1 
    end as dpd_bucket_{i}
    """.format(i = i)
    for i in range(1 , 37)
]\
+[
    f"""
    case when (p_hist_bucket_{i} = 8 and {i} > mn_diff_start_end + 1) then -2
    when p_hist_bucket_{i} = 8 then -1
    else p_hist_bucket_{i} 
    end as phist_bucket_{i}
    """
    for i in range(1 , 7)
]

In [58]:
tl_data2=tl_data1\
.withColumn("Loan_Type", F.trim(F.col("Loan_Type")))\
.filter("Ownership_type != 'Guarantor'")\
.filter("length(dpd_string) between 0 and 108")\
.withColumn('run_dt',F.to_date(F.lit(run_date)))\
.withColumn("dateReport",F.to_date(F.lit(run_date)))\
.filter("dateReport is not null")\
.select(*vs_tl_date_ipad)\
.drop('Last_payment_date')\
.select(*vs_tl_cast_expr0)\
.filter("openDate is not null")\
.filter("openDate <= run_dt")\
.filter("PaymentHistoryStartDate <= run_dt")\
.withColumn("datereported_trades1",F.lpad(F.col("datereported_trades_c"),8,'0'))\
.select(*vs_tl_cast_expr1)\
.selectExpr(["*"]+["case when dateReported is Null then Last_payment_date else dateReported end as dateReported_new"])\
.drop('dateReported')\
.withColumnRenamed('dateReported_new','dateReported')\
.filter("dateReported is not Null")\
.withColumn('month_diff',F.months_between(F.trunc('run_dt','month'),F.trunc('PaymentHistoryStartDate','month')))\
.withColumn('create_date_n',F.col('run_dt'))\
.selectExpr(vs_tl_expr2)\
.selectExpr(vs1_pp_tl_expr)\
.selectExpr(dpd_exp1)\
.drop('Last_payment_date')\
.withColumnRenamed('Last_payment_date_new','Last_payment_date')\
.selectExpr(dpd_exp2)\
.drop('DATECLOSE')\
.withColumnRenamed('DATECLOSE_new','DATECLOSE')\
.selectExpr(["*"]+["case when Over_due_amount is null then 0 else Over_due_amount*1 end as PastDueAmount_new"])\
.withColumnRenamed("Sanction_Amount","SANCTIONAMOUNT")\
.withColumnRenamed("High_Credit_Amount","HIGHCREDIT")\
.withColumnRenamed("Over_due_amount","AMOUNT_OVERDUE")\
.withColumnRenamed("Out_standing_Balance","CURRENT_BALANCE")\
.withColumn(
    "PASTDUEAMOUNT" , 
    F.when(F.col("AMOUNT_OVERDUE") <= 0 , F.lit(0))\
    .otherwise(F.col("AMOUNT_OVERDUE"))
)\
.withColumn("NEGATIVE_BALANCE_FLAG" , F.when(F.col("CURRENT_BALANCE") < 0 , F.lit(1)).otherwise(F.lit(0)))\
.withColumn("CURRENT_BALANCE" , F.when(F.col("CURRENT_BALANCE") <= 0 , F.lit(0)).otherwise(F.col("CURRENT_BALANCE")))\
.withColumn("CREDITLIMIT" , F.when(F.col("CREDITLIMIT") < 0 , F.lit(None)).otherwise(F.col("CREDITLIMIT")))\
.withColumnRenamed('CURRENT_BALANCE' , 'BALANCE')\
.withColumnRenamed('CASH_LIMIT' , 'CASHLIMIT')\
.withColumnRenamed('openDate' , 'DATEOPENED')\
.withColumnRenamed('Last_payment_date' , 'LASTPAYMENTDATE')\
.withColumnRenamed('DATECLOSE'  , 'DATECLOSED')\
.withColumnRenamed('dateReported' , 'DATEREPORTED')\
.withColumnRenamed('actual_paymt_amt' , 'lastpayment')\
.withColumnRenamed('tenor' , 'repaymenttenure')\
.withColumnRenamed('EMI' , 'actual_emi')\
.withColumnRenamed("Sector","account_holder_type")\
.withColumnRenamed("Loan_account_Number","acct_uniq_id")\
.withColumnRenamed("PaymentHistoryStartDate","payment_history_start_date")\
.withColumnRenamed("paymentend_date","payment_history_end_date")\
.withColumnRenamed("writeoff_amt_prin","written_off_amount_principal")\
.withColumnRenamed("writeoff_amt_tot","written_off_amount")\
.withColumnRenamed("COLLATERALType","collateral_type")\
.withColumnRenamed("COLLATERALVALUE","collateral_value")\
.withColumnRenamed("suit_filed_status","is_suit_filed_or_wilful_default")\
.withColumnRenamed("Credit_Facility_Status","is_written_off_or_settled")\
.withColumn("ACCOUNT_TYPE",F.col('Loan_Type'))\
.withColumn(
    "mn_diff_retro_start" ,
    F.months_between(F.date_trunc('month' , F.col("run_dt")) , F.trunc(F.col('PAYMENT_HISTORY_START_DATE') , "month")).cast(T.IntegerType())
)\
.withColumn(
    "mn_diff_retro_end" ,
    F.months_between(F.date_trunc('month' , F.col("run_dt")) , F.trunc(F.col('PAYMENT_HISTORY_END_DATE') , "month")).cast(T.IntegerType())
)\
.withColumn(
    "mn_diff_retro_report" , 
    F.months_between(F.date_trunc('month' , F.col("run_dt")) , F.trunc(F.col("DATEREPORTED") , "month")).cast(T.IntegerType())
)\
.withColumn(
    "mn_diff_retro_open" , 
    F.months_between(F.date_trunc('month' , F.col("run_dt")) , F.trunc(F.col("DATEOPENED") , "month")).cast(T.IntegerType())
)\
.withColumn(
    "mn_diff_retro_close" , 
    F.months_between(F.date_trunc('month' , F.col("run_dt")) , F.trunc(F.col("DATECLOSED") , "month")).cast(T.IntegerType())
)\
.withColumn(
    "mn_diff_retro_payment" , 
    F.months_between(F.date_trunc('month' , F.col("run_dt")) , F.trunc(F.col("LASTPAYMENTDATE") , "month")).cast(T.IntegerType())
)\
.withColumn("pre_fill" , F.concat_ws('' , F.array_repeat(F.lit("8") , F.col("mn_diff_retro_start") - F.lit(1))))\
.withColumn("phist_buckets" , F.concat(F.col("pre_fill") , F.col("phist_buckets")))\
.withColumn(
    "dpd_cut_len" ,
    F.when(F.col("mn_diff_retro_start") <= 0 , (F.abs(F.col('mn_diff_retro_start'))) + F.lit(1)).otherwise(F.lit(0))
)\
.withColumn('phist_buckets' , F.expr("substring(phist_buckets , dpd_cut_len + 1 , length(phist_buckets) - dpd_cut_len)"))\
.withColumn(
    "PAYMENT_HISTORY_START_DATE" , 
    F.when(
        F.col("mn_diff_retro_start") <= 0 , 
        F.add_months(F.col('run_dt') , months = -1)
    ).otherwise(F.col('PAYMENT_HISTORY_START_DATE'))
)\
.withColumn(
    "mn_diff_start_end" , 
    F.months_between(F.date_trunc('month' , F.col("PAYMENT_HISTORY_START_DATE")) , F.trunc("PAYMENT_HISTORY_END_DATE" , "month")).cast(T.IntegerType())
)\
.selectExpr(tl_dpd_exp1)\
.selectExpr(tl_dpd_exp2)
 

In [None]:
tl_data1.count()

In [39]:
tl_data2.groupBy("PRODUCT_DESC").count().show()

+------------+-----+
|PRODUCT_DESC|count|
+------------+-----+
|      OTHERS|   40|
|          PL|   49|
|          GL|   61|
|          HL|   18|
|          AL|   12|
|          CC|   28|
|         BIL|   20|
+------------+-----+



25/05/08 07:43:50 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Loan_Type                                    , DATE_OPENED, DateReported_trades, Last_payment_date, Pay_Hist_Start_Date, Ownership_type, Dpd_string                                                                                                  
 Schema: Loan_Type, DATE_OPENED, DateReported_trades, Last_payment_date, Pay_Hist_Start_Date, Ownership_Type, Dpd_string
Expected: Loan_Type but found: Loan_Type                                    
CSV file: file:///home/data/manu/sample/cibil/tl.csv


In [15]:
tl_data2.groupBy("PRODUCT_TYPE").count().show()

25/05/08 07:35:13 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Loan_Type                                    , DATE_OPENED, DateReported_trades, Last_payment_date, Pay_Hist_Start_Date, Ownership_type, Dpd_string                                                                                                  
 Schema: Loan_Type, DATE_OPENED, DateReported_trades, Last_payment_date, Pay_Hist_Start_Date, Ownership_Type, Dpd_string
Expected: Loan_Type but found: Loan_Type                                    
CSV file: file:///home/data/manu/sample/cibil/tl.csv


+------------------+-----+
|      PRODUCT_TYPE|count|
+------------------+-----+
|            OTHERS|   15|
|         Gold_Loan|   61|
|         Auto_Loan|   11|
|  Two-wheeler_Loan|   13|
|     Used_Car_Loan|    1|
|     Business_Loan|   20|
|               LAP|    5|
|     Consumer_Loan|   12|
|Commercial_Vehicle|    3|
|       Credit_Card|   28|
|      Housing_Loan|   18|
|     Personal_Loan|   41|
+------------------+-----+



In [22]:
tl_data1.groupBy("Loan_Type").count().show(truncate =False)

+---------------------------------------------+-----+
|Loan_Type                                    |count|
+---------------------------------------------+-----+
|Used Car Loan                                |1    |
|Auto Loan (Personal)                         |11   |
|Consumer Loan                                |12   |
|Other                                        |6    |
|Commercial Vehicle Loan                      |3    |
|Loan Against Bank Deposits                   |7    |
|Construction Equipment Loan                  |1    |
|Housing Loan                                 |18   |
|Gold Loan                                    |61   |
|Loan to Professional                         |1    |
|Credit Card                                  |28   |
|Property Loan                                |5    |
|Personal Loan                                |41   |
|Business Loan - Priority Sector - Agriculture|20   |
|Two-wheeler Loan                             |13   |
+---------------------------

25/05/07 17:25:22 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Loan_Type                                    
 Schema: Loan_Type
Expected: Loan_Type but found: Loan_Type                                    
CSV file: file:///home/data/manu/sample/cibil/tl.csv


In [16]:
tl_data2.count()

25/05/08 07:39:57 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: DATE_OPENED, DateReported_trades, Last_payment_date, Pay_Hist_Start_Date, Ownership_type, Dpd_string                                                                                                  
 Schema: DATE_OPENED, DateReported_trades, Last_payment_date, Pay_Hist_Start_Date, Ownership_Type, Dpd_string
Expected: Dpd_string but found: Dpd_string                                                                                                  
CSV file: file:///home/data/manu/sample/cibil/tl.csv


228

In [123]:
tl_data1.show(1,False,True)

25/05/05 17:10:43 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: EnquiryControlNumber, MemberReference, Loan_account_Number, Loan_Status, Loan_Type                                    , Loan_Classification, Sanction_Amount, High_Credit_Amount, Out_standing_Balance, EMI   , CREDITLIMIT, CASH_LIMIT, actual_paymt_amt, Interest_Rates, tenor, Income, monthly_annual_indicator, net_gross_indicator, occupation_code, COLLATERALTYPE, COLLATERALVALUE, DATE_OPENED, DATE_CLOSED, DateReported_trades, Last_payment_date, Pay_Hist_Start_Date, Pay_Hist_END_Date, Ownership_type, Over_due_amount, Tenor_Frequency, suit_filed_status, Credit_Facility_Status, writeoff_amt_tot, writeoff_amt_prin, settlement_amt, Dpd_string                                                                                                  , PL_Propensity_Score, NTC_Score, TU_Score, ExclusionCode1, ExclusionCode2, ExclusionCode3, ExclusionCode4, ExclusionCode5, ExclusionCode6, ExclusionCode7, ReasonCode1,

-RECORD 0--------------------------------------------------------------------------------------------------------------------------------
 EnquiryControlNumber     | 99401699                                                                                                     
 MemberReference          | 79238541                                                                                                     
 Loan_account_Number      | NOT DISCLOSED                                                                                                
 Loan_Status              | Closed                                                                                                       
 Loan_Type                | Gold Loan                                                                                                    
 Loan_Classification      | STD                                                                                                          
 Sanction_Amount          | 38500 

In [59]:

tl_data2.write.mode('overwrite').parquet(prep_data_path + 'prep_tl_data.parquet')

25/05/08 09:36:49 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: EnquiryControlNumber, MemberReference, Loan_account_Number, Loan_Status, Loan_Type                                    , Loan_Classification, Sanction_Amount, High_Credit_Amount, Out_standing_Balance, EMI   , CREDITLIMIT, CASH_LIMIT, actual_paymt_amt, Interest_Rates, tenor, Income, monthly_annual_indicator, net_gross_indicator, occupation_code, COLLATERALTYPE, COLLATERALVALUE, DATE_OPENED, DATE_CLOSED, DateReported_trades, Last_payment_date, Pay_Hist_Start_Date, Pay_Hist_END_Date, Ownership_type, Over_due_amount, Tenor_Frequency, suit_filed_status, Credit_Facility_Status, writeoff_amt_tot, writeoff_amt_prin, settlement_amt, Dpd_string                                                                                                  , PL_Propensity_Score, NTC_Score, TU_Score, ExclusionCode1, ExclusionCode2, ExclusionCode3, ExclusionCode4, ExclusionCode5, ExclusionCode6, ExclusionCode7, ReasonCode1,

In [None]:
quit()