In [0]:
**Outpatient Pipeline Summary (Steps 9-14)**

Click to expand for full details.

This pipeline processes outpatient activity data into a clean, multi-level dataset ready for analysis, benchmarking, and reporting.

Step 9 Wide Table & Provider Mapping
The raw outpatient snapshot is filtered for relevant years, attendance types, and administrative categories. Treatment Function Codes are validated and grouped, and key metrics (totals, first/follow-up appointments, procedures, DNAs, 2WW) are aggregated by month, provider, and treatment group. Provider mergers are applied, and ICB and region codes are mapped for organisational context. Month-end dates are derived for consistent time-series analysis.

Step 10 & 10A Derived Metrics & Benchmarks
Key performance metrics (e.g., DNA rates, procedure percentages, remote vs face-to-face ratios) are calculated safely to handle missing data. The 25th percentile of remote activity is computed as a lower benchmark, allowing providers to see where they sit relative to peers.

Step 11 & 13 Long / OPRT Format
Metrics are reshaped into a long format, turning each measure into a key–value pair while retaining identifiers like month, provider, ICB, region, and treatment group. This structure supports flexible reporting, visualization, and dashboard integration.

Step 12 Multi-Level Aggregation
Metrics are aggregated across Organisation, ICB, and Region levels. Derived rates are recalculated for each level, producing a consistent view of performance from local to regional scale.

Step 14 Internal Metric IDs
Each metric is linked with its Treatment Function Group and assigned an InternalID for consistent tracking and reporting. The final dataset is saved in Parquet format for downstream use.

In [0]:
#1 Importing Tools 
import openpyxl
import pandas as pd

from pyspark.sql import functions as F
from datetime import datetime
from openpyxl.styles import NamedStyle

In [0]:
#2 Reduce risk of a timeout by increasing limit to 30 minutes
spark.conf.set("spark.databricks.execution.timeout", "1800")

In [0]:
#3 Loading the master hierarchies table from the lake mart
df_master_hierarchies = spark.read.option("header", "true").csv("abfss://analytics-projects@udalstdataanalysisprod.dfs.core.windows.net/ElectiveRecovery/EROC_Collection_Queries/master_hierarchies_table.csv")
display(df_master_hierarchies.limit(10))
print(f"Number of rows in master hierarchies: {df_master_hierarchies.count()}")

In [0]:
#4 loading ICB to Region table
df_icb_region = spark.read.option("header", "true").csv("abfss://analytics-projects@udalstdataanalysisprod.dfs.core.windows.net/ElectiveRecovery/EROC_Collection/EROC/EROC_ICB_Region_DisplayNames.csv")  # Ensure proper Azure credentials are configured for ADLS access.
display(df_icb_region.limit(10))
print(f"Number of rows in icb_region: {df_icb_region.count()}")

In [0]:
#5 loading list of merged providers
df_merged_providers = spark.read.option("header", "true").csv("abfss://analytics-projects@udalstdataanalysisprod.dfs.core.windows.net/ElectiveRecovery/EROC_Collection/EROC/EROC_Merged_Providers.csv")
display(df_merged_providers.limit(10))
print(f"Number of rows in merged providers: {df_merged_providers.count()}")

In [0]:
#6 creating new provider code from the provider mapping table
provider_code_mapping = df_merged_providers = spark.read.option("header", "true").csv("abfss://analytics-projects@udalstdataanalysisprod.dfs.core.windows.net/ElectiveRecovery/EROC_Collection/EROC/EROC_Merged_Providers.csv")
display(df_merged_providers.limit(10))
print(f"Number of rows in merged providers: {df_merged_providers.count()}")

In [0]:
#7 importing MHS metric list and internal ID
mhs_metric_list = spark.read.option("header", "true").csv("abfss://analytics-projects@udalstdataanalysisprod.dfs.core.windows.net/ElectiveRecovery/EROC_Collection/MHS")
display(mhs_metric_list.limit(10))
print(f"Number of rows in mhs_metric_list: {mhs_metric_list.count()}")

In [0]:
#8 Loading the core monthly snapshot data
from pyspark.sql import functions as F
df_op_activity_snapshot = spark.read.option("header", "true").option("recursiveFileLookup", "true").parquet(
    "abfss://reporting@udalstdatacuratedprod.dfs.core.windows.net/restricted/patientlevel/MESH/OPA/OPA_Core_Monthly_Snapshot/Published/1"
)
display(df_op_activity_snapshot.limit(10))

