In [0]:
#Import the tools
#from env import env, not used for this notebook
from src import utils, excel

import openpyxl
import pandas as pd

from pyspark.sql import functions as F
from datetime import datetime
from openpyxl.styles import NamedStyle


In [0]:
# Load PIFU data
from pyspark.sql import functions as F

# Load data
df_pifu_qa = spark.read.format('delta').load(
    'abfss://analytics-projects@udalstdataanalysisprod.dfs.core.windows.net/ElectiveRecovery/Projects/PIFU_Validation'
)


df_pifu_qa.display()

In [0]:
from pyspark.sql import functions as F

# Creating the monthly & Org sheet data frame
df_moved_discharged = (
    df_pifu_qa
    .where(F.col("EROC_DerMetric").isin("PIFUSRTMV01", "PIFUSRTDC01"))
    .where(F.col("EROC_DerMonth") > '2023-03-01')
    .where(F.col("EROC_Latest_Flag") == 1)  # confirmed correct
    .groupby(
        "EROC_DerMonth",
        "EROC_DerProviderCode",
        "EROC_DerTFC",
        "RegionName",
        "EROC_DerMetric"
    )
    .agg(F.sum("EROC_Value").alias("Value"))
    .orderBy(
        "EROC_DerMonth",
        "EROC_DerProviderCode"
    )
)

df_moved_discharged.display()


In [0]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")#create the pivoted data frame

df_pivoted = (
    df_moved_discharged
    .groupBy("EROC_DerMonth", "RegionName", "EROC_DerProviderCode")
    .pivot("EROC_DerMetric")  # creates columns for each metric
    .agg(F.first("Value"))    # or F.sum if there are multiple rows
    .na.fill(0)  # replace nulls with 0
    .orderBy("EROC_DerMonth", "RegionName", "EROC_DerProviderCode")
)

df_pivoted.display()


In [0]:
dbutils.widgets.removeAll()

In [0]:
# Creating the chart 
from pyspark.sql.functions import to_date, col
from datetime import datetime
import plotly.express as px

# Step 0: Optional cleanup (reset widgets for proper order — safe in dashboards)
dbutils.widgets.removeAll()

# Step 1: Convert date string to proper date type
df_moved_discharged = df_moved_discharged.withColumn(
    "EROC_DerMonth", to_date(col("EROC_DerMonth"), "yyyy-MM-dd")
)

# Step 2: Register the DataFrame as a temp SQL view
df_moved_discharged.createOrReplaceTempView("moved_discharged")

# Step 3: Get base DataFrame with valid rows
df_line_chart = spark.sql("""
    SELECT
        EROC_DerMonth,
        EROC_DerProviderCode,
        EROC_DerMetric,
        Value
    FROM
        moved_discharged
    WHERE
        EROC_DerMonth IS NOT NULL
        AND EROC_DerProviderCode IS NOT NULL
        AND EROC_DerMetric IS NOT NULL
""")

# Step 4: Fetch distinct values and date range
valid_providers = sorted(set(str(p[0]) for p in df_line_chart.select("EROC_DerProviderCode").distinct().collect()))
valid_metrics = sorted(set(str(m[0]) for m in df_line_chart.select("EROC_DerMetric").distinct().collect()))
min_date, max_date = df_line_chart.selectExpr("min(EROC_DerMonth)", "max(EROC_DerMonth)").first()


In [0]:
#Step 5 # 1. Provider dropdown
valid_providers = sorted(set(str(p[0]) for p in df_line_chart.select("EROC_DerProviderCode").distinct().collect()))
dbutils.widgets.dropdown("1_Provider", "All", ["All"] + valid_providers)

# 2. Metric dropdown
valid_metrics = sorted(set(str(m[0]) for m in df_line_chart.select("EROC_DerMetric").distinct().collect()))
dbutils.widgets.dropdown("2_Metric", "All", ["All"] + valid_metrics)

# 3. Start date text box
min_date = df_line_chart.selectExpr("min(EROC_DerMonth)").first()[0]
dbutils.widgets.text("3_StartDate", min_date.strftime("%Y-%m-%d") if min_date else "")

# 4. End date text box
max_date = df_line_chart.selectExpr("max(EROC_DerMonth)").first()[0]
dbutils.widgets.text("4_EndDate", max_date.strftime("%Y-%m-%d") if max_date else "")

# 5. Region dropdown
#if "RegionName" in df_line_chart.columns:
   # valid_regions = sorted(set(str(r[0]) for r in df_line_chart.select("RegionName").distinct().collect() if r[0] is not None))
    #dbutils.widgets.dropdown("5_Region", "All", ["All"] + valid_regions)

