# Mentoring 3

In [1]:
# Library
import pyspark
import os
import sys

from helper.utils import init_spark_session
from helper.utils import logging_process
from pyspark.sql.functions import regexp_replace,to_date,year,month,dayofmonth,when,make_date



## Extraction

### Database

In [2]:
from extract.extract import extract_data

In [3]:
#Education
education_df = extract_data('education_status','db')

In [4]:
education_df.show(5)

+------------+---------+--------------------+--------------------+
|education_id|    value|          created_at|          updated_at|
+------------+---------+--------------------+--------------------+
|           1| tertiary|2025-02-28 15:31:...|2025-02-28 15:31:...|
|           2|secondary|2025-02-28 15:31:...|2025-02-28 15:31:...|
|           3|  unknown|2025-02-28 15:31:...|2025-02-28 15:31:...|
|           4|  primary|2025-02-28 15:31:...|2025-02-28 15:31:...|
+------------+---------+--------------------+--------------------+



In [5]:
#marital
marital_df = extract_data('marital_status','db')
marital_df.show(5)

+----------+--------+--------------------+--------------------+
|marital_id|   value|          created_at|          updated_at|
+----------+--------+--------------------+--------------------+
|         1| married|2025-02-28 15:31:...|2025-02-28 15:31:...|
|         2|  single|2025-02-28 15:31:...|2025-02-28 15:31:...|
|         3|divorced|2025-02-28 15:31:...|2025-02-28 15:31:...|
+----------+--------+--------------------+--------------------+



In [6]:
#marketing
marketing_df = extract_data('marketing_campaign_deposit','db')
marketing_df.show(5)

+------------+---+------------+----------+------------+-------+-------+-------+-----+-------+---+-----+--------+--------+-----+--------+--------+------------------+--------------------+--------------------+
|loan_data_id|age|         job|marital_id|education_id|default|balance|housing| loan|contact|day|month|duration|campaign|pdays|previous|poutcome|subscribed_deposit|          created_at|          updated_at|
+------------+---+------------+----------+------------+-------+-------+-------+-----+-------+---+-----+--------+--------+-----+--------+--------+------------------+--------------------+--------------------+
|           1| 58|  management|         1|           1|  false|  $2143|   true|false|unknown|  5|  may|     261|       1|   -1|       0| unknown|             false|2025-02-28 15:59:...|2025-02-28 15:59:...|
|           2| 44|  technician|         2|           2|  false|    $29|   true|false|unknown|  5|  may|     151|       1|   -1|       0| unknown|             false|2025-02-

### CSV

In [7]:
bank_df = extract_data('new_bank_transaction','csv')
bank_df.show(5)


+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+
|TransactionID|CustomerID|CustomerDOB|CustGender|CustLocation|CustAccountBalance|TransactionDate|TransactionTime|TransactionAmount (INR)|
+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+
|      T642232|  C1010028|    25/8/88|         F|       DELHI|         296828.37|        29/8/16|          95212|                    557|
|       T87414|  C1010035|     2/3/92|         M|      MUMBAI|           7284.42|         1/8/16|         111917|                     50|
|      T560676|C1010035_2|     9/6/80|         M| NAVI MUMBAI|         378013.09|        27/8/16|         185011|                    700|
|      T610204|  C1010036|    26/2/96|         M|     GURGAON|         355430.17|        26/8/16|          95203|                    208|
|      T957663|  C1010041|     6/9

In [8]:
bank_df.count()

1048567

## Exploration

In [9]:
from helper.utils import check_null_values

### Education

In [10]:

education_df.printSchema()

