Dalam latihan ini kita akan melakukan data cleansing dan transform menggunakan data credit card approval. 
Data diambil dari sini : https://www.kaggle.com/datasets/rikdifos/credit-card-approval-prediction/data

Yang akan kita lakukan adalah sebagai berikut : 
- Load data
- Eksplorasi
- Menangani empty/NULL column : mengisi dengan nilai default
- Melakukan agregasi dan join data credit application dan data credit record yang sudah diagregasi 
- Transformasi tanggal

Yang akan kita lakukan tentu bukan proses menghitung credit risk yang ideal, akan tetapi hanya untuk menunjukkan beberapa operasi yang biasa kita lakukan dalam data preprocessing, dilakukan dengan spark dan hive.

In [31]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f

In [32]:
spark = SparkSession.builder.appName('Hive Basics').enableHiveSupport().getOrCreate()

## Prepare data

### Download dataset

In [6]:
!wget  -P data https://github.com/urfie/SparkSQL-dengan-Hive/raw/main/datasets/application_record.csv.gz

--2023-10-06 07:47:54--  https://github.com/urfie/SparkSQL-dengan-Hive/raw/main/datasets/application_record.csv.gz
Resolving github.com (github.com)... 20.205.243.166
Connecting to github.com (github.com)|20.205.243.166|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/urfie/SparkSQL-dengan-Hive/main/datasets/application_record.csv.gz [following]
--2023-10-06 07:47:55--  https://raw.githubusercontent.com/urfie/SparkSQL-dengan-Hive/main/datasets/application_record.csv.gz
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.110.133, 185.199.111.133, 185.199.108.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.110.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 3175168 (3,0M) [application/octet-stream]
Saving to: ‘data/application_record.csv.gz.1’


2023-10-06 07:47:56 (4,07 MB/s) - ‘data/application_record.csv.gz.1’ saved [3175168/3175168]

In [5]:
!wget -P data https://github.com/urfie/SparkSQL-dengan-Hive/raw/main/datasets/credit_record.csv.gz 

--2023-10-06 07:47:34--  https://github.com/urfie/SparkSQL-dengan-Hive/raw/main/datasets/credit_record.csv.gz
Resolving github.com (github.com)... 20.205.243.166
Connecting to github.com (github.com)|20.205.243.166|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/urfie/SparkSQL-dengan-Hive/main/datasets/credit_record.csv.gz [following]
--2023-10-06 07:47:34--  https://raw.githubusercontent.com/urfie/SparkSQL-dengan-Hive/main/datasets/credit_record.csv.gz
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.111.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2684548 (2,6M) [application/octet-stream]
Saving to: ‘data/credit_record.csv.gz.1’


2023-10-06 07:47:36 (2,65 MB/s) - ‘data/credit_record.csv.gz.1’ saved [2684548/2684548]



### Load to HDFS

In [None]:
!hdfs dfs -ls /user/hadoop

In [None]:
!hdfs dfs -mkdir /user/hadoop/cc_data

In [None]:
!hdfs dfs -mkdir /user/hadoop/cc_data/credit

In [None]:
!hdfs dfs -put data/credit_record.csv.gz /user/hadoop/cc_data/credit

In [34]:
!hdfs dfs -ls /user/hadoop/cc_data/credit

Found 1 items
-rw-r--r--   1 hadoop supergroup    2684548 2023-10-06 07:26 /user/hadoop/cc_data/credit/credit_record.csv.gz


In [None]:
!hdfs dfs -mkdir /user/hadoop/cc_data/app

In [None]:
!hdfs dfs -put data/application_record.csv.gz /user/hadoop/cc_data/app

In [35]:
!hdfs dfs -ls /user/hadoop/cc_data/app

Found 1 items
-rw-r--r--   1 hadoop supergroup    3175168 2023-10-06 07:25 /user/hadoop/cc_data/app/application_record.csv.gz


In [None]:
spark.sql("CREATE DATABASE user0;")

### Create external tables

In [None]:
spark.sql("""CREATE TABLE user0.credit_ext(
id STRING,
months_balance BIGINT,
status STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 'hdfs://127.0.0.1:9000/user/hadoop/cc_data/credit');
""")

