In [1]:
# spark.stop()

In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = SparkConf().setAppName('myApp') \
.setMaster('local')
sc = SparkContext(conf=conf)
spark = SparkSession(sc) 

24/06/15 07:45:14 WARN Utils: Your hostname, Christophers-Laptop.local resolves to a loopback address: 127.0.0.1; using 172.21.31.41 instead (on interface en0)
24/06/15 07:45:14 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/15 07:45:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
from pyspark.sql.functions import *
import pandas as pd
import math

In [4]:
df_application = spark.read.format('csv').options(header=True,inferSchema=True).load('application_record.csv')
df_application.show(30, truncate=50)
df_application.printSchema()

                                                                                

+-------+-----------+------------+---------------+------------+----------------+--------------------+-----------------------------+--------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+
|     ID|CODE_GENDER|FLAG_OWN_CAR|FLAG_OWN_REALTY|CNT_CHILDREN|AMT_INCOME_TOTAL|    NAME_INCOME_TYPE|          NAME_EDUCATION_TYPE|  NAME_FAMILY_STATUS|NAME_HOUSING_TYPE|DAYS_BIRTH|DAYS_EMPLOYED|FLAG_MOBIL|FLAG_WORK_PHONE|FLAG_PHONE|FLAG_EMAIL|OCCUPATION_TYPE|CNT_FAM_MEMBERS|
+-------+-----------+------------+---------------+------------+----------------+--------------------+-----------------------------+--------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+
|5008804|          M|           Y|              Y|           0|        427500.0|             Working|             Higher education|      Civil marriage| Rented apartment

In [5]:
# Check for null etc.
df_application.select([count((when(isnan(c) | isnull(c), 1))).alias(c) for c in df_application.columns]).show()
# No null in data so all data is valid

[Stage 3:>                                                          (0 + 1) / 1]

+---+-----------+------------+---------------+------------+----------------+----------------+-------------------+------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+
| ID|CODE_GENDER|FLAG_OWN_CAR|FLAG_OWN_REALTY|CNT_CHILDREN|AMT_INCOME_TOTAL|NAME_INCOME_TYPE|NAME_EDUCATION_TYPE|NAME_FAMILY_STATUS|NAME_HOUSING_TYPE|DAYS_BIRTH|DAYS_EMPLOYED|FLAG_MOBIL|FLAG_WORK_PHONE|FLAG_PHONE|FLAG_EMAIL|OCCUPATION_TYPE|CNT_FAM_MEMBERS|
+---+-----------+------------+---------------+------------+----------------+----------------+-------------------+------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+
|  0|          0|           0|              0|           0|               0|               0|                  0|                 0|                0|         0|            0|         0|              0|         0|         0|     

                                                                                

In [6]:
# First Check the data in code_gender to check if there is any misdata
# Select and show distinct values for each column separately
# List of columns to check for distinct values
columns = [
    'code_gender',
    'flag_own_car',
    'flag_own_realty',
    'flag_mobil',
    'flag_work_phone',
    'flag_email',
    'flag_phone'
]

# Loop through each column and display distinct values
for column in columns:
    df_application.select(column).distinct().show()


+-----------+
|code_gender|
+-----------+
|          F|
|          M|
+-----------+

+------------+
|flag_own_car|
+------------+
|           Y|
|           N|
+------------+

+---------------+
|flag_own_realty|
+---------------+
|              Y|
|              N|
+---------------+

+----------+
|flag_mobil|
+----------+
|         1|
+----------+

+---------------+
|flag_work_phone|
+---------------+
|              1|
|              0|
+---------------+

+----------+
|flag_email|
+----------+
|         1|
|         0|
+----------+

+----------+
|flag_phone|
+----------+
|         1|
|         0|
+----------+



