In [0]:
#Connecting to GP table

spark.conf.set(
    "fs.azure.account.key.udalstdatacuratedprod.dfs.core.windows.net",
    "<correct-account-key>"
)

parquet_path = "abfss://reporting@udalstdatacuratedprod.dfs.core.windows.net/unrestricted/reference/UKHD/ODS/GP_Hierarchies_All/"

df_gp = spark.read.parquet(parquet_path)

display(df_gp)

In [0]:
#Shaping GP view

from pyspark.sql.functions import lit

df_practice = df_gp.filter(df_gp.Rel_Active == 'True') \
    .selectExpr(
        "GP_Code as Org_Code",
        "GP_Name as Org_Name",
        "GP_PCN_Code as PCN_Code",
        "GP_PCN_Name as PCN_Name",
        "PCN_STP_Code as PCN_STP_Code",
        "Rel_Active",
        "'Practice' as Mapping_type"
    )

df_pcn = df_gp.filter(df_gp.Rel_Active == 'True') \
    .select(
        "GP_PCN_Code", "GP_PCN_Name", "PCN_STP_Code", "Rel_Active"
    ).dropDuplicates() \
    .selectExpr(
        "GP_PCN_Code as Org_Code",
        "GP_PCN_Name as Org_Name",
        "GP_PCN_Code as PCN_Code",
        "GP_PCN_Name as PCN_Name",
        "PCN_STP_Code as PCN_STP_Code",
        "Rel_Active",
        "'PCN' as Mapping_type"
    )

df_gp_mapped = df_practice.unionByName(df_pcn)

df_gp_mapped.write.mode("overwrite").option("header", True).csv(
    "abfss://analytics-projects@udalstdataanalysisprod.dfs.core.windows.net/ElectiveRecovery/EROC_Collection_Queries/GP_mapped.csv"
)

In [0]:
#Connecting to TFC tables

spark.conf.set(
    "fs.azure.account.key.udalstdatacuratedprod.dfs.core.windows.net",
    "<actual-account-key>"
)

parquet_path = "abfss://unrestricted@udalstdatacuratedprod.dfs.core.windows.net/reference/Internal/Reference/eRS_TFC_Mappings/Published/1/Internal_Reference_eRS_TFC_Mappings_00000.parquet"

df_tfc = spark.read.parquet(parquet_path)

display(df_tfc)

In [0]:
#Saving the TFC view

df_tfc.write.mode("overwrite").option("header", True).csv(
    "abfss://analytics-projects@udalstdataanalysisprod.dfs.core.windows.net/ElectiveRecovery/EROC_Collection_Queries/TFC_mapped.csv"
)

In [0]:
#Connecting to Commissioner Hierarchies

spark.conf.set(
    "fs.azure.account.key.udalstdatacuratedprod.dfs.core.windows.net",
    "<actual-account-key>"
)

parquet_path = "abfss://reporting@udalstdatacuratedprod.dfs.core.windows.net/unrestricted/reference/UKHD/ODS/Commissioner_Hierarchies_ICB/UKHD_ODS_Commissioner_Hierarchies_ICB_00000.parquet"

df_icb = spark.read.parquet(parquet_path)

display(df_icb)

In [0]:
#Connecting to Provider Hierarchies

spark.conf.set(
    "fs.azure.account.key.udalstdatacuratedprod.dfs.core.windows.net",
    "<actual-account-key>"
)

parquet_path = "abfss://reporting@udalstdatacuratedprod.dfs.core.windows.net/unrestricted/reference/UKHD/ODS/Provider_Hierarchies/UKHD_ODS_Provider_Hierarchies_00000.parquet"
df_provider = spark.read.parquet(parquet_path)

display(df_provider)

In [0]:
#Connecting to GPs Hierarchies

spark.conf.set(
    "fs.azure.account.key.udalstdatacuratedprod.dfs.core.windows.net",
    "<actual-account-key>"
)

parquet_path = "abfss://reporting@udalstdatacuratedprod.dfs.core.windows.net/unrestricted/reference/UKHD/ODS/GP_Hierarchies_All/UKHD_ODS_GP_Hierarchies_All_00000.parquet"
df_gps = spark.read.parquet(parquet_path)

