In [0]:
# Set the execution timeout to 4 hours (14400 seconds)
# Note: On serverless notebooks, the default timeout is 2.5 hours. 
# You can manually set a longer timeout (e.g., 4 hours) using this config, 
# but actual enforcement may depend on workspace and cluster policies.
spark.conf.set("spark.databricks.execution.timeout", "14400")

In [0]:
%pip install dbldatagen


In [0]:
%restart_python

In [0]:
# Get schema from the existing table
table_name = "main.gtm_data.core_usecase_curated"
df = spark.table(table_name)
schema = df.schema

display(df.limit(20))

In [0]:
import dbldatagen as dg
from pyspark.sql.functions import col

# Extract unique values for subregion and sales fields
sales_region_values = [row[0] for row in df.select("sales_region").distinct().collect() if row[0] is not None]
sales_subregion1_values = [row[0] for row in df.select("sales_subregion_level_1").distinct().collect() if row[0] is not None]
sales_subregion2_values = [row[0] for row in df.select("sales_subregion_level_2").distinct().collect() if row[0] is not None]
sales_subregion3_values = [row[0] for row in df.select("sales_subregion_level_3").distinct().collect() if row[0] is not None]
account_exec_values = [row[0] for row in df.select("account_executive").distinct().collect() if row[0] is not None]
sales_mgr_values = [row[0] for row in df.select("sales_manager").distinct().collect() if row[0] is not None]
solution_arch_values = [row[0] for row in df.select("solution_architect").distinct().collect() if row[0] is not None]

# Get sample values for copy-over fields
demand_plan_stage_next_steps_values = [row[0] for row in df.select("demand_plan_stage_next_steps").distinct().collect() if row[0] is not None]
usecase_description_values = [row[0] for row in df.select("usecase_description").distinct().collect() if row[0] is not None]
implementation_notes_values = [row[0] for row in df.select("implementation_notes").distinct().collect() if row[0] is not None]
business_impact_values = [row[0] for row in df.select("business_impact").distinct().collect() if row[0] is not None]
sdr_bdr_notes_values = [row[0] for row in df.select("sdr_bdr_notes").distinct().collect() if row[0] is not None]

# Compute min/max for all date and timestamp fields
date_ranges = {}
for field in schema:
    if field.dataType.typeName() in ["date", "timestamp"]:
        minmax = df.agg({field.name: "min"}).collect()[0][0], df.agg({field.name: "max"}).collect()[0][0]
        if minmax[0] is not None and minmax[1] is not None:
            date_ranges[field.name] = minmax

row_count = 1000  # Set desired number of synthetic rows

spec = dg.DataGenerator(spark, name="synthetic_core_usecase_curated", rows=row_count, partitions=4)

for field in schema:
    col_name = field.name
    col_type = field.dataType
    nullable = field.nullable

    if col_name == "sales_region":
        spec = spec.withColumn(col_name, "string", values=sales_region_values, nullable=nullable)
    elif col_name == "sales_subregion_level_1":
        spec = spec.withColumn(col_name, "string", values=sales_subregion1_values, nullable=nullable)
    elif col_name == "sales_subregion_level_2":
        spec = spec.withColumn(col_name, "string", values=sales_subregion2_values, nullable=nullable)
    elif col_name == "sales_subregion_level_3":
        spec = spec.withColumn(col_name, "string", values=sales_subregion3_values, nullable=nullable)
    elif col_name == "subregion_1":
        spec = spec.withColumn(col_name, "string", values=subregion1_values, nullable=nullable)
    elif col_name == "subregion_2":
        spec = spec.withColumn(col_name, "string", values=subregion2_values, nullable=nullable)
    elif col_name == "subregion_3":
        spec = spec.withColumn(col_name, "string", values=subregion3_values, nullable=nullable)
    elif col_name == "demand_plan_stage_next_steps":
        spec = spec.withColumn(col_name, "string", values=demand_plan_stage_next_steps_values, nullable=nullable)
    elif col_name == "usecase_description":
        spec = spec.withColumn(col_name, "string", values=usecase_description_values, nullable=nullable)
    elif col_name == "implementation_notes":
        spec = spec.withColumn(col_name, "string", values=implementation_notes_values, nullable=nullable)
    elif col_name == "business_impact":
        spec = spec.withColumn(col_name, "string", values=business_impact_values, nullable=nullable)
    elif col_name == "sdr_bdr_notes":
        spec = spec.withColumn(col_name, "string", values=sdr_bdr_notes_values, nullable=nullable)
    elif col_name == "account_executive":
        spec = spec.withColumn(col_name, "string", values=account_exec_values, nullable=nullable)
    elif col_name == "field_manager":
        spec = spec.withColumn(col_name, "string", values=field_mgr_values, nullable=nullable)
    elif col_name == "sales_manager":
        spec = spec.withColumn(col_name, "string", values=sales_mgr_values, nullable=nullable)
    elif col_name == "solution_architect":
        spec = spec.withColumn(col_name, "string", values=solution_arch_values, nullable=nullable)
    elif col_type.typeName() == "string":
        spec = spec.withColumn(col_name, "string", minValue=5, maxValue=20, nullable=nullable)
    elif col_type.typeName() == "integer":
        spec = spec.withColumn(col_name, "int", minValue=0, maxValue=10000, nullable=nullable)
    elif col_type.typeName() == "long":
        spec = spec.withColumn(col_name, "long", minValue=0, maxValue=100000, nullable=nullable)
    elif col_type.typeName() == "double":
        spec = spec.withColumn(col_name, "double", minValue=0.0, maxValue=10000.0, nullable=nullable)
    elif col_type.typeName() == "boolean":
        spec = spec.withColumn(col_name, "boolean", nullable=nullable)
    elif col_type.typeName() == "timestamp":
        if col_name in date_ranges:
            begin = str(date_ranges[col_name][0])
            end = str(date_ranges[col_name][1])
            spec = spec.withColumn(col_name, "timestamp", begin=begin, end=end, nullable=nullable)
        else:
            spec = spec.withColumn(col_name, "timestamp", begin="2020-01-01 00:00:00", end="2025-12-31 23:59:59", nullable=nullable)
    elif col_type.typeName() == "date":
        if col_name in date_ranges:
            begin = str(date_ranges[col_name][0])
            end = str(date_ranges[col_name][1])
            spec = spec.withColumn(col_name, "date", begin=begin, end=end, nullable=nullable)
        else:
            spec = spec.withColumn(col_name, "date", begin="2020-01-01", end="2025-12-31", nullable=nullable)
    else:
        spec = spec.withColumn(col_name, "string", minValue=5, maxValue=20, nullable=nullable)

synthetic_df = spec.build()

display(synthetic_df)

In [0]:
display(synthetic_df.describe())

In [0]:
synthetic_df.write.mode("overwrite").saveAsTable("users.luis_herrera.synthetic_core_usecase_curated")