# TVS Curated Transform Pipeline

This notebook reads from `lh-tvs-operational` (raw Dataverse data), applies transformations,
and writes curated analytics-ready tables to `lh-tvs-curated`.

**Transformations:**
- SCD Type 2 tracking for Accounts dimension
- Deduplication and type casting
- Time entry aggregation by account and month
- Account health scoring

**Schedule:** Daily at 4:00 AM UTC (after Paylocity sync at 2:00 AM)

In [None]:
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import (
    col, lit, current_timestamp, sha2, concat_ws, when, coalesce,
    row_number, lead, sum as spark_sum, count, avg, max as spark_max,
    min as spark_min, datediff, months_between, round as spark_round,
    date_format, to_date, year, month
)
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, DoubleType,
    BooleanType, DateType, TimestampType
)
from datetime import datetime, timedelta

# Configuration
OPERATIONAL_LAKEHOUSE = "lh-tvs-operational"
CURATED_LAKEHOUSE = "lh-tvs-curated"
OPERATIONAL_PATH = f"abfss://{OPERATIONAL_LAKEHOUSE}@onelake.dfs.fabric.microsoft.com"
CURATED_PATH = f"abfss://{CURATED_LAKEHOUSE}@onelake.dfs.fabric.microsoft.com"

spark = SparkSession.builder.getOrCreate()
print(f"Pipeline started at {datetime.utcnow().isoformat()}")
print(f"Reading from: {OPERATIONAL_LAKEHOUSE}")
print(f"Writing to: {CURATED_LAKEHOUSE}")

In [None]:
# ── Cell 2: Read operational data and apply deduplication / type casting ────

# Read raw Dataverse tables from operational lakehouse
df_accounts_raw = spark.read.format("delta").load(f"{OPERATIONAL_PATH}/Tables/dv_accounts")
df_contacts_raw = spark.read.format("delta").load(f"{OPERATIONAL_PATH}/Tables/dv_contacts")
df_subscriptions_raw = spark.read.format("delta").load(f"{OPERATIONAL_PATH}/Tables/dv_subscriptions")
df_tasks_raw = spark.read.format("delta").load(f"{OPERATIONAL_PATH}/Tables/dv_tasks")
df_time_entries_raw = spark.read.format("delta").load(f"{OPERATIONAL_PATH}/Tables/dv_timeentries")
df_deliverables_raw = spark.read.format("delta").load(f"{OPERATIONAL_PATH}/Tables/dv_deliverables")

print(f"Raw counts:")
print(f"  Accounts:      {df_accounts_raw.count()}")
print(f"  Contacts:      {df_contacts_raw.count()}")
print(f"  Subscriptions: {df_subscriptions_raw.count()}")
print(f"  Tasks:         {df_tasks_raw.count()}")
print(f"  Time Entries:  {df_time_entries_raw.count()}")
print(f"  Deliverables:  {df_deliverables_raw.count()}")

# Deduplicate accounts by tvs_accountid, keeping latest version
window_latest = Window.partitionBy("tvs_accountid").orderBy(col("modifiedon").desc())

df_accounts_deduped = (
    df_accounts_raw
    .withColumn("_row_num", row_number().over(window_latest))
    .filter(col("_row_num") == 1)
    .drop("_row_num")
)

# Cast types for clean schema
df_accounts_clean = (
    df_accounts_deduped
    .withColumn("tvs_monthlyhours", col("tvs_monthlyhours").cast(IntegerType()))
    .withColumn("tvs_tier", col("tvs_tier").cast(IntegerType()))
    .withColumn("tvs_status", col("tvs_status").cast(IntegerType()))
    .withColumn("tvs_onboarddate", to_date(col("tvs_onboarddate")))
)

# Deduplicate contacts
window_contacts = Window.partitionBy("tvs_contactid").orderBy(col("modifiedon").desc())
df_contacts_clean = (
    df_contacts_raw
    .withColumn("_row_num", row_number().over(window_contacts))
    .filter(col("_row_num") == 1)
    .drop("_row_num")
)

