In [1]:
import findspark
import pandas as pd

In [2]:
findspark.init("/opt/manual/spark/")

In [3]:
from pyspark.sql import SparkSession, functions as F

# Create SparkSession

In [4]:
spark = (SparkSession.builder
        .appName("Model with Spark")
        .master("yarn")
        .enableHiveSupport()
        .getOrCreate())
# .config("spark.jars","xgboost4j_2.12-1.2.0.jar,xgboost4j-spark_2.12-1.2.0.jar")

# Read data

In [5]:
train_df = spark.sql("select * from homecredit.application_train_orc_snappy")

In [6]:
train_df.limit(5).toPandas()

Unnamed: 0,sk_id_curr,target,name_contract_type,code_gender,flag_own_car,flag_own_realty,cnt_children,amt_income_total,amt_credit,amt_annuity,...,flag_document_18,flag_document_19,flag_document_20,flag_document_21,amt_req_credit_bureau_hour,amt_req_credit_bureau_day,amt_req_credit_bureau_week,amt_req_credit_bureau_mon,amt_req_credit_bureau_qrt,amt_req_credit_bureau_year
0,100002,1,Cash loans,M,N,Y,0,202500.0,406597.5,24700.5,...,0,0,0,0,0.0,0.0,0.0,0.0,0.0,1.0
1,100003,0,Cash loans,F,N,N,0,270000.0,1293502.5,35698.5,...,0,0,0,0,0.0,0.0,0.0,0.0,0.0,0.0
2,100004,0,Revolving loans,M,Y,Y,0,67500.0,135000.0,6750.0,...,0,0,0,0,0.0,0.0,0.0,0.0,0.0,0.0
3,100006,0,Cash loans,F,N,Y,0,135000.0,312682.5,29686.5,...,0,0,0,0,,,,,,
4,100007,0,Cash loans,M,N,Y,0,121500.0,513000.0,21865.5,...,0,0,0,0,0.0,0.0,0.0,0.0,0.0,0.0


In [7]:
train_df.count()

307511

In [8]:
test_df = spark.sql("select * from homecredit.application_test_orc_snappy")

In [9]:
test_df.count()

48744

In [10]:
print(len(train_df.columns))
print(len(test_df.columns))

122
121


# Schema

In [11]:
train_df.printSchema()

root
 |-- sk_id_curr: integer (nullable = true)
 |-- target: integer (nullable = true)
 |-- name_contract_type: string (nullable = true)
 |-- code_gender: string (nullable = true)
 |-- flag_own_car: string (nullable = true)
 |-- flag_own_realty: string (nullable = true)
 |-- cnt_children: integer (nullable = true)
 |-- amt_income_total: double (nullable = true)
 |-- amt_credit: double (nullable = true)
 |-- amt_annuity: double (nullable = true)
 |-- amt_goods_price: double (nullable = true)
 |-- name_type_suite: string (nullable = true)
 |-- name_income_type: string (nullable = true)
 |-- name_education_type: string (nullable = true)
 |-- name_family_status: string (nullable = true)
 |-- name_housing_type: string (nullable = true)
 |-- region_population_relative: double (nullable = true)
 |-- days_birth: integer (nullable = true)
 |-- days_employed: integer (nullable = true)
 |-- days_registration: double (nullable = true)
 |-- days_id_publish: integer (nullable = true)
 |-- own_car_ag

In [12]:
# There is no complex type

# Null check

In [13]:
# Explore null values

In [14]:
# This dict is for pandas dataframe to explore more comfortable null value exploration
null_dict = {
    "columns":train_df.columns,
    "null_count": [],
    "has_null":[],
    "null_ratio":[]
}

# To calculate null_ratio we need to know record count
df_count = float(train_df.count())

