In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime, date

# Read current eligibility data
print("=== CURRENT ELIGIBILITY DATA ===")
eligibility_df = spark.table("healthanalytics.eligibility")
eligibility_df.show(10)

# Check the current schema
eligibility_df.printSchema()

print("Current eligibility status distribution:")
eligibility_df.groupBy("eligibility_status").count().show()

In [0]:
def implement_eligibility_scd2(target_table, source_df, key_column="member_id"):
    """
    Implement SCD Type 2 for eligibility table with proper healthcare date handling
    
    This tracks:
    - Coverage terminations and reactivations
    - Plan changes
    - Eligibility period overlaps
    - Compliance audit trail
    """
    
    print("=== ELIGIBILITY SCD TYPE 2 IMPLEMENTATION ===")
    
    # Read current eligibility table
    try:
        current_eligibility = spark.table(target_table)
    except Exception:
        # If SCD2 table doesn't exist, create from base table
        current_eligibility = spark.table("healthanalytics.eligibility")
        
        # Add SCD2 columns to existing data
        current_eligibility = current_eligibility \
            .withColumn("record_start_date", col("eligibility_start_date")) \
            .withColumn("record_end_date", lit("9999-12-31")) \
            .withColumn("is_current_record", lit(True)) \
            .withColumn("change_reason", lit("Initial Load")) \
            .withColumn("version_number", lit(1)) \
            .withColumn("created_timestamp", current_timestamp())
        
        # Save initial SCD2 table
        current_eligibility.write.mode("overwrite").saveAsTable(f"{target_table}_scd2")
        print("Created initial SCD2 eligibility table")
    
    # Display current active eligibility records
    active_records = current_eligibility.filter(col("is_current_record") == True)
    print(f"Current active eligibility records: {active_records.count()}")
    active_records.show()
    
    # Prepare source data with SCD2 columns
    current_date = date.today().strftime("%Y-%m-%d")
    
    source_enhanced = source_df \
        .withColumn("record_start_date", col("eligibility_start_date")) \
        .withColumn("record_end_date", lit("9999-12-31")) \
        .withColumn("is_current_record", lit(True)) \
        .withColumn("created_timestamp", current_timestamp())
    
    print("Source eligibility changes:")
    source_enhanced.show()
    
    # Find records that have changed
    # Join current active records with source to identify changes
    comparison_df = active_records.alias("current").join(
        source_enhanced.alias("new"),
        col("current.member_id") == col("new.member_id"),
        "inner"
    )
    
    # Identify what constitutes a change for eligibility
    changed_records = comparison_df.filter(
        (col("current.eligibility_status") != col("new.eligibility_status")) |
        (col("current.eligibility_start_date") != col("new.eligibility_start_date")) |
        (col("current.eligibility_end_date") != col("new.eligibility_end_date"))
    ).select("new.*")
    
    print(f"Records with eligibility changes: {changed_records.count()}")
    
    if changed_records.count() > 0:
        changed_member_ids = changed_records.select("member_id").distinct()
        
        # Step 1: Close current records for changed members
        expired_records = current_eligibility.join(changed_member_ids, "member_id", "inner") \
            .withColumn("record_end_date", lit(current_date)) \
            .withColumn("is_current_record", lit(False))

In [0]:
from pyspark.sql.functions import col, lit, current_timestamp, when, max, to_date
from datetime import date
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

