data reading from source



In [0]:
df=spark.sql("select * from databricks_cata.silver.acute_discharge_situation")

In [0]:
from pyspark.sql.functions import col, count

# 1. List all columns
all_cols = df.columns

# 2. Group by every column, count duplicates
dups = (
    df.groupBy(all_cols)
      .agg(count("*").alias("cnt"))
      .filter(col("cnt") > 1)
)

# 3. Show you any truly identical rows
dups.show(truncate=False)


In [0]:
dups.count()

In [0]:
from pyspark.sql import functions as F

# 1. Compute true min/max dates as before
min_max = (
    df
    .withColumn("period_date", F.to_date("period", "yyyy-MM-dd"))
    .agg(
        F.min("period_date").alias("start_date"),
        F.max("period_date").alias("end_date")
    )
    .collect()[0]
)
start = min_max["start_date"]
end   = min_max["end_date"]

# 2. Generate one seed row, then sequence & explode
calendar_df = (
    spark.range(1)   # single row with id = 0
         .selectExpr(
             f"sequence(to_date('{start}'), to_date('{end}'), interval 1 day) AS dt"
         )
         .select(F.explode("dt").alias("date_key"))
         .withColumn("year",        F.year("date_key"))
         .withColumn("month",       F.month("date_key"))
         .withColumn("day",         F.dayofmonth("date_key"))
         .withColumn("day_of_week", F.dayofweek("date_key"))
         .withColumn("is_weekend",  F.expr("day_of_week IN (1,7)"))
)

display(calendar_df)  # you should now see one row per date


In [0]:
calendar_df.write.mode("overwrite").format("delta").saveAsTable("databricks_cata.gold.dim_date")

In [0]:
%sql
select * from databricks_cata.gold.dim_date

In [0]:
dim_region = (df
  .filter("level = 'Region'")
  .selectExpr("org_code as region_code", "org_name as region_name")
  .distinct()
)

dim_region.write.mode("overwrite").format("delta").saveAsTable("databricks_cata.gold.dim_region")


In [0]:
%sql
select * from databricks_cata.gold.dim_region

In [0]:
dim_icb = (df
  .filter("level = 'ICB'")
  .selectExpr("org_code as icb_code", "org_name as icb_name", "region as region_code")
  .distinct()
)

dim_icb.write.mode("overwrite").format("delta").saveAsTable("databricks_cata.gold.dim_icb")


In [0]:
%sql
select * from databricks_cata.gold.dim_icb

In [0]:
dim_metric = (df
  .select("metric_group","metric","metric_type")
  .distinct()
)

dim_metric.write.mode("overwrite").format("delta").saveAsTable("databricks_cata.gold.dim_metric")


In [0]:
%sql
select * from databricks_cata.gold.dim_metric

In [0]:
df_silver = df.withColumn("period_date", F.to_date("period", "yyyy-MM-dd"))

In [0]:
display(df_silver)

In [0]:
from pyspark.sql.functions import to_date

spark.table("databricks_cata.silver.acute_discharge_situation") \
  .filter("level = 'ICB'") \
  .selectExpr(
    "to_date(period,'yyyy-MM-dd') AS date_key",
    "org_code",
    "metric_group",
    "value",
    "year",
    "month"
  ) \
  .write \
  .mode("overwrite") \
  .partitionBy("year","month") \
  .format("delta") \
  .saveAsTable("databricks_cata.gold.fact_daily_sitrep")


In [0]:
%sql
select * from databricks_cata.gold.fact_daily_sitrep

In [0]:
from pyspark.sql.functions import expr, col

fact_wide = (
  spark.table("databricks_cata.gold.fact_daily_sitrep")
    .groupBy("date_key", "year", "month", "org_code")
    .pivot("metric_group")
    .agg(expr("sum(value)"))
)

# Rename columns to remove spaces
for col_name in fact_wide.columns:
    new_col_name = col_name.replace(" ", "_")
    fact_wide = fact_wide.withColumnRenamed(col_name, new_col_name)

fact_wide.write.mode("overwrite").format("delta").saveAsTable("databricks_cata.gold.vw_fact_wide")

In [0]:
%sql
select * from databricks_cata.gold.vw_fact_wide

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# 1a) Add year/month columns
fact_monthly = (
    spark.table("vw_fact_wide")
      .withColumn("year",   F.year("date_key"))
      .withColumn("month",  F.month("date_key"))
)