In [7]:
# After checking the flag_mobil is not important because the data is only 1 and no other variation. so in the cleaned data not getting the flag mobil
df_app_cleaned = df_application.select(['id','code_gender','flag_own_car','flag_own_realty','cnt_children','amt_income_total',
                                        'name_income_type','name_education_type','name_family_status','name_housing_type','days_birth',
                                        'days_employed','flag_work_phone','flag_phone','flag_email','occupation_type','cnt_fam_members'])

# AND the value is always 0 an 1 as the boolean so we want to make our data uniform so changing the Y as 1 and N as 0, F as 1 and M as 0
df_app_cleaned = df_app_cleaned.withColumn('code_gender', when(df_app_cleaned.code_gender == 'F',1).otherwise(0))
df_app_cleaned = df_app_cleaned.withColumn('flag_own_car', when(df_app_cleaned.flag_own_car == 'Y',1).otherwise(0))
df_app_cleaned = df_app_cleaned.withColumn('flag_own_realty', when(df_app_cleaned.flag_own_car == 'Y',1).otherwise(0))

# More data cleaning cnt_fam_members should be integer because there shouldn't be fam members that is half or less than 1
df_app_cleaned = df_app_cleaned.withColumn('cnt_fam_members', col('cnt_fam_members').cast('Integer'))

df_app_cleaned.show()
df_app_cleaned.select('id').distinct().count()

+-------+-----------+------------+---------------+------------+----------------+--------------------+--------------------+--------------------+-----------------+----------+-------------+---------------+----------+----------+---------------+---------------+
|     id|code_gender|flag_own_car|flag_own_realty|cnt_children|amt_income_total|    name_income_type| name_education_type|  name_family_status|name_housing_type|days_birth|days_employed|flag_work_phone|flag_phone|flag_email|occupation_type|cnt_fam_members|
+-------+-----------+------------+---------------+------------+----------------+--------------------+--------------------+--------------------+-----------------+----------+-------------+---------------+----------+----------+---------------+---------------+
|5008804|          0|           1|              0|           0|        427500.0|             Working|    Higher education|      Civil marriage| Rented apartment|    -12005|        -4542|              1|         0|         0|     

438510

In [8]:
# Changing the days birth into age Integer so that it will be classification into its correspondence age
df_app_cleaned = df_app_cleaned.withColumn('age', floor(col('days_birth')/365.25) * -1)

# Check value because age should not be negative or even > 100
df_app_cleaned.select('age').where('age < 0 or age > 100').show()

# Changing the days birth into age Integer so that it will be classification into its correspondence age
df_app_cleaned = df_app_cleaned.withColumn('employed', floor(col('days_employed')/365.25) * -1)

# Check value because age should not be negative or even > 100 as well as the occupation
df_app_cleaned.select(['name_income_type','occupation_type','employed']).where('employed < 0 or employed > 100').distinct().show()

+---+
|age|
+---+
+---+

+----------------+---------------+--------+
|name_income_type|occupation_type|employed|
+----------------+---------------+--------+
|       Pensioner|           NULL|    -999|
+----------------+---------------+--------+



In [9]:
# because the data is said pensioner than it is possible that the employed is negative but for calculation sake it is better to make it into 0
df_app_cleaned = df_app_cleaned.withColumn('employed', when(df_app_cleaned.employed < 0,0).otherwise(df_app_cleaned.employed))

df_app_cleaned.show()

+-------+-----------+------------+---------------+------------+----------------+--------------------+--------------------+--------------------+-----------------+----------+-------------+---------------+----------+----------+---------------+---------------+---+--------+
|     id|code_gender|flag_own_car|flag_own_realty|cnt_children|amt_income_total|    name_income_type| name_education_type|  name_family_status|name_housing_type|days_birth|days_employed|flag_work_phone|flag_phone|flag_email|occupation_type|cnt_fam_members|age|employed|
+-------+-----------+------------+---------------+------------+----------------+--------------------+--------------------+--------------------+-----------------+----------+-------------+---------------+----------+----------+---------------+---------------+---+--------+
|5008804|          0|           1|              0|           0|        427500.0|             Working|    Higher education|      Civil marriage| Rented apartment|    -12005|        -4542|    