# Show number of rows in the raw data
row_count = df_op_activity_snapshot.count()
print(f"Number of rows in raw data: {row_count}")

In [0]:
**Step 9 Creating the Wide Table and Provider Mapping**

Click to expand for full details.

Builds a wide, aggregated outpatient activity table from the snapshot dataset. Treatment Function Codes are validated and grouped, and the data is filtered for relevant years, attendance types, and administrative categories. Key metrics are aggregated by month, provider, and treatment group, including totals, first/follow-up, procedures, DNAs, and 2WW appointments. The table also accounts for provider mergers, maps to ICB and Region codes, and derives month-end dates, producing a clean, regionally aligned dataset ready for analysis.

In [0]:
#9 Creating the wide table & inserting new column for merged providers with new merger codes and mapping to ICB and Region codes
from pyspark.sql.functions import when, col, lit, create_map, coalesce
import pyspark.sql.functions as F

# Define valid treatment function codes
VALID_TREATMENT_CODES = [
    '100', '101', '102', '104', '105', '106', '108', '110', '111', '115', '120', '130', '140',
    '144', '145', '301', '302', '303', '307', '320', '330', '340', '361', '400', '410', '420',
    '430', '501', '502', '560', '650'
]

# Adding in the Treatment_Function_Code_New column
opa_with_tfc = df_op_activity_snapshot.withColumn(
    "Treatment_Function_Code_New",
    when(col("Treatment_Function_Code").isin(VALID_TREATMENT_CODES), col("Treatment_Function_Code")).otherwise("Other")
)

# Add Treatment_Function_Group column using VALID_TREATMENT_CODES groupings
opa_with_groups = opa_with_tfc.withColumn(
    "Treatment_Function_Group",
    when(col("Treatment_Function_Code_New").isin("100", "102", "104", "105", "106"), "GS")
     .when(col("Treatment_Function_Code_New").isin("140", "144", "145"), "OMFS")
     .when(col("Treatment_Function_Code_New").isin("110", "111", "115"), "T&O")
     .otherwise(col("Treatment_Function_Code_New"))
)

# Filter dataset for relevant years, admin category, TFC, and attendance
opa_filtered = opa_with_groups.filter(
    (col("Der_Financial_Year").isin("2023/24", "2024/25", "2025/26")) &  # This can be updated manually
    (col("Administrative_Category") == "01") &
    (col("Treatment_Function_Code") != "812") &
    (col("First_Attendance").isin("1", "2", "3", "4"))
)

