This notebook orchestrates metadata-driven data ingestion workflows in Databricks. It enables users to trigger immediate or scheduled ingestion jobs for tables defined in metadata, supporting both ad-hoc and automated ingestion scenarios.

## Key Features

- **Connection and Table Metadata Management:**  
  Loads connection and table metadata from Delta tables (`connection_metadata`, `table_metadata`) to drive ingestion logic.

- **User Input via Widgets:**  
  Uses Databricks widgets to capture user input for connection selection, table selection (optional), schema, ingestion mode (immediate or scheduled), cron expression for scheduling, and run ID.

- **Run Metadata Logging:**  
  Each ingestion run is logged in the `run_metadata` table, capturing run ID, connection, user, timestamp, and ingestion mode.

- **Immediate Ingestion:**  
  If "immediate" mode is selected, the notebook triggers ingestion for the specified table or all tables under the selected connection by invoking a child notebook (`metadata_driven_ingestion`).

- **Scheduled Ingestion:**  
  If "schedule" mode is selected, the notebook writes a new schedule entry to the `ingestion_schedule` table, including cron expression and scheduling metadata.

- **Extensibility:**  
  The notebook is modular, with a `MetadataManager` class for metadata access and clear separation of orchestration logic.

## Workflow

1. **Metadata Loading:**  
   Loads connection and table metadata using the `MetadataManager` class.

2. **Widget Setup:**  
   Prompts the user to select a connection, optionally a table, specify schema, ingestion mode, cron expression, and run ID.

3. **Parameter Extraction:**  
   Extracts widget values and generates a run ID if not provided.

4. **Connection and Table Selection:**  
   Filters metadata to identify the selected connection and relevant tables.

5. **Run Metadata Logging:**  
   Logs the orchestration run in the `run_metadata` table for audit and tracking.

6. **Ingestion Execution:**  
   - **Immediate Mode:** Runs ingestion for the selected table(s) by calling the child notebook.
   - **Schedule Mode:** Writes a new schedule entry to the `ingestion_schedule` table for future execution.

## Usage

- **Immediate Ingestion:**  
  Select a connection, optionally a table, set mode to "immediate", and run the notebook to trigger ingestion.

- **Scheduled Ingestion:**  
  Select a connection, optionally a table, set mode to "schedule", specify a cron expression, and run the notebook to schedule ingestion.

## Prerequisites

- Delta tables: `connection_metadata`, `table_metadata`, `run_metadata`, `ingestion_schedule`
- Child notebook: `metadata_driven_ingestion`
- Required libraries: `croniter` (for cron expression handling)

## Notes

- The notebook uses Spark DataFrames for all data operations.
- All user interactions are handled via Databricks widgets.
- The notebook is designed for extensibility and integration into larger data engineering workflows.

In [0]:
# Databricks notebook: Ingestion Orchestration
from pyspark.sql import SparkSession, DataFrame
import uuid
from pyspark.sql import Row
import datetime

In [0]:
class MetadataManager:
    def __init__(self, spark: SparkSession):
        self.spark = spark

    def load_connection_metadata(self):
        return self.spark.table('connection_metadata')

    def load_table_metadata(self):
        return self.spark.table('table_metadata')

In [0]:
spark = SparkSession.builder.getOrCreate()
metadata = MetadataManager(spark)

In [0]:
dbutils.widgets.text("connection_id", "")
#dbutils.widgets.text("connection_idx", "1", "Select connection (number)")
dbutils.widgets.text("table_id", "", "Table ID (optional, for single table)")
dbutils.widgets.text("schema", "")
dbutils.widgets.dropdown("ingest_mode", "immediate", ["immediate", "schedule"], "Ingestion Mode")
dbutils.widgets.text("cron_expression", "0 2 * * *", "Cron Expression (if scheduled)")
dbutils.widgets.text("run_id", "", "Run ID (leave blank to auto-generate)")
schema = dbutils.widgets.get("schema")

In [0]:
connection_id = dbutils.widgets.get("connection_id")
#connection_idx = int(dbutils.widgets.get("connection_idx")) - 1
table_id = dbutils.widgets.get("table_id")
ingest_mode = dbutils.widgets.get("ingest_mode")
cron_expression = dbutils.widgets.get("cron_expression")
run_id_param = dbutils.widgets.get("run_id")
run_id = run_id_param if run_id_param else str(uuid.uuid4())

In [0]:
# 1. List available connections and tables

