- Create Lakebase Database Instance 
- Create a synced table
- Create a lakebase native table

In [0]:
%pip install --upgrade databricks-sdk

In [0]:
dbutils.library.restartPython()

In [0]:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.database import DatabaseInstance, SyncedTableSpec, SyncedTableSchedulingPolicy, SyncedDatabaseTable

In [0]:
lakebase_instance_name = 'mfg-production-scheduling-demo' # name of the database instance
cu = 'CU_1' # size of the database instance
node_count = 1 # number of database nodes, need at least 2 for failover

catalog_name = 'zg' # UC catalog name
pg_db_name = 'pg_mfg_db' # Lakebase database name (similar to UC catalog level)
schema_name = 'production_scheduling_demo' # name of the UC schema and postgres schema

orders_destination_table_name = f"{catalog_name}.{schema_name}.orders_backlog_synced_table"
orders_source_table_full_name = f"{catalog_name}.{schema_name}.orders_backlog"
orders_primary_key_columns = ["order_id"]

parts_catalog_destination_table_name = f"{catalog_name}.{schema_name}.parts_catalog_synced_table"
parts_catalog_source_table_full_name = f"{catalog_name}.{schema_name}.parts_catalog"
parts_catalog_primary_key_columns = ["part_id"]

pg_override_table_name = 'assignment_overrides' # native postgres table name

In [0]:
# Initialize the workspace client 
w = WorkspaceClient()

In [0]:
# create the database instance 
database_instance = DatabaseInstance(
  name=lakebase_instance_name,
  capacity=cu,
  node_count=node_count
)

w.database.create_database_instance(
  database_instance = database_instance
)

In [0]:
# wait for the database instance to become available before creating the synced table
w.database.wait_get_database_instance_database_available(name=lakebase_instance_name)

In [0]:
spark.sql(f"alter table {parts_catalog_source_table_full_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true);")

In [0]:
spark.sql(f"alter table {orders_source_table_full_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true);")

In [0]:
# create a synced table for the parts catalog
table_spec = SyncedTableSpec(
  source_table_full_name=parts_catalog_source_table_full_name,
  primary_key_columns=parts_catalog_primary_key_columns,
  scheduling_policy=SyncedTableSchedulingPolicy.TRIGGERED, 
)

synced_table = SyncedDatabaseTable(
  name=parts_catalog_destination_table_name,
  database_instance_name=lakebase_instance_name,
  logical_database_name=pg_db_name,
  spec=table_spec
)

w.database.create_synced_database_table(synced_table=synced_table)

In [0]:
# get synchronization pipeline details for the recommended_routes synced table
parts_synced_table_name_info = w.database.get_synced_database_table(name=parts_catalog_destination_table_name)
pipeline_id = parts_synced_table_name_info.data_synchronization_status.pipeline_id

In [0]:
# create a synced table for the orders backlog using the same DLT pipeline to optimize resources
table_spec = SyncedTableSpec(
  existing_pipeline_id=pipeline_id,
  source_table_full_name=orders_source_table_full_name,
  primary_key_columns=orders_primary_key_columns,
  scheduling_policy=SyncedTableSchedulingPolicy.TRIGGERED, 
)

synced_table = SyncedDatabaseTable(
  name=orders_destination_table_name,
  database_instance_name=lakebase_instance_name,
  logical_database_name=pg_db_name,
  spec=table_spec
)

w.database.create_synced_database_table(synced_table=synced_table)

In [0]:
# run the pipeline to populate the orders backlog table
run = w.pipelines.start_update(
    pipeline_id=pipeline_id,
    full_refresh=False
)

In [0]:
# create a native lakebase (postgres) table
import psycopg2
import uuid

user = dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get()

instance = w.database.get_database_instance(name=lakebase_instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[lakebase_instance_name])

# Connection parameters
conn = psycopg2.connect(
    host = instance.read_write_dns,
    dbname = pg_db_name,
    user = user,
    password = cred.token,
    sslmode = "require"
)

# Execute query
with conn.cursor() as cur:
    cur.execute(f"""
    DROP TABLE IF EXISTS {schema_name}.{pg_override_table_name}            
    """)
    
    cur.execute(f"""
    CREATE TABLE {schema_name}.{pg_override_table_name} (
      order_id VARCHAR(255),
      machine_id VARCHAR(255),
      assigned_by VARCHAR(255),
      assigned_at TIMESTAMP,
      notes TEXT
    );
    """)

    cur.execute(f"""
    SELECT EXISTS (
        SELECT 1 FROM information_schema.tables
        WHERE table_schema = '{schema_name}' AND table_name = '{pg_override_table_name}'
    );
    """)
    created = cur.fetchone()[0]
    if created:
        print(f"Table {schema_name}.{pg_override_table_name} has been created or already exists.")
    else:
        print("Table creation failed or is not visible.")
conn.commit()
conn.close()