In [10]:
# there is occupation type that is null so we need to fill it up
df_app_cleaned.select('occupation_type').where(isnull(col('occupation_type'))).count() 
# There are no possibility to

134203

In [11]:
# check for occupation type that is null
df_app_cleaned.select(['amt_income_total','name_income_type','occupation_type','employed']).where(isnull(df_app_cleaned.occupation_type)).show()
# there is possibility to use the amt_income_total & name_income_type to classified the occupation_type typically will have the same job
# so need to find the exact number to it

+----------------+----------------+---------------+--------+
|amt_income_total|name_income_type|occupation_type|employed|
+----------------+----------------+---------------+--------+
|        427500.0|         Working|           NULL|      13|
|        427500.0|         Working|           NULL|      13|
|        283500.0|       Pensioner|           NULL|       0|
|        283500.0|       Pensioner|           NULL|       0|
|        283500.0|       Pensioner|           NULL|       0|
|        112500.0|         Working|           NULL|       5|
|        112500.0|         Working|           NULL|       5|
|        112500.0|         Working|           NULL|       5|
|        315000.0|       Pensioner|           NULL|       0|
|        225000.0|         Working|           NULL|       8|
|        225000.0|         Working|           NULL|       8|
|        225000.0|         Working|           NULL|       8|
|        225000.0|         Working|           NULL|       8|
|        157500.0|      

In [12]:
# use amt_income total to get the kind of occupation type check first
# the possibility is that the amt_income_total,name_income_type, and employed can fill the occupation_type that is null
df_app_cleaned.select(['amt_income_total','name_income_type','occupation_type','employed']).where((df_app_cleaned.amt_income_total == 225000.0) & (df_app_cleaned.name_income_type == 'Working')).show()
# This data further proven the point that it will be able to be filled up

+----------------+----------------+---------------+--------+
|amt_income_total|name_income_type|occupation_type|employed|
+----------------+----------------+---------------+--------+
|        225000.0|         Working|           NULL|       8|
|        225000.0|         Working|           NULL|       8|
|        225000.0|         Working|           NULL|       8|
|        225000.0|         Working|           NULL|       8|
|        225000.0|         Working|       Managers|       3|
|        225000.0|         Working|       Managers|       3|
|        225000.0|         Working|       Managers|       3|
|        225000.0|         Working|       Managers|       3|
|        225000.0|         Working|       Managers|       3|
|        225000.0|         Working|       Managers|       3|
|        225000.0|         Working|       Managers|       3|
|        225000.0|         Working|       Managers|       3|
|        225000.0|         Working|       Managers|       3|
|        225000.0|      

In [13]:
# so use the possibility of it and fill the occupation_type that got from the amt_income_total and name_income_type
from pyspark.sql import Window, functions as F

# Define a window specification
window_spec = Window.partitionBy('amt_income_total', 'name_income_type')

# Fill missing values of occupation_type with the most frequent value within each group
filled_occupation_type = F.first('occupation_type', ignorenulls=True).over(window_spec)

# Fill missing values using coalesce
df_app_cleaned = df_app_cleaned.withColumn('filled_occupation_type', filled_occupation_type)
df_app_cleaned = df_app_cleaned.withColumn('occupation_type', F.coalesce('occupation_type', 'filled_occupation_type')).drop('filled_occupation_type')

df_app_cleaned.show()

[Stage 44:>                                                         (0 + 1) / 1]

+-------+-----------+------------+---------------+------------+----------------+----------------+--------------------+--------------------+-----------------+----------+-------------+---------------+----------+----------+---------------+---------------+---+--------+
|     id|code_gender|flag_own_car|flag_own_realty|cnt_children|amt_income_total|name_income_type| name_education_type|  name_family_status|name_housing_type|days_birth|days_employed|flag_work_phone|flag_phone|flag_email|occupation_type|cnt_fam_members|age|employed|
+-------+-----------+------------+---------------+------------+----------------+----------------+--------------------+--------------------+-----------------+----------+-------------+---------------+----------+----------+---------------+---------------+---+--------+
|6093713|          1|           0|              0|           0|         26100.0|       Pensioner|Secondary / secon...|             Married|House / apartment|    -21003|       365243|              0|    

                                                                                