for col in train_df.columns:
    # If a column has null, nan or empty value calculate how many?
    null_count = train_df.filter( (train_df[col].isNull()) |  
                                 (train_df[col] == "") | 
                                 (F.isnan(F.col(col))) |
                                 (train_df[col] == "null") |
                                 (train_df[col] == "NULL")
                                ).count()
    # Append this calculated count to dictionary
    null_dict["null_count"].append(null_count)
    # If null_count is gt 0 that means this column has atleast one null, nan or empty value.
    if  null_count > 0 :
        null_ratio = null_count/df_count
        null_dict["null_ratio"].append(null_ratio)
        null_dict["has_null"].append(True)
        print("{} has {} null and null ratio {}".format(col, null_count, round(null_ratio, 4)))
    else:
        # Add zero for non-null columns
        null_dict["null_ratio"].append(0.0)
        # Add False for non-null columns
        null_dict["has_null"].append(False)

amt_annuity has 12 null and null ratio 0.0
amt_goods_price has 278 null and null ratio 0.0009
name_type_suite has 1292 null and null ratio 0.0042
own_car_age has 202929 null and null ratio 0.6599
occupation_type has 96391 null and null ratio 0.3135
cnt_fam_members has 2 null and null ratio 0.0
ext_source_1 has 173378 null and null ratio 0.5638
ext_source_2 has 660 null and null ratio 0.0021
ext_source_3 has 60965 null and null ratio 0.1983
apartments_avg has 156061 null and null ratio 0.5075
basementarea_avg has 179943 null and null ratio 0.5852
years_beginexpluatation_avg has 150007 null and null ratio 0.4878
years_build_avg has 204488 null and null ratio 0.665
commonarea_avg has 214865 null and null ratio 0.6987
elevators_avg has 163891 null and null ratio 0.533
entrances_avg has 154828 null and null ratio 0.5035
floorsmax_avg has 153020 null and null ratio 0.4976
floorsmin_avg has 208642 null and null ratio 0.6785
landarea_avg has 182590 null and null ratio 0.5938
livingapartments_a

In [16]:
# Create apandas dataframe from null_dictionary
null_df = pd.DataFrame.from_dict(null_dict)

In [17]:
# To prevent display row truncation
pd.options.display.max_rows = 999
null_df[null_df.has_null == True] \
.sort_values("null_ratio", ascending=False) \
.head(100)

Unnamed: 0,columns,null_count,has_null,null_ratio
76,commonarea_medi,214865,True,0.698723
48,commonarea_avg,214865,True,0.698723
62,commonarea_mode,214865,True,0.698723
84,nonlivingapartments_medi,213514,True,0.69433
70,nonlivingapartments_mode,213514,True,0.69433
56,nonlivingapartments_avg,213514,True,0.69433
86,fondkapremont_mode,210295,True,0.683862
68,livingapartments_mode,210199,True,0.68355
82,livingapartments_medi,210199,True,0.68355
54,livingapartments_avg,210199,True,0.68355


In [18]:
# Filter out at least one null columns (has_null = True)
null_df = null_df[null_df.has_null == True]

# Drop higher ratio null columns

In [19]:
# I decide to drop columns that null rate greater than %48

In [20]:
# Select columns gt %48 null_ratio
null_df.loc[null_df.null_ratio > 0.48].loc[:,"columns"].head(100)

21                     own_car_age
41                    ext_source_1
44                  apartments_avg
45                basementarea_avg
46     years_beginexpluatation_avg
47                 years_build_avg
48                  commonarea_avg
49                   elevators_avg
50                   entrances_avg
51                   floorsmax_avg
52                   floorsmin_avg
53                    landarea_avg
54            livingapartments_avg
55                  livingarea_avg
56         nonlivingapartments_avg
57               nonlivingarea_avg
58                 apartments_mode
59               basementarea_mode
60    years_beginexpluatation_mode
61                years_build_mode
62                 commonarea_mode
63                  elevators_mode
64                  entrances_mode
65                  floorsmax_mode
66                  floorsmin_mode
67                   landarea_mode
68           livingapartments_mode
69                 livingarea_mode
70        nonlivinga