# Aggregates the metrics by month, provider, and Treatment_Function_Group
opa_agg = opa_filtered.groupBy(
    "Der_Activity_Month",
    "Der_Provider_Code",
    "Treatment_Function_Group"
).agg(
    # All contacts
    F.sum(when((col("Attendance_Status").isin("5", "6")) & (col("First_Attendance").isin("1", "2", "3", "4")), 1).otherwise(0)).alias("All_Total"),
    F.sum(when((col("Attendance_Status").isin("5", "6")) & (col("First_Attendance").isin("1", "3")), 1).otherwise(0)).alias("All_First"),
    F.sum(when((col("Attendance_Status").isin("5", "6")) & (col("First_Attendance").isin("2", "4")), 1).otherwise(0)).alias("All_FU"),
    F.sum(when((col("Der_Number_Procedure") > 0) & (col("Attendance_Status").isin("5", "6")) & (col("First_Attendance").isin("1", "2", "3", "4")), 1).otherwise(0)).alias("All_Proc"),
    F.sum(when((col("Der_Number_Procedure") == 0) & (col("Attendance_Status").isin("5", "6")) & (col("First_Attendance").isin("1", "2", "3", "4")), 1).otherwise(0)).alias("All_NoProc"),
    F.sum(when((col("Der_Number_Procedure") > 0) & (col("Attendance_Status").isin("5", "6")) & (col("First_Attendance").isin("2", "4")), 1).otherwise(0)).alias("All_FU_Proc"),
    F.sum(when((col("Der_Number_Procedure") == 0) & (col("Attendance_Status").isin("5", "6")) & (col("First_Attendance").isin("2", "4")), 1).otherwise(0)).alias("All_FU_NoProc"),
    # Face-to-face
    F.sum(when((col("Attendance_Status").isin("5", "6")) & (col("First_Attendance").isin("1", "2")), 1).otherwise(0)).alias("F2F_Total"),
    F.sum(when((col("Attendance_Status").isin("5", "6")) & (col("First_Attendance") == "1"), 1).otherwise(0)).alias("F2F_First"),
    F.sum(when((col("Attendance_Status").isin("5", "6")) & (col("First_Attendance") == "2"), 1).otherwise(0)).alias("F2F_FU"),
    # Remote
    F.sum(when((col("Attendance_Status").isin("5", "6")) & (col("First_Attendance").isin("3", "4")), 1).otherwise(0)).alias("Remote_Total"),
    F.sum(when((col("Attendance_Status").isin("5", "6")) & (col("First_Attendance") == "3"), 1).otherwise(0)).alias("Remote_First"),
    F.sum(when((col("Attendance_Status").isin("5", "6")) & (col("First_Attendance") == "4"), 1).otherwise(0)).alias("Remote_FU"),
    # Did not attends (DNAs)
    F.sum(when((col("Attendance_Status").isin("3", "7")) & (col("First_Attendance").isin("1", "2", "3", "4")), 1).otherwise(0)).alias("All_DNA"),
    F.sum(when((col("Attendance_Status").isin("3", "7")) & (col("First_Attendance").isin("1", "3")), 1).otherwise(0)).alias("All_First_DNA"),
    F.sum(when((col("Attendance_Status").isin("3", "7")) & (col("First_Attendance").isin("2", "4")), 1).otherwise(0)).alias("All_FU_DNA"),
    F.sum(when((col("Attendance_Status").isin("3", "7")) & (col("First_Attendance").isin("1", "2")), 1).otherwise(0)).alias("F2F_DNA"),
    F.sum(when((col("Attendance_Status").isin("3", "7")) & (col("First_Attendance").isin("3", "4")), 1).otherwise(0)).alias("Remote_DNA"),
    # 2WW DNA
    F.sum(when((col("Attendance_Status").isin("3", "7")) & (col("First_Attendance").isin("1", "2", "3", "4")) & (col("Priority_Type") == "3"), 1).otherwise(0)).alias("All_2WW_DNA"),
    F.sum(when((col("Attendance_Status").isin("3", "7")) & (col("First_Attendance").isin("1", "3")) & (col("Priority_Type") == "3"), 1).otherwise(0)).alias("All_First_2WW_DNA"),
    F.sum(when((col("Attendance_Status").isin("3", "7")) & (col("First_Attendance").isin("2", "4")) & (col("Priority_Type") == "3"), 1).otherwise(0)).alias("All_FU_2WW_DNA"),
    # All 2WW appointments
    F.sum(when((col("Attendance_Status").isin("5", "6", "3", "7")) & (col("First_Attendance").isin("1", "2", "3", "4")) & (col("Priority_Type") == "3"), 1).otherwise(0)).alias("All_2WW"),
    F.sum(when((col("Attendance_Status").isin("5", "6", "3", "7")) & (col("First_Attendance").isin("1", "3")) & (col("Priority_Type") == "3"), 1).otherwise(0)).alias("All_First_2WW"),
    F.sum(when((col("Attendance_Status").isin("5", "6", "3", "7")) & (col("First_Attendance").isin("2", "4")) & (col("Priority_Type") == "3"), 1).otherwise(0)).alias("All_FU_2WW")
)

# Add "All" TFC totals by month and provider
METRIC_COLS = [c for c in opa_agg.columns if c not in ["Der_Activity_Month", "Der_Provider_Code", "Treatment_Function_Group"]]

opa_all_tfc = opa_agg.groupBy("Der_Activity_Month", "Der_Provider_Code").agg(
    *[F.sum(col(c)).alias(c) for c in METRIC_COLS]
).withColumn("Treatment_Function_Group", lit("All"))

opa_final = opa_agg.unionByName(opa_all_tfc)

# Order by results
opa_final_ordered = opa_final.orderBy("Der_Activity_Month", "Der_Provider_Code", "Treatment_Function_Group")

# Inserted mapping code to build mapping_expr from df_merged_providers
provider_code_mapping_dict = {
    row['Old_Provider_Code']: row['New_Provider_Code']
    for row in df_merged_providers.select("Old_Provider_Code", "New_Provider_Code").distinct().collect()
}

mapping_list = []
for k, v in provider_code_mapping_dict.items():
    mapping_list.append(lit(k))
    mapping_list.append(lit(v))

mapping_expr = create_map(mapping_list)

# Add "Adj Org Code" column based on provider_code_mapping
opa_final_ordered_with_adj = opa_final_ordered.withColumn(
    "Adj Org Code",
    coalesce(mapping_expr.getItem(col("Der_Provider_Code")), col("Der_Provider_Code"))
)

