In [0]:
# --- ADLS via SAS (FixedSASTokenProvider) — short & robust -----------------
storage_account = "stnzrentdev"
container = "nz-rent"
dfs_fqdn = f"{storage_account}.dfs.core.windows.net"
abfss_url = f"abfss://{container}@{dfs_fqdn}/"

# Use token WITHOUT leading '?'
sas_token_raw = "sv=2024-11-04&ss=bfqt&srt=co&sp=rwdlacupyx&se=2025-10-25T16:26:07Z&st=2025-10-15T08:11:07Z&spr=https&sig=Xddwgamve%2Fr6c2FKAWLKWax2cOWBZwUJ5t%2BpmxPWOdg%3D"

# 0) Clear possible conflicting configs (ignore errors if not set)
for k in [
    f"fs.azure.account.key.{dfs_fqdn}",
    f"fs.azure.sas.{container}.{storage_account}.dfs.core.windows.net",
]:
    try: spark.conf.unset(k)
    except Exception: pass

# 1) Tell Spark to use SAS with FixedSASTokenProvider
spark.conf.set(f"fs.azure.account.auth.type.{dfs_fqdn}", "SAS")
spark.conf.set(f"fs.azure.sas.token.provider.type.{dfs_fqdn}",
               "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set(f"fs.azure.sas.fixed.token.{dfs_fqdn}", sas_token_raw)

# 2) List container; bootstrap /bronze if empty
entries = dbutils.fs.ls(abfss_url)
if len(entries) == 0:
    dbutils.fs.mkdirs(abfss_url + "bronze")
    dbutils.fs.put(abfss_url + "bronze/_sanity.txt", "hello databricks", overwrite=True)
    entries = dbutils.fs.ls(abfss_url)

# 3) Show as table
rows = [(e.path, e.size, e.modificationTime) for e in entries]
display(spark.createDataFrame(rows, ["path", "size", "mtime"]).orderBy("path"))

In [0]:
from pyspark.sql.functions import trim, col, year, month, quarter, when, round as rd
from pyspark.sql.types import *

bronze_path = f"{abfss_url}bronze/staging_rent_from_workspace.csv"

df_bronze = (spark.read
             .option("header", True)
             .option("inferSchema", True)
             .csv(bronze_path))

df_silver = (df_bronze
    .withColumn("suburb_name", trim(col("suburb_name")))
    .withColumn("region", when(trim(col("region")) == "", None).otherwise(trim(col("region"))))
    .withColumn("territorial_authority", when(trim(col("territorial_authority")) == "", None).otherwise(trim(col("territorial_authority"))))
    .withColumn("property_type", trim(col("property_type")))

    .withColumn("median_rent", col("median_rent").cast("double"))
    .withColumn("count_bonds", col("count_bonds").cast("int"))
    
    .withColumn("date_month", col("date_month").cast("date"))
    .withColumn("year", year(col("date_month")))
    .withColumn("month", month(col("date_month")))
    .withColumn("quarter", quarter(col("date_month")))
)
    
df_silver = df_silver.filter(col("date_month").isNotNull() & col("median_rent").isNotNull())

silver_path = f"{abfss_url}silver/staging_rent_delta"
(df_silver.write
    .format("delta")
    .mode("overwrite")
    .save(silver_path))

print("Silver layer written successfully.")
display(df_silver.limit(5))

In [0]:
from pyspark.sql.functions import monotonically_increasing_id

silver_path = f"{abfss_url}silver/staging_rent_delta"
df_silver = spark.read.format("delta").load(silver_path)

dim_time = (df_silver
            .select("date_month", "year", "quarter", "month")
            .distinct()
            .orderBy("date_month")
            .withColumn("time_id", monotonically_increasing_id())
)

dim_time.write.format("delta").mode("overwrite").save(f"{abfss_url}gold/dim_time")

dim_suburb = (df_silver
              .select("suburb_name", "region", "territorial_authority", "lat", "lon")
              .distinct()
              .withColumn("suburb_id", monotonically_increasing_id())
)

dim_suburb.write.format("delta").mode("overwrite").save(f"{abfss_url}gold/dim_suburb")

dim_property_type = (df_silver
                .select(trim(col("property_type")).alias("property_type_name"))
                .distinct()
                .withColumn("property_type_id", monotonically_increasing_id())
)

dim_property_type.write.format("delta").mode("overwrite").save(f"{abfss_url}gold/dim_property_type")

print("Dimension tables created successfully.")
display(dim_suburb.limit(5))
display(dim_property_type.limit(5))
display(dim_time.limit(5))

In [0]:
silver_path = f"{abfss_url}silver/staging_rent_delta"
df_silver = spark.read.format("delta").load(silver_path)

dim_time = spark.read.format("delta").load(f"{abfss_url}gold/dim_time")
dim_suburb = spark.read.format("delta").load(f"{abfss_url}gold/dim_suburb")
dim_property = spark.read.format("delta").load(f"{abfss_url}gold/dim_property_type")

fact_rent = (
    df_silver.alias("r")
    .join(dim_time.alias("t"), col("r.date_month") == col("t.date_month"), "inner")
    .join(
        dim_suburb.alias("s"),
        (col("r.suburb_name") == col("s.suburb_name")) &
        (col("r.region") == col("s.region")),
        "inner"
    )
    .join(
        dim_property.alias("p"),
        col("r.property_type") == col("p.property_type_name"),
        "inner"
    )
    .select(
        col("t.time_id"),
        col("s.suburb_id"),
        col("p.property_type_id"),
        col("r.median_rent").alias("median_rent"),
        col("r.count_bonds").alias("count_bonds")
    )
)

fact_path = f"{abfss_url}gold/fact_rent"
(fact_rent.write
         .format("delta")
         .mode("overwrite")
         .save(fact_path))

print("Fact table created successfully.")
display(fact_rent.limit(5))