In [0]:
# Step 6: Get widget values
selected_provider = dbutils.widgets.get("1_Provider")
selected_metric = dbutils.widgets.get("2_Metric")
start_date_str = dbutils.widgets.get("3_StartDate")
end_date_str = dbutils.widgets.get("4_EndDate")
#end_date_str = dbutils.widgets.get("5_Region")

# Step 7: Validate and parse dates
def safe_parse(date_str, fallback):
    try:
        return datetime.strptime(date_str, "%Y-%m-%d").date()
    except:
        return fallback

start_date = safe_parse(start_date_str, min_date)
end_date = safe_parse(end_date_str, max_date)

# Step 8: Apply filtering
df_filtered = df_line_chart.filter((col("EROC_DerMonth") >= start_date) & (col("EROC_DerMonth") <= end_date))

if selected_provider != "All" and selected_provider in valid_providers:
    df_filtered = df_filtered.filter(col("EROC_DerProviderCode") == selected_provider)
if selected_metric != "All" and selected_metric in valid_metrics:
    df_filtered = df_filtered.filter(col("EROC_DerMetric") == selected_metric)

# Check if df_filtered is empty
if df_filtered.count() == 0:
    raise ValueError("No data available for the selected filters. Please adjust your filter criteria.")

# Step 9: Pivot and display
df_pivot = df_filtered.groupBy("EROC_DerMonth").pivot("EROC_DerMetric").sum("Value").orderBy("EROC_DerMonth")
display(df_pivot)

# Step 10: Optional Plotly line chart for selected metric
if selected_metric != "All":
    pdf = df_filtered.select("EROC_DerMonth", "Value").orderBy("EROC_DerMonth").toPandas()
    fig1 = px.line(
        pdf,
        x="EROC_DerMonth",
        y="Value",
        title=f"{selected_metric} Trend for {selected_provider}" if selected_provider != "All" else f"{selected_metric} Trend (All Providers)"
    )
   

Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql import functions as F

# Get filter values from widgets (reuse logic from previous cells)
selected_provider = dbutils.widgets.get("1_Provider")
selected_metric = dbutils.widgets.get("2_Metric")
start_date_str = dbutils.widgets.get("3_StartDate")
end_date_str = dbutils.widgets.get("4_EndDate")
#try:    selected_region = dbutils.widgets.get("5_Region")
#except:
    #selected_region = "All"


def safe_parse(date_str, fallback):
    try:
        return datetime.strptime(date_str, "%Y-%m-%d").date()
    except:
        return fallback

# Use min/max dates from df_pifu_qa for fallback
min_date = df_pifu_qa.selectExpr("min(EROC_DerMonth)").first()[0]
max_date = df_pifu_qa.selectExpr("max(EROC_DerMonth)").first()[0]
start_date = safe_parse(start_date_str, min_date)
end_date = safe_parse(end_date_str, max_date)

# Apply filters
df_PIFUTOTAL01 = (
    df_pifu_qa
    .where(F.col("EROC_DerMetric") == "PIFUTOTAL01")
    .where(F.col("EROC_Latest_Flag") == 1)
    .where(F.col("EROC_DerMonth") >= start_date)
    .where(F.col("EROC_DerMonth") <= end_date)
)

if selected_provider != "All":
    df_PIFUTOTAL01 = df_PIFUTOTAL01.where(F.col("EROC_DerProviderCode") == selected_provider)
#if selected_region != "All":
    #df_PIFUTOTAL01 = df_PIFUTOTAL01.where(F.col("RegionName") == selected_region)

df_PIFUTOTAL01 = (
    df_PIFUTOTAL01
    .groupby(
        "EROC_DerMonth",
        "EROC_DerProviderCode",
        #"RegionName",
        "EROC_DerMetric"
    )
    .agg(F.sum("EROC_Value").alias("Value"))
    .orderBy(
        "EROC_DerMonth",
        #"RegionName",
        "EROC_DerProviderCode"
    )
)

display(df_PIFUTOTAL01)

Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql import functions as F

# Get end_date_str from widget and parse it
end_date_str = dbutils.widgets.get("4_EndDate")
def safe_parse(date_str, fallback):
    try:
        return datetime.strptime(date_str, "%Y-%m-%d").date()
    except:
        return fallback

min_date = df_pifu_qa.selectExpr("min(EROC_DerMonth)").first()[0]
max_date = df_pifu_qa.selectExpr("max(EROC_DerMonth)").first()[0]
end_date = safe_parse(end_date_str, max_date)