# Add "ICB" column by joining to df_master_hierarchies on Organisation_Code and returning STP_Code
opa_final_ordered_with_icb = opa_final_ordered_with_adj.join(
    df_master_hierarchies.select(
        F.col("Organisation_Code").alias("join_org_code"),
        F.col("STP_Code").alias("ICB")
    ),
    opa_final_ordered_with_adj["Adj Org Code"] == F.col("join_org_code"),
    "left"
).drop("join_org_code")

# Add "Region" column by joining to df_icb_region on ICB column and returning Region_Code
opa_final_ordered_with_icb_region = opa_final_ordered_with_icb.join(
    df_icb_region.select(
        F.col("ICB_Code").alias("join_icb"),
        F.col("Region_Code")
    ),
    opa_final_ordered_with_icb["ICB"] == F.col("join_icb"),
    "left"
).drop("join_icb")

from pyspark.sql.functions import last_day, to_date, concat_ws

opa_final_ordered_with_icb_region = opa_final_ordered_with_icb_region.withColumn(
    "Der_Activity_Month_Date",
    last_day(
        to_date(
            concat_ws(
                '-',
                col("Der_Activity_Month").substr(1, 4),
                col("Der_Activity_Month").substr(5, 2),
                lit("01")
            )
        )
    )
)

#display(opa_final_ordered_with_icb_region.limit(10))

opa_final_ordered_with_icb_region_row_count = opa_final_ordered_with_icb_region.count()
# Drop unwanted columns, aggregate metrics, and sort the final table
id_cols = ["Der_Activity_Month_Date", "Treatment_Function_Group", "Region_Code", "ICB", "Adj Org Code"]

# Determine metric columns (exclude identifiers and the three columns to drop)
metric_cols = [
    c for c in opa_final_ordered_with_icb_region.columns
    if c not in id_cols + ["Der_Activity_Month", "Der_Provider_Code", "Treatment_Function_Code_New"]
]

opa_final_processed = (
    opa_final_ordered_with_icb_region
    .groupBy(*[F.col(c) for c in id_cols])
    .agg(*[F.sum(F.col(c)).alias(c) for c in metric_cols])
    .orderBy("Der_Activity_Month_Date", "Region_Code", "ICB", "Adj Org Code", "Treatment_Function_Group")
)

display(opa_final_processed.limit(10))
print(f"Number of rows in opa_final_processed: {opa_final_processed.count()}")

In [0]:
**For step 10 summary**

Click to expand for full details.

This step enhances the outpatient wide table with additional performance metrics while ensuring robustness against missing columns. A reusable safe_add function checks column availability before computing new ratios (e.g., DNA %, procedure %, remote %, and face-to-face rates). The script also creates duplicate columns for downstream compatibility and safely aggregates related measures into combined totals. By using conditional logic and dynamic column creation, this process guarantees consistent metric generation even when source fields are incomplete or evolve over time.

In [0]:

# 10 Safe metric calculation (robust to missing columns)
from pyspark.sql import functions as F

df = opa_final_ordered_with_icb_region

# Helper to safely add derived columns only if dependencies exist
def safe_add(df, new_col, expr_fn, required_cols):
    if all(c in df.columns for c in required_cols):
        return df.withColumn(new_col, expr_fn(df))
    else:
        return df.withColumn(new_col, F.lit(None))

