#### Dalam latihan ini kita akan melakukan data cleansing dan transform menggunakan data credit card approval. 
#### Data diambil dari sini : `https://www.kaggle.com/datasets/rikdifos/credit-card-approval-prediction/data`

#### Yang akan kita lakukan adalah sebagai berikut :

*1. Load data*
*2. Eksplorasi*
*3. Menangani empty/NULL column : mengisi dengan nilai default*
*4. Melakukan agregasi dan join data credit application dan data credit record yang sudah diagregasi*
*5. Transformasi tanggal*

#### Yang akan kita lakukan tentu bukan proses menghitung credit risk yang ideal, akan tetapi hanya untuk menunjukkan beberapa operasi yang biasa kita lakukan dalam data preprocessing, dilakukan dengan spark dan hive.

In [1]:
%spark.pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
spark = SparkSession.builder.enableHiveSupport().getOrCreate()


### Prepare Data

#### Download Dataset


In [3]:
%sh
wget  -P data https://github.com/urfie/SparkSQL-dengan-Hive/raw/main/datasets/application_record.csv.gz

In [4]:
%sh
wget -P data https://github.com/urfie/SparkSQL-dengan-Hive/raw/main/datasets/credit_record.csv.gz

#### Upload Data ke HDFS


In [6]:
%sh
hdfs dfs -mkdir /user/userdev/cc_data

In [7]:
%sh
hdfs dfs -mkdir /user/userdev/cc_data/credit

In [8]:
%sh
hdfs dfs -put data/credit_record.csv.gz /user/userdev/cc_data/credit

In [9]:
%sh
hdfs dfs -ls /user/userdev/cc_data/credit

In [10]:
%sh
hdfs dfs -mkdir /user/userdev/cc_data/app

In [11]:
%sh
hdfs dfs -put data/application_record.csv.gz /user/userdev/cc_data/app

In [12]:
%sh
hdfs dfs -ls /user/userdev/cc_data/app

In [13]:
%spark.pyspark
spark.sql("CREATE DATABASE user0;")


#### Create External table


In [15]:
%spark.pyspark
spark.sql("""CREATE EXTERNAL TABLE user0.credit_ext(
id STRING,
months_balance BIGINT,
status STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 'hdfs://myzoo/user/userdev/cc_data/credit'
""")

In [16]:
%spark.pyspark
spark.sql("select * from user0.credit_ext limit 10").show(truncate=False)

In [17]:
%spark.pyspark
spark.sql("""
CREATE EXTERNAL TABLE user0.app_ext(
id	STRING,
code_gender	STRING,
flag_own_car	STRING,
flag_own_realty	STRING,
cnt_children	STRING,
amt_income_total	STRING,
name_income_type	STRING,
name_education_type	STRING,
name_family_status	STRING,
name_housing_type	STRING,
days_birth	INT,
days_employed	INT,
flag_mobil	INT,
flag_work_phone	INT,
flag_phone	INT,
flag_email	INT,
occupation_type	STRING,
cnt_fam_members	INT)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 'hdfs://myzoo/user/userdev/cc_data/app'
""")

In [18]:
%spark.pyspark
spark.sql("select * from user0.app_ext limit 5").show(truncate=False)

#### Create Managed Table


In [20]:
%spark.pyspark
spark.sql("""CREATE TABLE user0.credit(
id STRING,
months_balance BIGINT,
status STRING)
STORED AS ORC;
""")

In [21]:
%spark.pyspark
spark.sql("insert into user0.credit select * from user0.credit_ext")

In [22]:
%spark.pyspark
spark.sql("""
CREATE TABLE user0.app(
id	STRING,
code_gender	STRING,
flag_own_car	STRING,
flag_own_realty	STRING,
cnt_children	STRING,
amt_income_total	STRING,
name_income_type	STRING,
name_education_type	STRING,
name_family_status	STRING,
name_housing_type	STRING,
days_birth	INT,
days_employed	INT,
flag_mobil	INT,
flag_work_phone	INT,
flag_phone	INT,
flag_email	INT,
occupation_type	STRING,
cnt_fam_members	INT)
STORED AS ORC;
""")

In [23]:
%spark.pyspark
spark.sql("insert into user0.app select * from user0.app_ext")

### Clean Data

#### Explorasi Data

In [25]:
%spark.pyspark
spark.sql("select distinct status from user0.credit_ext").show()

In [26]:
%spark.pyspark
spark.sql("select distinct name_education_type from user0.app_ext").show(truncate = False)

In [27]:
%spark.pyspark
spark.sql("select distinct code_gender from user0.app_ext").show(truncate = False)

In [28]:
%spark.pyspark
spark.sql("select distinct name_family_status from user0.app_ext").show(truncate = False)

In [29]:
%spark.pyspark
spark.sql("select distinct name_housing_type from user0.app_ext").show(truncate = False)

In [30]:
%spark.pyspark
spark.sql("select distinct occupation_type from user0.app_ext").show(truncate = False)

##### Dari hasil di atas terlihat bahwa kolom `occupation_type` memiliki record dengan nilai kosong (empty string). Note : Dalam hal ini Hive menyimpan kolom kosong sebagai *empty string*, bukan *NULL*


#### Load Data ke DataFrame