In [14]:
# check for occupation type that is null  and that the amt_income_total and name_income_type same, but still null
df_app_cleaned.select(['amt_income_total','name_income_type','occupation_type','employed']).where(isnull(df_app_cleaned.occupation_type) & (df_app_cleaned.employed != 0)).show()
# There are no exact data to fill the null in that missing value, so the possibility is the same name_income_type with the nearest amt_income_total
# Create a window specification to partition by 'name_income_type' and order by the absolute difference in 'amt_income_total'
window_spec = Window.partitionBy('name_income_type').orderBy(F.abs(F.col('amt_income_total') - F.col('amt_income_total_with_null')))

# Create a temporary column to store the amt_income_total of rows with null occupation_type
df_temp = df_app_cleaned.withColumn('amt_income_total_with_null', F.when(F.col('occupation_type').isNull(), F.col('amt_income_total')))

# Fill missing occupation_type using the nearest amt_income_total
filled_occupation_type = F.first('occupation_type', ignorenulls=True).over(window_spec)
df_temp = df_temp.withColumn('filled_occupation_type', filled_occupation_type)

# Use coalesce to fill the null values
df_app_cleaned = df_temp.withColumn('occupation_type', F.coalesce(F.col('occupation_type'), F.col('filled_occupation_type'))).drop('filled_occupation_type', 'amt_income_total_with_null')

df_app_cleaned.show()

+----------------+--------------------+---------------+--------+
|amt_income_total|    name_income_type|occupation_type|employed|
+----------------+--------------------+---------------+--------+
|         32400.0|             Working|           NULL|       5|
|         32400.0|             Working|           NULL|       5|
|         32400.0|             Working|           NULL|       5|
|         38250.0|       State servant|           NULL|       3|
|         38250.0|       State servant|           NULL|       3|
|         38250.0|       State servant|           NULL|       3|
|         38250.0|       State servant|           NULL|       3|
|         38250.0|       State servant|           NULL|       3|
|         38250.0|       State servant|           NULL|       3|
|         38250.0|       State servant|           NULL|       3|
|         38700.0|             Working|           NULL|      26|
|         41211.0|Commercial associate|           NULL|       6|
|         41211.0|Commerc

[Stage 50:>                                                         (0 + 1) / 1]

+-------+-----------+------------+---------------+------------+----------------+--------------------+--------------------+------------------+-----------------+----------+-------------+---------------+----------+----------+---------------+---------------+---+--------+
|     id|code_gender|flag_own_car|flag_own_realty|cnt_children|amt_income_total|    name_income_type| name_education_type|name_family_status|name_housing_type|days_birth|days_employed|flag_work_phone|flag_phone|flag_email|occupation_type|cnt_fam_members|age|employed|
+-------+-----------+------------+---------------+------------+----------------+--------------------+--------------------+------------------+-----------------+----------+-------------+---------------+----------+----------+---------------+---------------+---+--------+
|6014233|          1|           0|              0|           1|         28723.5|Commercial associate|Secondary / secon...|           Married|House / apartment|    -14361|         -138|            

                                                                                

In [15]:
# check for occupation type that is null
df_app_cleaned.select(['amt_income_total','name_income_type','occupation_type','employed']).where(isnull(df_app_cleaned.occupation_type)).show()
# Data already cleaned!!

+----------------+----------------+---------------+--------+
|amt_income_total|name_income_type|occupation_type|employed|
+----------------+----------------+---------------+--------+
+----------------+----------------+---------------+--------+