# Now build metrics safely
metrics = [
    ("All_DNA_Over_All_Total", lambda d: F.when((F.col("All_Total")+F.col("All_DNA"))!=0,
                                                (F.col("All_DNA")/(F.col("All_Total")+F.col("All_DNA")))*100),
     ["All_Total", "All_DNA"]),
    ("All_First_DNA_Over_All_First", lambda d: F.when((F.col("All_First")+F.col("All_First_DNA"))!=0,
                                                      (F.col("All_First_DNA")/(F.col("All_First")+F.col("All_First_DNA")))*100),
     ["All_First", "All_First_DNA"]),
    ("All_FU_DNA_Over_All_FU", lambda d: F.when((F.col("All_FU")+F.col("All_FU_DNA"))!=0,
                                                (F.col("All_FU_DNA")/(F.col("All_FU")+F.col("All_FU_DNA")))*100),
     ["All_FU", "All_FU_DNA"]),
    ("All_FU_Over_All_Total", lambda d: F.when(F.col("All_Total")!=0, (F.col("All_FU")/F.col("All_Total"))*100),
     ["All_FU", "All_Total"]),
    ("All_First_Over_All_Total", lambda d: F.when(F.col("All_Total")!=0, (F.col("All_First")/F.col("All_Total"))*100),
     ["All_First", "All_Total"]),
    ("All_NoProc_Over_All_Total", lambda d: F.when(F.col("All_Total")!=0, (F.col("All_NoProc")/F.col("All_Total"))*100),
     ["All_NoProc", "All_Total"]),
    ("All_Proc_Over_All_Total", lambda d: F.when(F.col("All_Total")!=0, (F.col("All_Proc")/F.col("All_Total"))*100),
     ["All_Proc", "All_Total"]),
    ("Remote_Total_Over_All_Total", lambda d: F.when(F.col("All_Total")!=0, (F.col("Remote_Total")/F.col("All_Total"))*100),
     ["Remote_Total", "All_Total"]),
    ("Remote_FU_Over_All_FU", lambda d: F.when(F.col("All_FU")!=0, (F.col("Remote_FU")/F.col("All_FU"))*100),
     ["Remote_FU", "All_FU"]),
    ("Remote_First_Over_All_First", lambda d: F.when(F.col("All_First")!=0, (F.col("Remote_First")/F.col("All_First"))*100),
     ["Remote_First", "All_First"]),
    ("F2F_DNA_Over_F2F_Total", lambda d: F.when((F.col("F2F_Total")+F.col("F2F_DNA"))!=0,
                                                (F.col("F2F_DNA")/(F.col("F2F_Total")+F.col("F2F_DNA")))*100),
     ["F2F_Total", "F2F_DNA"]),
    ("Remote_DNA_Over_Remote_Total", lambda d: F.when((F.col("Remote_Total")+F.col("Remote_DNA"))!=0,
                                                      (F.col("Remote_DNA")/(F.col("Remote_Total")+F.col("Remote_DNA")))*100),
     ["Remote_Total", "Remote_DNA"]),
]

# Apply metrics dynamically
for name, expr, req in metrics:
    df = safe_add(df, name, expr, req)

# Add simple duplicates & combinations (only if columns exist)
simple_copies = [
    ("All_DNA1", "All_DNA"),
    ("All_DNA2", "All_DNA"),
    ("All_First1", "All_First"),
    ("All_First2", "All_First"),
    ("All_First3", "All_First"),
    ("All_FU1", "All_FU"),
    ("All_FU2", "All_FU"),
    ("All_FU3", "All_FU"),
    ("All_FU4", "All_FU"),
    ("All_FU5", "All_FU"),
    ("All_Total1", "All_Total"),
    ("All_Total2", "All_Total"),
    ("All_Total3", "All_Total"),
    ("All_Total4", "All_Total"),
    ("All_Total5", "All_Total"),
    ("All_Total6", "All_Total"),
    ("Remote_Total1", "Remote_Total"),
    ("Remote_Total2", "Remote_Total"),
]
for newc, base in simple_copies:
    if base in df.columns:
        df = df.withColumn(newc, F.col(base))
    else:
        df = df.withColumn(newc, F.lit(None))

# Add a few combined sums (safe)
combos = [
    ("All_First_plus_All_First_DNA", ["All_First", "All_First_DNA"]),
    ("All_FU_plus_All_FU_DNA", ["All_FU", "All_FU_DNA"]),
    ("All_Total_plus_All_DNA", ["All_Total", "All_DNA"]),
    ("F2F_Total_plus_F2F_DNA", ["F2F_Total", "F2F_DNA"]),
    ("Remote_Total_plus_Remote_DNA", ["Remote_Total", "Remote_DNA"]),
]
for newc, cols in combos:
    if all(c in df.columns for c in cols):
        df = df.withColumn(newc, F.col(cols[0]).cast("long") + F.col(cols[1]).cast("long"))
    else:
        df = df.withColumn(newc, F.lit(None))

# Final rename to match previous convention
opa_final_with_added_metrics = df

display(opa_final_with_added_metrics.limit(10))
print(f" Container 10 complete — {opa_final_with_added_metrics.count()} rows, {len(opa_final_with_added_metrics.columns)} columns")

In [0]:
**For step 10a**

Click to expand for full details.

This step introduces a benchmarking metric to support performance comparison across providers. It calculates the 25th percentile (lower quartile) of Remote_Total activity for each month and provider (Adj Org Code), representing a lower performance benchmark. The benchmark is then joined back to the main dataset and missing values are defaulted to zero. This provides a consistent reference point for analysing variation in remote activity levels across organisations and reporting periods.

