In [0]:
dbutils.secrets.listScopes()

[SecretScope(name='abcbank_secretScope')]

In [0]:
dbutils.secrets.list("abcbank_secretScope")

[SecretMetadata(key='client-id'),
 SecretMetadata(key='client-secret'),
 SecretMetadata(key='directory-id')]

In [0]:
client_id = dbutils.secrets.get("abcbank_secretScope", "client-id")
client_secret = dbutils.secrets.get("abcbank_secretScope", "client-secret")
directory_id = dbutils.secrets.get("abcbank_secretScope", "directory-id")

In [0]:
configs = {"fs.azure.account.auth.type": "OAuth",
          "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
          "fs.azure.account.oauth2.client.id": client_id,
          "fs.azure.account.oauth2.client.secret": client_secret,
          "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{directory_id}/oauth2/token"}

In [0]:
storage_account = "dlsaabcbank"

container_raw = "silver"
container_processed = "gold"

source_raw = f"abfss://{container_raw}@{storage_account}.dfs.core.windows.net/"
mount_point_raw = f"/mnt/{storage_account}/{container_raw}"

source_processed = f"abfss://{container_processed}@{storage_account}.dfs.core.windows.net/"
mount_point_processed = f"/mnt/{storage_account}/{container_processed}"

In [0]:
if any(mount.mountPoint == mount_point_raw for mount in dbutils.fs.mounts()):
    dbutils.fs.unmount(mount_point_raw)
    print(f"Unmount existing mount at {mount_point_raw}")

try:
    dbutils.fs.mount(
        source = source_raw,
        mount_point = mount_point_raw,
        extra_configs = configs
    )
    print(f"Mounted successfully at {mount_point_raw}")
except Exception as e:
    print(f"Error mounting: {mount_point_raw}")

/mnt/dlsaabcbank/silver has been unmounted.
Unmount existing mount at /mnt/dlsaabcbank/silver
Mounted successfully at /mnt/dlsaabcbank/silver


In [0]:
if any(mount.mountPoint == mount_point_processed for mount in dbutils.fs.mounts()):
    dbutils.fs.unmount(mount_point_processed)
    print(f"Unmount existing mount at {mount_point_processed}")

try:
    dbutils.fs.mount(
        source = source_processed,
        mount_point = mount_point_processed,
        extra_configs = configs
    )
    print(f"Mounted successfully at {mount_point_processed}")
except Exception as e:
    print(f"Error mounting: {mount_point_processed}")

/mnt/dlsaabcbank/gold has been unmounted.
Unmount existing mount at /mnt/dlsaabcbank/gold
Mounted successfully at /mnt/dlsaabcbank/gold


In [0]:
delta_customers_location = "delta/customers"
delta_accounts_location = "delta/accounts"
delta_transactions_location = "delta/transactions"
delta_loans_location = "delta/loans"
delta_loan_payments_location = "delta/loan_payments"

delta_customers_table_path = f"{mount_point_raw}/{delta_customers_location}"
delta_accounts_table_path = f"{mount_point_raw}/{delta_accounts_location}"
delta_transactions_table_path = f"{mount_point_raw}/{delta_transactions_location}"
delta_loans_table_path = f"{mount_point_raw}/{delta_loans_location}"
delta_loan_payments_table_path = f"{mount_point_raw}/{delta_loan_payments_location}"

In [0]:
from delta.tables import DeltaTable

df_customers = spark.read.format("delta").load(delta_customers_table_path)
df_accounts = spark.read.format("delta").load(delta_accounts_table_path)
df_transactions = spark.read.format("delta").load(delta_transactions_table_path)
df_loans = spark.read.format("delta").load(delta_loans_table_path)
df_loan_payments = spark.read.format("delta").load(delta_loan_payments_table_path)

df_accounts_customers = df_accounts.join(df_customers, on="customer_id", how="left") \
    .select("account_id", "customer_id", "first_name", "last_name", "balance")

df_accounts_customers.show(15)

+----------+-----------+----------+---------+-------+
|account_id|customer_id|first_name|last_name|balance|
+----------+-----------+----------+---------+-------+
|        28|          7|     James| Martinez|2900.00|
|        24|         11| Alexander|   Thomas|2600.00|
|        13|         29|   Michael|  Collins|1300.25|
|        11|          3|   Michael|  Johnson|1100.75|
|        15|         47|    Andrew|     Gray| 700.75|
|        18|          5|     David|   Wilson|1600.50|
|        16|         18|    Amelia|    Adams|1400.00|
|        26|         25|    Daniel| Campbell|2800.50|
|        21|         53|     James|  Jenkins| 300.25|
|        29|         58|    Sophia|   Hughes|  75.25|
|        32|          9|   William|    Lopez|3300.00|
|        30|         32|    Sophia|   Morris|3100.50|
|        22|         37| Alexander|     Bell|2400.50|
|         2|         12|  Isabella|      Lee|2500.75|
|        20|         21|    Andrew| Mitchell|2000.00|
+----------+-----------+----

