In [145]:
import os

from pyspark.sql import SparkSession

# ensure that these variables are set in your env, either through your shell config file or venv activation script
print(os.environ.get("JAVA_HOME"))  # "/opt/homebrew/opt/openjdk@11/libexec/openjdk.jdk/Contents/Home"
print(os.environ.get("SPARK_HOME"))  # /Users/lydialim/pyspark-eda-demo/.venv/lib/python3.10/site-packages/pyspark


# initialize spark session
spark = SparkSession.builder.getOrCreate()

/opt/homebrew/opt/openjdk@17/libexec/openjdk.jdk/Contents/Home
/Users/lydialim/pyspark-eda-demo/.venv/lib/python3.10/site-packages/pyspark


In [146]:
# import libs
from pyspark.sql.functions import (
    concat,
    col,
    length,
    lit,
    when,
    from_unixtime,
    from_utc_timestamp,
    to_timestamp,
    date_format,
    regexp_replace,
    split,
    expr,
    element_at,
    year
)

from pyspark.sql import DataFrame

In [147]:
# additional spark session configurations
spark.conf.set("spark.sql.session.timeZone", "Asia/Singapore")  # UTC+8

In [148]:
df = spark.read.parquet("data_fixtures/cc_sample_transaction.parquet")
df.printSchema()
df.show(5)

root
 |-- trans_date_trans_time: string (nullable = true)
 |-- cc_num: string (nullable = true)
 |-- merchant: string (nullable = true)
 |-- category: string (nullable = true)
 |-- amt: string (nullable = true)
 |-- person_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- dob: date (nullable = true)
 |-- trans_num: string (nullable = true)
 |-- merch_lat: string (nullable = true)
 |-- merch_long: string (nullable = true)
 |-- is_fraud: string (nullable = true)
 |-- merch_zipcode: string (nullable = true)
 |-- merch_last_update_time: string (nullable = true)
 |-- merch_eff_time: string (nullable = true)
 |-- cc_bic: string (nullable = true)

+---------------------+-----------

In [149]:
# define some functions that will help us clean the data
def to_utc8_hm(c):
    """
    Accepts Column c whose values may be:
      • purely numeric, ≥15 digits → µs since epoch
      • purely numeric, 12–14 digits → ms since epoch
      • anything else → literal 'yyyy-MM-dd HH:mm:ss'
    Returns a Column of strings
      'yyyy-MM-dd HH:mm:ss.SSSSSS +08:00'
    """
    c_str = c.cast("string")
    is_digits = c_str.rlike("^[0-9]+$")

    # 1) parse into a UTC Timestamp
    ts_utc = (
        when(
            is_digits & (length(c_str) >= 15), from_unixtime(c_str.cast("double") / 1e6)
        )  # µs → seconds
        .when(
            is_digits & (length(c_str).between(12, 14)),
            from_unixtime(c_str.cast("double") / 1e3),
        )  # ms → seconds
        .otherwise(to_timestamp(c_str, "yyyy-MM-dd HH:mm:ss"))  # literal parse
    )

    # 2) shift to UTC+8
    ts_utc8 = from_utc_timestamp(ts_utc, "GMT+8")

    # 3) format with 6-digit fraction and append "+08:00"
    return concat(date_format(ts_utc8, "yyyy-MM-dd HH:mm:ss.SSSSSS"), lit(" +08:00"))


def clean_and_split_names(
    df: DataFrame,
    input_col: str = "person_name",
    first_col: str = "first",
    last_col: str = "last",
) -> DataFrame:
    """
    Given a DataFrame with `input_col` containing raw names,
    returns a new DataFrame with `first_col` and `last_col`.

    Assumptions:
    - Names are separated by a space
    - Names are always two words
    - Names are always in the format "First Last"
    - Names are always alphabetic
    - Names are never empty
    """
    return (
        df
        # 1) Turn anything that isn't A–Z/a–z into a space
        .withColumn("_cleaned", regexp_replace(col(input_col), "[^A-Za-z]", " "))
        # 2) Split into tokens on spaces
        .withColumn("_tokens", split(col("_cleaned"), " "))
        # 3) Drop empty tokens
        .withColumn("_tokens", expr("filter(_tokens, x -> x <> '')"))
        # 4) Extract first & second tokens
        .withColumn(first_col, element_at(col("_tokens"), 1)).withColumn(
            last_col, element_at(col("_tokens"), 2)
        )
        # 5) Drop the helpers
        .drop("_cleaned", "_tokens")
    )

First, we deal with the timestamp columns by standardizing the format.

In [150]:
timestamp_cols = ["merch_eff_time", "merch_last_update_time", "trans_date_trans_time"]

# overwrites the existing timestamp columns inplace
for ts in timestamp_cols:
    df = df.withColumn(ts, to_utc8_hm(col(ts)))

df.show(5, truncate=False)

+---------------------------------+----------------+----------------------------------+-------------+------+--------------------+------+----------------------------+--------------+-----+-----+-------+---------+--------+---------------------------------+----------+--------------------------------+------------------+-----------+--------+-------------+---------------------------------+---------------------------------+-----------+
|trans_date_trans_time            |cc_num          |merchant                          |category     |amt   |person_name         |gender|street                      |city          |state|zip  |lat    |long     |city_pop|job                              |dob       |trans_num                       |merch_lat         |merch_long |is_fraud|merch_zipcode|merch_last_update_time           |merch_eff_time                   |cc_bic     |
+---------------------------------+----------------+----------------------------------+-------------+------+--------------------+------+