conn_df = metadata.load_connection_metadata()
table_df = metadata.load_table_metadata()
connections = conn_df.filter(conn_df["connection_id"] == connection_id).collect()[0]
print(connections)




Row(connection_id='conn_postgres_chinook', type='postgresql', host='ep-sweet-snow-aeztchbb-pooler.c-2.us-east-2.aws.neon.tech', port=5432, database='chinook', schema='public', username='neondb_owner', password='npg_7Bd4JRTiqnox', options='')


In [0]:
run_metadata_row = Row(
    run_id=str(run_id),
    connection_id=connection_id,
    dateandtime=datetime.datetime.now(),
    user=dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get(),
    ingest_mode=ingest_mode
)

print(run_metadata_row)
run_metadata_df = spark.createDataFrame([run_metadata_row])
run_metadata_df.write.format("delta").mode("append").saveAsTable("run_metadata")

Row(run_id='6af31011-b48c-4815-a4eb-15799f3b12c1', connection_id='conn_postgres_chinook', dateandtime=datetime.datetime(2025, 7, 31, 19, 20, 2, 745996), user='mustafasajidsd@gmail.com', ingest_mode='schedule')


In [0]:
selected_conn = connections
print(selected_conn)

Row(connection_id='conn_postgres_chinook', type='postgresql', host='ep-sweet-snow-aeztchbb-pooler.c-2.us-east-2.aws.neon.tech', port=5432, database='chinook', schema='public', username='neondb_owner', password='npg_7Bd4JRTiqnox', options='')


In [0]:
# 2. Immediate or scheduled ingestion
if ingest_mode == "immediate":
    # If table_id is provided, ingest just that table; else, all tables for the connection
    table_ids = [table_id] if table_id else [row.table_id for row in table_df.filter(table_df.connection_id == selected_conn.connection_id).collect()]
    table_ids_str = ",".join(table_ids)
    import uuid
    #run_id = str(uuid.uuid4())
    result = dbutils.notebook.run("metadata_driven_ingestion", 600, {
        "table_ids": table_ids_str,
        "run_id": run_id,
        "schema": schema
    })
    print(result)
    print(table_ids_str)
    print(f"Run ID: {run_id}")
    print("Ingestion complete.")
else:
    # Write to ingestion_schedule table
    from pyspark.sql import Row
    import uuid, datetime
    schedule_id = str(uuid.uuid4())
    schedule_type = "table" if table_id else "connection"
    now = datetime.datetime.now()
    schedule_row = Row(
        schedule_id=schedule_id,
        connection_id=selected_conn.connection_id,
        table_id=table_id if table_id else None,
        schedule_type=schedule_type,
        cron_expression=cron_expression,
        active_flag="Y",
        last_run=None,
        next_run=None,
        comments=f"Scheduled via orchestration notebook at {now}"
    )

In [0]:
%pip install croniter

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
if ingest_mode == "schedule":
    from croniter import croniter

    schedule_id = str(uuid.uuid4())
    schedule_type = "table" if table_id else "connection"
    now = datetime.datetime.now()

    # Check for duplicate
    existing = spark.sql(f"""
        SELECT 1 FROM ingestion_schedule
        WHERE connection_id = '{selected_conn.connection_id}'
          AND table_id = '{table_id if table_id else ''}'
          AND schedule_type = '{schedule_type}'
          AND active_flag = 'Y'
        LIMIT 1
    """).collect()

    if existing:
        print("There is already an entry for this combination of connection_id, table_id, schedule_type, and active_flag.")
    else:
        # Calculate next_run using cron_expression
        next_run = croniter(cron_expression, now).get_next(datetime.datetime)

        schedule_row = Row(
            schedule_id=schedule_id,
            connection_id=selected_conn.connection_id,
            table_id=table_id if table_id else '',
            schedule_type=schedule_type,
            cron_expression=cron_expression,
            active_flag="Y",
            last_run=now,
            next_run=next_run,
            comments=f"Scheduled via orchestration notebook at {now}"
        )

        schedule_df = spark.createDataFrame([schedule_row])
        schedule_df.write.format("delta").mode("append").saveAsTable("ingestion_schedule")
        display(schedule_df)

schedule_id,connection_id,table_id,schedule_type,cron_expression,active_flag,last_run,next_run,comments
74787660-c5fc-43a5-9a3a-0acf9c47d6e9,conn_postgres_chinook,,connection,0 3 * * *,Y,2025-07-31T18:53:16.665Z,2025-08-01T03:00:00.000Z,Scheduled via orchestration notebook at 2025-07-31 18:53:16.665448