In [0]:
# 10A – Add Remote Lower Benchmark
from pyspark.sql import functions as F

# Start from container 10 output
df_benchmark = opa_final_with_added_metrics

# Step 1: Calculate 25th percentile of Remote_Total by month and Adj Org Code
remote_lower = (
    df_benchmark
    .groupBy("Der_Activity_Month_Date", "Adj Org Code")
    .agg(
        F.expr("percentile_approx(Remote_Total, 0.25)").alias("Remote_Lower_Benchmark")
    )
)

# Step 2: Join benchmark back to main dataset
df_with_benchmark = df_benchmark.join(
    remote_lower,
    on=["Der_Activity_Month_Date", "Adj Org Code"],
    how="left"
)

# Step 3: For missing values, fill with 0
df_with_benchmark = df_with_benchmark.fillna({"Remote_Lower_Benchmark": 0})

# Final output
opa_final_with_remote_benchmark = df_with_benchmark

display(opa_final_with_remote_benchmark.limit(10))
print(f"Container 10A complete — {opa_final_with_remote_benchmark.count()} rows, {len(opa_final_with_remote_benchmark.columns)} columns")


In [0]:
**For step 11 summary**

Click to expand for full details.

This step reshapes the wide outpatient dataset into a long (tidy) format for easier analysis and visualization. Using PySparks explode and struct functions, all numeric metric columns are unpivoted into key value pairs (Metric_Name, Metric_Value) while retaining key identifiers like month, provider, ICB, and region. A combined metric label (Metric_Name_Treatment_Function_Group) is also generated to preserve clinical context. The final ordered table supports flexible reporting, dashboarding, and time-series comparisons across all activity metrics.

In [0]:
#11 reshapes the wide outpatient dataset into a long (tidy) format for easier analysis
from pyspark.sql.functions import col, explode, array, struct, lit, concat_ws

# ID columns to keep
id_cols = [
    "Der_Activity_Month_Date",
    "Der_Provider_Code",
    "Treatment_Function_Group",
    "Adj Org Code",
    "ICB",
    "Region_Code"
]

# Identify all metric columns
metric_cols = [c for c in opa_final_with_added_metrics.columns if c not in id_cols]

# Unpivot numeric metrics
opa_long = (
    opa_final_with_added_metrics
    .select(
        *id_cols,
        explode(array(*[
            struct(lit(c).alias("Metric_Name"), col(c).alias("Metric_Value")) for c in metric_cols
        ])).alias("kv")
    )
    .select(
        *id_cols,
        col("kv.Metric_Name"),
        col("kv.Metric_Value")
    )
)

# Create the combined metric name
opa_long = opa_long.withColumn(
    "Metric_Name_Treatment_Function_Group",
    concat_ws("_", col("Metric_Name"), col("Treatment_Function_Group"))
)

# Order by date
opa_long_ordered = opa_long.orderBy("Der_Activity_Month_Date")

display(opa_long_ordered.limit(10))
print(f"Number of rows in opa_long_ordered: {opa_long_ordered.count()}")

In [0]:
**For step 12**

Click to expand for full details.

This step consolidates the outpatient metrics into three hierarchical reporting levels: Organisation, ICB, and Region. Starting from the organisation-level dataset, core activity counts are summed across groups, and derived percentage metrics (e.g., DNA rates, procedure ratios, remote attendance rates) are recalculated for each level. The resulting dataframes are combined into a single unified dataset with a “Level” indicator and consistent organisation codes. This produces a complete, multi-level view of outpatient activity performance, ready for downstream reporting, benchmarking, and analytical use.

In [0]:
#12 – Aggregation and final metric derivation (Org, ICB, Region)
from pyspark.sql import functions as F
from pyspark.sql.functions import when, col, lit

# Start from Org-level counts from Container 9
df_org = opa_final_with_added_metrics.withColumnRenamed("Adj Org Code", "Adj_Org_Code")

# Base metric columns to sum (removed "F2F_DNA")
count_cols = [
    "All_Total","All_First","All_FU","All_Proc","All_NoProc",
    "All_FU_Proc","All_FU_NoProc",
    "F2F_Total","F2F_First","F2F_FU",
    "Remote_Total","Remote_First","Remote_FU",
    "All_DNA","All_First_DNA","All_FU_DNA",
    "Remote_DNA",
    "All_2WW","All_First_2WW","All_FU_2WW",
    "All_2WW_DNA","All_First_2WW_DNA","All_FU_2WW_DNA"
]