display(df_gps)

In [0]:
#Connecting to Optical Hierarchies

spark.conf.set(
    "fs.azure.account.key.udalstdatacuratedprod.dfs.core.windows.net",
    "<actual-account-key>"
)

parquet_path = "abfss://unrestricted@udalstdatacuratedprod.dfs.core.windows.net/reference/UKHD/ODS/Optical_Sites_SCD/Published/1/UKHD_ODS_Optical_Sites_SCD_00000.parquet"
df_optical = spark.read.parquet(parquet_path)

display(df_optical)

In [0]:
#Connecting to Dental Hierarchies

spark.conf.set(
    "fs.azure.account.key.udalstdatacuratedprod.dfs.core.windows.net",
    "<actual-account-key>"
)

parquet_path = "abfss://unrestricted@udalstdatacuratedprod.dfs.core.windows.net/reference/UKHD/ODS/General_Dental_Practices_SCD/Published/1/UKHD_ODS_General_Dental_Practices_SCD_00000.parquet"
df_dental = spark.read.parquet(parquet_path)

display(df_dental)

In [0]:
#Connecting to API

spark.conf.set(
    "fs.azure.account.key.udalstdatacuratedprod.dfs.core.windows.net",
    "<actual-account-key>"
)

parquet_path = "abfss://unrestricted@udalstdatacuratedprod.dfs.core.windows.net/reference/UKHD/ODS_API/vwOrganisation_SCD_IsLatestEqualsOneWithRole/Published/1/UKHD_ODS_API_vwOrganisation_SCD_IsLatestEqualsOneWithRole_00000.parquet"
df_api = spark.read.parquet(parquet_path)

display(df_api)





In [0]:
from pyspark.sql.functions import col, lit, coalesce, row_number, to_date
from pyspark.sql.window import Window
from pyspark.sql.types import DateType, StructType, StructField, StringType

# ------------------------------------------------------------
# 0) Roles DataFrame now comes from df_api (already loaded above)
# ------------------------------------------------------------
df_roles = df_api

# Optional: keep only the columns we need
if "ODS_Code" in df_roles.columns and "Role_ID" in df_roles.columns:
    df_roles = df_roles.select("ODS_Code", "Role_ID")

# ------------------------------------------------------------
# Helper to enforce date columns
# ------------------------------------------------------------
def ensure_date_cols(df):
    return (
        df.withColumn("Effective_From", col("Effective_From").cast(DateType()))
          .withColumn("Effective_To",   col("Effective_To").cast(DateType()))
    )

# ------------------------------------------------------------
# 1) Commissioner Hierarchies
# ------------------------------------------------------------
df_icb_sel = df_icb.select(
    "Organisation_Code",
    "Organisation_Name",
    "ICB_Code",
    "Integrated_Care_Board_Name",
    "ODS_Organisation_Type",
    "Effective_From",
    "Effective_To"
).withColumn("Source", lit("Commissioner_Hierarchies"))

# ------------------------------------------------------------
# 2) Provider Hierarchies
# ------------------------------------------------------------
df_provider_sel = df_provider.select(
    col("Organisation_Code"),
    col("Organisation_Name"),
    col("STP_Code").alias("ICB_Code"),
    col("STP_Name").alias("Integrated_Care_Board_Name"),
    col("ODS_Organisation_Type"),
    col("Effective_From"),
    col("Effective_To")
).withColumn("Source", lit("Provider_Hierarchies"))

# ------------------------------------------------------------
# 3) GP Practice
# ------------------------------------------------------------
df_gps_practice = df_gps.select(
    col("GP_Code").alias("Organisation_Code"),
    col("GP_Name").alias("Organisation_Name"),
    col("GP_STP_Code").alias("ICB_Code"),
    col("GP_STP_Name").alias("Integrated_Care_Board_Name"),
    lit("GP Practice").alias("ODS_Organisation_Type"),
    col("GP_PCN_Rel_Start_Date").alias("Effective_From"),
    col("GP_PCN_Rel_End_Date").alias("Effective_To")
).withColumn("Source", lit("GP_Hierarchies_All_1_GP"))