# 1b) Aggregate to month × ICB
monthly_agg = (
    fact_monthly
      .groupBy("org_code","year","month")
      .agg(
        F.sum(F.col("NCTR")).alias("total_delays"),       # NCTR = count of patients delayed
        F.sum(F.col("Discharges")).alias("total_discharges")   # DISCH = count of discharges (if available)
      )
      .withColumn(
        "delay_rate",
        # per-100 discharges; if DISCH is null/zero you’d need to handle separately
        F.when(
          F.col("total_discharges")>0,
          F.col("total_delays")/F.col("total_discharges")*100
        ).otherwise(None)
      )
)

monthly_agg.write.mode("overwrite").format("delta").saveAsTable("databricks_cata.gold.vw_monthly_rates")

In [0]:
%sql
select * from databricks_cata.gold.vw_monthly_rates order by  year asc, month asc

In [0]:
# 2a) Peer averages & stddev by month
peer_stats = (
    spark.table("databricks_cata.gold.vw_monthly_rates")
      .groupBy("year","month")
      .agg(
        F.mean("delay_rate").alias("peer_avg_rate"),
        F.stddev("delay_rate").alias("peer_stddev_rate")
      )
)

# 2b) Join back & compute Z-score
peer_benchmarked = (
    spark.table("databricks_cata.gold.vw_monthly_rates")
      .join(peer_stats, ["year","month"])
      .withColumn(
        "z_score",
        (F.col("delay_rate") - F.col("peer_avg_rate")) / F.col("peer_stddev_rate")
      )
)

peer_benchmarked.write.mode("overwrite").format("delta").saveAsTable("databricks_cata.gold.vw_peer_benchmarked")

In [0]:
%sql
select * from databricks_cata.gold.vw_peer_benchmarked order by year asc, month asc 

In [0]:
# 5a) Monthly sum of A8 per ICB
barrier_monthly = (
    fact_monthly
      .groupBy("org_code","year","month")
      .agg(F.sum("A8").alias("transport_delays"))
)

# 5b) Peer stats and z-score
barrier_peer = (
    barrier_monthly
      .join(
        barrier_monthly
          .groupBy("year","month")
          .agg(
            F.mean("transport_delays").alias("peer_avg"),
            F.stddev("transport_delays").alias("peer_std")
          ),
        ["year","month"]
      )
      .withColumn(
        "z_score_transport",
        (F.col("transport_delays") - F.col("peer_avg")) / F.col("peer_std")
      )
      .select("year","month","transport_delays","peer_avg","z_score_transport")
)
display(barrier_peer)


In [0]:
from pyspark.sql import functions as F

# 1) unpivot A1–A8 into barrier, count
barrier_unpivot = (
  spark.table("databricks_cata.gold.vw_fact_wide")
    .selectExpr(
      "date_key", "org_code as icb_code",
      "year", "month",
      """
      stack(
        8,
        'A1', A1,
        'A2', A2,
        'A3', A3,
        'A4', A4,
        'A5', A5,
        'A6', A6,
        'A7', A7,
        'A8', A8
      ) AS (barrier, delay_count)
      """
    )
    .filter("delay_count IS NOT NULL")
)

# 2) aggregate to month×ICB×barrier
barrier_monthly = (
  barrier_unpivot
    .groupBy("icb_code","year","month","barrier")
    .agg(F.sum("delay_count").alias("total_barrier"))
)

# 3) compute peer avg & stddev per barrier×month
peer_barrier_stats = (
  barrier_monthly
    .groupBy("barrier","year","month")
    .agg(
      F.mean("total_barrier").alias("peer_avg"),
      F.stddev("total_barrier").alias("peer_std")
    )
)

# 4) join back & z-score
barrier_zscores = (
  barrier_monthly
    .join(peer_barrier_stats, ["barrier","year","month"])
    .withColumn(
      "z_score_barrier",
      (F.col("total_barrier") - F.col("peer_avg")) / F.col("peer_std")
    )
)

barrier_zscores.createOrReplaceTempView("vw_barrier_zscores")


In [0]:
%sql
SELECT * 
FROM vw_barrier_zscores
ORDER BY year, month, barrier, icb_code


In [0]:
barrier_zscores.write.mode("overwrite").format("delta").saveAsTable("databricks_cata.gold.vw_barrier_zscores")

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# 1) Define exactly which metric_group columns you want to include
metrics = [
    "NCTR",
    "A1",
    "A2",
    "A3",
    "A4",
    "A5",
    "A6",
    "A7",
    "A8",
    "Additional_bed_days_lost"
]