print(f"\nAfter deduplication:")
print(f"  Accounts: {df_accounts_clean.count()}")
print(f"  Contacts: {df_contacts_clean.count()}")

In [None]:
# ── Cell 3: SCD Type 2 for Accounts Dimension ────────────────────────────

# Generate hash of trackable columns for change detection
scd_columns = ["tvs_name", "tvs_industry", "tvs_tier", "tvs_status", "tvs_monthlyhours"]

df_accounts_hashed = df_accounts_clean.withColumn(
    "_row_hash",
    sha2(concat_ws("|", *[coalesce(col(c).cast(StringType()), lit("NULL")) for c in scd_columns]), 256)
)

# Try to load existing dimension; if first run, create empty structure
try:
    df_dim_existing = spark.read.format("delta").load(f"{CURATED_PATH}/Tables/dim_accounts")
    has_existing = True
    print(f"Existing dim_accounts rows: {df_dim_existing.count()}")
except Exception:
    has_existing = False
    print("No existing dim_accounts found, creating initial load.")

if has_existing:
    # Find changed records: hash mismatch on active records
    df_current = df_dim_existing.filter(col("is_current") == True)
    
    df_changed = (
        df_accounts_hashed.alias("new")
        .join(df_current.alias("old"), col("new.tvs_accountid") == col("old.tvs_accountid"), "left")
        .filter(
            (col("old._row_hash").isNull()) |  # New records
            (col("new._row_hash") != col("old._row_hash"))  # Changed records
        )
        .select("new.*")
    )
    
    changed_count = df_changed.count()
    print(f"Changed/new accounts: {changed_count}")
    
    if changed_count > 0:
        # Close old records
        changed_ids = [row.tvs_accountid for row in df_changed.select("tvs_accountid").collect()]
        
        df_closed = (
            df_dim_existing
            .withColumn("is_current",
                when(col("tvs_accountid").isin(changed_ids) & (col("is_current") == True), lit(False))
                .otherwise(col("is_current"))
            )
            .withColumn("valid_to",
                when(col("tvs_accountid").isin(changed_ids) & (col("valid_to").isNull()), current_timestamp())
                .otherwise(col("valid_to"))
            )
        )
        
        # Create new version records
        df_new_versions = (
            df_changed
            .withColumn("valid_from", current_timestamp())
            .withColumn("valid_to", lit(None).cast(TimestampType()))
            .withColumn("is_current", lit(True))
        )
        
        df_dim_final = df_closed.unionByName(df_new_versions, allowMissingColumns=True)
    else:
        df_dim_final = df_dim_existing
        print("No changes detected, skipping SCD update.")
else:
    # Initial load: all records are current
    df_dim_final = (
        df_accounts_hashed
        .withColumn("valid_from", current_timestamp())
        .withColumn("valid_to", lit(None).cast(TimestampType()))
        .withColumn("is_current", lit(True))
    )

# Write dim_accounts
df_dim_final.write.format("delta").mode("overwrite").save(f"{CURATED_PATH}/Tables/dim_accounts")
print(f"dim_accounts written: {df_dim_final.count()} total rows")

# Write dim_contacts (simple overwrite, no SCD)
df_contacts_clean.write.format("delta").mode("overwrite").save(f"{CURATED_PATH}/Tables/dim_contacts")
print(f"dim_contacts written: {df_contacts_clean.count()} rows")

In [None]:
# ── Cell 4: Time Entry Aggregation and Fact Tables ─────────────────────────

# Fact: Time Entries (clean, with derived columns)
df_time_facts = (
    df_time_entries_raw
    .withColumn("tvs_hours", col("tvs_hours").cast(DoubleType()))
    .withColumn("tvs_date", to_date(col("tvs_date")))
    .withColumn("year_month", date_format(col("tvs_date"), "yyyy-MM"))
    .withColumn("year", year(col("tvs_date")))
    .withColumn("month", month(col("tvs_date")))
    .withColumn("is_billable", col("tvs_billable").cast(BooleanType()))
)