Then, we split the person_name columns into first and last name.

In [151]:
df = clean_and_split_names(df, "person_name", "first", "last")
df = df.drop("person_name")
df.show(5, truncate=False)

+---------------------------------+----------------+----------------------------------+-------------+------+------+----------------------------+--------------+-----+-----+-------+---------+--------+---------------------------------+----------+--------------------------------+------------------+-----------+--------+-------------+---------------------------------+---------------------------------+-----------+---------+-------+
|trans_date_trans_time            |cc_num          |merchant                          |category     |amt   |gender|street                      |city          |state|zip  |lat    |long     |city_pop|job                              |dob       |trans_num                       |merch_lat         |merch_long |is_fraud|merch_zipcode|merch_last_update_time           |merch_eff_time                   |cc_bic     |first    |last   |
+---------------------------------+----------------+----------------------------------+-------------+------+------+---------------------------

# PII Sanitization
Sanitizing PII is a tricky topic and depends on the downstream use cases and regulations surrounding the data.

# Considerations for data storage
- For the purposes of this technical assessment, we have the CC owner information and the Merchant details stored alongside the transaction data. But ideally this should be denormalized and stored in a separate dimension table.
- When storing the PII such as first and last name, we should store the hashed version of the name.
- The salt that is used for hashing should be stored separately and exposed only 


# Considerations for masking
If we choose to entirely mask or hash the credit card number, we would not be able to use it for any analysis / joining to BIN number lists to identify the issuing bank / card type. For example an e-commerce site may want to use the first 6 to 8 digits of the credit card number to identify the issuing bank / card type to offer rewards to the cardholder, upon eligible transactions.

Ref: https://www.chargeflow.io/blog/bank-identification-numbers-bin-the-backbone-of-payment-processing

# Considerations for hashing

Options for sanitizing names:
- Partial anonymization: J*** D**, easy to implement but weak and identity thieves can still infer an identity using a few more personal details.
- Full masking: XXXX XXX, strong, but irreversible and cannot be used for any analysis.
- Hashing (unsalted): Pros: Same input ⇒ same hash ⇒ you can still group or join records by name. Cons: Can be reverse-engineered / vulnerable to rainbow table (pre-computed key-value pairs) attacks if leaked.
- Hashing (salted): Prevents rainbow table attacks since knowing the hash alone is not enough to reverse-engineer the original input. 


In [152]:
def mask_credit_card(df: DataFrame, input_col: str, output_col: str) -> DataFrame:
    """
    Always masks at least one digit in the middle, even if the total length is <= 12.
    """
    df = df.withColumn(input_col, col(input_col).cast("string"))
    masked = expr(
        f"""CASE
             WHEN length({input_col}) > 12 THEN
               concat(
                 substr({input_col}, 1, 8),
                 repeat('*', length({input_col}) - 12),
                 substr({input_col}, -4, 4)
               )
             ELSE
               concat(
                 substr({input_col}, 1, 8),
                 '*',
                 substr({input_col}, 9, length({input_col})-8)
               )
           END"""
    )
    return df.withColumn(output_col, masked)

In [153]:
# truncate dob to year
df = df.withColumn("dob", year(col("dob")))
df.show(50, truncate=False)

+---------------------------------+-------------------+-------------------------------------------+-------------+------+------+------------------------------+------------------------+-----+-----+-------+------------------+--------+---------------------------------------------+----+--------------------------------+------------------+------------------+--------+-------------+---------------------------------+---------------------------------+-----------+-----------+----------+
|trans_date_trans_time            |cc_num             |merchant                                   |category     |amt   |gender|street                        |city                    |state|zip  |lat    |long              |city_pop|job                                          |dob |trans_num                       |merch_lat         |merch_long        |is_fraud|merch_zipcode|merch_last_update_time           |merch_eff_time                   |cc_bic     |first      |last      |
+---------------------------------+-----

In [154]:
df = mask_credit_card(df, "cc_num", "cc_num")
df.show(5, truncate=False)

+---------------------------------+----------------+----------------------------------+-------------+------+------+----------------------------+--------------+-----+-----+-------+---------+--------+---------------------------------+----+--------------------------------+------------------+-----------+--------+-------------+---------------------------------+---------------------------------+-----------+---------+-------+
|trans_date_trans_time            |cc_num          |merchant                          |category     |amt   |gender|street                      |city          |state|zip  |lat    |long     |city_pop|job                              |dob |trans_num                       |merch_lat         |merch_long |is_fraud|merch_zipcode|merch_last_update_time           |merch_eff_time                   |cc_bic     |first    |last   |
+---------------------------------+----------------+----------------------------------+-------------+------+------+----------------------------+----------

In [155]:
# Convert the spark df into a pandas df for visualization
# df_pandas = df.toPandas()

In [156]:
# try to see if the BIC can be used
unique_values = df.select("cc_bic").distinct().collect()
for row in unique_values:
    print(row.cc_bic)

ADMDUS41
ACEEUS31
DEUTUS33TRF
NA
CITIUS33CHI
AIABUS31

Null
APBCUS61


25/05/25 22:53:16 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 916001 ms exceeds timeout 120000 ms
25/05/25 22:53:16 WARN SparkContext: Killing executors is not supported by current scheduler.
25/05/25 22:53:18 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:53)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:342)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:132)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$