In [0]:
from pyspark.sql.functions import sum

df_joined = df_customers.join(df_accounts, on="customer_id", how="inner") \
  .drop(df_customers.ingestion_timestamp) \
  .drop(df_accounts.ingestion_timestamp) \

df_aggregated = df_joined.groupBy("customer_id", "first_name", "last_name") \
    .agg(sum("balance").alias("total_balance"))

df_final = df_joined.join(df_aggregated, on=["customer_id", "first_name", "last_name"], how="inner")

df_final.show(15)

+-----------+-----------+---------+---------------+-------------+-----+------+----------+------------+-------+-------------+
|customer_id| first_name|last_name|        address|         city|state|   zip|account_id|account_type|balance|total_balance|
+-----------+-----------+---------+---------------+-------------+-----+------+----------+------------+-------+-------------+
|          1|       John|      Doe|     123 Elm St|      Toronto|   ON|M4B1B3|        88|    Checking|8900.00|       8900.0|
|         10|        Ava| Anderson|909 Cypress Ave|  Quebec City|   QC|G1A0A1|        52|    Checking|5300.00|       5300.0|
|         11|  Alexander|   Thomas| 1010 Willow Rd|   St. John's|   NL|A1A0A1|        24|    Checking|2600.00|       2600.0|
|         12|   Isabella|      Lee| 1111 Poplar St|  Fredericton|   NB|E3B0A1|        64|    Checking|6500.00|      9000.75|
|         13|     Daniel|   Harris|  1212 Ash Blvd|Charlottetown|   PE|C1A0A1|        44|    Checking|4500.00|       4500.0|


In [0]:
delta_accounts_customers_location = "delta/accounts_customers"
delta_final_location = "delta/final"

delta_accounts_customers_table_path = f"{mount_point_processed}/{delta_accounts_customers_location}"
delta_final_table_path = f"{mount_point_processed}/{delta_final_location}"

In [0]:
from delta.tables import DeltaTable

if DeltaTable.isDeltaTable(spark, delta_accounts_customers_table_path):
    existing_data = DeltaTable.forPath(spark, delta_accounts_customers_table_path)
    
    (existing_data.alias("existing") \
        .merge(df_accounts_customers.alias("new"), "existing.account_id = new.account_id") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute())
else:
    df_accounts_customers.write.format("delta").mode("overwrite").save(delta_accounts_customers_table_path)

spark.read.format("delta").load(delta_accounts_customers_table_path).show(15)

+----------+-----------+----------+----------+-------+
|account_id|customer_id|first_name| last_name|balance|
+----------+-----------+----------+----------+-------+
|        41|         51|    Daniel|      Ross| 250.25|
|        28|          7|     James|  Martinez|2900.00|
|        38|         15|   Matthew|      King|3900.50|
|        89|         54|     Emily|     Perry| 850.25|
|        24|         11| Alexander|    Thomas|2600.00|
|        13|         29|   Michael|   Collins|1300.25|
|        65|         69|    Joseph|      Diaz| 550.25|
|        54|         42| Charlotte|Richardson|5500.50|
|        71|         73|    Andrew|  Hamilton| 625.75|
|        73|         87|   William|  McDonald| 650.25|
|        14|         64|  Isabella|  Gonzalez|3200.50|
|        11|          3|   Michael|   Johnson|1100.75|
|        80|         30| Elizabeth|   Stewart|8100.00|
|         4|         34|    Olivia|      Reed|3000.25|
|        69|         59|      John|    Flores| 600.25|
+---------

In [0]:
if DeltaTable.isDeltaTable(spark, delta_final_table_path):
    existing_data = DeltaTable.forPath(spark, delta_final_table_path)
    
    (existing_data.alias("existing") \
        .merge(df_final.alias("new"), "existing.account_id = new.account_id") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute())
else:
    df_final.write.format("delta").mode("overwrite").save(delta_final_table_path)

spark.read.format("delta").load(delta_final_table_path).show(15)

+-----------+-----------+---------+---------------+-------------+-----+------+----------+------------+-------+-------------+
|customer_id| first_name|last_name|        address|         city|state|   zip|account_id|account_type|balance|total_balance|
+-----------+-----------+---------+---------------+-------------+-----+------+----------+------------+-------+-------------+
|          1|       John|      Doe|     123 Elm St|      Toronto|   ON|M4B1B3|        88|    Checking|8900.00|       8900.0|
|         10|        Ava| Anderson|909 Cypress Ave|  Quebec City|   QC|G1A0A1|        52|    Checking|5300.00|       5300.0|
|         11|  Alexander|   Thomas| 1010 Willow Rd|   St. John's|   NL|A1A0A1|        24|    Checking|2600.00|       2600.0|
|         12|   Isabella|      Lee| 1111 Poplar St|  Fredericton|   NB|E3B0A1|        64|    Checking|6500.00|      9000.75|
|         13|     Daniel|   Harris|  1212 Ash Blvd|Charlottetown|   PE|C1A0A1|        44|    Checking|4500.00|       4500.0|
