In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, from_json, sum, when, lit, split, udf, trim, regexp_replace  # common function
from pyspark.sql.functions import date_format,from_unixtime, to_timestamp  # date function

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType


### Set up SPARK SESSION

In [None]:
conf = (
    pyspark.SparkConf().setAppName('cc_credit_card_transformation')
    .set("spark.executor.memory", "16g")
)

In [None]:
spark = SparkSession.builder.config(conf=conf).getOrCreate()
df = spark.read.json('raw_cc_credit_card/raw_cc_credit_card')

### Exploratory Data Analysis

This section performs EDA on the source dataset to understand distributions, detect anomalies, and identify data quality issues.

In [4]:
df.printSchema()

root
 |-- Unnamed: 0: string (nullable = true)
 |-- amt: string (nullable = true)
 |-- category: string (nullable = true)
 |-- cc_bic: string (nullable = true)
 |-- cc_num: string (nullable = true)
 |-- is_fraud: string (nullable = true)
 |-- merch_eff_time: string (nullable = true)
 |-- merch_last_update_time: string (nullable = true)
 |-- merch_lat: string (nullable = true)
 |-- merch_long: string (nullable = true)
 |-- merch_zipcode: string (nullable = true)
 |-- merchant: string (nullable = true)
 |-- personal_detail: string (nullable = true)
 |-- trans_date_trans_time: string (nullable = true)
 |-- trans_num: string (nullable = true)



In [5]:
# check top 5 record of the dataframe
# Dataframe contain nested json 
df.show(5)
print(f'ROW COUNT: {df.count()}')