# 2) Aggregate each metric by month × ICB, then unpivot with stack()
reason_monthly = (
    spark
      .table("databricks_cata.gold.vw_fact_wide")
      .groupBy("org_code", "year", "month")
      .agg(
        *[F.sum(F.col(m)).alias(m) for m in metrics]
      )
      .selectExpr(
        "org_code", "year", "month",
        f"""
        stack(
          {len(metrics)},
          {', '.join([f"'{m}', `{m}`" for m in metrics])}
        ) AS (reason, count)
        """
      )
      .filter("count IS NOT NULL")      # drop any null‐count rows
)

# 3) Rank each reason within its ICB × month by descending count
window_spec = Window.partitionBy("org_code", "year", "month") \
                    .orderBy(F.desc("count"))

top_reasons = (
    reason_monthly
      .withColumn("rank", F.rank().over(window_spec))
      .filter("rank <= 3")
)

top_reasons.write.mode("overwrite").format("delta").saveAsTable("databricks_cata.gold.vw_top_reasons")


In [0]:
%sql
select * from databricks_cata.gold.vw_top_reasons

In [0]:
%sql
SELECT org_code, year, month, reason, count
FROM vw_top3_reasons
ORDER BY org_code, year, month, rank;


In [0]:
%sql
Select * from databricks_cata.gold.dim_metric

In [0]:
%sql
select * from databricks_cata.gold.vw_barrier_zscores

In [0]:
%sql
select * from databricks_cata.gold.fact_daily_sitrep

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# 0) Start from your wide fact
fact = spark.table("databricks_cata.gold.vw_fact_wide")

# 1) Fill NULLs → 0 for A1–A8
for b in ["A1","A2","A3","A4","A5","A6","A7","A8"]:
    fact = fact.withColumn(b, F.coalesce(F.col(b), F.lit(0)))

# 2) Unpivot *including* zeros (no filter)
barrier_unpivot = fact.selectExpr(
    "org_code as icb_code", "year", "month",
    """
    stack(
      8,
      'A1', A1,
      'A2', A2,
      'A3', A3,
      'A4', A4,
      'A5', A5,
      'A6', A6,
      'A7', A7,
      'A8', A8
    ) AS (barrier, total_barrier)
    """
)

# 3) Compute peer stats & Z-score exactly as before
peer_barrier_stats = (
    barrier_unpivot
      .groupBy("barrier","year","month")
      .agg(
        F.mean("total_barrier").alias("peer_avg"),
        F.stddev("total_barrier").alias("peer_std")
      )
)

barrier_zscores = (
    barrier_unpivot
      .join(peer_barrier_stats, ["barrier","year","month"])
      .withColumn("z_score_barrier",
          (F.col("total_barrier") - F.col("peer_avg")) / F.col("peer_std")
      )
)

barrier_zscores.createOrReplaceTempView("vw_barrier_zscores")


In [0]:
%sql
select * from vw_barrier_zscores

In [0]:
barrier_zscores.write.format("delta").mode("overwrite").saveAsTable("databricks_cata.gold.vw_barrier_zscores")

In [0]:
%sql
select count(*) as ctr_count
from databricks_cata.gold.fact_daily_sitrep
where metric_group = 'CTR'

In [0]:
%sql
select * from databricks_cata.gold.fact_daily_sitrep where metric_group = "Discharge destination"

date_key,org_code,metric_group,value,year,month
2025-03-01,QRL,Discharge destination,421,2025,3
2025-03-01,QMJ,Discharge destination,80,2025,3
2025-03-01,QUY,Discharge destination,172,2025,3
2025-03-01,QT1,Discharge destination,56,2025,3
2025-03-01,QWU,Discharge destination,170,2025,3
2025-03-01,QKK,Discharge destination,199,2025,3
2025-03-01,QWO,Discharge destination,166,2025,3
2025-03-01,QHM,Discharge destination,304,2025,3
2025-03-01,QJM,Discharge destination,43,2025,3
2025-03-01,QOP,Discharge destination,157,2025,3


In [0]:
from pyspark.sql import functions as F

silver = spark.table("databricks_cata.silver.acute_discharge_situation")