root
 |-- education_id: integer (nullable = true)
 |-- value: string (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- updated_at: timestamp (nullable = true)



In [11]:
education_df.show(5)

+------------+---------+--------------------+--------------------+
|education_id|    value|          created_at|          updated_at|
+------------+---------+--------------------+--------------------+
|           1| tertiary|2025-02-28 15:31:...|2025-02-28 15:31:...|
|           2|secondary|2025-02-28 15:31:...|2025-02-28 15:31:...|
|           3|  unknown|2025-02-28 15:31:...|2025-02-28 15:31:...|
|           4|  primary|2025-02-28 15:31:...|2025-02-28 15:31:...|
+------------+---------+--------------------+--------------------+



In [12]:
check_null_values(education_df)

There's no null value


### Marital Status

In [13]:
marital_df.printSchema()

root
 |-- marital_id: integer (nullable = true)
 |-- value: string (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- updated_at: timestamp (nullable = true)



In [14]:
marital_df.count()

3

In [15]:
marital_df.show(3)

+----------+--------+--------------------+--------------------+
|marital_id|   value|          created_at|          updated_at|
+----------+--------+--------------------+--------------------+
|         1| married|2025-02-28 15:31:...|2025-02-28 15:31:...|
|         2|  single|2025-02-28 15:31:...|2025-02-28 15:31:...|
|         3|divorced|2025-02-28 15:31:...|2025-02-28 15:31:...|
+----------+--------+--------------------+--------------------+



In [16]:
check_null_values(marital_df)

There's no null value


### Marketing

In [17]:
marketing_df.printSchema()

root
 |-- loan_data_id: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital_id: integer (nullable = true)
 |-- education_id: integer (nullable = true)
 |-- default: boolean (nullable = true)
 |-- balance: string (nullable = true)
 |-- housing: boolean (nullable = true)
 |-- loan: boolean (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- subscribed_deposit: boolean (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- updated_at: timestamp (nullable = true)



In [18]:
marketing_df.count()

45211

In [19]:
marketing_df.show(5)

+------------+---+------------+----------+------------+-------+-------+-------+-----+-------+---+-----+--------+--------+-----+--------+--------+------------------+--------------------+--------------------+
|loan_data_id|age|         job|marital_id|education_id|default|balance|housing| loan|contact|day|month|duration|campaign|pdays|previous|poutcome|subscribed_deposit|          created_at|          updated_at|
+------------+---+------------+----------+------------+-------+-------+-------+-----+-------+---+-----+--------+--------+-----+--------+--------+------------------+--------------------+--------------------+
|           1| 58|  management|         1|           1|  false|  $2143|   true|false|unknown|  5|  may|     261|       1|   -1|       0| unknown|             false|2025-02-28 15:59:...|2025-02-28 15:59:...|
|           2| 44|  technician|         2|           2|  false|    $29|   true|false|unknown|  5|  may|     151|       1|   -1|       0| unknown|             false|2025-02-

In [20]:
check_null_values(marketing_df)

There's no null value


### Bank Transaction

In [21]:
bank_df.printSchema()

root
 |-- TransactionID: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- CustomerDOB: string (nullable = true)
 |-- CustGender: string (nullable = true)
 |-- CustLocation: string (nullable = true)
 |-- CustAccountBalance: string (nullable = true)
 |-- TransactionDate: string (nullable = true)
 |-- TransactionTime: string (nullable = true)
 |-- TransactionAmount (INR): string (nullable = true)



In [22]:
bank_df.count()

1048567

In [23]:
print(bank_df.columns)

['TransactionID', 'CustomerID', 'CustomerDOB', 'CustGender', 'CustLocation', 'CustAccountBalance', 'TransactionDate', 'TransactionTime', 'TransactionAmount (INR)']


In [24]:


check_null_values(bank_df)

CustGender has null values
CustLocation has null values
CustAccountBalance has null values


In [25]:
bank_df.groupBy("CustLocation").count().orderBy(bank_df['CustLocation'].asc()).show()

+--------------------+-----+
|        CustLocation|count|
+--------------------+-----+
|                NULL|  151|
|(154) BHASKOLA FA...|   11|
|(BEFORE YMCA BLDG...|    1|
|(BENAKA MDTS) BAN...|   21|
|(BRINDA BAN ) KOL...|    5|
|          (DT) HOSUR|   11|
|          (E) MUMBAI|   44|
|  (E) THANE SECTOR 3|    1|
|(EAST) DAHISAR MU...|    1|
|       (EAST) MUMBAI|   52|
|        (EAST) THANE|   22|
|     (INDIA ) NAGPUR|    8|
|(KH) TAL MULSHI PUNE|    4|
|(MCORP) 1/C NR HO...|    2|
|(METRO) NR RAM MA...|    2|
|     (MOHALI) KHARAR|   13|
|          (N) 24 PGS|    6|
|(NEAR AUDI SHOWRO...|    2|
|   (PIGEN) BANGALORE|    5|
|(PITAMPURA) NEW D...|    2|
+--------------------+-----+
only showing top 20 rows



we will remove null values

In [26]:
bank_df.groupBy("CustGender").count().orderBy(bank_df['CustGender'].asc()).show()

+----------+------+
|CustGender| count|
+----------+------+
|      NULL|  1100|
|         F|281936|
|         M|765530|
|         T|     1|
+----------+------+



we will remove null values

In [27]:
bank_df.groupBy("CustAccountBalance").count().orderBy(bank_df['CustAccountBalance'].asc()).show()

+------------------+-----+
|CustAccountBalance|count|
+------------------+-----+
|              NULL| 2369|
|                 0| 2711|
|              0.01|  300|
|              0.02|   79|
|              0.03|   44|
|              0.04|   86|
|              0.05|   79|
|              0.06|   18|
|              0.07|   37|
|              0.08|   18|
|              0.09|   14|
|               0.1|   42|
|              0.11|   23|
|              0.12|   33|
|              0.13|   50|
|              0.14|   20|
|              0.15|   55|
|              0.16|   19|
|              0.17|   43|
|              0.18|   34|
+------------------+-----+
only showing top 20 rows



we will remove null values

In [28]:
bank_df.filter(~bank_df["CustomerDOB"].contains("/")).show()

+-------------+----------+-----------+----------+--------------------+------------------+---------------+---------------+-----------------------+
|TransactionID|CustomerID|CustomerDOB|CustGender|        CustLocation|CustAccountBalance|TransactionDate|TransactionTime|TransactionAmount (INR)|
+-------------+----------+-----------+----------+--------------------+------------------+---------------+---------------+-----------------------+
|       T93303|  C1012964|        nan|         F|              MUMBAI|          84098.47|        22/9/16|         145046|                18485.9|
|      T586187|  C1012971|        nan|         F|              MUMBAI|          84098.47|        25/8/16|         151540|                   2980|
|      T529584|  C1083540|        nan|         M|              BOISAR|           8093.64|        24/8/16|         195728|                   1190|
|      T231105|  C1112938|        nan|         F|              MUMBAI|          84098.47|         7/8/16|         132750|   

## Transformation

### Marketing

In [29]:
marketing_copy = marketing_df.alias('marketing_copy')
marketing_copy.printSchema()

root
 |-- loan_data_id: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital_id: integer (nullable = true)
 |-- education_id: integer (nullable = true)
 |-- default: boolean (nullable = true)
 |-- balance: string (nullable = true)
 |-- housing: boolean (nullable = true)
 |-- loan: boolean (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- subscribed_deposit: boolean (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- updated_at: timestamp (nullable = true)



In [30]:
marketing_copy.select('balance').show(5)

+-------+
|balance|
+-------+
|  $2143|
|    $29|
|     $2|
|  $1506|
|     $1|
+-------+
only showing top 5 rows



In [31]:
# Changing data type balance
from pyspark.sql.functions import regexp_replace
marketing_copy = marketing_copy.withColumn("balance",regexp_replace("balance", r'\$', ''))
marketing_copy = marketing_copy.withColumn("balance",marketing_copy['balance'].cast("int"))
marketing_copy.select("balance").show(5)

+-------+
|balance|
+-------+
|   2143|
|     29|
|      2|
|   1506|
|      1|
+-------+
only showing top 5 rows



In [32]:
# duration
marketing_copy.select("duration").show(5)

+--------+
|duration|
+--------+
|     261|
|     151|
|      76|
|      92|
|     198|
+--------+
only showing top 5 rows



In [33]:
marketing_copy = marketing_copy.withColumn("duration_in_year",(marketing_copy['duration'] / 365).cast("int"))
marketing_copy.select(['duration','duration_in_year']).show(20)

+--------+----------------+
|duration|duration_in_year|
+--------+----------------+
|     261|               0|
|     151|               0|
|      76|               0|
|      92|               0|
|     198|               0|
|     139|               0|
|     217|               0|
|     380|               1|
|      50|               0|
|      55|               0|
|     222|               0|
|     137|               0|
|     517|               1|
|      71|               0|
|     174|               0|
|     353|               0|
|      98|               0|
|      38|               0|
|     219|               0|
|      54|               0|
+--------+----------------+
only showing top 20 rows



In [34]:
# rename columns
list_columns ={
    'pdays':'days_since_last_campaign',
    'previous':'previous_campaign_contacts',
    'poutcome':'previous_campaign_outcome'
}

marketing_copy = marketing_copy.withColumnsRenamed(list_columns)
marketing_copy.printSchema()

root
 |-- loan_data_id: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital_id: integer (nullable = true)
 |-- education_id: integer (nullable = true)
 |-- default: boolean (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: boolean (nullable = true)
 |-- loan: boolean (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- days_since_last_campaign: integer (nullable = true)
 |-- previous_campaign_contacts: integer (nullable = true)
 |-- previous_campaign_outcome: string (nullable = true)
 |-- subscribed_deposit: boolean (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- updated_at: timestamp (nullable = true)
 |-- duration_in_year: integer (nullable = true)



In [35]:
from transform.transform_marketing import transform_marketing

marketing_df = transform_marketing(marketing_df)
marketing_df.printSchema()

root
 |-- loan_data_id: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital_id: integer (nullable = true)
 |-- education_id: integer (nullable = true)
 |-- default: boolean (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: boolean (nullable = true)
 |-- loan: boolean (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- days_since_last_campaign: integer (nullable = true)
 |-- previous_campaign_contacts: integer (nullable = true)
 |-- previous_campaign_outcome: string (nullable = true)
 |-- subscribed_deposit: boolean (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- updated_at: timestamp (nullable = true)
 |-- duration_in_year: integer (nullable = true)



### Bank

#### Customers

In [36]:
bank_df = bank_df.dropna()

In [37]:
cust_copy = bank_df.select(['CustomerID','CustomerDOB','CustGender','CustLocation','CustAccountBalance'])
cust_copy.show(5)

+----------+-----------+----------+------------+------------------+
|CustomerID|CustomerDOB|CustGender|CustLocation|CustAccountBalance|
+----------+-----------+----------+------------+------------------+
|  C1010028|    25/8/88|         F|       DELHI|         296828.37|
|  C1010035|     2/3/92|         M|      MUMBAI|           7284.42|
|C1010035_2|     9/6/80|         M| NAVI MUMBAI|         378013.09|
|  C1010036|    26/2/96|         M|     GURGAON|         355430.17|
|  C1010041|     6/9/93|         F|       DELHI|          34119.48|
+----------+-----------+----------+------------+------------------+
only showing top 5 rows



In [38]:
# rename columns
list_cols = {
    'CustomerID':'customer_id',
    'CustomerDOB':'birth_date',
    'CustGender':'gender',
    'CustLocation':'location',
    'CustAccountBalance':'account_balance'
}
cust_copy = cust_copy.withColumnsRenamed(list_cols)
cust_copy.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- birth_date: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- location: string (nullable = true)
 |-- account_balance: string (nullable = true)



In [39]:
# For checking if there's birth_date with different format
cust_copy = cust_copy.withColumn(
    "is_date",
    when(to_date(cust_copy["birth_date"], "d/M/yy").isNotNull(), True).otherwise(False)
)

In [40]:
cust_copy.filter(cust_copy['is_date'] == False).show(5)

+-----------+----------+------+---------+---------------+-------+
|customer_id|birth_date|gender| location|account_balance|is_date|
+-----------+----------+------+---------+---------------+-------+
|   C1012964|       nan|     F|   MUMBAI|       84098.47|  false|
|   C1012971|       nan|     F|   MUMBAI|       84098.47|  false|
|   C1083540|       nan|     M|   BOISAR|        8093.64|  false|
|   C1112938|       nan|     F|   MUMBAI|       84098.47|  false|
| C1114329_2|       nan|     M|NEW DELHI|     1088762.49|  false|
+-----------+----------+------+---------+---------------+-------+
only showing top 5 rows



since birth_date can have null values in database, we will change it into null, and wont be removed

In [41]:
from pyspark.sql.functions import to_date,year,month,dayofmonth,when,make_date

# cast type
cust_copy = cust_copy.withColumn(
    "birth_date",
    to_date(cust_copy["birth_date"], "d/M/yy")
)

# Adjust year if > 2025
cust_copy = cust_copy.withColumn(
    "birth_date",
    when(year(cust_copy["birth_date"]) >= 2025,
         make_date((year(cust_copy["birth_date"]) - 100), month(cust_copy["birth_date"]), dayofmonth(cust_copy["birth_date"]))
        ).otherwise(cust_copy["birth_date"])
)

In [42]:
# change birth date
test = (
    cust_copy.groupBy("birth_date")
    .count()
)
test.orderBy(test['birth_date'].asc()).show()

+----------+-----+
|birth_date|count|
+----------+-----+
|      NULL| 3333|
|1800-01-01|56292|
|1925-05-06|    1|
|1926-01-28|    1|
|1926-03-14|    1|
|1926-11-06|    2|
|1926-12-21|    6|
|1927-01-07|    1|
|1927-01-15|    5|
|1927-02-11|    4|
|1927-02-23|   13|
|1927-04-03|    2|
|1927-04-13|    1|
|1927-05-19|    5|
|1927-05-25|    8|
|1927-06-08|    4|
|1927-08-04|    1|
|1927-09-11|    4|
|1927-11-14|    1|
|1928-01-12|    4|
+----------+-----+
only showing top 20 rows



In [43]:
# gender
# filter gender

cust_copy = cust_copy.filter(cust_copy['gender'].isin(['M','F']))
gender_list = {
    'M':"Male",
    "F":"Female"
}
cust_copy = cust_copy.replace(gender_list,subset=['gender'])
cust_copy.show(5)

+-----------+----------+------+-----------+---------------+-------+
|customer_id|birth_date|gender|   location|account_balance|is_date|
+-----------+----------+------+-----------+---------------+-------+
|   C1010028|1988-08-25|Female|      DELHI|      296828.37|   true|
|   C1010035|1992-03-02|  Male|     MUMBAI|        7284.42|   true|
| C1010035_2|1980-06-09|  Male|NAVI MUMBAI|      378013.09|   true|
|   C1010036|1996-02-26|  Male|    GURGAON|      355430.17|   true|
|   C1010041|1993-09-06|Female|      DELHI|       34119.48|   true|
+-----------+----------+------+-----------+---------------+-------+
only showing top 5 rows



In [44]:
# account balance
from pyspark.sql.types import DecimalType
cust_copy = cust_copy.withColumn('account_balance',cust_copy['account_balance'].cast(DecimalType(10, 2)))
cust_copy.show(5)

+-----------+----------+------+-----------+---------------+-------+
|customer_id|birth_date|gender|   location|account_balance|is_date|
+-----------+----------+------+-----------+---------------+-------+
|   C1010028|1988-08-25|Female|      DELHI|      296828.37|   true|
|   C1010035|1992-03-02|  Male|     MUMBAI|        7284.42|   true|
| C1010035_2|1980-06-09|  Male|NAVI MUMBAI|      378013.09|   true|
|   C1010036|1996-02-26|  Male|    GURGAON|      355430.17|   true|
|   C1010041|1993-09-06|Female|      DELHI|       34119.48|   true|
+-----------+----------+------+-----------+---------------+-------+
only showing top 5 rows



In [45]:
from transform.transform_customer import transform_customer
bank_df = transform_customer(bank_df)
cust_df = bank_df.select(['customer_id','birth_date','gender','location','account_balance'])
cust_df.show(5)

+-----------+----------+------+-----------+---------------+
|customer_id|birth_date|gender|   location|account_balance|
+-----------+----------+------+-----------+---------------+
|   C1010028|1988-08-25|Female|      DELHI|      296828.37|
|   C1010035|1992-03-02|  Male|     MUMBAI|        7284.42|
| C1010035_2|1980-06-09|  Male|NAVI MUMBAI|      378013.09|
|   C1010036|1996-02-26|  Male|    GURGAON|      355430.17|
|   C1010041|1993-09-06|Female|      DELHI|       34119.48|
+-----------+----------+------+-----------+---------------+
only showing top 5 rows



In [46]:
cust_df.filter(year(cust_df['birth_date']) >= 2025).show()

+-----------+----------+------+--------+---------------+
|customer_id|birth_date|gender|location|account_balance|
+-----------+----------+------+--------+---------------+
+-----------+----------+------+--------+---------------+



#### Transaction

In [47]:
trans_df = bank_df.select(['TransactionID','customer_id','TransactionDate','TransactionTime','TransactionAmount (INR)'])
trans_copy = trans_df.alias('trans_df')
trans_copy.show(5)

+-------------+-----------+---------------+---------------+-----------------------+
|TransactionID|customer_id|TransactionDate|TransactionTime|TransactionAmount (INR)|
+-------------+-----------+---------------+---------------+-----------------------+
|      T642232|   C1010028|        29/8/16|          95212|                    557|
|       T87414|   C1010035|         1/8/16|         111917|                     50|
|      T560676| C1010035_2|        27/8/16|         185011|                    700|
|      T610204|   C1010036|        26/8/16|          95203|                    208|
|      T957663|   C1010041|        10/9/16|         162533|                  14500|
+-------------+-----------+---------------+---------------+-----------------------+
only showing top 5 rows



In [48]:
 # rename columns
list_cols = {
    'TransactionID':'transaction_id',
    'TransactionDate':'transaction_date',
    'TransactionTime':'transaction_time',
    'TransactionAmount (INR)':'transaction_amount'
}
trans_copy = trans_copy.withColumnsRenamed(list_cols)
trans_copy.show(5)

+--------------+-----------+----------------+----------------+------------------+
|transaction_id|customer_id|transaction_date|transaction_time|transaction_amount|
+--------------+-----------+----------------+----------------+------------------+
|       T642232|   C1010028|         29/8/16|           95212|               557|
|        T87414|   C1010035|          1/8/16|          111917|                50|
|       T560676| C1010035_2|         27/8/16|          185011|               700|
|       T610204|   C1010036|         26/8/16|           95203|               208|
|       T957663|   C1010041|         10/9/16|          162533|             14500|
+--------------+-----------+----------------+----------------+------------------+
only showing top 5 rows



In [49]:
trans_copy.printSchema()

root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- transaction_time: string (nullable = true)
 |-- transaction_amount: string (nullable = true)



In [50]:
# Transaction Time
trans_copy.select('transaction_time').show(5)

+----------------+
|transaction_time|
+----------------+
|           95212|
|          111917|
|          185011|
|           95203|
|          162533|
+----------------+
only showing top 5 rows



In [51]:
from pyspark.sql.functions import to_timestamp, date_format,lpad

trans_copy = trans_copy.withColumn(
    "transaction_time",
    date_format(
        to_timestamp(
            lpad(trans_copy['transaction_time'], 6, "0"),
            "HHmmss"
        ),
        "HH:mm:ss"
    )
)
trans_copy.show(5)

+--------------+-----------+----------------+----------------+------------------+
|transaction_id|customer_id|transaction_date|transaction_time|transaction_amount|
+--------------+-----------+----------------+----------------+------------------+
|       T642232|   C1010028|         29/8/16|        09:52:12|               557|
|        T87414|   C1010035|          1/8/16|        11:19:17|                50|
|       T560676| C1010035_2|         27/8/16|        18:50:11|               700|
|       T610204|   C1010036|         26/8/16|        09:52:03|               208|
|       T957663|   C1010041|         10/9/16|        16:25:33|             14500|
+--------------+-----------+----------------+----------------+------------------+
only showing top 5 rows



In [52]:
trans_copy = trans_copy.withColumn('transaction_amount',trans_copy['transaction_amount'].cast(DecimalType(10, 2)))
trans_copy.show(5)

+--------------+-----------+----------------+----------------+------------------+
|transaction_id|customer_id|transaction_date|transaction_time|transaction_amount|
+--------------+-----------+----------------+----------------+------------------+
|       T642232|   C1010028|         29/8/16|        09:52:12|            557.00|
|        T87414|   C1010035|          1/8/16|        11:19:17|             50.00|
|       T560676| C1010035_2|         27/8/16|        18:50:11|            700.00|
|       T610204|   C1010036|         26/8/16|        09:52:03|            208.00|
|       T957663|   C1010041|         10/9/16|        16:25:33|          14500.00|
+--------------+-----------+----------------+----------------+------------------+
only showing top 5 rows



In [53]:
trans_copy.filter(year('transaction_date') >= 2025).show()

+--------------+-----------+----------------+----------------+------------------+
|transaction_id|customer_id|transaction_date|transaction_time|transaction_amount|
+--------------+-----------+----------------+----------------+------------------+
+--------------+-----------+----------------+----------------+------------------+



There's no transaction over 2025

In [54]:
from pyspark.sql.functions import to_date,year


# cast type
trans_copy = trans_copy.withColumn(
    "transaction_date",
    to_date(trans_copy["transaction_date"], "d/M/yy")
)

# filter transaction_date > 2025 
trans_copy = trans_copy.filter(year('transaction_date') <= 2025)
trans_copy.show(5)

+--------------+-----------+----------------+----------------+------------------+
|transaction_id|customer_id|transaction_date|transaction_time|transaction_amount|
+--------------+-----------+----------------+----------------+------------------+
|       T642232|   C1010028|      2016-08-29|        09:52:12|            557.00|
|        T87414|   C1010035|      2016-08-01|        11:19:17|             50.00|
|       T560676| C1010035_2|      2016-08-27|        18:50:11|            700.00|
|       T610204|   C1010036|      2016-08-26|        09:52:03|            208.00|
|       T957663|   C1010041|      2016-09-10|        16:25:33|          14500.00|
+--------------+-----------+----------------+----------------+------------------+
only showing top 5 rows



In [55]:
from transform.transform_transaction import transform_transaction

bank_df = transform_transaction(bank_df)
trans_df = bank_df.select(['transaction_id','customer_id','transaction_date','transaction_time','transaction_amount'])
trans_df.show(5)

+--------------+-----------+----------------+----------------+------------------+
|transaction_id|customer_id|transaction_date|transaction_time|transaction_amount|
+--------------+-----------+----------------+----------------+------------------+
|       T642232|   C1010028|      2016-08-29|        09:52:12|            557.00|
|        T87414|   C1010035|      2016-08-01|        11:19:17|             50.00|
|       T560676| C1010035_2|      2016-08-27|        18:50:11|            700.00|
|       T610204|   C1010036|      2016-08-26|        09:52:03|            208.00|
|       T957663|   C1010041|      2016-09-10|        16:25:33|          14500.00|
+--------------+-----------+----------------+----------------+------------------+
only showing top 5 rows



## Load

In [56]:
from load.load_data import load_data

load_data(education_df,'education_status')

In [57]:
load_data(marital_df,'marital_status')

In [58]:
load_data(marketing_df,'marketing_campaign_deposit')

In [59]:
load_data(cust_df,'customers')

In [60]:
load_data(trans_df,'transactions')