# Function to (re)calculate rates & derived metrics (removed F2F_DNA_Over_F2F_Total)
def add_rate_metrics(df):
    return (
        df
        .withColumn("All_DNA_Over_All_Total", F.when((F.col("All_Total")+F.col("All_DNA"))!=0,
            (F.col("All_DNA")/(F.col("All_Total")+F.col("All_DNA")))*100).otherwise(None))
        .withColumn("All_First_DNA_Over_All_First", F.when((F.col("All_First")+F.col("All_First_DNA"))!=0,
            (F.col("All_First_DNA")/(F.col("All_First")+F.col("All_First_DNA")))*100).otherwise(None))
        .withColumn("All_FU_DNA_Over_All_FU", F.when((F.col("All_FU")+F.col("All_FU_DNA"))!=0,
            (F.col("All_FU_DNA")/(F.col("All_FU")+F.col("All_FU_DNA")))*100).otherwise(None))
        .withColumn("All_2WW_DNA_Over_All_2WW", F.when(F.col("All_2WW")!=0,
            (F.col("All_2WW_DNA")/F.col("All_2WW"))*100).otherwise(None))
        .withColumn("All_First_2WW_DNA_Over_All_First_2WW", F.when(F.col("All_First_2WW")!=0,
            (F.col("All_First_2WW_DNA")/F.col("All_First_2WW"))*100).otherwise(None))
        .withColumn("All_FU_2WW_DNA_Over_All_FU_2WW", F.when(F.col("All_FU_2WW")!=0,
            (F.col("All_FU_2WW_DNA")/F.col("All_FU_2WW"))*100).otherwise(None))
        .withColumn("All_FU_Over_All_Total", F.when(F.col("All_Total")!=0,
            (F.col("All_FU")/F.col("All_Total"))*100).otherwise(None))
        .withColumn("All_First_Over_All_Total", F.when(F.col("All_Total")!=0,
            (F.col("All_First")/F.col("All_Total"))*100).otherwise(None))
        .withColumn("All_NoProc_Over_All_Total", F.when(F.col("All_Total")!=0,
            (F.col("All_NoProc")/F.col("All_Total"))*100).otherwise(None))
        .withColumn("All_Proc_Over_All_Total", F.when(F.col("All_Total")!=0,
            (F.col("All_Proc")/F.col("All_Total"))*100).otherwise(None))
        .withColumn("Remote_Total_Over_All_Total", F.when(F.col("All_Total")!=0,
            (F.col("Remote_Total")/F.col("All_Total"))*100).otherwise(None))
        .withColumn("Remote_FU_Over_All_FU", F.when(F.col("All_FU")!=0,
            (F.col("Remote_FU")/F.col("All_FU"))*100).otherwise(None))
        .withColumn("Remote_First_Over_All_First", F.when(F.col("All_First")!=0,
            (F.col("Remote_First")/F.col("All_First"))*100).otherwise(None))
        .withColumn("Remote_DNA_Over_Remote_Total", F.when((F.col("Remote_Total")+F.col("Remote_DNA"))!=0,
            (F.col("Remote_DNA")/(F.col("Remote_Total")+F.col("Remote_DNA")))*100).otherwise(None))
    )

# Aggregate to ICB
df_icb = (
    df_org.groupBy("Der_Activity_Month_Date", "ICB", "Treatment_Function_Group")
    .agg(*[F.sum(F.col(c)).alias(c) for c in count_cols])
)
df_icb = add_rate_metrics(df_icb).withColumn("Level", F.lit("ICB"))

# Aggregate to Region
df_region = (
    df_org.groupBy("Der_Activity_Month_Date", "Region_Code", "Treatment_Function_Group")
    .agg(*[F.sum(F.col(c)).alias(c) for c in count_cols])
)
df_region = add_rate_metrics(df_region).withColumn("Level", F.lit("Region"))

# Label Org-level rows
df_org = df_org.withColumn("Level", F.lit("Org"))

# Combine all levels into one dataset
final_output = (
    df_org.unionByName(df_icb, allowMissingColumns=True)
           .unionByName(df_region, allowMissingColumns=True)
)

# Adjust codes based on Level
final_output = final_output.withColumn(
    "Adj_Org_Code_Final",
    when(col("Level") == "Org", col("Adj_Org_Code"))
    .when(col("Level") == "ICB", col("ICB"))
    .when(col("Level") == "Region", col("Region_Code"))
)