dest = (
    silver
    .filter("level = 'ICB' AND metric_group = 'Discharge destination'")
    .select(
        F.to_date(F.col("period"), "yyyy-MM-dd").alias("date_key"),
        F.col("org_code"),
        F.col("value"),
        F.col("year"),
        F.col("month"),
        F.when(F.col("metric").startswith("Pathway 0:"), F.lit("P0"))
         .when(F.col("metric").startswith("Pathway 1:"), F.lit("P1"))
         .when(F.col("metric").startswith("Pathway 2:"), F.lit("P2"))
         .when(F.col("metric").startswith("Pathway 3:"), F.lit("P3"))
         .otherwise(F.lit(None))
         .alias("pathway_code")
    )
    .filter(F.col("pathway_code").isNotNull())
)

fact_discharge_dest = (
    dest
    .groupBy("date_key", "org_code", "year", "month")
    .pivot("pathway_code", ["P0", "P1", "P2", "P3"])
    .agg(F.expr("sum(value)"))
    .na.fill(0)
)

fact_discharge_dest.write \
    .mode("overwrite") \
    .format("delta") \
    .partitionBy("year", "month") \
    .saveAsTable("databricks_cata.gold.fact_discharge_destination")

In [0]:
%sql
select * from databricks_cata.gold.fact_discharge_destination

date_key,org_code,year,month,P0,P1,P2,P3
2025-04-01,QOC,2025,4,2208,232,224,58
2025-04-01,QMM,2025,4,4840,755,340,106
2025-04-01,QNX,2025,4,5678,535,266,104
2025-04-01,QT1,2025,4,6358,859,156,99
2025-04-01,QKS,2025,4,5931,602,361,169
2025-04-01,QHG,2025,4,4422,537,165,69
2025-04-01,QM7,2025,4,5487,494,221,109
2025-04-01,QJM,2025,4,1968,269,37,77
2025-04-01,QVV,2025,4,4367,361,152,44
2025-04-01,QYG,2025,4,12461,1659,467,354


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import trim, col, when, lit

spark = SparkSession.builder.getOrCreate()

# 1a) Grab every unique free-text reason
unique_reasons = (
    spark.table("databricks_cata.silver.acute_discharge_situation")
         .filter("metric_group = 'Delay reason'")
         .select(trim(col("metric")).alias("metric_text"))
         .distinct()
)

In [0]:
from pyspark.sql.functions import when, lit, trim, col