# ------------------------------------------------------------
# 4) GP PCN
# ------------------------------------------------------------
df_gps_pcn = df_gps.select(
    col("GP_PCN_Code").alias("Organisation_Code"),
    col("GP_PCN_Name").alias("Organisation_Name"),
    col("PCN_STP_Code").alias("ICB_Code"),
    col("PCN_STP_Name").alias("Integrated_Care_Board_Name"),
    lit("PCN").alias("ODS_Organisation_Type"),
    col("GP_PCN_Rel_Start_Date").alias("Effective_From"),
    col("GP_PCN_Rel_End_Date").alias("Effective_To")
).withColumn("Source", lit("GP_Hierarchies_All_1_PCN"))

# ------------------------------------------------------------
# 5) Roles view filter (Role_ID = 'RO261')
# ------------------------------------------------------------
df_roles_ro261 = df_roles.filter(col("Role_ID") == "RO261").select("ODS_Code")

# ------------------------------------------------------------
# 6) Optical Sites
# ------------------------------------------------------------
df_optical_sel = (
    df_optical.alias("A")
    .join(
        df_roles_ro261.alias("B"),
        col("A.High_Level_Health_Authority_Code") == col("B.ODS_Code"),
        "inner"  # match as per WHERE B.Role_ID = 'RO261'
    )
    .where(col("A.Is_Latest") == 1)
    .select(
        col("A.Organisation_Code"),
        col("A.Organisation_Name"),
        col("A.High_Level_Health_Authority_Code").alias("ICB_Code"),
        lit(None).cast("string").alias("Integrated_Care_Board_Name"),
        lit("Optical Site").alias("ODS_Organisation_Type"),
        col("A.Join_Parent_Date").alias("Effective_From"),
        col("A.Left_Parent_Date").alias("Effective_To")
    )
    .withColumn("Source", lit("Optical_Sites_SCD_1"))
)

# ------------------------------------------------------------
# 7) Dental Practices
# ------------------------------------------------------------
df_dental_sel = (
    df_dental.alias("A")
    .join(
        df_roles_ro261.alias("B"),
        col("A.High_Level_Health_Authority_Code") == col("B.ODS_Code"),
        "inner"  # match as per WHERE B.Role_ID = 'RO261'
    )
    .where(col("A.Is_Latest") == 1)
    .select(
        col("A.Organisation_Code"),
        col("A.Organisation_Name"),
        col("A.High_Level_Health_Authority_Code").alias("ICB_Code"),
        lit(None).cast("string").alias("Integrated_Care_Board_Name"),
        lit("Dental Practice").alias("ODS_Organisation_Type"),
        col("A.Join_Parent_Date").alias("Effective_From"),
        col("A.Left_Parent_Date").alias("Effective_To")
    )
    .withColumn("Source", lit("General_Dental_Practices_SCD_1"))
)

# ------------------------------------------------------------
# 8) Manual entries
# ------------------------------------------------------------
manual_entries = [
    ("T040", "COMMUNITY HEALTH AND EYECARE LTD", "UNK", "UNKNOWN", "Optical Site", None, None, "Manual"),
    ("06P", "NHS LUTON CCG", "QHG", "NHS BEDFORDSHIRE, LUTON AND MILTON KEYNES INTEGRATED CARE BOARD", "CLINICAL COMMISSIONING GROUP", None, None, "Manual"),
    ("R0E1C", "REFERRAL SUPPORT SERVICE", "QWU", "NHS COVENTRY AND WARWICKSHIRE INTEGRATED CARE BOARD", "CLINICAL COMMISSIONING GROUP", None, None, "Manual"),
    ("M85046003", "SUTTON PARK SURGERY", "QHL", "NHS BIRMINGHAM AND SOLIHULL INTEGRATED CARE BOARD", "BRANCH SURGERY", None, None, "Manual"),
    ("Y05286001", "LISTER HOUSE CHELLASTON", "QJ2", "NHS DERBY AND DERBYSHIRE INTEGRATED CARE BOARD", "BRANCH SURGERY", None, None, "Manual"),
    ("M89017001", "SHIRLEY MEDICAL CENTRE", "QHL", "NHS BIRMINGHAM AND SOLIHULL INTEGRATED CARE BOARD", "BRANCH SURGERY", None, None, "Manual"),
    ("B6S7Y", "MEDICUS HEALTH PARTNERS ALMA HEALTHCARE CENTRE", "QMJ", "NHS NORTH CENTRAL LONDON INTEGRATED CARE BOARD", "CLINICAL COMMISSIONING GROUP", None, None, "Manual"),
]

