In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd

In [2]:
# Configuration variables
MINIO_ENDPOINT = 'http://minio:9000'
MINIO_ACCESS_KEY = 'minioadmin'
MINIO_SECRET_KEY = 'minioadmin123'
MYSQL_HOST = 'mysql'
MYSQL_PORT = '3306'
MYSQL_DATABASE = 'finance_dw'
MYSQL_USER = 'root'
MYSQL_PASSWORD = 'root123'

In [3]:
builder = (
    SparkSession.builder.appName("DataWarehouse-ETL")
    # Memory configurations for ETL processing
    .config("spark.driver.memory", "4g")
    .config("spark.executor.memory", "4g")
    .config("spark.driver.maxResultSize", "2g")
    .config("spark.sql.adaptive.enabled", "true")
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
    # Jars for Delta Lake, S3, and MySQL
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.4.1,com.amazonaws:aws-java-sdk-bundle:1.12.262," \
    "io.delta:delta-spark_2.13:4.0.0," \
    "com.mysql:mysql-connector-j:8.0.33")
    # Delta Lake
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    # MinIO (S3A) - Source
    .config("spark.hadoop.fs.s3a.endpoint", MINIO_ENDPOINT)
    .config("spark.hadoop.fs.s3a.access.key", MINIO_ACCESS_KEY)
    .config("spark.hadoop.fs.s3a.secret.key", MINIO_SECRET_KEY)
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    # S3A performance configs
    .config("spark.hadoop.fs.s3a.connection.timeout", "60000")
    .config("spark.hadoop.fs.s3a.connection.request.timeout", "60000")
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
    .config("spark.hadoop.fs.s3a.attempts.maximum", "3")
    .config("spark.hadoop.fs.s3a.retry.limit", "3")
)

spark = builder.getOrCreate()



:: loading settings :: url = jar:file:/home/airflow/.local/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/airflow/.ivy2.5.2/cache
The jars for the packages stored in: /home/airflow/.ivy2.5.2/jars
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
io.delta#delta-spark_2.13 added as a dependency
com.mysql#mysql-connector-j added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-4c4862d0-ef05-4d8f-b911-94cccd28efcf;1.0
	confs: [default]


	found org.apache.hadoop#hadoop-aws;3.4.1 in central
	found software.amazon.awssdk#bundle;2.24.6 in central
	found org.wildfly.openssl#wildfly-openssl;1.1.3.Final in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
	found io.delta#delta-spark_2.13;4.0.0 in central
	found io.delta#delta-storage;4.0.0 in central
	found org.antlr#antlr4-runtime;4.13.1 in central
	found com.mysql#mysql-connector-j;8.0.33 in central
	found com.google.protobuf#protobuf-java;3.21.9 in central
:: resolution report :: resolve 325ms :: artifacts dl 12ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.12.262 from central in [default]
	com.google.protobuf#protobuf-java;3.21.9 from central in [default]
	com.mysql#mysql-connector-j;8.0.33 from central in [default]
	io.delta#delta-spark_2.13;4.0.0 from central in [default]
	io.delta#delta-storage;4.0.0 from central in [default]
	org.antlr#antlr4-runtime;4.13.1 from central in [default]
	org.apache.hadoop#hadoop-aws;3.4.1 from central in [

25/10/10 04:44:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
dw_mysql_url = f"jdbc:mysql://{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DATABASE}"
dw_mysql_properties = {
    "user": MYSQL_USER,
    "password": MYSQL_PASSWORD,
    "driver": "com.mysql.cj.jdbc.Driver"
}

# tables_config = {
#         'users': {
#             'path': 's3a://warehouse/dim_user/',
#             'partitions': None
#         },
#         'mcc_codes': {
#             'path': 's3a://warehouse/dim_mcc/',
#             'partitions': None
#         },
#         'cards': {
#             'path': 's3a://warehouse/dim_card/',
#             'partitions': ['card_brand']  # Partition by brand for better queries
#         },
#         'transactions': {
#             'path': 's3a://warehouse/fact_transactions/',
#             'partitions': ['year', 'month']  # Partition by date for performance
#         },
#         'fraud_labels': {
#             'path': 's3a://warehouse/fraud_labels/',
#             'partitions': None
#         }
#     }

In [5]:
users_df = spark.read.format("delta").load("s3a://rootdb/users/")
mcc_codes_df = spark.read.format("delta").load("s3a://rootdb/mcc_codes/")
cards_df = spark.read.format("delta").load("s3a://rootdb/cards/")
transactions_df = spark.read.format("delta").load("s3a://rootdb/transactions/")

print(f"users: {users_df.count()} rows loaded")
print(f"mcc_codes: {mcc_codes_df.count()} rows loaded")
print(f"cards: {cards_df.count()} rows loaded")
print(f"transactions: {transactions_df.count()} rows loaded")