# 2) Hard-code the mapping above
dim_delay_reason = unique_reasons.withColumn(
    "reason_code",
    when(col("metric_text") == "Wellbeing concerns � Awaiting determination of mental capacity",               lit("B3"))
   .when(col("metric_text") == "Capacity � Other home-based social care service not yet available (Pathway 1)", lit("E"))
   .when(col("metric_text") == "Interface�process � Patient/family/carer choice discussions on package still underway", lit("C"))
   .when(col("metric_text") == "Interface�process � End of life care inc Fast-Track CHC arrangement still underway?(Pathway 1 or 3)", lit("C"))
   .when(col("metric_text") == "Capacity � Housing adaptations not yet completed (Pathway 1 or 3)",             lit("E"))
   .when(col("metric_text") == "Capacity � Housing provision not yet available (Pathway 0 or 1)",              lit("E"))
   .when(col("metric_text") == "Capacity � Home-based rehabilitation, reablement or recovery services not yet available�(Pathway 1)", lit("E"))
   .when(col("metric_text") == "Capacity � Equipment and associated training not yet delivered (Pathway 1-3)", lit("E"))
   .when(col("metric_text") == "Hospital process - Awaiting medical review of need for supported discharge",     lit("A1"))
   .when(col("metric_text") == "Interface process � Residential/nursing home care arrangements still underway (Pathway 3)", lit("C"))
   .when(col("metric_text") == "Capacity � Other home-based community health services not yet available (Pathway 1)", lit("E"))
   .when(col("metric_text") == "Capacity � Residential/nursing home care not yet available (Pathway 3)",       lit("E"))
   .when(col("metric_text") == "Wellbeing concerns � Patient�/family/carer concerns over discharge readiness",  lit("B2"))
   .when(col("metric_text") == "Interface�process � Self-funded care package arrangements still underway",      lit("C"))
   .when(col("metric_text") == "Hospital process � Awaiting therapy review of need for supported discharge",     lit("A7"))
   .when(col("metric_text") == "Hospital process � Awaiting medicines to take home, discharge letter�or other discharge documentation", lit("A5"))
   .when(col("metric_text") == "Hospital process � Awaiting referral to care transfer hub for supported discharge", lit("C"))
   .when(col("metric_text") == "Care transfer hub process� Awaiting necessary referrals by care transfer hub",   lit("C"))
   .when(col("metric_text") == "Hospital process � Remaining in hospital due to infection prevention and control restrictions", lit("B4"))
   .when(col("metric_text") == "Wellbeing concerns � Ongoing�safeguarding concern",                            lit("B3"))
   .when(col("metric_text") == "Interface�process � Out of area discharge arrangements requested but not completed", lit("C"))
   .when(col("metric_text") == "Capacity � Bed-based rehabilitation, reablement or recovery services not yet available (Pathway 2)", lit("E"))
   .when(col("metric_text") == "Hospital process � Awaiting formal decision to discharge (inc diagnostic test results)", lit("A1"))
   .when(col("metric_text") == "Care transfer hub process � Waiting for confirmation of immediate care needs and pathway", lit("C"))
   .when(col("metric_text") == "Hospital process � Awaiting patient transport services",                      lit("A8"))
   .when(col("metric_text") == "Capacity � End of life care inc Fast-Track CHC not yet available�(Pathway 1 or 3)", lit("E"))
   .when(col("metric_text") == "Capacity � Awaiting restart of existing social care arrangements (Pathway 0)", lit("E"))
   .when(col("metric_text") == "Capacity � Mental health admitted patient care not yet available (Pathway 2)", lit("E"))
   .when(col("metric_text") == "Interface�process � Further action requested by agreed provider",            lit("C"))
   .when(col("metric_text") == "Care transfer hub process � Awaiting confirmation of funding eligibility",     lit("C"))
   .when(col("metric_text") == "Interface process � Home based rehabilitation, reablement or recovery service arrangements still underway (Pathway 1)", lit("C"))
   .when(col("metric_text") == "Interface process � Bed-based rehabilitation, reablement or recovery service arrangements still underway (Pathway 2)", lit("C"))
   .when(col("metric_text") == "Interface�process � Housing provision arrangement for homelessness still underway�(Pathway 0 or 1)", lit("C"))
   .when(col("metric_text") == "Interface process � Other home-based social care service arrangements still underway (Pathway 1)", lit("C"))
   .when(col("metric_text") == "Interface�process � Homeless with no recourse to public funds?",         lit("B1"))
   .when(col("metric_text") == "Wellbeing concerns - Issues with discharge destination readiness",         lit("B2"))
   .when(col("metric_text") == "Interface process � Other home-based community health service arrangements still underway (Pathway 1)", lit("C"))
   .otherwise(lit("UNKNOWN"))
)

In [0]:
dim_delay_reason.write.format("delta").mode("overwrite").saveAsTable("databricks_cata.gold.dim_delay_reason")

In [0]:
%sql
select * from databricks_cata.gold.dim_delay_reason

metric_text,reason_code
Wellbeing concerns � Awaiting determination of mental capacity,B3
Capacity � Other home-based social care service not yet available (Pathway 1),E
Interface�process � Patient/family/carer choice discussions on package still underway,C
Interface�process � End of life care inc Fast-Track CHC arrangement still underway?(Pathway 1 or 3),C
Capacity � Housing adaptations not yet completed (Pathway 1 or 3),E
Capacity � Housing provision not yet available (Pathway 0 or 1),E
"Capacity � Home-based rehabilitation, reablement or recovery services not yet available�(Pathway 1)",E
Capacity � Equipment and associated training not yet delivered (Pathway 1-3),E
Hospital process - Awaiting medical review of need for supported discharge,A1
Interface process � Residential/nursing home care arrangements still underway (Pathway 3),C


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, col, sum as Fsum, avg as Favg, stddev as Fstd
from pyspark.sql.window import Window

spark = SparkSession.builder.getOrCreate()

# 1) Read Silver delay-reason rows and add date_key
silver = (
    spark.table("databricks_cata.silver.acute_discharge_situation")
         .filter("metric_group = 'Delay reason'")
         .select(
             to_date("period", "yyyy-MM-dd").alias("date_key"),
             col("org_code"),
             col("metric"),
             col("value").alias("delay_count"),
             col("year"),
             col("month")
         )
)

