In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

# List of all datasets
datasets = ["accounts", "customers", "loan_payments", "loans", "transactions"]

# Define schema for each dataset
schemas = {
    "accounts": StructType([
        StructField("account_id", StringType(), True),
        StructField("customer_id", StringType(), True),
        StructField("account_type", StringType(), True),
        StructField("balance", DoubleType(), True)
    ]),
    "customers": StructType([
        StructField("customer_id", StringType(), True),
        StructField("name", StringType(), True),
        StructField("age", IntegerType(), True),
        StructField("city", StringType(), True)
    ]),
    "loan_payments": StructType([
        StructField("payment_id", StringType(), True),
        StructField("loan_id", StringType(), True),
        StructField("amount", DoubleType(), True),
        StructField("payment_date", StringType(), True)
    ]),
    "loans": StructType([
        StructField("loan_id", StringType(), True),
        StructField("customer_id", StringType(), True),
        StructField("loan_type", StringType(), True),
        StructField("loan_amount", DoubleType(), True)
    ]),
    "transactions": StructType([
        StructField("transaction_id", StringType(), True),
        StructField("account_id", StringType(), True),
        StructField("amount", DoubleType(), True),
        StructField("transaction_date", StringType(), True)
    ])
}

# Dictionary to store processed DataFrames
df_dict = {}

# Process each dataset
for dataset in datasets:
    print(f"\nProcessing {dataset}...")

    # Read data from Bronze layer with schema
    df = spark.read.option("header", "true").schema(schemas[dataset]).csv(f"/mnt/bronze/{dataset}.csv")

    # Remove duplicate rows
    df_clean = df.dropDuplicates()

    # Handle missing values dynamically based on schema
    fill_values = {}
    for field in schemas[dataset].fields:
        if isinstance(field.dataType, DoubleType):  # If numerical, fill with 0
            fill_values[field.name] = 0.0
        elif isinstance(field.dataType, IntegerType):  # If integer, fill with 0
            fill_values[field.name] = 0
        elif isinstance(field.dataType, StringType):  # If string, fill with 'Unknown'
            fill_values[field.name] = "Unknown"

    df_clean = df_clean.fillna(fill_values)

    # Store cleaned DataFrame
    df_dict[dataset] = df_clean

    # Show schema & sample data
    df_clean.printSchema()
    df_clean.show(5)

    # Save to Silver Layer in Delta format
    df_clean.write.format("delta").mode("overwrite").save(f"/mnt/silver/{dataset}")

    print(f"{dataset} successfully processed and saved to Silver layer!")

print("\n All datasets processed, cleaned, and saved to Silver Layer!")



Processing accounts...
root
 |-- account_id: string (nullable = false)
 |-- customer_id: string (nullable = false)
 |-- account_type: string (nullable = false)
 |-- balance: double (nullable = false)

+----------+-----------+------------+-------+
|account_id|customer_id|account_type|balance|
+----------+-----------+------------+-------+
|         3|         78|     Savings| 1500.0|
|         2|         12|    Checking|2500.75|
|         5|         56|     Savings|  500.0|
|         6|         23|    Checking| 1200.5|
|         4|         34|    Checking|3000.25|
+----------+-----------+------------+-------+
only showing top 5 rows

accounts successfully processed and saved to Silver layer!

Processing customers...
root
 |-- customer_id: string (nullable = false)
 |-- name: string (nullable = false)
 |-- age: integer (nullable = false)
 |-- city: string (nullable = false)

+-----------+-------+---+--------------+
|customer_id|   name|age|          city|
+-----------+-------+---+-------

In [0]:
display(dbutils.fs.ls("/mnt/silver/"))


path,name,size,modificationTime
dbfs:/mnt/silver/accounts/,accounts/,0,1741398028000
dbfs:/mnt/silver/customers/,customers/,0,1741398051000
dbfs:/mnt/silver/loan_payments/,loan_payments/,0,1741398056000
dbfs:/mnt/silver/loans/,loans/,0,1741398059000
dbfs:/mnt/silver/transactions/,transactions/,0,1741398063000


In [0]:
from pyspark.sql.functions import sum, count, avg

# Load cleaned Silver Layer datasets
df_accounts = spark.read.format("delta").load("/mnt/silver/accounts")
df_customers = spark.read.format("delta").load("/mnt/silver/customers")
df_loans = spark.read.format("delta").load("/mnt/silver/loans")
df_transactions = spark.read.format("delta").load("/mnt/silver/transactions")

# 🟢 1. Aggregate Balance per Customer
df_customer_balance = df_accounts.groupBy("customer_id").agg(
    sum("balance").alias("total_balance")
)
df_customer_balance.write.format("delta").mode("overwrite").save("/mnt/gold/customer_balance")

# 🟢 2. Total Loan Amount per Customer
df_customer_loans = df_loans.groupBy("customer_id").agg(
    sum("loan_amount").alias("total_loan_amount")
)
df_customer_loans.write.format("delta").mode("overwrite").save("/mnt/gold/customer_loans")