df_time_facts.write.format("delta").mode("overwrite").save(f"{CURATED_PATH}/Tables/fact_time_entries")
print(f"fact_time_entries written: {df_time_facts.count()} rows")

# Fact: Subscriptions
df_sub_facts = (
    df_subscriptions_raw
    .withColumn("tvs_startdate", to_date(col("tvs_startdate")))
    .withColumn("tvs_enddate", to_date(col("tvs_enddate")))
    .withColumn("tvs_monthlyhours", col("tvs_monthlyhours").cast(IntegerType()))
    .withColumn("tvs_monthlyprice", col("tvs_monthlyprice").cast(DoubleType()))
)

df_sub_facts.write.format("delta").mode("overwrite").save(f"{CURATED_PATH}/Tables/fact_subscriptions")
print(f"fact_subscriptions written: {df_sub_facts.count()} rows")

# Fact: Deliverables
df_del_facts = (
    df_deliverables_raw
    .withColumn("tvs_completeddate", to_date(col("tvs_completeddate")))
    .withColumn("tvs_status", col("tvs_status").cast(IntegerType()))
)

df_del_facts.write.format("delta").mode("overwrite").save(f"{CURATED_PATH}/Tables/fact_deliverables")
print(f"fact_deliverables written: {df_del_facts.count()} rows")

# ── Monthly Utilization Aggregate ────────────────────────────────────────

# Join time entries with account subscriptions to calculate utilization
df_monthly_util = (
    df_time_facts
    .filter(col("is_billable") == True)
    .groupBy(
        col("tvs_taskid").alias("task_id"),
        col("year_month"),
        col("year"),
        col("month")
    )
    .agg(
        spark_sum("tvs_hours").alias("total_hours"),
        count("*").alias("entry_count"),
        avg("tvs_hours").alias("avg_hours_per_entry"),
        spark_min("tvs_date").alias("first_entry_date"),
        spark_max("tvs_date").alias("last_entry_date")
    )
)

# Join with tasks to get account-level aggregation
df_task_accounts = df_tasks_raw.select(
    col("tvs_taskid").alias("task_id"),
    col("tvs_accountid").alias("account_id")
)

df_account_monthly = (
    df_monthly_util
    .join(df_task_accounts, "task_id", "left")
    .groupBy("account_id", "year_month", "year", "month")
    .agg(
        spark_sum("total_hours").alias("billable_hours"),
        spark_sum("entry_count").alias("total_entries")
    )
)

# Join with subscriptions to get utilization rate
df_active_subs = df_sub_facts.filter(col("tvs_status") == 100000001).select(
    col("tvs_accountid").alias("account_id"),
    col("tvs_monthlyhours").alias("allotted_hours")
).dropDuplicates(["account_id"])

df_utilization = (
    df_account_monthly
    .join(df_active_subs, "account_id", "left")
    .withColumn("allotted_hours", coalesce(col("allotted_hours"), lit(0)))
    .withColumn("utilization_pct",
        when(col("allotted_hours") > 0,
            spark_round(col("billable_hours") / col("allotted_hours") * 100, 2)
        ).otherwise(lit(0))
    )
    .withColumn("over_allotment", col("billable_hours") > col("allotted_hours"))
    .withColumn("calculated_at", current_timestamp())
)

df_utilization.write.format("delta").mode("overwrite").save(f"{CURATED_PATH}/Tables/agg_monthly_utilization")
print(f"agg_monthly_utilization written: {df_utilization.count()} rows")

In [None]:
# ── Cell 5: Account Health Score ──────────────────────────────────────────

# Health score based on: utilization, task completion rate, deliverable rate, recency

today = datetime.utcnow().date()
current_ym = today.strftime("%Y-%m")