25/10/10 04:44:44 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.


25/10/10 04:44:48 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


[Stage 0:>                                                          (0 + 1) / 1]

[Stage 2:>                                                        (0 + 16) / 50]



                                                                                

users: 2000 rows loaded


mcc_codes: 109 rows loaded


cards: 6146 rows loaded


transactions: 13305915 rows loaded


In [6]:
# POPULATE dim_mcc
dim_mcc_df = mcc_codes_df.select(
    col("mcc"),
    col("merchant_type")
)

# dim_mcc_df.write \
#     .format("jdbc") \
#     .option("url", dw_mysql_url) \
#     .option("dbtable", "dim_mcc") \
#     .options(**dw_mysql_properties) \
#     .mode("append") \
#     .save()

dim_mcc_df.write.format("delta").mode("overwrite").save('s3a://lakehouse/dim_mcc/')

[Stage 31:>                                                         (0 + 1) / 1]

25/10/10 04:44:57 WARN S3ABlockOutputStream: Application invoked the Syncable API against stream writing to dim_mcc/part-00000-4c259f72-10f6-4e50-8363-0ee986986c45-c000.snappy.parquet. This is Unsupported
                                                                                

In [7]:
# POPULATE dim_date

min_date = transactions_df.select(min("trans_date")).collect()[0][0]
max_date = transactions_df.select(max("trans_date")).collect()[0][0]

date_range_df = spark.sql(f"""
    SELECT sequence(to_date('{min_date}'), to_date('{max_date}'), interval 1 day) as date_array
""").select(explode(col("date_array")).alias("full_date"))

dim_date_df = date_range_df.select(
    # Create surrogate key as YYYYMMDD integer
    date_format(col("full_date"), "yyyyMMdd").cast("int").alias("date_sk"),
    col("full_date"),
    dayofmonth(col("full_date")).alias("day"),
    weekofyear(col("full_date")).alias("week"),
    month(col("full_date")).alias("month"),
    quarter(col("full_date")).alias("quarter"),
    year(col("full_date")).alias("year"),
    dayofweek(col("full_date")).alias("day_of_week"),
    when(dayofweek(col("full_date")).isin([1, 7]), 1).otherwise(0).alias("is_weekend")
)

# dim_date_df.write \
#     .format("jdbc") \
#     .option("url", dw_mysql_url) \
#     .option("dbtable", "dim_date") \
#     .options(**dw_mysql_properties) \
#     .mode("append") \
#     .save()

dim_date_df.write.format("delta").mode("overwrite").save('s3a://lakehouse/dim_date/')

[Stage 39:>                                                       (0 + 16) / 16][Stage 39:===>                                                    (1 + 15) / 16]





                                                                                

In [8]:
# 3. POPULATE dim_users

# Create customer dimension with surrogate keys
dim_user_df = users_df.select(
    col("client_id"),
    col("client_id").alias("user_sk"),  # Using natural key as surrogate for now
    col("birth_year"),
    col("gender"),
    col("yearly_income"),
    col("total_debt"),
    col("credit_score"),
    col("num_credit_cards"),
    col("latitude"),
    col("longitude"),
    # Create user segment based on credit score
    when(col("credit_score") >= 750, "Premium")
    .when(col("credit_score") >= 650, "Standard")
    .when(col("credit_score") >= 550, "Subprime")
    .otherwise("High Risk").alias("user_segment")
)

# Write to MySQL
# dim_user_df.write \
#     .format("jdbc") \
#     .option("url", dw_mysql_url) \
#     .option("dbtable", "dim_user") \
#     .options(**dw_mysql_properties) \
#     .mode("append") \
#     .save()

dim_user_df.write.format("delta").mode("overwrite").save('s3a://lakehouse/dim_user/')

In [9]:
# 4. POPULATE dim_merchant

# Check for duplicates in the source data
duplicate_check = transactions_df.groupBy("merchant_id").count().filter(col("count") > 1)
print(f"Duplicate merchant_ids in transactions: {duplicate_check.count()}")

# Extract unique merchants from transactions
dim_merchant_df = transactions_df.select(
    col("merchant_id"),
    col("merchant_id").alias("merchant_sk"),
    col("mcc"),
    col("merchant_city"),
    col("merchant_state"), 
    col("zip").alias("merchant_zip")
).distinct()  

print(f"Unique merchants found: {dim_merchant_df.count()}")

# Add merchant_type by joining with mcc_codes
dim_merchant_df = dim_merchant_df.join(
    mcc_codes_df.select(col("mcc"), col("merchant_type")), 
    on="mcc", 
    how="left"
)