# Check Submission_type values for debugging
# display(df_pifu_qa.select("Submission_type").distinct())

# Creating the monthly blank or null returns data frame for the selected end date only
df_blank_null = (
    df_pifu_qa
    .where(F.col("EROC_DerMetric").isin(
        "PIFUSRTMV01", "PIFUSRTDC01", "PIFUTOTAL01", "PIFUCOMP001", "PIFUBOOK001", "PIFUDNA0001"
    ))
    .where(F.col("EROC_DerMonth") == end_date)
    .where(F.col("EROC_Latest_Flag") == 1)
    # Fix: Use correct syntax for filtering Submission_type
    .where(~F.col("Submission_type").rlike("^01\| Complete return:|^02\| Partial return:"))
    .groupBy(
        "EROC_DerMonth",
        "EROC_DerProviderCode",
        "Submission_type",  
        "RegionName",
        #"EROC_DerMetric"
    )
    .agg(F.sum("EROC_Value").alias("Value"))
    .orderBy(
        "EROC_DerMonth",
        "RegionName",
        "EROC_DerProviderCode"
    )
)

display(df_blank_null)

In [0]:
from pyspark.sql import functions as F
from datetime import datetime

selected_provider = dbutils.widgets.get("1_Provider")
selected_metric = dbutils.widgets.get("2_Metric")
start_date_str = dbutils.widgets.get("3_StartDate")
end_date_str = dbutils.widgets.get("4_EndDate")

def safe_parse(date_str, fallback):
    try:
        return datetime.strptime(date_str, "%Y-%m-%d").date()
    except:
        return fallback

min_date = df_pifu_qa.selectExpr("min(EROC_DerMonth)").first()[0]
max_date = df_pifu_qa.selectExpr("max(EROC_DerMonth)").first()[0]
start_date = safe_parse(start_date_str, min_date)
end_date = safe_parse(end_date_str, max_date)

def get_filtered_df(latest_flag):
    df = (
        df_pifu_qa
        .filter(F.col("EROC_Latest_Flag") == latest_flag)
        .filter((F.col("EROC_DerMonth") >= F.lit(start_date)) & (F.col("EROC_DerMonth") <= F.lit(end_date)))
    )

    if selected_provider != "All":
        df = df.filter(F.col("EROC_DerProviderCode") == selected_provider)

    if selected_metric != "All":
        df = df.filter(F.col("EROC_DerMetric") == selected_metric)

    df_grouped = (
        df.groupBy("EROC_DerMonth", "EROC_DerMetric")
          .agg(F.sum("EROC_Value").alias("Value"))
          .orderBy("EROC_DerMonth", "EROC_DerMetric")
    )
    
    return df_grouped

df_latest_1 = get_filtered_df(1)
df_latest_0 = get_filtered_df(0)

display(df_latest_1)  # Stacked bar: x = EROC_DerMonth, series = EROC_DerMetric, y = Value
display(df_latest_0)  # Stacked bar: x = EROC_DerMonth, series = EROC_DerMetric, y = Value

In [0]:
# The combined chart

selected_provider = dbutils.widgets.get("1_Provider")
selected_metric = dbutils.widgets.get("2_Metric")
start_date_str = dbutils.widgets.get("3_StartDate")
end_date_str = dbutils.widgets.get("4_EndDate")

df_latest_1 = get_filtered_df(1).withColumn("Latest_Flag", F.lit(1))
df_latest_0 = get_filtered_df(0).withColumn("Latest_Flag", F.lit(0))

df_combined = df_latest_1.union(df_latest_0)

display(df_combined)


Databricks visualization. Run in Databricks to view.

In [0]:
# Splitting the chart

selected_provider = dbutils.widgets.get("1_Provider")
selected_metric = dbutils.widgets.get("2_Metric")
start_date_str = dbutils.widgets.get("3_StartDate")
end_date_str = dbutils.widgets.get("4_EndDate")

df_latest_1 = get_filtered_df(1).withColumn("Latest_Flag", F.lit(1))
df_latest_0 = get_filtered_df(0).withColumn("Latest_Flag", F.lit(0))

display(df_latest_0)  # Stacked bar: x = EROC_DerMonth, series = EROC_DerMetric, y = Value, filter Latest_Flag = 0
display(df_latest_1)  # Stacked bar: x = EROC_DerMonth, series = EROC_DerMetric, y = Value, filter Latest_Flag = 1

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.