In [33]:
spark.sql("select * from user0.credit_ext").show()

+-------+--------------+------+
|     id|months_balance|status|
+-------+--------------+------+
|5001711|             0|     X|
|5001711|            -1|     0|
|5001711|            -2|     0|
|5001711|            -3|     0|
|5001712|             0|     C|
|5001712|            -1|     C|
|5001712|            -2|     C|
|5001712|            -3|     C|
|5001712|            -4|     C|
|5001712|            -5|     C|
|5001712|            -6|     C|
|5001712|            -7|     C|
|5001712|            -8|     C|
|5001712|            -9|     0|
|5001712|           -10|     0|
|5001712|           -11|     0|
|5001712|           -12|     0|
|5001712|           -13|     0|
|5001712|           -14|     0|
|5001712|           -15|     0|
+-------+--------------+------+
only showing top 20 rows



In [None]:
#spark.sql("drop table user0.app_ext")

In [None]:
spark.sql("""
CREATE TABLE user0.app_ext(
id	STRING,
code_gender	STRING,
flag_own_car	STRING,
flag_own_realty	STRING,
cnt_children	STRING,
amt_income_total	STRING,
name_income_type	STRING,
name_education_type	STRING,
name_family_status	STRING,
name_housing_type	STRING,
days_birth	INT,
days_employed	INT,
flag_mobil	INT,
flag_work_phone	INT,
flag_phone	INT,
flag_email	INT,
occupation_type	STRING,
cnt_fam_members	INT)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 'hdfs://127.0.0.1:9000/user/hadoop/cc_data/app';
""")

In [36]:
spark.sql("select * from user0.app_ext limit 5").show(truncate=False)

+-------+-----------+------------+---------------+------------+----------------+--------------------+-----------------------------+--------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+
|id     |code_gender|flag_own_car|flag_own_realty|cnt_children|amt_income_total|name_income_type    |name_education_type          |name_family_status  |name_housing_type|days_birth|days_employed|flag_mobil|flag_work_phone|flag_phone|flag_email|occupation_type|cnt_fam_members|
+-------+-----------+------------+---------------+------------+----------------+--------------------+-----------------------------+--------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+
|5008804|M          |Y           |Y              |0           |427500.0        |Working             |Higher education             |Civil marriage      |Rented apartment 

### Create Managed table

In [None]:
spark.sql("""CREATE TABLE user0.credit(
id STRING,
months_balance BIGINT,
status STRING)
STORED AS ORC;
""")

In [None]:
spark.sql("insert into user0.credit select * from user0.credit_ext")

In [None]:
spark.sql("""
CREATE TABLE user0.app(
id	STRING,
code_gender	STRING,
flag_own_car	STRING,
flag_own_realty	STRING,
cnt_children	STRING,
amt_income_total	STRING,
name_income_type	STRING,
name_education_type	STRING,
name_family_status	STRING,
name_housing_type	STRING,
days_birth	INT,
days_employed	INT,
flag_mobil	INT,
flag_work_phone	INT,
flag_phone	INT,
flag_email	INT,
occupation_type	STRING,
cnt_fam_members	INT)
STORED AS ORC;
""")

In [None]:
spark.sql("insert into user0.app select * from user0.app_ext")

## Clean Data 

### Explorasi Data

In [43]:
spark.sql("select distinct status from user0.credit_ext").show()

[Stage 47:>                                                         (0 + 1) / 1]

+------+
|status|
+------+
|     3|
|     0|
|     5|
|     C|
|     X|
|     1|
|     4|
|     2|
+------+



                                                                                

In [44]:
spark.sql("select distinct occupation_type from user0.app_ext").show(truncate = False)

+---------------------+
|occupation_type      |
+---------------------+
|Managers             |
|HR staff             |
|Medicine staff       |
|Accountants          |
|Laborers             |
|Cleaning staff       |
|Private service staff|
|Drivers              |
|Sales staff          |
|Realty agents        |
|IT staff             |
|Security staff       |
|Secretaries          |
|Low-skill Laborers   |
|                     |
|Core staff           |
|Cooking staff        |
|High skill tech staff|
|Waiters/barmen staff |
+---------------------+



                                                                                

