In [0]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


### PySpark Import & Utilities functions

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import *

In [0]:

# Add project root to Python path so shared transformation utilities can be reused
import os
import sys

project_path = os.path.join(os.getcwd(), "..","..")
sys.path.append(project_path)
from utils.lookup_tables import *
from utils.manual_upds_and_dels_commbank import *

#print(os.path.join(os.getcwd(), "..",".."))

## 1) Source ingestion & Transformed (Auto Loader / streaming)

In [0]:
df_commbank = spark.readStream.format("cloudFiles")\
                .option("cloudFiles.format", "parquet")\
                .option("cloudFiles.schemaLocation","abfss://silver@storagebudgetproject.dfs.core.windows.net/commbank/checkpoint")\
                .option("schemaEvolutionMode", "addNewColumns")\
                .load("abfss://bronze@storagebudgetproject.dfs.core.windows.net/commbank")

In [0]:
# ----------------------------
# 1) base
# ----------------------------
df_base = (
    df_commbank
    .drop("_rescued_data","Balance")
    .where(col("Date").isNotNull())
    .withColumn("Date", to_date(col("Date"), "dd/MM/yyyy"))
    .withColumn("Description", upper(trim(col("Description"))))
    .withColumn("Amount", col("Amount").cast("double"))
)

In [0]:
# ----------------------------
# 2) typed
# ----------------------------
df_typed = (
    df_base
    .withColumn(
        "Type",
         when(col("Amount") < 0, lit("Expenses"))
         .when(col("Amount") > 0, lit("Income"))
         .otherwise(lit(None))
    )
)

In [0]:
# ----------------------------
# 3) cleaned (description_lookup)
# ----------------------------

# Apply utility functions - lookup_tables
df_cleaned = description_lookup.apply(df_typed)

In [0]:
# ----------------------------
# 4) categorized (apple.com override else category_lookup else n/a)
# ----------------------------

# Apply utility functions - lookup_tables
df_categorized = category_lookup.apply(df_cleaned)

df_categorized = (
    df_categorized
    .withColumn(
        "category",
        when((col("Description").like("%APPLE.COM%")) & (abs(col("Amount")) >= 5), lit("Entertainment"))
         .when((col("Description").like("%APPLE.COM%")) & (abs(col("Amount")) <  5), lit("Subscription"))
         .when((col("Description").like("%CREDIT INTEREST REDIRECTION%")), lit("Term Deposit Interest"))
         .when(col("Description") == "CREDIT INTEREST", lit("Saving Interest"))
         .when(col("category").isNotNull(), col("category"))
         .otherwise(lit(None))
    )
)




In [0]:
# ----------------------------
# 5) location (location_lookup else n/a)
# ----------------------------

# Apply utility functions - lookup_tables
df_location = location_lookup.apply(df_categorized)

In [0]:
# ----------------------------
# 6) noted (apple/tpg overrides else note_lookup)
# ----------------------------

# Apply utility functions - lookup_tables
df_noted = note_lookup.apply(df_location)

df_noted = (
    df_noted
    .withColumn(
        "note",
        when((col("Description").like("%APPLE.COM%")) & (abs(col("Amount")) >= 5), lit("apple store"))
         .when((col("Description").like("%APPLE.COM%")) & (abs(col("Amount")) <  5), lit("icloud"))
         .when((col("Description").like("%TPG INTERNET%")) & (abs(col("Amount")) >  35), lit("internet/home"))
         .when((col("Description").like("%TPG INTERNET%")) & (abs(col("Amount")) <= 35), lit("internet/phone"))
         .when(col("note").isNotNull(), col("note"))
         .otherwise(lit(None))
    )
)


In [0]:

# ----------------------------
# 7) Apply manual updates & deletes
# ----------------------------

# Apply utility functions - manual_upds_and_dels_commbank
df_silver = commbank_manual_updates(df_noted)

df_silver = commbank_manual_deletes(df_silver)

In [0]:
# ----------------------------
# 8) Bank name and payment method
# ----------------------------
df_silver = (
    df_silver
    .withColumn("bank", lit("commbank"))
    .withColumn("payment",
        when((col("category") == 'Personal'), lit("Personal"))
         .when((col("location") == 'online') & (col("category") != 'Personal'), lit("Online"))
         .when((col("location") != 'online') & (col("category") != 'Personal'), lit("On-site"))
         .otherwise(lit(None))
    )
)

In [0]:
df_silver = df_silver.withColumn(
    "transaction_id",
    sha2(concat_ws("||",
        coalesce(col("bank"), lit("")),
        col("Date").cast("string"),
        col("Amount").cast("string"),
        coalesce(col("Description"), lit(""))
    ), 256)
)

In [0]:
df_silver.writeStream.format("delta")\
    .outputMode("append")\
    .option("checkpointLocation", "abfss://silver@storagebudgetproject.dfs.core.windows.net/commbank/checkpoint")\
    .trigger(once=True)\
    .option("path", "abfss://silver@storagebudgetproject.dfs.core.windows.net/commbank/data")\
    .toTable("budget_cata.silver.commbank")

<pyspark.sql.connect.streaming.query.StreamingQuery at 0x7efc8e112c90>