# Latest utilization per account
window_latest_util = Window.partitionBy("account_id").orderBy(col("year_month").desc())
df_latest_util = (
    df_utilization
    .withColumn("_rn", row_number().over(window_latest_util))
    .filter(col("_rn") == 1)
    .select("account_id", "billable_hours", "allotted_hours", "utilization_pct")
)

# Task completion rate per account
df_task_metrics = (
    df_tasks_raw
    .groupBy(col("tvs_accountid").alias("account_id"))
    .agg(
        count("*").alias("total_tasks"),
        count(when(col("tvs_status") == 100000004, True)).alias("completed_tasks"),
        count(when(col("tvs_status") == 100000002, True)).alias("blocked_tasks"),
        count(when(
            (col("tvs_status").isin([100000000, 100000001])) & 
            (col("tvs_duedate") < lit(str(today))),
            True
        )).alias("overdue_tasks")
    )
    .withColumn("task_completion_rate",
        when(col("total_tasks") > 0,
            spark_round(col("completed_tasks") / col("total_tasks") * 100, 2)
        ).otherwise(lit(0))
    )
)

# Deliverable metrics
df_del_metrics = (
    df_deliverables_raw
    .groupBy(col("tvs_accountid").alias("account_id"))
    .agg(
        count("*").alias("total_deliverables"),
        count(when(col("tvs_status").isin([100000003, 100000004]), True)).alias("approved_delivered")
    )
    .withColumn("deliverable_rate",
        when(col("total_deliverables") > 0,
            spark_round(col("approved_delivered") / col("total_deliverables") * 100, 2)
        ).otherwise(lit(0))
    )
)

# Compose health score (0-100)
df_health = (
    df_accounts_clean
    .select(col("tvs_accountid").alias("account_id"), "tvs_name", "tvs_tier", "tvs_status")
    .join(df_latest_util, "account_id", "left")
    .join(df_task_metrics, "account_id", "left")
    .join(df_del_metrics, "account_id", "left")
    .withColumn("utilization_score",
        when(col("utilization_pct").between(50, 120), lit(30))  # Healthy utilization
        .when(col("utilization_pct") > 120, lit(15))  # Over-utilized
        .when(col("utilization_pct").between(20, 50), lit(20))  # Under-utilized
        .otherwise(lit(5))  # Very low engagement
    )
    .withColumn("task_score",
        spark_round(coalesce(col("task_completion_rate"), lit(0)) * 0.3, 2)
    )
    .withColumn("deliverable_score",
        spark_round(coalesce(col("deliverable_rate"), lit(0)) * 0.2, 2)
    )
    .withColumn("overdue_penalty",
        when(coalesce(col("overdue_tasks"), lit(0)) > 3, lit(-10))
        .when(coalesce(col("overdue_tasks"), lit(0)) > 0, lit(-5))
        .otherwise(lit(0))
    )
    .withColumn("health_score",
        spark_round(
            col("utilization_score") + col("task_score") + col("deliverable_score") + col("overdue_penalty") + lit(20),
            0
        )
    )
    .withColumn("health_score",
        when(col("health_score") > 100, lit(100))
        .when(col("health_score") < 0, lit(0))
        .otherwise(col("health_score"))
    )
    .withColumn("health_category",
        when(col("health_score") >= 80, lit("Healthy"))
        .when(col("health_score") >= 60, lit("Needs Attention"))
        .when(col("health_score") >= 40, lit("At Risk"))
        .otherwise(lit("Critical"))
    )
    .withColumn("calculated_at", current_timestamp())
)

df_health.write.format("delta").mode("overwrite").save(f"{CURATED_PATH}/Tables/agg_account_health")

# Summary
print(f"\nagg_account_health written: {df_health.count()} accounts")
print("\nHealth Distribution:")
df_health.groupBy("health_category").count().show()
print(f"\nPipeline completed at {datetime.utcnow().isoformat()}")