In [None]:
import requests
from pyspark.sql import functions as F
from pyspark.sql import types as T

# =========================================
# CONFIG
# =========================================
FLASK_API_URL = "https://databricks-approval-backend.onrender.com/customers"   # ‚úÖ CHANGE THIS
CATALOG = "mlops_prod"
CONTROL_SCHEMA = "platform"
CUSTOMER_TABLE = f"{CATALOG}.{CONTROL_SCHEMA}.customers_master"

TIMEOUT = 45  # API timeout in seconds

print(f"‚úÖ Target customer table: {CUSTOMER_TABLE}")
print(f"‚úÖ Reading customers from API: {FLASK_API_URL}")


In [None]:

# =========================================
# 1Ô∏è‚É£ CALL FLASK API
# =========================================
try:
    response = requests.get(FLASK_API_URL, timeout=TIMEOUT)
    response.raise_for_status()
    api_customers = response.json()
    print(f"‚úÖ Received {len(api_customers)} customers from API")
except Exception as e:
    raise RuntimeError(f"‚ùå Failed to fetch customers from API: {e}")

# =========================================
# 2Ô∏è‚É£ LOAD EXISTING CUSTOMERS FROM DATABRICKS
# =========================================
existing_df = spark.table(CUSTOMER_TABLE).select("customer_id")
existing_ids = {row["customer_id"] for row in existing_df.collect()}
print(f"‚úÖ Found {len(existing_ids)} existing customers in Databricks")

# =========================================
# 3Ô∏è‚É£ FILTER ONLY NEW CUSTOMERS
# =========================================
new_customers = [
    c for c in api_customers
    if c["customer_id"] not in existing_ids
]

if not new_customers:
    print("‚úÖ No new customers found ‚Äî onboarding skipped.")
    dbutils.notebook.exit("NO_NEW_CUSTOMERS")

print(f"üöÄ New customers to onboard: {len(new_customers)}")

# =========================================
# 4Ô∏è‚É£ CONVERT NEW CUSTOMERS TO SPARK DF
# =========================================
spark_schema = T.StructType([
    T.StructField("customer_id", T.StringType(), False),
    T.StructField("customer_name", T.StringType(), True),
    T.StructField("schema_name", T.StringType(), True),
    T.StructField("cloud_provider", T.StringType(), True),
    T.StructField("data_path", T.StringType(), True),
    T.StructField("is_active", T.BooleanType(), True),
    T.StructField("onboarding_status", T.StringType(), True),
    T.StructField("last_pipeline_run", T.TimestampType(), True),
    T.StructField("created_by", T.StringType(), True),
    T.StructField("created_at", T.TimestampType(), True),
    T.StructField("updated_at", T.TimestampType(), True),
])

spark_rows = []
for c in new_customers:
    spark_rows.append((
        c["customer_id"],
        c.get("customer_name"),
        c.get("schema_name"),
        c.get("cloud_provider"),
        c.get("data_path"),
        bool(c.get("is_active", 1)),
        "ACTIVE",
        None,
        "flask_api_onboarding",
        None,
        None
    ))

spark_new_df = spark.createDataFrame(spark_rows, schema=spark_schema) \
    .withColumn("created_at", F.current_timestamp()) \
    .withColumn("updated_at", F.current_timestamp())

# =========================================
# 5Ô∏è‚É£ INSERT INTO customers_master
# =========================================
spark_new_df.write.mode("append").saveAsTable(CUSTOMER_TABLE)

print(f"‚úÖ Inserted {spark_new_df.count()} new customers into {CUSTOMER_TABLE}")

In [None]:

# =========================================
# 2Ô∏è‚É£ LOAD EXISTING CUSTOMERS FROM DATABRICKS
# =========================================
existing_df = spark.table(CUSTOMER_TABLE).select("customer_id")
existing_ids = {row["customer_id"] for row in existing_df.collect()}
print(f"‚úÖ Found {len(existing_ids)} existing customers in Databricks")

# =========================================
# 3Ô∏è‚É£ FILTER ONLY NEW CUSTOMERS
# =========================================
new_customers = [
    c for c in api_customers
    if c["customer_id"] not in existing_ids
]

if not new_customers:
    print("‚úÖ No new customers found ‚Äî onboarding skipped.")
    dbutils.notebook.exit("NO_NEW_CUSTOMERS")

print(f"üöÄ New customers to onboard: {len(new_customers)}")


In [None]:

# =========================================
# 4Ô∏è‚É£ CONVERT NEW CUSTOMERS TO SPARK DF
# =========================================
spark_schema = T.StructType([
    T.StructField("customer_id", T.StringType(), False),
    T.StructField("customer_name", T.StringType(), True),
    T.StructField("schema_name", T.StringType(), True),
    T.StructField("cloud_provider", T.StringType(), True),
    T.StructField("data_path", T.StringType(), True),
    T.StructField("is_active", T.BooleanType(), True),
    T.StructField("onboarding_status", T.StringType(), True),
    T.StructField("last_pipeline_run", T.TimestampType(), True),
    T.StructField("created_by", T.StringType(), True),
    T.StructField("created_at", T.TimestampType(), True),
    T.StructField("updated_at", T.TimestampType(), True),
])


In [None]:

spark_rows = []
for c in new_customers:
    spark_rows.append((
        c["customer_id"],
        c.get("customer_name"),
        c.get("schema_name"),
        c.get("cloud_provider"),
        c.get("data_path"),
        bool(c.get("is_active", 1)),
        "ACTIVE",
        None,
        "flask_api_onboarding",
        None,
        None
    ))

spark_new_df = spark.createDataFrame(spark_rows, schema=spark_schema) \
    .withColumn("created_at", F.current_timestamp()) \
    .withColumn("updated_at", F.current_timestamp())

# =========================================
# 5Ô∏è‚É£ INSERT INTO customers_master
# =========================================
spark_new_df.write.mode("append").saveAsTable(CUSTOMER_TABLE)

print("‚úÖ ‚úÖ CUSTOMER ONBOARDING FROM FLASK API COMPLETED ‚úÖ")
print(f"‚úÖ Inserted {spark_new_df.count()} new customers into {CUSTOMER_TABLE}")