# 🟢 3. Total Transactions per Account
df_account_transactions = df_transactions.groupBy("account_id").agg(
    count("transaction_id").alias("total_transactions"),
    sum("amount").alias("total_transaction_amount")
)
df_account_transactions.write.format("delta").mode("overwrite").save("/mnt/gold/account_transactions")

# Print Success Message
print("\nAll transformations completed! Data saved to Gold Layer!")



All transformations completed! Data saved to Gold Layer!


In [0]:
display(dbutils.fs.ls("/mnt/gold/"))


path,name,size,modificationTime
dbfs:/mnt/gold/account_transactions/,account_transactions/,0,1741398163000
dbfs:/mnt/gold/customer_balance/,customer_balance/,0,1741398153000
dbfs:/mnt/gold/customer_loans/,customer_loans/,0,1741398160000


In [0]:
df_gold = spark.read.format("delta").load("/mnt/gold/customer_balance")
df_gold.show(5)


+-----------+-------------+
|customer_id|total_balance|
+-----------+-------------+
|         51|       250.25|
|          7|       2900.0|
|         15|       3900.5|
|         54|       850.25|
|         11|       2600.0|
+-----------+-------------+
only showing top 5 rows



In [0]:
display(dbutils.fs.ls("mnt/gold/customer_balance/"))


path,name,size,modificationTime
dbfs:/mnt/gold/customer_balance/_delta_log/,_delta_log/,0,1741398153000
dbfs:/mnt/gold/customer_balance/part-00000-0a7277cf-69c0-424f-b856-601ae35c4d33-c000.snappy.parquet,part-00000-0a7277cf-69c0-424f-b856-601ae35c4d33-c000.snappy.parquet,1807,1741398157000


In [0]:
display(dbutils.fs.ls("mnt/gold/customer_balance/"))


path,name,size,modificationTime
dbfs:/mnt/gold/customer_balance/_delta_log/,_delta_log/,0,1741398153000
dbfs:/mnt/gold/customer_balance/part-00000-0a7277cf-69c0-424f-b856-601ae35c4d33-c000.snappy.parquet,part-00000-0a7277cf-69c0-424f-b856-601ae35c4d33-c000.snappy.parquet,1807,1741398157000


In [0]:
display(dbutils.fs.ls("mnt/silver/"))  # Check if Silver has files
display(dbutils.fs.ls("mnt/gold/"))    # Check if Gold has files


path,name,size,modificationTime
dbfs:/mnt/silver/accounts/,accounts/,0,1741398028000
dbfs:/mnt/silver/customers/,customers/,0,1741398051000
dbfs:/mnt/silver/loan_payments/,loan_payments/,0,1741398056000
dbfs:/mnt/silver/loans/,loans/,0,1741398059000
dbfs:/mnt/silver/transactions/,transactions/,0,1741398063000


path,name,size,modificationTime
dbfs:/mnt/gold/account_transactions/,account_transactions/,0,1741398163000
dbfs:/mnt/gold/customer_balance/,customer_balance/,0,1741398153000
dbfs:/mnt/gold/customer_loans/,customer_loans/,0,1741398160000


In [0]:
display(dbutils.fs.ls("mnt/gold/customer_balance/_delta_log/"))


path,name,size,modificationTime
dbfs:/mnt/gold/customer_balance/_delta_log/00000000000000000000.crc,00000000000000000000.crc,2791,1741398159000
dbfs:/mnt/gold/customer_balance/_delta_log/00000000000000000000.json,00000000000000000000.json,1712,1741398158000
dbfs:/mnt/gold/customer_balance/_delta_log/__tmp_path_dir/,__tmp_path_dir/,0,1741398158000
dbfs:/mnt/gold/customer_balance/_delta_log/_commits/,_commits/,0,1741398153000


In [0]:
display(dbutils.fs.ls("mnt/gold/customer_balance/"))
display(dbutils.fs.ls("mnt/silver/accounts/"))


path,name,size,modificationTime
dbfs:/mnt/gold/customer_balance/_delta_log/,_delta_log/,0,1741398153000
dbfs:/mnt/gold/customer_balance/part-00000-0a7277cf-69c0-424f-b856-601ae35c4d33-c000.snappy.parquet,part-00000-0a7277cf-69c0-424f-b856-601ae35c4d33-c000.snappy.parquet,1807,1741398157000


path,name,size,modificationTime
dbfs:/mnt/silver/accounts/_delta_log/,_delta_log/,0,1741398028000
dbfs:/mnt/silver/accounts/part-00000-7ff8df6c-3f27-4ac9-88c0-42a01d030672-c000.snappy.parquet,part-00000-7ff8df6c-3f27-4ac9-88c0-42a01d030672-c000.snappy.parquet,2773,1741398036000


