In [None]:
! pip install kagglehub[pandas-datasets]
! pip install pyspark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, explode, from_json, from_utc_timestamp, regexp_replace, split, trim, concat, lit
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, DateType, DoubleType

spark = SparkSession.builder.appName("CCsample").getOrCreate()

df_raw = spark.read.option("inferSchema","true").json("cc_sample/cc_sample_transaction.json")

df_raw.printSchema()

In [None]:
# Sample JSON schema of `personal_detail`
personal_detail_schema = StructType([
    StructField("person_name", StringType()),
    StructField("gender", StringType()),
    StructField("address", StringType()),
    StructField("lat", StringType()),
    StructField("long", StringType()),
    StructField("city_pop", IntegerType()),
    StructField("job", StringType()),
    StructField("dob", DateType())
])

# Schema for `address` (nested inside `personal_detail`)
address_schema = StructType([
    StructField("street", StringType()),
    StructField("city", StringType()),
    StructField("state", StringType()),
    StructField("zip", StringType())
])

# Convert `personal_detail` from STRING to STRUCT
df_flat_pd = df_raw.withColumn("personal_detail", from_json(col("personal_detail"), personal_detail_schema))

# Convert `address` (inside `personal_detail`) from STRING to STRUCT
df_flat_address = df_flat_pd.withColumn("address", from_json(col("personal_detail.address"), address_schema))

df_clean_name = df_flat_address.withColumn("clean_name", regexp_replace("personal_detail.person_name", "[^a-zA-Z]", " "))\
                            .withColumn("clean_name", trim(regexp_replace("clean_name", r"\s+", " "))) \
                            .withColumn("first", split("clean_name", " ")[0]) \
                            .withColumn("last", split("clean_name", " ")[1])

df_clean_epoch = df_clean_name.withColumn(
                        "clean_eff_time",
                        F.when(F.length(F.col("merch_eff_time")) == 10,  # Epoch in seconds
                            from_utc_timestamp(F.from_unixtime(F.col("merch_eff_time")),"Asia/Singapore"))
                        .when((F.length(F.col("merch_last_update_time")) == 11),  # Epoch in milliseconds
                            from_utc_timestamp(F.from_unixtime(concat(F.col("merch_last_update_time"),F.lit("00")) / 1000),"Asia/Singapore"))
                        .when((F.length(F.col("merch_last_update_time")) == 12),  # Epoch in milliseconds
                            from_utc_timestamp(F.from_unixtime(concat(F.col("merch_last_update_time"),F.lit("0")) / 1000),"Asia/Singapore"))
                        .when((F.length(F.col("merch_last_update_time")) == 13),  # Epoch in milliseconds
                            from_utc_timestamp(F.from_unixtime(F.col("merch_last_update_time") / 1000),"Asia/Singapore"))
                        .when((F.length(F.col("merch_eff_time")) == 13),  # Epoch in milliseconds
                            from_utc_timestamp(F.from_unixtime(F.col("merch_eff_time") / 1000),"Asia/Singapore"))
                        .when(F.length(F.col("merch_eff_time")) == 16,  # Epoch in microseconds
                            from_utc_timestamp(F.from_unixtime(F.col("merch_eff_time") / 1_000_000),"Asia/Singapore"))
                        .otherwise(None)  # Handle invalid values
                    ) \
                   .withColumn(
                        "clean_merch_last_update_time",
                        F.when((F.length(F.col("merch_last_update_time")) == 10),  # Epoch in seconds
                            from_utc_timestamp(F.from_unixtime(F.col("merch_last_update_time")),"Asia/Singapore"))
                        .when((F.length(F.col("merch_last_update_time")) == 11),  # Epoch in milliseconds
                            from_utc_timestamp(F.from_unixtime(concat(F.col("merch_last_update_time"),F.lit("00")) / 1000),"Asia/Singapore"))
                        .when((F.length(F.col("merch_last_update_time")) == 12),  # Epoch in milliseconds
                            from_utc_timestamp(F.from_unixtime(concat(F.col("merch_last_update_time"),F.lit("0")) / 1000),"Asia/Singapore"))
                        .when((F.length(F.col("merch_last_update_time")) == 13),  # Epoch in milliseconds
                            from_utc_timestamp(F.from_unixtime(F.col("merch_last_update_time") / 1000),"Asia/Singapore"))
                        .when(F.length(F.col("merch_last_update_time")) == 16,  # Epoch in microseconds
                            from_utc_timestamp(F.from_unixtime(F.col("merch_last_update_time") / 1_000_000),"Asia/Singapore"))
                        .otherwise(None)  # Handle invalid values
                   )


df_clean_epoch.show()


In [None]:
# Select flattened columns
df_flat = df_clean_epoch.select(
    col("Unnamed: 0"),
    from_utc_timestamp(col("trans_date_trans_time"),"Asia/Singapore").alias("trans_date_trans_time"),
    col("cc_num"),
    col("merchant"),
    col("category"),
    col("amt").cast(DoubleType()),
    col("first"),
    col("last"),
    col("personal_detail.gender").alias("gender"),
    col("address.street").alias("street"),
    col("address.city").alias("city"),
    col("address.state").alias("state"),
    col("address.zip").alias("zip"),
    col("personal_detail.lat").alias("lat"),
    col("personal_detail.long").alias("long"),
    col("personal_detail.city_pop").alias("city_pop"),
    col("personal_detail.job").alias("job"),
    col("personal_detail.dob").alias("dob"),
    col("trans_num"),
    col("merch_lat"),
    col("merch_long"),
    col("is_fraud"),
    col("merch_zipcode"),
    col("clean_merch_last_update_time").alias("merch_last_update_time"),
    col("clean_eff_time").alias("merch_eff_time"),
    col("cc_bic")
)

# df_flat.printSchema()
# Show result
df_flat.show(truncate=False)