# 2) Join to Dim-Delay-Reason (which has metric_text→reason_code)
dim_delay = spark.table("databricks_cata.gold.dim_delay_reason")

fact_delay_reason = (
    silver
      .join(dim_delay,
            silver.metric == dim_delay.metric_text,
            how="inner")
      .select(
         "date_key",
         col("org_code").alias("icb_code"),
         col("reason_code"),
         "delay_count",
         "year","month"
      )
)



In [0]:
# Persist the fact table
(
  fact_delay_reason
    .write
    .mode("overwrite")
    .format("delta")
    .saveAsTable("databricks_cata.gold.fact_delay_reason")
)



In [0]:
%sql
select * from databricks_cata.gold.fact_delay_reason

date_key,icb_code,reason_code,delay_count,year,month
2025-04-01,R0A,C,34.0,2025,4
2025-04-01,RCD,A8,0.0,2025,4
2025-04-01,RWA,A5,0.0,2025,4
2025-04-01,RN5,B4,0.0,2025,4
2025-04-01,QMF,B2,4.0,2025,4
2025-04-01,RTR,B3,0.0,2025,4
2025-04-01,QU9,B2,1.0,2025,4
2025-04-01,RTX,C,0.0,2025,4
2025-04-01,QMM,C,0.0,2025,4
2025-04-01,RVW,C,0.0,2025,4


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# --- assume fact_delay_reason is already defined as:
# fact_delay_reason = spark.table("databricks_cata.gold.fact_delay_reason")

# 3a) Compute peer averages & stddev per reason × month
peer_stats = (
    fact_delay_reason
      .groupBy("reason_code","year","month")
      .agg(
        F.avg("delay_count").alias("peer_avg"),
        F.stddev("delay_count").alias("peer_std")
      )
)

# 3b) Join back & calculate z-score
barrier_zscores = (
    fact_delay_reason.alias("f")
      .join(peer_stats.alias("p"),
            on=[
              F.col("f.reason_code") == F.col("p.reason_code"),
              F.col("f.year")        == F.col("p.year"),
              F.col("f.month")       == F.col("p.month")
            ],
            how="inner"
      )
      .select(
        F.col("f.icb_code"),
        F.col("f.reason_code").alias("barrier"),
        F.col("f.year"),
        F.col("f.month"),
        F.col("f.delay_count").alias("total_barrier"),
        F.col("p.peer_avg"),
        (
          (F.col("f.delay_count") - F.col("p.peer_avg")) 
          / F.col("p.peer_std")
        ).alias("z_score_barrier")
      )
)

# **Write out as a Delta table**
barrier_zscores.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("databricks_cata.gold.fact_barrier_zscores")


# 4) Rank & pick Top-3 per ICB × month
window_spec = Window.partitionBy("icb_code","year","month") \
                    .orderBy(F.desc("total_barrier"))

top3 = (
    barrier_zscores
      .withColumn("rank", F.rank().over(window_spec))
      .filter("rank <= 3")
      .select("icb_code","year","month","barrier","total_barrier","rank")
)

# **Write out as a Delta table**
top3.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("databricks_cata.gold.fact_top3_delay_reasons")


In [0]:
%sql
select * from databricks_cata.gold.fact_barrier_zscores

icb_code,barrier,year,month,total_barrier,peer_avg,z_score_barrier
R0A,C,2025,4,34.0,6.14920634920635,0.9602052089717122
RCD,A8,2025,4,0.0,2.6785714285714284,-0.2777759980895458
RWA,A5,2025,4,0.0,4.303571428571429,-0.2819199723295389
RN5,B4,2025,4,0.0,0.75,-0.2561673778270786
QMF,B2,2025,4,4.0,2.738095238095238,0.1247411877588792
RTR,B3,2025,4,0.0,2.7916666666666665,-0.2690546408726303
QU9,B2,2025,4,1.0,2.738095238095238,-0.1718133340829846
RTX,C,2025,4,0.0,6.14920634920635,-0.2120047292577461
QMM,C,2025,4,0.0,6.14920634920635,-0.2120047292577461
RVW,C,2025,4,0.0,6.14920634920635,-0.2120047292577461


In [0]:
%sql
select * from databricks_cata.gold.fact_top3_delay_reasons