In [0]:
display(dbutils.fs.ls("mnt/silver/accounts/"))  # Check Silver
display(dbutils.fs.ls("mnt/gold/customer_balance/"))  # Check Gold


path,name,size,modificationTime
dbfs:/mnt/silver/accounts/_delta_log/,_delta_log/,0,1741398028000
dbfs:/mnt/silver/accounts/part-00000-7ff8df6c-3f27-4ac9-88c0-42a01d030672-c000.snappy.parquet,part-00000-7ff8df6c-3f27-4ac9-88c0-42a01d030672-c000.snappy.parquet,2773,1741398036000


path,name,size,modificationTime
dbfs:/mnt/gold/customer_balance/_delta_log/,_delta_log/,0,1741398153000
dbfs:/mnt/gold/customer_balance/part-00000-0a7277cf-69c0-424f-b856-601ae35c4d33-c000.snappy.parquet,part-00000-0a7277cf-69c0-424f-b856-601ae35c4d33-c000.snappy.parquet,1807,1741398157000


In [0]:
display(dbutils.fs.ls("mnt/gold/customer_balance/_delta_log/"))


path,name,size,modificationTime
dbfs:/mnt/gold/customer_balance/_delta_log/00000000000000000000.crc,00000000000000000000.crc,2791,1741398159000
dbfs:/mnt/gold/customer_balance/_delta_log/00000000000000000000.json,00000000000000000000.json,1712,1741398158000
dbfs:/mnt/gold/customer_balance/_delta_log/__tmp_path_dir/,__tmp_path_dir/,0,1741398158000
dbfs:/mnt/gold/customer_balance/_delta_log/_commits/,_commits/,0,1741398153000


In [0]:
display(dbutils.fs.ls("mnt/gold/customer_balance/"))


path,name,size,modificationTime
dbfs:/mnt/gold/customer_balance/_delta_log/,_delta_log/,0,1741398153000
dbfs:/mnt/gold/customer_balance/part-00000-0a7277cf-69c0-424f-b856-601ae35c4d33-c000.snappy.parquet,part-00000-0a7277cf-69c0-424f-b856-601ae35c4d33-c000.snappy.parquet,1807,1741398157000


In [0]:
df = spark.read.format("delta").load("dbfs:/mnt/gold/customer_balance/")
display(df)


customer_id,total_balance
51,250.25
7,2900.0
15,3900.5
54,850.25
11,2600.0
69,550.25
29,1300.25
42,5500.5
87,650.25
73,625.75


In [0]:
display(dbutils.fs.mounts())


mountPoint,source,encryptionType
/databricks-datasets,databricks-datasets,
/Volumes,UnityCatalogVolumes,
/databricks/mlflow-tracking,databricks/mlflow-tracking,
/databricks-results,databricks-results,
/databricks/mlflow-registry,databricks/mlflow-registry,
/mnt/bronze,wasbs://bronze@bootcstorage.blob.core.windows.net/,
/Volume,DbfsReserved,
/volumes,DbfsReserved,
/,DatabricksRoot,
/volume,DbfsReserved,


In [0]:
dbutils.fs.put("dbfs:/mnt/gold/test.txt", "this is a test file", True)


Wrote 19 bytes.


True

In [0]:
display(dbutils.fs.ls("dbfs:/mnt/gold/"))


path,name,size,modificationTime
dbfs:/mnt/gold/account_transactions/,account_transactions/,0,1741398163000
dbfs:/mnt/gold/customer_balance/,customer_balance/,0,1741398153000
dbfs:/mnt/gold/customer_loans/,customer_loans/,0,1741398160000
dbfs:/mnt/gold/test.txt,test.txt,19,1741404315000


In [0]:
df.write.format("parquet").mode("overwrite").save("dbfs:/mnt/gold/customer_balance_parquet/")


In [0]:
storage_account_name = "bootcstorage"
container_name = "gold"
mnt_name = "/mnt/gold"
storage_account_key = "is4pvreV+HFvC6adOilAI1X6eH+DEliTAh4LEN1+YuQnvl+wN4Ixg610WhPKe8xse+P9Tvzaw4Pi+AStnLbYXQ=="  # Paste your key here

dbutils.fs.mount(
  source=f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/",
  mount_point=mnt_name,
  extra_configs={f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net": storage_account_key}
)


True

In [0]:
display(dbutils.fs.mounts())


mountPoint,source,encryptionType
/databricks-datasets,databricks-datasets,
/mnt/gold,wasbs://gold@bootcstorage.blob.core.windows.net/,
/Volumes,UnityCatalogVolumes,
/databricks/mlflow-tracking,databricks/mlflow-tracking,
/databricks-results,databricks-results,
/databricks/mlflow-registry,databricks/mlflow-registry,
/mnt/bronze,wasbs://bronze@bootcstorage.blob.core.windows.net/,
/Volume,DbfsReserved,
/volumes,DbfsReserved,
/,DatabricksRoot,