def implement_eligibility_scd2(target_table, source_df, key_column="member_id"):
    """
    Implement SCD Type 2 for eligibility table with proper healthcare date handling
    
    This tracks:
    - Coverage terminations and reactivations
    - Plan changes
    - Eligibility period overlaps
    - Compliance audit trail
    """
    
    print("=== ELIGIBILITY SCD TYPE 2 IMPLEMENTATION ===")
    
    # Read current eligibility table
    try:
        current_eligibility = spark.table(target_table)
        
        # Add SCD2 columns if they don't exist
        if "is_current_record" not in current_eligibility.columns:
            current_eligibility = current_eligibility \
                .withColumn("record_start_date", col("eligibility_start_date")) \
                .withColumn("record_end_date", lit("9999-12-31")) \
                .withColumn("is_current_record", lit(True)) \
                .withColumn("change_reason", lit("Initial Load")) \
                .withColumn("version_number", lit(1)) \
                .withColumn("created_timestamp", current_timestamp())
    except Exception:
        # If SCD2 table doesn't exist, create from base table
        current_eligibility = spark.table("healthanalytics.eligibility")
        
        # Add SCD2 columns to existing data
        current_eligibility = current_eligibility \
            .withColumn("record_start_date", col("eligibility_start_date")) \
            .withColumn("record_end_date", lit("9999-12-31")) \
            .withColumn("is_current_record", lit(True)) \
            .withColumn("change_reason", lit("Initial Load")) \
            .withColumn("version_number", lit(1)) \
            .withColumn("created_timestamp", current_timestamp())
        
        # Save initial SCD2 table
        current_eligibility.write.mode("overwrite").saveAsTable(f"{target_table}_scd2")
        print("Created initial SCD2 eligibility table")
    
    # Display current active eligibility records
    active_records = current_eligibility.filter(col("is_current_record") == True)
    print(f"Current active eligibility records: {active_records.count()}")
    display(active_records)
    
    # Prepare source data with SCD2 columns
    current_date = date.today().strftime("%Y-%m-%d")
    
    source_enhanced = source_df \
        .withColumn("record_start_date", to_date(col("eligibility_start_date"), "M/d/yyyy")) \
        .withColumn("record_end_date", lit("9999-12-31")) \
        .withColumn("is_current_record", lit(True)) \
        .withColumn("created_timestamp", current_timestamp())
    
    print("Source eligibility changes:")
    display(source_enhanced)
    
    # Find records that have changed
    # Join current active records with source to identify changes
    comparison_df = active_records.alias("current").join(
        source_enhanced.alias("new"),
        col("current.member_id") == col("new.member_id"),
        "inner"
    )
    
    # Identify what constitutes a change for eligibility
    changed_records = comparison_df.filter(
        (col("current.eligibility_status") != col("new.eligibility_status")) |
        (col("current.eligibility_start_date") != col("new.eligibility_start_date")) |
        (col("current.eligibility_end_date") != col("new.eligibility_end_date"))
    ).select("new.*")
    
    print(f"Records with eligibility changes: {changed_records.count()}")
    
    if changed_records.count() > 0:
        changed_member_ids = changed_records.select("member_id").distinct()
        
        # Step 1: Close current records for changed members
        expired_records = current_eligibility.join(changed_member_ids, "member_id", "inner") \
            .withColumn("record_end_date", lit(current_date)) \
            .withColumn("is_current_record", lit(False))
        
        # Step 2: Keep unchanged records as-is
        unchanged_records = current_eligibility.join(changed_member_ids, "member_id", "left_anti")
        
        # Step 3: Get max version numbers for changed members
        max_versions = current_eligibility.groupBy("member_id").agg(
            max("version_number").alias("max_version")
        )
        
        # Step 4: Create new records with incremented version and change reason
        new_versions = changed_records.join(max_versions, "member_id", "left") \
            .withColumn("version_number", col("max_version") + 1) \
            .withColumn("change_reason", 
                when(col("eligibility_status") == "Terminated", "Coverage Terminated")
                .when(col("eligibility_status") == "Active", "Coverage Activated")
                .when(col("eligibility_status") == "Suspended", "Coverage Suspended")
                .otherwise("Eligibility Update")
            ).drop("max_version")
        
        # Step 5: Handle completely new members
        new_members = source_enhanced.join(
            current_eligibility.select("member_id").distinct(),
            "member_id",
            "left_anti"
        ).withColumn("version_number", lit(1)) \
         .withColumn("change_reason", lit("New Enrollment"))
        
        # Combine all records
        final_eligibility = unchanged_records \
            .union(expired_records) \
            .union(new_versions) \
            .union(new_members)
    
    else:
        # No changes, just add new members
        new_members = source_enhanced.join(
            current_eligibility.select("member_id").distinct(),
            "member_id", 
            "left_anti"
        ).withColumn("version_number", lit(1)) \
         .withColumn("change_reason", lit("New Enrollment"))
        
        final_eligibility = current_eligibility.union(new_members)
    
    # Save updated SCD2 table
    final_eligibility.write.mode("overwrite").saveAsTable(f"{target_table}_scd2")
    
    print("=== UPDATED ELIGIBILITY SCD2 TABLE ===")
    result_df = spark.table(f"{target_table}_scd2")
    result_df.orderBy("member_id", "version_number")
    display(result_df)
    
    # Show summary of changes
    print("=== CHANGE SUMMARY ===")
    change_summary = result_df.groupBy("change_reason").count().orderBy("count", ascending=False)
    display(change_summary)
    
    return result_df

# Create sample eligibility changes (realistic healthcare scenarios)
eligibility_changes = [
    # Member 111260497: Terminated coverage
    (1, 111260497, "1/1/2023", "6/30/2024", "Terminated"),
    
    # Member 123456789: Suspended then reactivated  
    (2, 123456789, "1/1/2023", "12/31/2024", "Suspended"),
    
    # Member 987654321: Extended coverage period
    (3, 987654321, "1/1/2023", "12/31/2025", "Active"),
    
    # New member enrollment
    (4, 555777888, "7/1/2024", "12/31/2024", "Active"),
    
    # Member with gap in coverage (terminated then reactivated)
    (5, 444555666, "8/1/2024", "12/31/2024", "Active")
]

eligibility_schema = StructType([
    StructField("eligibility_id", IntegerType(), True),
    StructField("member_id", IntegerType(), True),
    StructField("eligibility_start_date", StringType(), True),
    StructField("eligibility_end_date", StringType(), True),
    StructField("eligibility_status", StringType(), True)
])

eligibility_updates = spark.createDataFrame(eligibility_changes, eligibility_schema)

print("=== ELIGIBILITY CHANGES TO PROCESS ===")
display(eligibility_updates)