In [21]:
# Convert to list columns gt %48 null_ratio
null_cols_to_drop = null_df.loc[null_df.null_ratio > 0.48].loc[:,"columns"].tolist()
null_cols_to_stay = null_df.loc[null_df.null_ratio < 0.48].loc[:,"columns"].tolist()

In [22]:
print("null_cols_to_drop")
print(null_cols_to_drop)
print("null_cols_to_stay")
print(null_cols_to_stay)

null_cols_to_drop
['own_car_age', 'ext_source_1', 'apartments_avg', 'basementarea_avg', 'years_beginexpluatation_avg', 'years_build_avg', 'commonarea_avg', 'elevators_avg', 'entrances_avg', 'floorsmax_avg', 'floorsmin_avg', 'landarea_avg', 'livingapartments_avg', 'livingarea_avg', 'nonlivingapartments_avg', 'nonlivingarea_avg', 'apartments_mode', 'basementarea_mode', 'years_beginexpluatation_mode', 'years_build_mode', 'commonarea_mode', 'elevators_mode', 'entrances_mode', 'floorsmax_mode', 'floorsmin_mode', 'landarea_mode', 'livingapartments_mode', 'livingarea_mode', 'nonlivingapartments_mode', 'nonlivingarea_mode', 'apartments_medi', 'basementarea_medi', 'years_beginexpluatation_medi', 'years_build_medi', 'commonarea_medi', 'elevators_medi', 'entrances_medi', 'floorsmax_medi', 'floorsmin_medi', 'landarea_medi', 'livingapartments_medi', 'livingarea_medi', 'nonlivingapartments_medi', 'nonlivingarea_medi', 'fondkapremont_mode', 'housetype_mode', 'totalarea_mode', 'wallsmaterial_mode']
nu

In [23]:
# Drop columns gt %48 null_ratio
train_df2 = train_df.drop(*null_cols_to_drop)
test_df2 = test_df.drop(*null_cols_to_drop)

In [24]:
print(len(train_df2.columns))
print(len(test_df2.columns))

74
73


# Split numeric and categoric columns

In [25]:
categoric_cols = []
numeric_cols = []
label_col = ['target']

for col in train_df2.dtypes:
    if (col[0] not in label_col):
        if col[1] == 'string':
            categoric_cols.append(col[0])
        else:
            numeric_cols.append(col[0])

In [26]:
print(categoric_cols)

['name_contract_type', 'code_gender', 'flag_own_car', 'flag_own_realty', 'name_type_suite', 'name_income_type', 'name_education_type', 'name_family_status', 'name_housing_type', 'occupation_type', 'weekday_appr_process_start', 'organization_type', 'emergencystate_mode']


In [27]:
print(numeric_cols)