In [45]:
spark.sql("select distinct name_education_type from user0.app_ext").show(truncate = False)

[Stage 53:>                                                         (0 + 1) / 1]

+-----------------------------+
|name_education_type          |
+-----------------------------+
|Academic degree              |
|Incomplete higher            |
|Secondary / secondary special|
|Lower secondary              |
|Higher education             |
+-----------------------------+



                                                                                

In [46]:
spark.sql("select distinct code_gender from user0.app_ext").show(truncate = False)

[Stage 56:>                                                         (0 + 1) / 1]

+-----------+
|code_gender|
+-----------+
|F          |
|M          |
+-----------+



                                                                                

In [47]:
spark.sql("select distinct name_family_status from user0.app_ext").show(truncate = False)

[Stage 59:>                                                         (0 + 1) / 1]

+--------------------+
|name_family_status  |
+--------------------+
|Separated           |
|Married             |
|Single / not married|
|Widow               |
|Civil marriage      |
+--------------------+



                                                                                

In [48]:
spark.sql("select distinct name_housing_type from user0.app_ext").show(truncate = False)

[Stage 62:>                                                         (0 + 1) / 1]

+-------------------+
|name_housing_type  |
+-------------------+
|House / apartment  |
|Municipal apartment|
|Co-op apartment    |
|Rented apartment   |
|Office apartment   |
|With parents       |
+-------------------+



                                                                                

Dari hasil di atas terlihat bahwa kolom `occupation_type` memiliki record dengan nilai kosong (empty string).
Note : Dalam hal ini Hive menyimpan kolom kosong sebagai empty string, bukan NULL 

### Load data ke DataFrame

In [37]:
dfApp = spark.sql("select * from user0.app_ext")

### Drop duplicates row

Membersihkan data duplikat, yaitu record dengan id yang sama 

In [38]:
dfApp.count()

438557

In [39]:
dfNodup = dfApp.drop_duplicates(["id"])

In [40]:
dfNodup.count()

                                                                                

438510

### Mengisi nilai default 

In [41]:
dfClean = dfNodup.withColumn('occupation_type', \
                              f.when(f.trim(dfNodup['occupation_type']) == '', 'Other') \
                              .otherwise(dfNodup['occupation_type']))

In [None]:
dfClean.select('occupation_type').distinct().show(truncate=False)

### Reformat tanggal lahir 

Kita akan menghitung tanggal lahir dengan rumus : tanggal saat ini - `days_birth`

In [None]:
dfClean = dfClean.withColumn('dob', f.date_add(f.current_date(), dfClean.days_birth))

In [None]:
dfClean.show()

### Reformat status kredit

Berdasar deskripsi dataset, status kredit terdiri dari C, X, dan integer 0-5, dengan arti : 
- 0: 1-29 days past due 
- 1: 30-59 days past due
- 2: 60-89 days overdue
- 3: 90-119 days overdue
- 4: 120-149 days overdue
- 5: Overdue or bad debts, write-offs for more than 150 days
- C: paid off that month
- X: No loan for the month

Kita akan  mengelompokkan jenis kredit menjadi 2, yaitu 
- 0 (Good Credit): X, C, dan 0
- 1 (Bad Credit): 2 sampai 5

In [None]:
dfCredit = spark.sql("select * from user0.credit")

In [None]:
dfCredit = dfCredit.withColumn('status_new', \
                              f.when(dfCredit.status.isin(['X','C',0]), 0) \
                              .otherwise(1))
dfAggregated = dfCredit.groupBy("id").agg(f.sum("status_new").alias("risk"))

### Join dengan data aplikasi 

In [None]:
dfApp_joined = dfClean.join(dfAggregated,dfClean.id==dfAggregated.id, "left").select(dfClean["*"],dfAggregated["risk"])

In [None]:
#dfApp_joined.show()

### Save ke tabel

In [None]:
dfApp_joined.write.saveAsTable(name="user0.app_risk_1", mode="overwrite",format="orc")

In [None]:
spark.sql("describe extended user0.app_risk_1").show(100, truncate = False)

In [None]:
spark.sql("select * from user0.app_risk_1").show()