# Apply SCD Type 2 to eligibility
eligibility_scd2_result = implement_eligibility_scd2(
    "healthanalytics.eligibility",
    eligibility_updates
)

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

def implement_provider_scd1(target_table, source_df, key_column="provider_id"):
    """
    Implement SCD Type 1 for provider table - practice location changes
    
    This overwrites:
    - Practice location changes (current location is sufficient)
    - Contact information updates  
    - Administrative corrections
    
    This preserves:
    - Provider specialty (might need history)
    - PCP status (might need history)
    """
    
    print("=== PROVIDER SCD TYPE 1 IMPLEMENTATION ===")
    
    # Read current provider table
    current_providers = spark.table(target_table)
    print("Current provider table:")
    current_providers.show()
    
    print("Provider location changes to apply:")
    source_df.show()
    
    # Identify what columns to update (SCD Type 1 columns)
    scd1_columns = ["practice_location", "name"]  # Location and name corrections
    
    # Show before state
    print("=== BEFORE CHANGES ===")
    before_changes = current_providers.filter(
        col(key_column).isin([row[key_column] for row in source_df.collect()])
    )
    before_changes.show()
    
    # For SCD Type 1, we simply merge/upsert the changes
    # Get records that are not being updated
    unchanged_providers = current_providers.join(
        source_df.select(key_column),
        key_column,
        "left_anti"
    )
    
    # Combine unchanged records with all source records (updated + new)
    updated_providers = unchanged_providers.union(source_df)
    
    # Alternative approach using Delta merge (if available)
    try:
        from delta.tables import DeltaTable
        
        # Try Delta Lake merge for better performance
        delta_table = DeltaTable.forName(spark, target_table)
        
        # Build update dictionary for SCD1 columns
        update_dict = {col: f"source.{col}" for col in scd1_columns}
        update_dict["Provider_Plan_ID"] = "source.Provider_Plan_ID"
        update_dict["specialty"] = "source.specialty"
        update_dict["is_pcp"] = "source.is_pcp"
        update_dict["ID"] = "source.ID"
        update_dict["TaxID"] = "source.TaxID"
        
        delta_table.alias("target") \
            .merge(
                source_df.alias("source"),
                f"target.{key_column} = source.{key_column}"
            ) \
            .whenMatchedUpdate(set=update_dict) \
            .whenNotMatchedInsertAll() \
            .execute()
        
        print("Used Delta Lake merge for SCD Type 1")
        result_df = spark.table(target_table)
        
    except Exception as e:
        print(f"Delta merge not available, using DataFrame operations: {e}")
        
        # Save using DataFrame operations
        updated_providers.write.mode("overwrite").saveAsTable(f"{target_table}_scd1_updated")
        result_df = spark.table(f"{target_table}_scd1_updated")
    
    # Show after state
    print("=== AFTER CHANGES ===")
    after_changes = result_df.filter(
        col(key_column).isin([row[key_column] for row in source_df.collect()])
    )
    after_changes.show()
    
    # Show change summary
    print("=== CHANGE SUMMARY ===")
    total_providers = result_df.count()
    updated_count = source_df.count()
    new_providers = source_df.join(current_providers.select(key_column), key_column, "left_anti").count()
    
    print(f"Total providers after update: {total_providers}")
    print(f"Records processed: {updated_count}")
    print(f"New providers added: {new_providers}")
    print(f"Existing providers updated: {updated_count - new_providers}")
    
    return result_df

# Create realistic provider location changes
provider_changes = [
    # Dr. James Wilson moved to new location
    (2001, "Dr. James Wilson", "Family Medicine", "Downtown Medical Center", 1, "PR0001", 1, 957974335),
    
    # Dr. Sarah Johnson name correction and location change  
    (2002, "Dr. Sarah M. Johnson", "Pediatrics", "Children's Hospital East Wing", 1, "PR0002", 2, 123456789),
    
    # Existing provider location change
    (2003, "Dr. Michael Chen", "Cardiology", "Heart & Vascular Institute", 0, "PR0003", 3, 987654321),
    
    # New provider joining the network
    (2004, "Dr. Lisa Rodriguez", "Orthopedics", "Sports Medicine Center", 0, "PR0004", 4, 555666777),
    
    # Provider location consolidation
    (2005, "Dr. Robert Kim", "Internal Medicine", "North Clinic", 1, "PR0005", 5, 111222333)
]

provider_schema = StructType([
    StructField("Provider_Plan_ID", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("specialty", StringType(), True),
    StructField("practice_location", StringType(), True),
    StructField("is_pcp", IntegerType(), True),
    StructField("provider_id", StringType(), True),
    StructField("ID", IntegerType(), True),
    StructField("TaxID", IntegerType(), True)
])

provider_updates = spark.createDataFrame(provider_changes, provider_schema)

print("=== PROVIDER LOCATION CHANGES TO PROCESS ===")
provider_updates.show(truncate=False)

# Apply SCD Type 1 to provider data
provider_scd1_result = implement_provider_scd1(
    "default.providers",
    provider_updates
)