+----------+------+-------------+-----------+----------------+--------+----------------+----------------------+------------------+-----------+-------------+--------------------+--------------------+---------------------+--------------------+
|Unnamed: 0|   amt|     category|     cc_bic|          cc_num|is_fraud|  merch_eff_time|merch_last_update_time|         merch_lat| merch_long|merch_zipcode|            merchant|     personal_detail|trans_date_trans_time|           trans_num|
+----------+------+-------------+-----------+----------------+--------+----------------+----------------------+------------------+-----------+-------------+--------------------+--------------------+---------------------+--------------------+
|         0|  4.97|     misc_net|CITIUS33CHI|2703186189652095|       0|1325376018798532|         1325376018666|         36.011293| -82.048315|        28705|fraud_Rippin, Kub...|{"person_name":"J...|  2019-01-01 00:00:18|0b242abb623afc578...|
|         1|107.23|  grocery_pos

In [6]:
# select top 10 of nest json
df.select('personal_detail').show(10,truncate=False) 

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|personal_detail                                                                                                                                                                                                                                                                      |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"person_name":"Jennifer,Banks,eeeee","gender":"F","address":"{\"street\":\"561 Perry Cove\",\"city\":\"Moravian Falls\",\"state\":\"NC\",\"zip\":\"28654\"}","

In [7]:
# look for null value for each column
# seems like there is null value for merch zipcode

df.select([
    sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in df.columns
]).show()

+----------+---+--------+------+------+--------+--------------+----------------------+---------+----------+-------------+--------+---------------+---------------------+---------+
|Unnamed: 0|amt|category|cc_bic|cc_num|is_fraud|merch_eff_time|merch_last_update_time|merch_lat|merch_long|merch_zipcode|merchant|personal_detail|trans_date_trans_time|trans_num|
+----------+---+--------+------+------+--------+--------------+----------------------+---------+----------+-------------+--------+---------------+---------------------+---------+
|         0|  0|       0|     0|     0|       0|             0|                     0|        0|         0|       195973|       0|              0|                    0|        0|
+----------+---+--------+------+------+--------+--------------+----------------------+---------+----------+-------------+--------+---------------+---------------------+---------+



In [8]:
# aware line 3 containing null string value
# change the condition to get all value where null 
# use lower case function to ignore the case

df.select([
    sum(when((lower(col(c)) == 'null' )| (lower(col(c)) == 'na'), 1).otherwise(0)).alias(c)
    for c in df.columns
]).show()

+----------+---+--------+------+------+--------+--------------+----------------------+---------+----------+-------------+--------+---------------+---------------------+---------+
|Unnamed: 0|amt|category|cc_bic|cc_num|is_fraud|merch_eff_time|merch_last_update_time|merch_lat|merch_long|merch_zipcode|merchant|personal_detail|trans_date_trans_time|trans_num|
+----------+---+--------+------+------+--------+--------------+----------------------+---------+----------+-------------+--------+---------------+---------------------+---------+
|         0|  0|       0|288599|     0|       0|             0|                     0|        0|         0|            0|       0|              0|                    0|        0|
+----------+---+--------+------+------+--------+--------------+----------------------+---------+----------+-------------+--------+---------------+---------------------+---------+



In [9]:
df.where((lower(col('cc_bic')) == 'null' )| (lower(col('cc_bic')) == 'na')).show()

+----------+------+-------------+------+-------------------+--------+----------------+----------------------+------------------+------------------+-------------+--------------------+--------------------+---------------------+--------------------+
|Unnamed: 0|   amt|     category|cc_bic|             cc_num|is_fraud|  merch_eff_time|merch_last_update_time|         merch_lat|        merch_long|merch_zipcode|            merchant|     personal_detail|trans_date_trans_time|           trans_num|
+----------+------+-------------+------+-------------------+--------+----------------+----------------------+------------------+------------------+-------------+--------------------+--------------------+---------------------+--------------------+
|         2|220.11|entertainment|  Null|     38859492057661|       0|1325376051506840|         1325376051286|         43.150704|       -112.154481|        83236|fraud_Lind-Buckridge|{"person_name":"E...|  2019-01-01 00:00:51|a1a22d70485983eac...|
|         7|

### Data Transformation and Cleaning

This section focuses on transforming raw data into a structured format, handling missing values, correcting data types, and cleaning inconsistencies to ensure data quality and consistency.


In [10]:
transformed_df = df.select('*')

In [11]:
# Date transformation
# change all to same human readaber format
transformed_df = transformed_df.withColumn(
    "trans_date_trans_time",
    date_format(to_timestamp('trans_date_trans_time'), "yyyy-MM-dd HH:mm:ss.SSS")
)

transformed_df = transformed_df.withColumn(
    "merch_last_update_time",
    date_format(from_unixtime(col("merch_last_update_time") / 1000000), "yyyy-MM-dd HH:mm:ss.SSS")
)

transformed_df = transformed_df.withColumn(
    "merch_eff_time",
    date_format(from_unixtime(col("merch_eff_time") / 1000000), "yyyy-MM-dd HH:mm:ss.SSS")
)

In [12]:
# using from_json to explode the columns 
#   -> explode personal_detail 
#   -> explode personal_detail.address
# from_json required schema to explode a json string
# documentation : https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.from_json.html


# create a function to take in schema + column to explode and return a dataframe
# expend the column and assign back to column nume, then explode into physical column and drop the original column 
# column state change: JsonString -> Json -> physical column 
def explode_json(df, col_name, schema):
    return df.withColumn(col_name, from_json(col(col_name), schema)).select(*df.columns,f'{col_name}.*').drop(col_name)

In [13]:
# define schema for personal details

personal_detail_schema = StructType([
    StructField("person_name", StringType()),
    StructField("gender", StringType()),
    StructField("address", StringType()),
    StructField("lat", StringType()),
    StructField("long", StringType()),
    StructField("city_pop", StringType()),
    StructField("job", StringType()),
    StructField("dob", StringType()),
])

address_schema =  StructType([
                    StructField("street", StringType()),
                    StructField("city", StringType()),
                    StructField("state", StringType()),
                    StructField("zip", StringType())
                ])


In [14]:
transformed_df = explode_json(transformed_df,'personal_detail',personal_detail_schema)

In [15]:
transformed_df.show(5)

+----------+------+-------------+-----------+----------------+--------+--------------------+----------------------+------------------+-----------+-------------+--------------------+---------------------+--------------------+--------------------+------+--------------------+-------+---------+--------+--------------------+----------+
|Unnamed: 0|   amt|     category|     cc_bic|          cc_num|is_fraud|      merch_eff_time|merch_last_update_time|         merch_lat| merch_long|merch_zipcode|            merchant|trans_date_trans_time|           trans_num|         person_name|gender|             address|    lat|     long|city_pop|                 job|       dob|
+----------+------+-------------+-----------+----------------+--------+--------------------+----------------------+------------------+-----------+-------------+--------------------+---------------------+--------------------+--------------------+------+--------------------+-------+---------+--------+--------------------+----------+
|

In [16]:
transformed_df = explode_json(transformed_df,'address',address_schema)

In [17]:
transformed_df.show(5, truncate=False)

+----------+------+-------------+-----------+----------------+--------+-----------------------+-----------------------+------------------+-----------+-------------+----------------------------------+-----------------------+--------------------------------+--------------------+------+-------+---------+--------+---------------------------------+----------+----------------------------+--------------+-----+-----+
|Unnamed: 0|amt   |category     |cc_bic     |cc_num          |is_fraud|merch_eff_time         |merch_last_update_time |merch_lat         |merch_long |merch_zipcode|merchant                          |trans_date_trans_time  |trans_num                       |person_name         |gender|lat    |long     |city_pop|job                              |dob       |street                      |city          |state|zip  |
+----------+------+-------------+-----------+----------------+--------+-----------------------+-----------------------+------------------+-----------+-------------+----------

In [18]:
#  splitting person_name into first name and last name using regular expression
#    -> data contain multiple delimiters
#    -> use \W+ to match any non-word character
#    -> \W+ : https://www.w3schools.com/jsref/jsref_obj_regexp.asp
transformed_df = transformed_df.withColumn('first',split(transformed_df['person_name'], '\W+').getItem(0))
transformed_df = transformed_df.withColumn('last',split(transformed_df['person_name'], '\W+').getItem(1))

In [19]:
# check data to ensure data extract correctly
transformed_df.select('person_name','first','last').show(20)

+--------------------+-----------+---------+
|         person_name|      first|     last|
+--------------------+-----------+---------+
|Jennifer,Banks,eeeee|   Jennifer|    Banks|
|Stephanie,Gill,eeeee|  Stephanie|     Gill|
|      Edward@Sanchez|     Edward|  Sanchez|
|     Jeremy/White, !|     Jeremy|    White|
|        Tyler@Garcia|      Tyler|   Garcia|
|Jennifer,Conner,e...|   Jennifer|   Conner|
|Kelsey, , Richard...|     Kelsey| Richards|
|    Steven, Williams|     Steven| Williams|
|Heather, , Chase ...|    Heather|    Chase|
|     Melissa@Aguilar|    Melissa|  Aguilar|
|     Eddie|Mendez!!!|      Eddie|   Mendez|
|   Theresa@Blackwell|    Theresa|Blackwell|
|   Charles|Robles!!!|    Charles|   Robles|
|           Jack@Hill|       Jack|     Hill|
|Christopher@Casta...|Christopher|Castaneda|
|       Ronald@Carson|     Ronald|   Carson|
|        Lisa, Mendez|       Lisa|   Mendez|
| Nathan,Thomas,eeeee|     Nathan|   Thomas|
|         Justin, Gay|     Justin|      Gay|
|Kenneth, 

In [20]:
# drop person_name column
transformed_df = transformed_df.drop('person_name')
transformed_df.show(5)

+----------+------+-------------+-----------+----------------+--------+--------------------+----------------------+------------------+-----------+-------------+--------------------+---------------------+--------------------+------+-------+---------+--------+--------------------+----------+--------------------+--------------+-----+-----+---------+-------+
|Unnamed: 0|   amt|     category|     cc_bic|          cc_num|is_fraud|      merch_eff_time|merch_last_update_time|         merch_lat| merch_long|merch_zipcode|            merchant|trans_date_trans_time|           trans_num|gender|    lat|     long|city_pop|                 job|       dob|              street|          city|state|  zip|    first|   last|
+----------+------+-------------+-----------+----------------+--------+--------------------+----------------------+------------------+-----------+-------------+--------------------+---------------------+--------------------+------+-------+---------+--------+--------------------+-------

#### TRIM text column with space

In [21]:
trim_column = ['merchant','job','street','city','state']
for column in trim_column:
    df = transformed_df.withColumn(column, trim(column))

In [22]:
# check data
# transformed_df.withColumn('merchant', trim('merchant')).show(5, truncate=False)

#### Unable to find any relation for cc_bic & merch_zipcode to map
##### However decide not to drop the NA/Empty/Null row due to each transaction may impact on the overall analysis</br>Fill all unable to define as 'Undefined'

In [23]:
transformed_df.select('cc_bic').distinct().show()

+-----------+
|     cc_bic|
+-----------+
|   ADMDUS41|
|   ACEEUS31|
|DEUTUS33TRF|
|         NA|
|CITIUS33CHI|
|   AIABUS31|
|           |
|       Null|
|   APBCUS61|
+-----------+



In [24]:
transformed_df.show(5, truncate=False)


+----------+------+-------------+-----------+----------------+--------+-----------------------+-----------------------+------------------+-----------+-------------+----------------------------------+-----------------------+--------------------------------+------+-------+---------+--------+---------------------------------+----------+----------------------------+--------------+-----+-----+---------+-------+
|Unnamed: 0|amt   |category     |cc_bic     |cc_num          |is_fraud|merch_eff_time         |merch_last_update_time |merch_lat         |merch_long |merch_zipcode|merchant                          |trans_date_trans_time  |trans_num                       |gender|lat    |long     |city_pop|job                              |dob       |street                      |city          |state|zip  |first    |last   |
+----------+------+-------------+-----------+----------------+--------+-----------------------+-----------------------+------------------+-----------+-------------+----------------

In [25]:
transformed_df = transformed_df.withColumn("cc_bic",when(col("cc_bic").isin("Null", "NA", ""), lit(None)).otherwise(col("cc_bic")))

In [26]:
# check null count
transformed_df.select([
    sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in transformed_df.columns
]).show()

+----------+---+--------+------+------+--------+--------------+----------------------+---------+----------+-------------+--------+---------------------+---------+------+---+----+--------+---+---+------+----+-----+---+-----+----+
|Unnamed: 0|amt|category|cc_bic|cc_num|is_fraud|merch_eff_time|merch_last_update_time|merch_lat|merch_long|merch_zipcode|merchant|trans_date_trans_time|trans_num|gender|lat|long|city_pop|job|dob|street|city|state|zip|first|last|
+----------+---+--------+------+------+--------+--------------+----------------------+---------+----------+-------------+--------+---------------------+---------+------+---+----+--------+---+---+------+----+-----+---+-----+----+
|         0|  0|       0|432980|     0|       0|             0|                     0|        0|         0|       195973|       0|                    0|        0|     0|  0|   0|       0|  0|  0|     0|   0|    0|  0|    0|   0|
+----------+---+--------+------+------+--------+--------------+---------------------

In [27]:
transformed_df = transformed_df.fillna('undefined')

In [28]:
transformed_df.show(5)

+----------+------+-------------+-----------+----------------+--------+--------------------+----------------------+------------------+-----------+-------------+--------------------+---------------------+--------------------+------+-------+---------+--------+--------------------+----------+--------------------+--------------+-----+-----+---------+-------+
|Unnamed: 0|   amt|     category|     cc_bic|          cc_num|is_fraud|      merch_eff_time|merch_last_update_time|         merch_lat| merch_long|merch_zipcode|            merchant|trans_date_trans_time|           trans_num|gender|    lat|     long|city_pop|                 job|       dob|              street|          city|state|  zip|    first|   last|
+----------+------+-------------+-----------+----------------+--------+--------------------+----------------------+------------------+-----------+-------------+--------------------+---------------------+--------------------+------+-------+---------+--------+--------------------+-------

In [29]:
# change the data type for further analysis

transformed_df = transformed_df.withColumn("amt", col("amt").cast(DoubleType())) \
                                .withColumn("lat", col("lat").cast(DoubleType())) \
                                .withColumn("long", col("long").cast(DoubleType())) \
                                .withColumn("city_pop", col("city_pop").cast(IntegerType())) \
                                .withColumn("merch_lat", col("merch_lat").cast(DoubleType())) \
                                .withColumn("merch_long", col("merch_long").cast(DoubleType())) \
                                .withColumn("is_fraud", col("is_fraud").cast(IntegerType())) \
                                .withColumn("zip", col("zip").cast(StringType())) \
                                .withColumn("merch_zipcode", col("merch_zipcode").cast(StringType())) 

In [30]:
# select column to load as output
transformed_df = transformed_df.select(
                        col("Unnamed: 0"), 
                        col("trans_date_trans_time"),
                        col("cc_num"), 
                        col("merchant"),
                        col("category"),
                        col("amt"),
                        col("first"),
                        col("last"),
                        col("gender"),
                        col("street"),
                        col("city"),
                        col("state"),
                        col("zip"),
                        col("lat"),
                        col("long"),
                        col("city_pop"),
                        col("job"),
                        col("dob"),
                        col("trans_num"),
                        col("merch_lat"),
                        col("merch_long"),
                        col("is_fraud"),
                        col("merch_zipcode"),
                        col("merch_last_update_time"),
                        col("merch_eff_time"),
                        col("cc_bic")
                )

In [31]:
# check transformed schema
transformed_df.printSchema()

root
 |-- Unnamed: 0: string (nullable = false)
 |-- trans_date_trans_time: string (nullable = false)
 |-- cc_num: string (nullable = false)
 |-- merchant: string (nullable = false)
 |-- category: string (nullable = false)
 |-- amt: double (nullable = true)
 |-- first: string (nullable = false)
 |-- last: string (nullable = false)
 |-- gender: string (nullable = false)
 |-- street: string (nullable = false)
 |-- city: string (nullable = false)
 |-- state: string (nullable = false)
 |-- zip: string (nullable = false)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- job: string (nullable = false)
 |-- dob: string (nullable = false)
 |-- trans_num: string (nullable = false)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)
 |-- is_fraud: integer (nullable = true)
 |-- merch_zipcode: string (nullable = false)
 |-- merch_last_update_time: string (nullable = false)
 |-- merch_eff_time: string

#### confirm the number before transformation and after transformation is same

In [32]:
transformed_df.count()

1296675

In [33]:
df.count()

1296675

#### loaded into currated zone for further transformation/ analysis </br> note: No data partition 

In [35]:
transformed_df.write.partitionBy("cc_bic", "category", "gender").mode('overwrite').parquet('currated_cc_credit_card/cc_credit_card_currated.parquet')

In [36]:
spark.stop()