In [16]:
df_credit = spark.read.format('csv').options(header=True,inferSchema=True).load('credit_record.csv')
df_credit.show(30, truncate=50)
df_credit.printSchema()
print('Status \n\
      0 => 1-29 days past due \n\
      1 => 30-59 days past due \n\
      2 => 60-89 days past due \n\
      3 => 90-119 days past due \n\
      4 => 120-149 days past due \n\
      5 => overdue/ bad debts > 150 days \n\
      C => paid successfully \n\
      X => No loan \n\
')
# The schema is already correct with id and months_balance as Integer.

+-------+--------------+------+
|     ID|MONTHS_BALANCE|STATUS|
+-------+--------------+------+
|5001711|             0|     X|
|5001711|            -1|     0|
|5001711|            -2|     0|
|5001711|            -3|     0|
|5001712|             0|     C|
|5001712|            -1|     C|
|5001712|            -2|     C|
|5001712|            -3|     C|
|5001712|            -4|     C|
|5001712|            -5|     C|
|5001712|            -6|     C|
|5001712|            -7|     C|
|5001712|            -8|     C|
|5001712|            -9|     0|
|5001712|           -10|     0|
|5001712|           -11|     0|
|5001712|           -12|     0|
|5001712|           -13|     0|
|5001712|           -14|     0|
|5001712|           -15|     0|
|5001712|           -16|     0|
|5001712|           -17|     0|
|5001712|           -18|     0|
|5001713|             0|     X|
|5001713|            -1|     X|
|5001713|            -2|     X|
|5001713|            -3|     X|
|5001713|            -4|     X|
|5001713

                                                                                

In [17]:
# Check for null etc.
# Display the count of null values for each column
df_credit.select([count(when(isnan(c) | isnull(c), 1)).alias(c) for c in df_credit.columns]).show()

# List of status values to count
status_values = [0, 1, 2, 3, 4, 5, 'C', 'X']

# Loop through each status value and print the count
for status in status_values:
    count_value = df_credit.select('status').where(col('status') == status).count()
    print(f"{status}: {count_value}")

                                                                                

+---+--------------+------+
| ID|MONTHS_BALANCE|STATUS|
+---+--------------+------+
|  0|             0|     0|
+---+--------------+------+

0: 383120
1: 11090
2: 868
3: 320
4: 223
5: 1693
C: 442031
X: 209230


In [18]:
# convert each status to points
# X is 0 because there is no loan 
# 0 - 5 is negative value [-5,-10,-20,-50,-100]
# C is positive value of + 20
# Define the mapping from status to points
status_to_points = {
    'X': 0,
    0: -5,
    1: -10,
    2: -20,
    3: -50,
    4: -100,
    5: -1000,
    'C': 50
}

# Convert the mapping dictionary to a column expression
points_expr = when(col('status') == 'X', status_to_points['X']) \
    .when(col('status') == 'C', status_to_points['C'])

for status in range(6):
    points_expr = points_expr.when(col('status') == status, status_to_points[status])

# Add the new column with points to the DataFrame
df_credit_processed = df_credit.withColumn('points', points_expr)

df_credit_processed.show()

df_credit_processed.select('id').distinct().count()

+-------+--------------+------+------+
|     ID|MONTHS_BALANCE|STATUS|points|
+-------+--------------+------+------+
|5001711|             0|     X|     0|
|5001711|            -1|     0|    -5|
|5001711|            -2|     0|    -5|
|5001711|            -3|     0|    -5|
|5001712|             0|     C|    50|
|5001712|            -1|     C|    50|
|5001712|            -2|     C|    50|
|5001712|            -3|     C|    50|
|5001712|            -4|     C|    50|
|5001712|            -5|     C|    50|
|5001712|            -6|     C|    50|
|5001712|            -7|     C|    50|
|5001712|            -8|     C|    50|
|5001712|            -9|     0|    -5|
|5001712|           -10|     0|    -5|
|5001712|           -11|     0|    -5|
|5001712|           -12|     0|    -5|
|5001712|           -13|     0|    -5|
|5001712|           -14|     0|    -5|
|5001712|           -15|     0|    -5|
+-------+--------------+------+------+
only showing top 20 rows