In [33]:
%spark.pyspark
dfApp = spark.sql("select * from user0.app_ext")

#### Drop Duplicate Row

##### Membersihkan data duplikat, yaitu record dengan id yang sama


In [35]:
%spark.pyspark
dfApp.count()

In [36]:
%spark.pyspark
dfNodup = dfApp.drop_duplicates(["id"])

In [37]:
%spark.pyspark
dfNodup.count()

#### Mengisi Nilai Default


In [39]:
%spark.pyspark
dfClean = dfNodup.withColumn('occupation_type', \
                              f.when(f.trim(dfNodup['occupation_type']) == '', 'Other') \
                              .otherwise(dfNodup['occupation_type']))

In [40]:
%spark.pyspark
dfClean.select('occupation_type').distinct().show(truncate=False)

#### Reformat tanggal lahir
##### Kita akan menghitung tanggal lahir dengan rumus : `tanggal saat ini - days_birth`


In [42]:
%spark.pyspark
dfClean = dfClean.withColumn('dob', f.expr("date_add(current_date(), days_birth)"))

In [43]:
%spark.pyspark
dfClean.show()

#### Reformat status kredit

Berdasar deskripsi dataset, status kredit terdiri dari C, X, dan integer 0-5, dengan arti :

- 0: 1-29 days past due
- 1: 30-59 days past due
- 2: 60-89 days overdue
- 3: 90-119 days overdue
- 4: 120-149 days overdue
- 5: Overdue or bad debts, write-offs for more than 150 days
- C: paid off that month
- X: No loan for the month
- Kita akan mengelompokkan jenis kredit menjadi 2, yaitu

0 (Good Credit): X, C, dan 0
1 (Bad Credit): 2 sampai 5


In [45]:
%spark.pyspark
dfCredit = spark.sql("select * from user0.credit")

In [46]:
%spark.pyspark
dfCredit = dfCredit.withColumn('status_new', \
                              f.when(dfCredit.status.isin(['X','C',0]), 0) \
                              .otherwise(1))
                              
dfAggregated = dfCredit.groupBy("id").agg(f.sum("status_new").alias("risk"))

#### Join dengan data aplikasi



In [48]:
%spark.pyspark
dfApp_joined = dfClean.join(dfAggregated,dfClean.id==dfAggregated.id, "left").select(dfClean["*"],dfAggregated["risk"])


#### Save ke tabel


In [50]:
%spark.pyspark
dfApp_joined.write.saveAsTable(name="user0.app_risk_1", mode="overwrite",format="orc")

In [51]:
%spark.pyspark
spark.sql("describe extended user0.app_risk_1").show(100, truncate = False)

In [52]:
%spark.pyspark
spark.sql("select * from user0.app_risk_1").show()

##### Drop All data before running first


In [54]:
%spark.pyspark
spark.sql("drop table user0.app")
spark.sql("drop table user0.app_ext")
spark.sql("drop table user0.credit")
spark.sql("drop table user0.credit_ext")
spark.sql("drop table user0.app_risk_1")
spark.sql("drop database user0")

In [55]:
%sh
hdfs dfs -rmr -skipTrash /user/userdev/cc_data


In [56]:
%spark.pyspark
spark.sql("""
    SELECT app.*, credit.resiko 
        FROM 
        (SELECT id,code_gender,flag_own_car,flag_own_realty,cnt_children,amt_income_total,    
    name_income_type,name_education_type,name_family_status,name_housing_type,days_birth,
    days_employed,flag_mobil,flag_work_phone,flag_phone,flag_email,
              CASE WHEN trim(occupation_type) == '' THEN 'Other' ELSE occupation_type END as occupation_type,
            cnt_fam_members, 
              date_add(current_date(), days_birth) as dob FROM 
                (SELECT distinct * FROM user0.app)) as app 
        JOIN
        (SELECT id, sum(status_new) as resiko from 
            (SELECT id, status, CASE WHEN status in ('C','X','0') THEN 0 ELSE 1 END as status_new FROM user0.credit) 
        GROUP BY id) as credit
        ON app.id = credit.id""").write.saveAsTable(name="user0.app_risk_2", mode="overwrite")


In [57]:
spark.sql("select * from user0.app_risk_2 limit 5").show()

In [58]:
%spark.pyspark
spark.sql("""
    SELECT app.*, credit.resiko 
        FROM 
        (SELECT id,code_gender,flag_own_car,flag_own_realty,cnt_children,amt_income_total,    
    name_income_type,name_education_type,name_family_status,name_housing_type,days_birth,
    days_employed,flag_mobil,flag_work_phone,flag_phone,flag_email,
              CASE WHEN trim(occupation_type) == '' THEN 'Other' ELSE occupation_type END as occupation_type,
            cnt_fam_members, 
              date_add(current_date(), days_birth) as dob FROM 
                (SELECT distinct * FROM user0.app)) as app 
        JOIN
        (SELECT id, sum(status_new) as resiko from 
            (SELECT id, status, CASE WHEN status in ('C','X','0') THEN 0 ELSE 1 END as status_new FROM user0.credit) 
        GROUP BY id) as credit
        ON app.id = credit.id""").write.saveAsTable(name="user0.app_risk_3", mode="overwrite")

In [59]:
%spark.pyspark