# Save and display
final_output.write.mode("overwrite").parquet("/mnt/output/opa_final_all_levels")

display(final_output.limit(10))
print(f"Rows in final output: {final_output.count()}")

In [0]:
**Step 13**

Click to expand for full details.

This step reshapes the multi-level outpatient dataset into a long (skinny) OPRT format, suitable for reporting or operational analytics. Using PySparks explode and struct, all numeric metrics are unpivoted into key value pairs (OPRT_Metric_Name, Metric_Value), while retaining identifiers such as month, region, ICB, organisation, and treatment function group. The resulting table provides a tidy, standardised structure for downstream processing, enabling easy filtering, aggregation, and integration with reporting tools or performance dashboards.

In [0]:
#13 – Convert to long/skinny OPRT format
from pyspark.sql.functions import col, lit, explode, array, struct

# Use the final_output table from container 12
df_wide = final_output

# Define identifier columns
id_cols = [
    "Der_Activity_Month_Date",
    "Region_Code",
    "ICB",
    "Adj_Org_Code_Final",
    "Treatment_Function_Group"
]

# Identify metric columns (exclude identifiers and helper columns)
metric_cols = [c for c in df_wide.columns if c not in id_cols + ["Level", "Adj_Org_Code"]]

# Melt into long format
opa_oprt_long = (
    df_wide.select(
        *id_cols,
        explode(array(*[
            struct(lit(c).alias("OPRT_Metric_Name"), col(c).alias("Metric_Value"))
            for c in metric_cols
        ])).alias("kv")
    )
    .select(
        *id_cols,
        col("kv.OPRT_Metric_Name"),
        col("kv.Metric_Value")
    )
)

# Rename column for clarity
opa_oprt_long = opa_oprt_long.withColumnRenamed("Treatment_Function_Group", "Treatment_Function_Group")

display(opa_oprt_long.limit(10))
print(f"Container 13 complete — {opa_oprt_long.count()} rows, {len(opa_oprt_long.columns)} columns")


In [0]:
**Step 14**

Click to expand for full details.

This final step enhances the long-format outpatient dataset by creating a combined metric identifier (OPRT_Metric_Name_TFC) that merges each metric with its Treatment Function Group. The dataset is then joined with mhs_metric_list to assign a corresponding InternalID for consistent internal tracking and reporting. The resulting table is written to Parquet, providing a fully standardised, identifier-enriched dataset ready for operational reporting, benchmarking, and downstream analytics.

In [0]:
#14 – Add OPRT Metric Name_TFC and join to mhs_metric_list for InternalID
from pyspark.sql import functions as F
from pyspark.sql.functions import concat_ws, col

# Start from container 13 output
df_metrics = opa_oprt_long

# Step 1: Create combined OPRT metric name and TFC
df_metrics = df_metrics.withColumn(
    "OPRT_Metric_Name_TFC",
    concat_ws("_", col("OPRT_Metric_Name"), col("Treatment_Function_Group"))
)

# Step 2: Join with mhs_metric_list on combined name only
df_with_id = df_metrics.join(
    mhs_metric_list.select(
        F.col("OPRT Metric Name _TFC").alias("join_metric"),
        F.col("InternalID")
    ),
    df_metrics["OPRT_Metric_Name_TFC"] == F.col("join_metric"),
    "left"
).drop("join_metric")

# Step 3: Write and show final table
df_with_id.write.mode("overwrite").parquet("/mnt/output/opa_oprt_final")

display(df_with_id.limit(10))
print(f"Container 14 complete — {df_with_id.count()} rows, {len(df_with_id.columns)} columns")


In [0]:
#15 Saving the file to the lake mart for QA
df_with_id.coalesce(1).write \
    .format("csv") \
    .mode("overwrite") \
    .option("header", "true") \
    .save("abfss://analytics-projects@udalstdataanalysisprod.dfs.core.windows.net/ElectiveRecovery/Projects/OP_QA_STP_Region_new_metrics_v9.csv")

In [0]:
#16 Saving the file to the lake mart for QA (filtered for a small sample)
df_with_id.filter(
    (F.col("Der_Activity_Month_Date") == "2025-07-31") & 
    (F.col("Adj_Org_Code_Final") == "RH8")
).coalesce(1).write \
    .format("csv") \
    .mode("overwrite") \
    .option("header", "true") \
    .save("abfss://analytics-projects@udalstdataanalysisprod.dfs.core.windows.net/ElectiveRecovery/Projects/OP_QA_STP_Region_new_metrics_short_vi.csv")