45985

In [19]:
# then do aggregate data on the same id customer
df_credit_processed = df_credit_processed.drop('status')
df_credit_processed = df_credit_processed.groupby('id')\
                        .agg(
                            count('months_balance').alias('month_count'),
                            sum('points').alias('sum_points')
                        )
df_credit_processed = df_credit_processed.withColumn('avg_points', col('sum_points') / col('month_count'))
df_credit_processed = df_credit_processed.drop('month_count','sum_points')
df_credit_processed.show()

+-------+-------------------+
|     id|         avg_points|
+-------+-------------------+
|5001812|               -5.0|
|5001849|  7.222222222222222|
|5001921|               18.0|
|5003338|               -5.0|
|5003386|               -5.0|
|5003485| -4.705882352941177|
|5003623| 43.888888888888886|
|5004426|  42.72727272727273|
|5004485|  2.142857142857143|
|5004511|               -5.4|
|5004620| -4.705882352941177|
|5004650| 23.026315789473685|
|5004774| 25.344827586206897|
|5005000|-0.7142857142857143|
|5005607|  39.44444444444444|
|5005681| 18.870967741935484|
|5009033|-1.1764705882352942|
|5009304|  7.857142857142857|
|5009355|-4.8076923076923075|
|5009429|               -5.0|
+-------+-------------------+
only showing top 20 rows



                                                                                

In [20]:
# Count how many row in df_credit_processed
df_credit_processed.summary().show()

+-------+-----------------+------------------+
|summary|               id|        avg_points|
+-------+-----------------+------------------+
|  count|            45985|             45985|
|   mean|5070163.913058606|11.919731024252135|
| stddev|45433.63587058626|30.541103517001016|
|    min|          5001711|-947.6315789473684|
|    25%|          5026144|-4.090909090909091|
|    50%|          5065731|               0.0|
|    75%|          5114021| 32.32142857142857|
|    max|          5150487|              50.0|
+-------+-----------------+------------------+



In [21]:
# # count the positive and negative sum_points, 
# # the point distributed where good is called when point is > 10 why is that it is because below 10 you still commited blunder when paying so there are consequences.
# print(f"good : {df_credit_processed.select('avg_points').where(df_credit_processed.avg_points > 0 ).count()}")
# # print(f"normal : {df_credit_processed.select('avg_points').where((df_credit_processed.avg_points < 10) & (df_credit_processed.avg_points > -3)).count()}")
# print(f"bad : {df_credit_processed.select('avg_points').where(df_credit_processed.avg_points <= 0).count()}")

# # labeling the data
# df_credit_processed = df_credit_processed.withColumn('result',when(col('avg_points') > 0,'good').otherwise('bad'))
# df_credit_processed.show()

In [22]:
# join the data of application to credit data
df_joined_processed = df_app_cleaned.join(df_credit_processed,on='id',how='inner')

# df_joined_processed.show()
df_joined_processed.select([count((when(isnan(c) | isnull(c), 1))).alias(c) for c in df_joined_processed.columns]).show()
df_joined_processed.select([count(c).alias(c) for c in df_joined_processed.columns]).show()
# no null data

                                                                                

+---+-----------+------------+---------------+------------+----------------+----------------+-------------------+------------------+-----------------+----------+-------------+---------------+----------+----------+---------------+---------------+---+--------+----------+
| id|code_gender|flag_own_car|flag_own_realty|cnt_children|amt_income_total|name_income_type|name_education_type|name_family_status|name_housing_type|days_birth|days_employed|flag_work_phone|flag_phone|flag_email|occupation_type|cnt_fam_members|age|employed|avg_points|
+---+-----------+------------+---------------+------------+----------------+----------------+-------------------+------------------+-----------------+----------+-------------+---------------+----------+----------+---------------+---------------+---+--------+----------+
|  0|          0|           0|              0|           0|               0|               0|                  0|                 0|                0|         0|            0|              0

                                                                                