# Check for duplicates after join
duplicate_after_join = dim_merchant_df.groupBy("merchant_id").count().filter(col("count") > 1)
print(f"Duplicates after join: {duplicate_after_join.count()}")

# If duplicates exist, show them
if duplicate_after_join.count() > 0:
    print("Sample duplicates:")
    duplicate_after_join.show(10)

# Force single partition and ensure no duplicates
dim_merchant_df = dim_merchant_df.dropDuplicates(["merchant_id"]).coalesce(1)

# Write to MySQL
# dim_merchant_df.write.jdbc(
#     dw_mysql_url, 
#     "dim_merchant", 
#     mode="append", 
#     properties={
#         **dw_mysql_properties,
#         "batchsize": "1000",
#         "isolationLevel": "READ_UNCOMMITTED"
#     }
# )

dim_merchant_df.write.format("delta").mode("overwrite").save('s3a://lakehouse/dim_merchant/')

[Stage 58:>                                                       (0 + 16) / 16]

[Stage 58:===>                                                    (1 + 15) / 16]

                                                                                

Duplicate merchant_ids in transactions: 53611


[Stage 66:>                                                       (0 + 16) / 16]

[Stage 66:===>                                                    (1 + 15) / 16]





                                                                                

Unique merchants found: 211495


[Stage 76:>               (0 + 16) / 16][Stage 77:>                 (0 + 0) / 1]

[Stage 76:=>              (1 + 15) / 16][Stage 77:>                 (0 + 1) / 1]                                                                                







Duplicates after join: 16182


[Stage 91:>               (0 + 16) / 16][Stage 92:>                 (0 + 0) / 1]

[Stage 91:=>              (1 + 15) / 16][Stage 92:>                 (0 + 1) / 1]                                                                                





                                                                                

Sample duplicates:


[Stage 106:>              (0 + 16) / 16][Stage 107:>                (0 + 0) / 1]

[Stage 106:>              (1 + 15) / 16][Stage 107:>                (0 + 1) / 1]                                                                                

[Stage 106:===>                                                   (1 + 15) / 16]



                                                                                

+-----------+-----+
|merchant_id|count|
+-----------+-----+
|      99621|    4|
|      38311|   15|
|      68579|    2|
|      82529|   17|
|      81900|    9|
|      66010|  161|
|       7833|    3|
|      90461|   29|
|      25591|    3|
|      73470|    2|
+-----------+-----+
only showing top 10 rows


[Stage 117:>              (0 + 16) / 16][Stage 118:>                (0 + 0) / 1]

                                                                                





[Stage 120:>                                                      (0 + 16) / 17]



[Stage 123:>                                                        (0 + 1) / 1]

                                                                                

In [10]:
# 5. POPULATE dim_card

dim_card_df = cards_df.select(
    col("card_id"),
    col("card_id").alias("card_sk"),  # Using natural key as surrogate for now
    col("client_id"),
    col("card_brand"),
    col("card_type"),
    col("credit_limit"),
    col("acct_open_date"),
    col("has_chip"),
    col("card_on_dark_web")
)

# Write to MySQL
# dim_card_df.write \
#     .format("jdbc") \
#     .option("url", dw_mysql_url) \
#     .option("dbtable", "dim_card") \
#     .options(**dw_mysql_properties) \
#     .mode("append") \
#     .save()

dim_card_df.write.format("delta").mode("overwrite").save('s3a://lakehouse/dim_card/')

In [11]:
# 6. POPULATE fact_transactions

fact_df = transactions_df.select(
    col("transaction_id"),
    # Create date surrogate key (YYYYMMDD format)
    date_format(col("trans_date"), "yyyyMMdd").cast("int").alias("date_sk"),
    col("client_id").alias("user_sk"),
    col("card_id").alias("card_sk"), 
    col("merchant_id").alias("merchant_sk"),
    col("mcc").alias("mcc_sk"),
    col("amount"),
    col("use_chip"),
    col("errors"),
    # No fraud labels - set defaults
    lit(0).alias("is_fraud"),
    lit(None).cast("string").alias("fraud_label_source"),
    lit(1).alias("transaction_count")
)

print(f"Fact table prepared: {fact_df.count()} rows")

# Write in batches to avoid memory issues and deadlocks
# fact_df.coalesce(5).write \
#     .format("jdbc") \
#     .option("url", dw_mysql_url) \
#     .option("dbtable", "fact_transactions") \
#     .option("batchsize", "50000") \
#     .options(**dw_mysql_properties) \
#     .mode("append") \
#     .save()

fact_df.write.format("delta").mode("overwrite").save('s3a://lakehouse/fact_transactions/')

Fact table prepared: 13305915 rows


[Stage 139:>                                                      (0 + 16) / 16]

[Stage 139:===>                                                   (1 + 15) / 16]