icb_code,year,month,barrier,total_barrier,rank
ENG,2024,6,E,827.0,1
ENG,2024,6,E,804.0,2
ENG,2024,6,C,640.0,3
ENG,2024,7,E,891.0,1
ENG,2024,7,E,807.0,2
ENG,2024,7,C,645.0,3
ENG,2024,8,E,884.0,1
ENG,2024,8,E,814.0,2
ENG,2024,8,C,668.0,3
ENG,2024,9,E,677.0,1


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import to_date, col, when, lit

# 1. Read your silver table
silver_a = spark.table("databricks_cata.silver.acute_discharge_situation")

# 2. Filter down to just the Additional‐bed‐days rows for ICBs, and parse out sub‐groups
additional = (
    silver_a
      .filter("level = 'ICB' AND metric_group = 'Additional bed days lost'")
      .withColumn("date_key", to_date(col("period"), "yyyy-MM-dd"))
      .withColumn(
         "length_band",
         when(col("metric").rlike("7\\+? days"),  "bed_days_7_plus")
         .when(col("metric").rlike("14\\+? days"), "bed_days_14_plus")
         .when(col("metric").rlike("21\\+? days"), "bed_days_21_plus")
         .otherwise(lit("other"))
      )
)




In [0]:
# 3. Pivot to wide form
fact_additional_bed_days = (
    additional
      .groupBy("date_key","org_code")
      .pivot("length_band", ["bed_days_7_plus","bed_days_14_plus","bed_days_21_plus"])
      .agg(F.sum("value").alias("total_days_lost"))
      .withColumn("year",  F.year("date_key"))
      .withColumn("month", F.month("date_key"))
)



In [0]:
display(fact_additional_bed_days)

date_key,org_code,bed_days_7_plus,bed_days_14_plus,bed_days_21_plus,year,month
2024-04-08,QUE,2630,2229,1852,2024,4
2024-09-02,QUE,1678,1460,1164,2024,9
2025-01-20,QUE,1789,1531,1203,2025,1
2025-05-26,QOQ,1098,906,648,2025,5
2024-04-29,QHM,3336,3047,2567,2024,4
2024-07-15,QMF,1636,1561,1436,2024,7
2024-12-23,QOP,11087,10611,10010,2024,12
2025-01-13,QU9,1512,1315,1048,2025,1
2025-04-14,QYG,13255,12575,11415,2025,4
2025-02-10,QWE,3801,3615,3308,2025,2


In [0]:
# 4. Persist as a Delta table in your gold layer
fact_additional_bed_days.write\
    .mode("overwrite")\
    .format("delta")\
    .partitionBy("year","month")\
    .saveAsTable("databricks_cata.gold.fact_additional_bed_days")

In [0]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder.getOrCreate()

# read your pivoted bed-days fact
fact_bed = spark.table("databricks_cata.gold.fact_additional_bed_days")

# 1) Unpivot to long form
fact_bed_long = fact_bed.selectExpr(
  "org_code as icb_code",
  "year", "month", "date_key",
  # stack(#cols, 'colName1', expr1, 'colName2', expr2, ...) → (length_band, total_days_lost)
  """
  stack(
    3,
    'bed_days_7_plus',   bed_days_7_plus,
    'bed_days_14_plus',  bed_days_14_plus,
    'bed_days_21_plus',  bed_days_21_plus
  ) as (length_band, total_days_lost)
  """
)

# 2) Compute peer means & stddev by bucket × year × month
peer_stats_bed = (
  fact_bed_long
    .groupBy("length_band","year","month")
    .agg(
      F.avg("total_days_lost").alias("peer_avg_days"),
      F.stddev("total_days_lost").alias("peer_std_days")
    )
)

# 3) Join back & compute Z-score
bed_zscores = (
  fact_bed_long.alias("f")
    .join(peer_stats_bed.alias("p"),
          on=["length_band","year","month"], how="inner")
    .select(
      F.col("f.icb_code"),
      F.col("f.date_key"),
      F.col("f.year"),
      F.col("f.month"),
      F.col("f.length_band"),
      F.col("f.total_days_lost"),
      F.col("p.peer_avg_days"),
      F.col("p.peer_std_days"),
      (
        (F.col("f.total_days_lost") - F.col("p.peer_avg_days"))
        / F.col("p.peer_std_days")
      ).alias("z_score_bed_days")
    )
)

# 4) Persist as a Delta table for Power BI consumption
bed_zscores.write \
  .format("delta") \
  .mode("overwrite") \
  .saveAsTable("databricks_cata.gold.fact_bed_days_zscores")