+-----+-----------+------------+---------------+------------+----------------+----------------+-------------------+------------------+-----------------+----------+-------------+---------------+----------+----------+---------------+---------------+-----+--------+----------+
|   id|code_gender|flag_own_car|flag_own_realty|cnt_children|amt_income_total|name_income_type|name_education_type|name_family_status|name_housing_type|days_birth|days_employed|flag_work_phone|flag_phone|flag_email|occupation_type|cnt_fam_members|  age|employed|avg_points|
+-----+-----------+------------+---------------+------------+----------------+----------------+-------------------+------------------+-----------------+----------+-------------+---------------+----------+----------+---------------+---------------+-----+--------+----------+
|36457|      36457|       36457|          36457|       36457|           36457|           36457|              36457|             36457|            36457|     36457|        36457| 

******Starting EDA Process over the Result******

In [None]:
# find the correlation matrix


**Extract Feature**
**--**
**Processing with Model**

In [23]:
# total_count = count_df.count()
# status_class_proportion = status_class_counts.withColumn('proportion', col('count') / lit(total_count))
# status_class_proportion.show()

In [24]:
# columns = data_join.columns
# # Membuat DataFrame dari daftar kolom
# information = spark.createDataFrame([(col,) for col in columns], ["variable"])

# # Menambahkan kolom 'IV' dengan nilai None
# information = information.withColumn('IV', lit(None).cast(StringType()))

# # Daftar kolom yang akan dihapus
# namelist = ['FLAG_MOBIL', 'begin_month', 'dep_value', 'target', 'ID']

# # Menghapus kolom-kolom yang ada dalam namelist dari information
# for name in namelist:
#     information = information.filter(information['variable'] != name)

# # Tampilkan hasil
# information.show()

In [25]:
# def calc_information_value(df: DataFrame, feature: str, target: str, pr: bool = False):
#     # Mengisi nilai NULL
#     df = df.withColumn(feature, when(col(feature).isNull(), "NULL").otherwise(col(feature)))
    
#     # Menghitung jumlah total, good, dan bad untuk setiap nilai feature
#     data = df.groupBy(feature).agg(
#         count("*").alias("All"),
#         count(when(col(target) == 0, 1)).alias("Good"),
#         count(when(col(target) == 1, 1)).alias("Bad")
#     )
    
#     # Menghitung Share, Bad Rate, Distribution Good, dan Distribution Bad
#     total_count = df.count()
#     total_bad = df.filter(col(target) == 1).count()
#     total_good = total_count - total_bad
    
#     data = data.withColumn("Share", col("All") / total_count)
#     data = data.withColumn("Bad Rate", col("Bad") / col("All"))
#     data = data.withColumn("Distribution Good", (col("All") - col("Bad")) / total_good)
#     data = data.withColumn("Distribution Bad", col("Bad") / total_bad)
    
#     # Menghitung WoE
#     data = data.withColumn("WoE", log(col("Distribution Good") / col("Distribution Bad")))
    
#     # Mengganti nilai inf dan -inf dengan 0
#     data = data.withColumn("WoE", when(col("WoE") == float("inf"), 0).otherwise(col("WoE")))
#     data = data.withColumn("WoE", when(col("WoE") == float("-inf"), 0).otherwise(col("WoE")))
    
#     # Menghitung information_value
#     data = data.withColumn("information_value", col("WoE") * (col("Distribution Good") - col("Distribution Bad")))
    
#     # Mengurutkan data berdasarkan feature dan value
#     data = data.orderBy(feature)
    
#     # Menghitung total information_value
#     information_value = data.agg(sum("information_value")).collect()[0][0]
    
#     if pr:
#         data.show()
#         print('information_value = ', information_value)
    
#     print(f"This variable's information_value is: {information_value}")
#     data.groupBy(feature).count().show()
    
#     return information_value, data