manual_schema = StructType([
    StructField("Organisation_Code", StringType(), True),
    StructField("Organisation_Name", StringType(), True),
    StructField("ICB_Code", StringType(), True),
    StructField("Integrated_Care_Board_Name", StringType(), True),
    StructField("ODS_Organisation_Type", StringType(), True),
    StructField("Effective_From", DateType(), True),
    StructField("Effective_To", DateType(), True),
    StructField("Source", StringType(), True),
])

df_manual = spark.createDataFrame(manual_entries, manual_schema)

# ------------------------------------------------------------
# 9) Enforce date columns
# ------------------------------------------------------------
df_icb_sel      = ensure_date_cols(df_icb_sel)
df_provider_sel = ensure_date_cols(df_provider_sel)
df_gps_practice = ensure_date_cols(df_gps_practice)
df_gps_pcn      = ensure_date_cols(df_gps_pcn)
df_optical_sel  = ensure_date_cols(df_optical_sel)
df_dental_sel   = ensure_date_cols(df_dental_sel)
df_manual       = ensure_date_cols(df_manual)

# ------------------------------------------------------------
# 10) Union all datasets
# ------------------------------------------------------------
df_all = (
    df_icb_sel
    .unionByName(df_provider_sel)
    .unionByName(df_gps_practice)
    .unionByName(df_gps_pcn)
    .unionByName(df_optical_sel)
    .unionByName(df_dental_sel)
    .unionByName(df_manual)
)

# ------------------------------------------------------------
# 11) Latest record per Organisation_Code
# ------------------------------------------------------------
window_spec = Window.partitionBy("Organisation_Code").orderBy(
    coalesce(col("Effective_To"), to_date(lit("2999-12-31"))).desc()
)
df_latest = df_all.withColumn("LatestRowNumber", row_number().over(window_spec))

# ------------------------------------------------------------
# 12) Final selection
# ------------------------------------------------------------
master_hierarchies_table = (
    df_latest
    .filter(col("LatestRowNumber") == 1)
    .select(
        col("Organisation_Code"),
        col("Organisation_Name"),
        col("ICB_Code").alias("STP_Code"),
        col("Integrated_Care_Board_Name").alias("STP_Name"),
        col("ODS_Organisation_Type"),
        col("Effective_From"),
        col("Effective_To"),
        col("Source")
    )
)

# ------------------------------------------------------------
# 13) Display
# ------------------------------------------------------------
display(master_hierarchies_table)

#Row count of data set
display(master_hierarchies_table.count())

In [0]:
# 14) Row count by Source

print("Row counts by Source:")
display(master_hierarchies_table.groupBy("Source").count().orderBy(col("count").desc()))

# 15) Duplicate check by Organisation_Code (show full rows)

dupe_codes = (
    master_hierarchies_table
    .groupBy("Organisation_Code")
    .count()
    .filter(col("count") > 1)
    .select("Organisation_Code")
)

if dupe_codes.count() > 0:
    print("⚠️ Duplicate Organisation_Codes found — showing all matching rows:")
    dupes_full = master_hierarchies_table.join(dupe_codes, on="Organisation_Code", how="inner")
    display(dupes_full.orderBy("Organisation_Code", "Source"))
else:
    print("✅ No duplicate Organisation_Codes found.")

In [0]:
master_hierarchies_table.coalesce(1).write.mode("overwrite").option("header", True).csv(
    "abfss://analytics-projects@udalstdataanalysisprod.dfs.core.windows.net/ElectiveRecovery/EROC_Collection_Queries/master_hierarchies_table.csv"
)