['sk_id_curr', 'cnt_children', 'amt_income_total', 'amt_credit', 'amt_annuity', 'amt_goods_price', 'region_population_relative', 'days_birth', 'days_employed', 'days_registration', 'days_id_publish', 'flag_mobil', 'flag_emp_phone', 'flag_work_phone', 'flag_cont_mobile', 'flag_phone', 'flag_email', 'cnt_fam_members', 'region_rating_client', 'region_rating_client_w_city', 'hour_appr_process_start', 'reg_region_not_live_region', 'reg_region_not_work_region', 'live_region_not_work_region', 'reg_city_not_live_city', 'reg_city_not_work_city', 'live_city_not_work_city', 'ext_source_2', 'ext_source_3', 'obs_30_cnt_social_circle', 'def_30_cnt_social_circle', 'obs_60_cnt_social_circle', 'def_60_cnt_social_circle', 'days_last_phone_change', 'flag_document_2', 'flag_document_3', 'flag_document_4', 'flag_document_5', 'flag_document_6', 'flag_document_7', 'flag_document_8', 'flag_document_9', 'flag_document_10', 'flag_document_11', 'flag_document_12', 'flag_document_13', 'flag_document_14', 'flag_do

In [28]:
print(label_col)

['target']


In [29]:
print(len(categoric_cols)+len(numeric_cols)+len(label_col))

74


In [30]:
if len(train_df2.columns) == (len(categoric_cols)+len(numeric_cols)+len(label_col)):
    print("columns split is successful.")
else: print("there is a mistake column split ops.")

columns split is successful.


# Trim categorical columns

In [31]:
for col_name in categoric_cols:
    train_df2 = train_df2.withColumn(col_name, F.trim(F.col(col_name)))

# Explore categorical columns including nulls

In [32]:
null_cat_cols = list(set(categoric_cols).intersection(set(null_cols_to_stay)))
print(null_cat_cols)

['name_type_suite', 'emergencystate_mode', 'occupation_type']


In [33]:
for col in null_cat_cols:
    train_df2.groupBy(col).agg(F.count("*").alias("total_count")).orderBy(F.desc("total_count")).show()

+---------------+-----------+
|name_type_suite|total_count|
+---------------+-----------+
|  Unaccompanied|     248526|
|         Family|      40149|
|Spouse, partner|      11370|
|       Children|       3267|
|        Other_B|       1770|
|           null|       1292|
|        Other_A|        866|
|Group of people|        271|
+---------------+-----------+

+-------------------+-----------+
|emergencystate_mode|total_count|
+-------------------+-----------+
|                 No|     159428|
|               null|     145755|
|                Yes|       2328|
+-------------------+-----------+

+--------------------+-----------+
|     occupation_type|total_count|
+--------------------+-----------+
|                null|      96391|
|            Laborers|      55186|
|         Sales staff|      32102|
|          Core staff|      27570|
|            Managers|      21371|
|             Drivers|      18603|
|High skill tech s...|      11380|
|         Accountants|       9813|
|      Medicine

In [34]:
# There are three categorical null columns. Assign Unknown for nulls.

In [35]:
for col in null_cat_cols:
    train_df2 = train_df2.withColumn(col, F.when( ((train_df2[col].isNull()) |  
                                 (train_df2[col] == "") | 
                                 (F.isnan(F.col(col))) |
                                 (train_df2[col] == "null") |
                                 (train_df2[col] == "NULL")), "Unknown_").otherwise(F.col(col)))

In [36]:
for col in null_cat_cols:
    test_df2 = test_df2.withColumn(col, F.when( ((test_df2[col].isNull()) |  
                                 (test_df2[col] == "") | 
                                 (F.isnan(F.col(col))) |
                                 (test_df2[col] == "null") |
                                 (test_df2[col] == "NULL")), "Unknown_").otherwise(F.col(col)))

# Explore numeric columns including nulls

In [37]:
null_num_cols = list(set(numeric_cols).intersection(set(null_cols_to_stay)))
null_num_cols_dict = {
    "null_num_cols" : null_num_cols,
    "means": []
}
for col in null_num_cols:
    print(col)
    train_df2.select(F.mean(col).alias("mean")).show()
    col_mean1 = train_df2.select(F.mean(col).alias("mean")).head(1)
    col_mean = col_mean1[0].asDict()["mean"]
    null_num_cols_dict["means"].append(col_mean)
    

cnt_fam_members
+-----------------+
|             mean|
+-----------------+
|2.152665450442101|
+-----------------+

amt_req_credit_bureau_qrt
+-------------------+
|               mean|
+-------------------+
|0.26547414959848414|
+-------------------+

amt_req_credit_bureau_mon
+-------------------+
|               mean|
+-------------------+
|0.26739526000781977|
+-------------------+

ext_source_3
+------------------+
|              mean|
+------------------+
|0.5108529061800121|
+------------------+

obs_60_cnt_social_circle
+------------------+
|              mean|
+------------------+
|1.4052921791901856|
+------------------+

amt_req_credit_bureau_hour
+--------------------+
|                mean|
+--------------------+
|0.006402448193930645|
+--------------------+

ext_source_2
+------------------+
|              mean|
+------------------+
|0.5143926741308463|
+------------------+

def_60_cnt_social_circle
+-------------------+
|               mean|
+-------------------+
|0.100

In [38]:
null_num_cols_dict["means"]

[2.152665450442101,
 0.26547414959848414,
 0.26739526000781977,
 0.5108529061800121,
 1.4052921791901856,
 0.006402448193930645,
 0.5143926741308463,
 0.10004894123788705,
 0.0343619356973142,
 -962.8587883320868,
 27108.573909183444,
 0.1434206662533851,
 1.4222454239942575,
 0.0070002105326475985,
 1.899974435321363,
 538396.2074288895]

In [39]:
num_col_means_df = pd.DataFrame.from_dict(null_num_cols_dict)
num_col_means_df.head()

Unnamed: 0,null_num_cols,means
0,cnt_fam_members,2.152665
1,amt_req_credit_bureau_qrt,0.265474
2,amt_req_credit_bureau_mon,0.267395
3,ext_source_3,0.510853
4,obs_60_cnt_social_circle,1.405292


# Fill numeric nulls with mean

In [40]:
null_num_cols = list(set(numeric_cols).intersection(set(null_cols_to_stay)))
for col, mean in zip(null_num_cols, null_num_cols_dict["means"]):
    train_df2 = train_df2.withColumn(col, F.when( ((train_df2[col].isNull()) |  
                                 (train_df2[col] == "") | 
                                 (F.isnan(F.col(col))) |
                                 (train_df2[col] == "null") |
                                 (train_df2[col] == "NULL")), mean).otherwise(F.col(col)))
    print(col, mean)

cnt_fam_members 2.152665450442101
amt_req_credit_bureau_qrt 0.26547414959848414
amt_req_credit_bureau_mon 0.26739526000781977
ext_source_3 0.5108529061800121
obs_60_cnt_social_circle 1.4052921791901856
amt_req_credit_bureau_hour 0.006402448193930645
ext_source_2 0.5143926741308463
def_60_cnt_social_circle 0.10004894123788705
amt_req_credit_bureau_week 0.0343619356973142
days_last_phone_change -962.8587883320868
amt_annuity 27108.573909183444
def_30_cnt_social_circle 0.1434206662533851
obs_30_cnt_social_circle 1.4222454239942575
amt_req_credit_bureau_day 0.0070002105326475985
amt_req_credit_bureau_year 1.899974435321363
amt_goods_price 538396.2074288895


In [41]:
null_num_cols = list(set(numeric_cols).intersection(set(null_cols_to_stay)))
for col, mean in zip(null_num_cols, null_num_cols_dict["means"]):
    test_df2 = test_df2.withColumn(col, F.when( ((test_df2[col].isNull()) |  
                                 (test_df2[col] == "") | 
                                 (F.isnan(F.col(col))) |
                                 (test_df2[col] == "null") |
                                 (test_df2[col] == "NULL")), mean).otherwise(F.col(col)))
    print(col, mean)

cnt_fam_members 2.152665450442101
amt_req_credit_bureau_qrt 0.26547414959848414
amt_req_credit_bureau_mon 0.26739526000781977
ext_source_3 0.5108529061800121
obs_60_cnt_social_circle 1.4052921791901856
amt_req_credit_bureau_hour 0.006402448193930645
ext_source_2 0.5143926741308463
def_60_cnt_social_circle 0.10004894123788705
amt_req_credit_bureau_week 0.0343619356973142
days_last_phone_change -962.8587883320868
amt_annuity 27108.573909183444
def_30_cnt_social_circle 0.1434206662533851
obs_30_cnt_social_circle 1.4222454239942575
amt_req_credit_bureau_day 0.0070002105326475985
amt_req_credit_bureau_year 1.899974435321363
amt_goods_price 538396.2074288895


In [42]:
train_df2.limit(10).toPandas()

Unnamed: 0,sk_id_curr,target,name_contract_type,code_gender,flag_own_car,flag_own_realty,cnt_children,amt_income_total,amt_credit,amt_annuity,...,flag_document_18,flag_document_19,flag_document_20,flag_document_21,amt_req_credit_bureau_hour,amt_req_credit_bureau_day,amt_req_credit_bureau_week,amt_req_credit_bureau_mon,amt_req_credit_bureau_qrt,amt_req_credit_bureau_year
0,100002,1,Cash loans,M,N,Y,0,202500.0,406597.5,24700.5,...,0,0,0,0,0.0,0.0,0.0,0.0,0.0,1.0
1,100003,0,Cash loans,F,N,N,0,270000.0,1293502.5,35698.5,...,0,0,0,0,0.0,0.0,0.0,0.0,0.0,0.0
2,100004,0,Revolving loans,M,Y,Y,0,67500.0,135000.0,6750.0,...,0,0,0,0,0.0,0.0,0.0,0.0,0.0,0.0
3,100006,0,Cash loans,F,N,Y,0,135000.0,312682.5,29686.5,...,0,0,0,0,0.006402,0.007,0.034362,0.267395,0.265474,1.899974
4,100007,0,Cash loans,M,N,Y,0,121500.0,513000.0,21865.5,...,0,0,0,0,0.0,0.0,0.0,0.0,0.0,0.0
5,100008,0,Cash loans,M,N,Y,0,99000.0,490495.5,27517.5,...,0,0,0,0,0.0,0.0,0.0,0.0,1.0,1.0
6,100009,0,Cash loans,F,Y,Y,1,171000.0,1560726.0,41301.0,...,0,0,0,0,0.0,0.0,0.0,1.0,1.0,2.0
7,100010,0,Cash loans,M,Y,Y,0,360000.0,1530000.0,42075.0,...,0,0,0,0,0.0,0.0,0.0,0.0,0.0,0.0
8,100011,0,Cash loans,F,N,Y,0,112500.0,1019610.0,33826.5,...,0,0,0,0,0.0,0.0,0.0,0.0,0.0,1.0
9,100012,0,Revolving loans,M,N,Y,0,135000.0,405000.0,20250.0,...,0,0,0,0,0.006402,0.007,0.034362,0.267395,0.265474,1.899974


In [43]:
len(train_df2.columns)

74

# Distinct categories in categoric_cols

In [73]:
# We don't want too much categories if a column has more categories than max_distinct_cat
# we want to drop that column

max_distinct_cat=20
cat_cols_and_distinc_cls_dict = {}
drop_cols_too_many_clss = []
for col in categoric_cols:
    dist_count = train_df2.select(F.countDistinct(col).alias("distinct")).head(1)[0].asDict()["distinct"]
    if dist_count < max_distinct_cat:
        cat_cols_and_distinc_cls_dict[col] = dist_count
        print(col,dist_count)
    else:
        print("{} has dropped because it has too many ({}) caregories ".format(col, dist_count))
        drop_cols_too_many_clss.append(col)

name_contract_type 2
code_gender 3
flag_own_car 2
flag_own_realty 2
name_type_suite 8
name_income_type 8
name_education_type 5
name_family_status 6
name_housing_type 6
occupation_type 19
weekday_appr_process_start 7
organization_type has dropped because it has too many caregories 58
emergencystate_mode 3


In [76]:
print(cat_cols_and_distinc_cls_dict)

{'name_contract_type': 2, 'code_gender': 3, 'flag_own_car': 2, 'flag_own_realty': 2, 'name_type_suite': 8, 'name_income_type': 8, 'name_education_type': 5, 'name_family_status': 6, 'name_housing_type': 6, 'occupation_type': 19, 'weekday_appr_process_start': 7, 'emergencystate_mode': 3}


# Weak classes in categoric cols

In [45]:
# We put Unknown_. If you see Unknown it is already in original data source.

In [78]:
train_df2_count = train_df2.count()
for col in cat_cols_and_distinc_cls_dict.keys():
    train_df2.groupBy(col) \
    .agg(F.count("*").alias("total_count")) \
    .orderBy(F.desc("total_count")) \
    .withColumn("class_ratio", F.round( F.col("total_count") / F.lit(train_df2_count),4 )) \
    .show()

+------------------+-----------+-----------+
|name_contract_type|total_count|class_ratio|
+------------------+-----------+-----------+
|        Cash loans|     278232|     0.9048|
|   Revolving loans|      29279|     0.0952|
+------------------+-----------+-----------+

+-----------+-----------+-----------+
|code_gender|total_count|class_ratio|
+-----------+-----------+-----------+
|          F|     202448|     0.6583|
|          M|     105059|     0.3416|
|        XNA|          4|        0.0|
+-----------+-----------+-----------+

+------------+-----------+-----------+
|flag_own_car|total_count|class_ratio|
+------------+-----------+-----------+
|           N|     202924|     0.6599|
|           Y|     104587|     0.3401|
+------------+-----------+-----------+

+---------------+-----------+-----------+
|flag_own_realty|total_count|class_ratio|
+---------------+-----------+-----------+
|              Y|     213312|     0.6937|
|              N|      94199|     0.3063|
+---------------+

In [108]:
weak_class_ratio=0.0005
weak_classes_dict={}
for col in cat_cols_and_distinc_cls_dict.keys():
    # calculate weak class total count and class ratio than filter if class ration less than user specfied value
    dummy_df = train_df2.groupBy(col) \
    .agg(F.count("*").alias("total_count")) \
    .orderBy(F.desc("total_count")) \
    .withColumn("class_ratio", F.round( F.col("total_count") / F.lit(train_df2_count),4 )) \
    .filter(F.col("class_ratio") < weak_class_ratio )
    dummy_df.show()
    
    # Make a dictionary col name is key and weak classes value as another dictionary
    my_dict = dummy_df.select(dummy_df.columns[0]).toPandas().to_dict()
    print(my_dict)
    # Get weak classes as list
    weak_calss_list = list(my_dict[col].values())
    print(weak_calss_list)
    
    # filter out rows that contain weak clases
    train_df2 = train_df2.filter( ~ (F.col(col).isin(weak_calss_list))  )
    print(train_df2.count())
    
    # Collect deleted weak classes in a dictionary
    if weak_calss_list:
        weak_classes_dict[col] = weak_calss_list
    
    

{'name_contract_type': {}}
[]
307511
+------------------+-----------+-----------+
|name_contract_type|total_count|class_ratio|
+------------------+-----------+-----------+
+------------------+-----------+-----------+

{'code_gender': {0: 'XNA'}}
['XNA']
307507
+-----------+-----------+-----------+
|code_gender|total_count|class_ratio|
+-----------+-----------+-----------+
|        XNA|          4|        0.0|
+-----------+-----------+-----------+

{'flag_own_car': {}}
[]
307507
+------------+-----------+-----------+
|flag_own_car|total_count|class_ratio|
+------------+-----------+-----------+
+------------+-----------+-----------+

{'flag_own_realty': {}}
[]
307507
+---------------+-----------+-----------+
|flag_own_realty|total_count|class_ratio|
+---------------+-----------+-----------+
+---------------+-----------+-----------+

{'name_type_suite': {}}
[]
307507
+---------------+-----------+-----------+
|name_type_suite|total_count|class_ratio|
+---------------+-----------+----------

In [109]:
print(weak_classes_dict)

{'code_gender': ['XNA'], 'name_income_type': ['Unemployed', 'Student', 'Businessman', 'Maternity leave'], 'name_family_status': ['Unknown']}


In [93]:
my_dict = {'name_income_type': {0: 'Unemployed', 1: 'Student', 2: 'Businessman', 3: 'Maternity leave'}}

In [94]:
my_dict['name_income_type'].values()

dict_values(['Unemployed', 'Student', 'Businessman', 'Maternity leave'])

In [51]:
# Delete rows where;
# 1. code_gender == XNA  4
# 2. name_income_type == Maternity leave  5
# 3. name_family_status == Unknown 2

In [None]:
for col

In [52]:
train_df2.filter(~
                 ((F.col("code_gender") == 'XNA') | 
                 (F.col("name_income_type") == 'Maternity leave') | 
                 (F.col("name_family_status") == 'Unknown'))
                ).count()

307500

In [53]:
train_df2.count()

307511

In [54]:
train_df3 = train_df2.filter(~
                 ((F.col("code_gender") == 'XNA') | 
                 (F.col("name_income_type") == 'Maternity leave') | 
                 (F.col("name_family_status") == 'Unknown'))
                )

In [55]:
train_df3.count()

307500

In [56]:
test_df3 = test_df2.filter(~
                 ((F.col("code_gender") == 'XNA') | 
                 (F.col("name_income_type") == 'Maternity leave') | 
                 (F.col("name_family_status") == 'Unknown'))
                )

In [57]:
print(test_df2.count())
print(test_df3.count())

48744
48744


# ML Pipelines

# StringIndexer For Categorical

In [58]:
from pyspark.ml.feature import StringIndexer
string_indexer_dict = {
    "column_names": [],
    "onehot_column_names": []
}

for col in categoric_cols:
    string_indexer_dict["column_names"].append(col+"_str_idx")
    string_indexer_dict["onehot_column_names"].append(col+"_onehot")
    

In [59]:
print(string_indexer_dict["column_names"])

['name_contract_type_str_idx', 'code_gender_str_idx', 'flag_own_car_str_idx', 'flag_own_realty_str_idx', 'name_type_suite_str_idx', 'name_income_type_str_idx', 'name_education_type_str_idx', 'name_family_status_str_idx', 'name_housing_type_str_idx', 'occupation_type_str_idx', 'weekday_appr_process_start_str_idx', 'organization_type_str_idx', 'emergencystate_mode_str_idx']


In [60]:
print(string_indexer_dict["onehot_column_names"])

['name_contract_type_onehot', 'code_gender_onehot', 'flag_own_car_onehot', 'flag_own_realty_onehot', 'name_type_suite_onehot', 'name_income_type_onehot', 'name_education_type_onehot', 'name_family_status_onehot', 'name_housing_type_onehot', 'occupation_type_onehot', 'weekday_appr_process_start_onehot', 'organization_type_onehot', 'emergencystate_mode_onehot']


In [61]:
indexer = StringIndexer() \
.setInputCols(categoric_cols) \
.setOutputCols(string_indexer_dict["column_names"])

# OneHotEncoder

In [62]:
from pyspark.ml.feature import OneHotEncoder

In [63]:
encoder = OneHotEncoder() \
.setInputCols(string_indexer_dict["column_names"]) \
.setOutputCols(string_indexer_dict["onehot_column_names"])

# VectorAssembler

In [64]:
from pyspark.ml.feature import VectorAssembler

In [65]:
assembler = VectorAssembler() \
.setInputCols(numeric_cols+string_indexer_dict["onehot_column_names"]) \
.setOutputCol("non_scaled_features")

# Scaler

In [66]:
from pyspark.ml.feature import MinMaxScaler

In [67]:
scaler = MinMaxScaler() \
.setInputCol("non_scaled_features") \
.setOutputCol("features")

# Estimator

In [68]:
from pyspark.ml.classification import RandomForestClassifier

In [69]:
estimator = RandomForestClassifier() \
.setFeaturesCol("features") \
.setLabelCol("target")

# Pipeline

In [None]:
from pyspark.ml import Pipeline

In [None]:
pipeline_obj = Pipeline() \
.setStages([indexer, encoder, assembler, scaler, estimator])

# Train Model

In [None]:
pipeline_model = pipeline_obj.fit(train_df3)

In [None]:
pipeline_model.write().overwrite().save("/saved_models/homecredit-randomforest-pipeline-model")

# Test

In [None]:
transformed_test_df = pipeline_model.transform(test_df3)

In [None]:
transformed_test_df.limit(5).toPandas()

In [None]:
transformed_train_df = pipeline_model.transform(train_df3)

# Evaluation with train set

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [None]:
evaluator = BinaryClassificationEvaluator() \
.setLabelCol("target") \
.setMetricName("areaUnderROC")

In [None]:
evaluator.evaluate(transformed_train_df)