In [26]:
# def convert_dummy(df: DataFrame, feature: str, rank: int = 0) -> DataFrame:
#     # Menghitung jumlah kemunculan setiap nilai pada feature
#     mode = df.groupBy(feature).count().orderBy(col('count').desc()).collect()[rank][0]
    
#     # Membuat dummy variables menggunakan pivot
#     dummies = df.groupBy("ID").pivot(feature).agg(lit(1)).na.fill(0)
    
#     # Mengganti nama kolom dummy
#     for col_name in dummies.columns:
#         if col_name != "ID":
#             new_name = f"{feature}_{col_name}"
#             dummies = dummies.withColumnRenamed(col_name, new_name)
    
#     # Menghapus kolom yang paling sering muncul
#     dummies = dummies.drop(f"{feature}_{mode}")
    
#     # Menghapus kolom asli dari DataFrame
#     df = df.drop(feature)
    
#     # Menggabungkan kembali dummies dengan DataFrame asli
#     df = df.join(dummies, on="ID", how="left")
    
#     return df

In [27]:
# def get_category(df, col, binsnum, labels, qcut=False):
#     # Menyalin DataFrame asli
#     localdf = df.select(col).toPandas()

#     if qcut:
#         # Quantile cut
#         localdf['bins'] = pd.qcut(localdf[col], q=binsnum, labels=labels)
#     else:
#         # Equal-length cut
#         localdf['bins'] = pd.cut(localdf[col], bins=binsnum, labels=labels)
    
#     # Membuat DataFrame dari hasil binning
#     bins_df = spark.createDataFrame(localdf[['bins']])
    
#     # Menggabungkan hasil binning ke DataFrame asli
#     name = f'gp_{col}'
#     bins_df = bins_df.withColumnRenamed('bins', name)
#     df = df.withColumn(name, bins_df[name].cast('string'))
    
#     return df


In [28]:
# # Mengganti nilai kolom CODE_GENDER
# data_join = data_join.withColumn('CODE_GENDER', when(col('CODE_GENDER') == 'F', 0).when(col('CODE_GENDER') == 'M', 1))
# # print(data_join['CODE_GENDER'].value_counts())
# # Menghitung value_counts
# gender_counts = data_join.groupBy('CODE_GENDER').count()
# gender_counts.show()
# # Menghitung information value
# iv, data = calc_information_value(data_join, 'CODE_GENDER', 'target')
# information = information.withColumn('IV', when(col('variable') == 'CODE_GENDER', lit(iv).cast(StringType())).otherwise(col('IV')))
# data.show()



**PREPROCESSING**

In [29]:
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.ml.feature import StringIndexer,VectorAssembler
from pyspark.ml import Pipeline

In [30]:
# indexers = [StringIndexer(inputCol=col,outputCol=col+'_idx').fit(df_joined_processed) for col in ['name_income_type','name_family_status','name_housing_type']]
# pipeline = Pipeline(stages=indexers)
# final_df = pipeline.fit(df_joined_processed).transform(df_joined_processed)
# final_df.show()

In [31]:
# col = ['name_income_type_idx','amt_income_total','name_housing_type_idx']
# assembler = VectorAssembler(inputCols=col, outputCol= 'features')
# final_df = assembler.transform(final_df)
# final_df = final_df.withColumn('result',when(final_df.result == 'good',1).otherwise(0))
# final_df.show()

In [32]:
# (train,test) = final_df.randomSplit([0.8,0.2],seed=11)

In [33]:
# from pyspark.ml.classification import GBTClassifier

In [34]:
# model = GBTClassifier(featuresCol='features', labelCol='result').fit(train)

In [35]:
# data = model.transform(test)
# total_count = data.count()

# # Filter rows where 'prediction' matches 'Survived' and count them
# matching_count = data.filter(F.col('prediction') == F.col('result')).count()

# # Calculate the percentage
# percentage = (matching_count / total_count) * 100

# print("Percentage of similar predictions vs. 'Result' GBT: {:.2f}%".format(percentage))