In [None]:
%py
# PySpark script to generate comprehensive test data for purgo_playground.d_product_revenue_clone table

from pyspark.sql.types import StructType, StructField, LongType, StringType, DateType, DoubleType
from pyspark.sql.functions import when, col, regexp_replace, lit

# Define the schema matching purgo_playground.d_product_revenue_clone
schema = StructType([
    StructField("product_id", LongType(), False),
    StructField("product_name", StringType(), False),
    StructField("product_type", StringType(), False),
    StructField("revenue", LongType(), False),
    StructField("country", StringType(), False),
    StructField("customer_id", StringType(), False),
    StructField("purchased_date", DateType(), False),
    StructField("invoice_date", DateType(), False),
    StructField("invoice_number", StringType(), True),
    StructField("is_returned", LongType(), False),
    StructField("customer_satisfaction_score", LongType(), False),
    StructField("product_details", StringType(), False),
    StructField("customer_first_purchased_date", DateType(), False),
    StructField("customer_first_product", StringType(), False),
    StructField("customer_first_revenue", DoubleType(), False)
])

# Create test data covering various test scenarios
test_data = [
    # Happy path scenarios
    (1, "ProductA", "Type1", 1000, "USA", "CUST001", "2024-01-15", "2024-01-20", "1234234534", 0, 5, "DetailsA", "2023-12-01", "ProductA", 500.0),
    (2, "ProductB", "Type2", 2000, "Canada", "CUST002", "2024-02-10", "2024-02-15", "9876543210", 1, 4, "DetailsB", "2023-11-05", "ProductB", 750.0),
    
    # Edge cases
    (3, "ProductC", "Type3", 0, "UK", "CUST003", "2024-03-01", "2024-03-05", "555566667777", 0, 3, "DetailsC", "2023-10-20", "ProductC", 0.0),
    (4, "ProductD", "Type4", 999999999, "Germany", "CUST004", "2024-04-25", "2024-04-30", "1000", 1, 2, "DetailsD", "2023-09-15", "ProductD", 250.0),
    
    # Error cases with out-of-range values
    (5, "ProductE", "Type5", -500, "France", "CUST005", "2024-05-10", "2024-05-15", "ABCDE12345", 0, 6, "DetailsE", "2023-08-10", "ProductE", -100.0),
    (6, "ProductF", "Type6", 1500, "Spain", "CUST006", "2024-06-20", "2024-06-25", "9999999999999999", 2, -1, "DetailsF", "2023-07-05", "ProductF", 300.0),
    
    # NULL handling scenarios
    (7, "ProductG", "Type7", 2500, "Italy", "CUST007", "2024-07-15", "2024-07-20", None, 0, 5, "DetailsG", "2023-06-01", "ProductG", 400.0),
    (8, "ProductH", "Type8", 3000, "Netherlands", "CUST008", "2024-08-10", "2024-08-15", "1234", 1, None, "DetailsH", "2023-05-05", "ProductH", 550.0),
    
    # Special characters and multi-byte characters
    (9, "ProductI", "Type9", 3500, "Sweden", "CUST009", "2024-09-05", "2024-09-10", "こんにちは1234", 0, 4, "DetailsI", "2023-04-20", "ProductI", 600.0),
    (10, "ProductJ", "Type10", 4000, "Brazil", "CUST010", "2024-10-25", "2024-10-30", "特殊字符5678", 1, 3, "DetailsJ", "2023-03-15", "ProductJ", 650.0),
    
    # Additional happy path
    (11, "ProductK", "Type11", 4500, "Australia", "CUST011", "2024-11-10", "2024-11-15", "1122334455", 0, 5, "DetailsK", "2023-02-10", "ProductK", 700.0),
    (12, "ProductL", "Type12", 5000, "Japan", "CUST012", "2024-12-20", "2024-12-25", "6677889900", 1, 4, "DetailsL", "2023-01-05", "ProductL", 800.0),
    
    # Edge case: invoice_number with exactly four digits
    (13, "ProductM", "Type13", 5500, "Mexico", "CUST013", "2025-01-15", "2025-01-20", "5678", 0, 2, "DetailsM", "2022-12-01", "ProductM", 850.0),
    
    # Error case: invoice_number with non-numeric characters
    (14, "ProductN", "Type14", 6000, "India", "CUST014", "2025-02-10", "2025-02-15", "INV#1234", 1, 1, "DetailsN", "2022-11-05", "ProductN", 900.0),
    
    # NULL in multiple columns
    (15, "ProductO", "Type15", 6500, "China", "CUST015", None, "2025-03-05", "9999", 0, 5, "DetailsO", "2022-10-20", "ProductO", 950.0),
    
    # Special characters in product_name
    (16, "PrödüctP", "Type16", 7000, "Russia", "CUST016", "2025-04-25", "2025-04-30", "8888", 1, 4, "DetailsP", "2022-09-15", "PrödüctP", 1000.0),
    
    # Multi-byte characters in customer_id
    (17, "ProductQ", "Type17", 7500, "South Korea", "CÜST017", "2025-05-10", "2025-05-15", "7777", 0, 3, "DetailsQ", "2022-08-10", "ProductQ", 1050.0),
    
    # Edge case: maximum length strings
    (18, "P" * 255, "Type18", 8000, "Norway", "CUST018", "2025-06-20", "2025-06-25", "6666", 1, 2, "D" * 1000, "2022-07-05", "ProductQ" * 50, 1100.0),
    
    # Error case: special characters in customer_id
    (19, "ProductR", "Type19", 8500, "Switzerland", "CUST@019", "2025-07-15", "2025-07-20", "5555", 0, 1, "DetailsR", "2022-06-01", "ProductR", 1150.0),
    
    # NULL invoice_number with non-NULL other fields
    (20, "ProductS", "Type20", 9000, "Belgium", "CUST020", "2025-08-10", "2025-08-15", None, 1, 5, "DetailsS", "2022-05-05", "ProductS", 1200.0)
]

# Create DataFrame with test data
df_test = spark.createDataFrame(data=test_data, schema=schema)

# Apply masking logic to the invoice_number column
# Mask the last four characters with '*' if invoice_number is not NULL and has length >=4
df_masked = df_test.withColumn(
    "invoice_number",
    when(
        col("invoice_number").isNotNull() & (col("invoice_number").rlike(".*.{4}$")),
        regexp_replace(col("invoice_number"), "(.+).{4}$", "$1****")
    ).when(
        col("invoice_number").isNotNull(),
        regexp_replace(col("invoice_number"), ".{1,4}$", lambda m: "*" * len(m.group(0)))
    ).otherwise(None)
)

# Try writing the masked DataFrame to the clone table
try:
    df_masked.write.mode("overwrite").saveAsTable("purgo_playground.d_product_revenue_clone")
except Exception as e:
    # Handle exceptions such as missing columns or data type mismatches
    print(f"Error while writing to purgo_playground.d_product_revenue_clone: {e}")