### Instructions

Based on https://docs.databricks.com/_extras/notebooks/source/sql-server-cdc-connector-setup.html 
* Modify settings in "Configuration" cell as necessary.
* Execute the following cells one after another. Some are optional and can be skipped.

# Prerequisite
- cdc and ct enabled on the catalog
- connection_name created
- secrets has the user catalog credentials

### Ensure recent version of the Python SDK

In [None]:
%pip install --quiet databricks-sdk~=0.28.0 pymssql sqlalchemy jinja2 apscheduler
try:
    dbutils.library.restartPython()
except:
    pass

### Configuration

In [None]:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import catalog, jobs, pipelines
from apscheduler.schedulers.background import BackgroundScheduler
import logging

w = WorkspaceClient()
scheduler = BackgroundScheduler()
scheduler.start()
logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)

## Retrieve secrets

In [None]:
import re, time, base64
from datetime import datetime, timedelta
WHOAMI=re.sub('[-.@]','', w.current_user.me().user_name.split('@')[0])   
NINE_CHAR_ID=hex(int(time.time_ns() / 100000000 ))[2:]
SECRETS_SCOPE="lfcddemo"

# convert secrets suffix to connection name suffix
PREFERRED_HOST_SUFFIX={ "-sq":"sq", "-gt":"gt", }
# get the secrets
scrts=w.secrets.list_secrets(scope=SECRETS_SCOPE)
# find secrets key with matching host suffix
SECRETS_KEY=""
for host_suffix in PREFERRED_HOST_SUFFIX:
    for scrt in scrts:
        if scrt.key.endswith(host_suffix): 
            SECRETS_KEY=scrt.key
            break
    if SECRETS_KEY:
        break    
print(f"using databricks secretes {SECRETS_SCOPE=} {scrt.key=}")

# convert export key=value to key=value dict
scrt_key_values_str = base64.b64decode(w.secrets.get_secret(scope=SECRETS_SCOPE, key=SECRETS_KEY).value).decode('utf-8')
scrt_key_value_dict={}
for export_key_value in scrt_key_values_str.split(";"):
    export_key_values=re.split("[ =']",export_key_value)
    if len(export_key_values) > 1:
        scrt_key_value_dict[export_key_values[1]]=export_key_values[3]

print(f"retrieved databricks secretes {SECRETS_SCOPE=} {scrt.key=}")

In [None]:

# ======================
# Setup
# ======================

# The following function simplifies the replication of multiple tables from the same schema
def replicate_tables_from_db_schema(db_catalog_name, db_schema_name, db_table_names):
  return [pipelines.IngestionConfig(
            table = pipelines.TableSpec( 
            source_catalog=db_catalog_name,
            source_schema=db_schema_name,
            source_table=table_name,
            destination_catalog=target_catalog_name,
            destination_schema=target_schema_name,
          )) for table_name in db_table_names]

# The following function simplifies the replication of an entire DB schemas
def replicate_full_db_schema(db_catalog_name, db_schema_names):
  return [pipelines.IngestionConfig(
            schema = pipelines.SchemaSpec( 
            source_catalog=db_catalog_name,
            source_schema=db_schema_name,
            destination_catalog=target_catalog_name,
            destination_schema=target_schema_name,
          )) for db_schema_name in db_schema_names]


mode="BOTH" # CDC | CT | BOTH | NONE
gateway_cluster_spec = None
# Uncomment the following to specify a cluster policy and/or spark configuration for the Gateway pipeline
# gateway_cluster_spec = pipelines.PipelineCluster(
#   # Uncomment to specifya a cluster policy  
#   # label="default", 
#   # policy_id="0011223344556677", 
#   # apply_policy_default_values=True,
#
#   # Uncomment to customize cluser Spark configuration
#   spark_conf={ }
#)

# The name of the UC connection with the credentials to access the source database
connection_name = f"lfcddemo-{PREFERRED_HOST_SUFFIX[host_suffix]}"

# The name of the UC catalog and schema to store the replicated tables
target_catalog_name = "main"
target_schema_name = f"{WHOAMI}_{NINE_CHAR_ID}"

# The name of the UC catalog and schema to store the staging volume with intermediate
# CDC and snapshot data.
# Use the destination catalog/schema by default
stg_catalog_name = target_catalog_name 
stg_schema_name = target_schema_name 

# The name of the Gateway pipeline to create
gateway_pipeline_name = f"{WHOAMI}_{NINE_CHAR_ID}_gw"

# The name of the Ingestion pipeline to create
ingestion_pipeline_name = f"{WHOAMI}_{NINE_CHAR_ID}_ig"

# source
source_catalog_name=""              # get from secrets 
source_schema_name=f"{WHOAMI}"

# Customize who gets notified about failures
notifications = [
  pipelines.Notifications(
      email_recipients = [ w.current_user.me().user_name ],
      alerts = [ "on-update-failure", "on-update-fatal-failure", "on-flow-failure"]
      )
  ]

# Get connections
connections = w.connections.get(connection_name)

print(f"using databricks connections {connection_name=}")

if scrt_key_value_dict['DB_HOST_FQDN'] != connections.options['host']:
    raise exception(f"{connections.options['host']=} and {scrt_key_value_dict['DB_HOST_FQDN']=} do not match ")
if scrt_key_value_dict['DB_PORT'] != connections.options['port']:
    raise exception(f"{connections.options['port']=} and {scrt_key_value_dict['DB_PORT']=} do not match ")

# Create connection pool to database

In [None]:
# connect to source with id that can run CDC / CT command
run_cdc_ct=True

import sqlalchemy as sa, pandas as pd, getpass, jinja2
source_host_name=connections.options["host"]
source_port=connections.options["port"]

if "source_user_name" not in vars() or not source_user_name: 
    try:
        source_user_name=scrt_key_value_dict['USER_USERNAME']
    except:
        source_user_name=input("dba source_user_name")
if "source_password" not in vars() or not source_password: 
    try:
        source_password=scrt_key_value_dict['USER_PASSWORD']
    except:
        source_password=getpass.getpass("dba source_password")
if "source_catalog_name" not in vars() or not source_catalog_name: 
    try:
        source_catalog_name=scrt_key_value_dict['DB_CATALOG']
    except:
        source_catalog_name=getpass.getpass("source_catalog_name")

sqlalchemy_url=f"mssql+pymssql://{source_user_name}:{source_password}@{source_host_name}:{source_port}/{source_catalog_name}"
engine = sa.create_engine(sqlalchemy_url, pool_size=20, max_overflow=0, pool_pre_ping=True, isolation_level="AUTOCOMMIT", 
    connect_args={"login_timeout": 120})

# azure SQL requires time to wake up after the first login attempt
connect_retry=0
conn=None
while (not conn) or (connect_retry < 10):
    try:    
        conn = engine.connect()
        break
    except Exception as e:
        connect_retry += 1
        print(f"Sleeping for retry.  Database error: {e}")
        time.sleep(60)
if (not conn):
    raise Exception(f"Could not connect to {source_user_name}:@{source_host_name}:{source_port}/{source_catalog_name}")

In [None]:
# Construct the full list of tables to replicate
# IMPORTANT: The letter case of the catalog, schema and table names MUST MATCH EXACTLY the case used in the source database system tables
tables_to_replicate = replicate_full_db_schema(source_catalog_name, [source_schema_name]) 

# Append tables from additional schemas as needed
#  + replicate_tables_from_db_schema(source_catalog_name, source_schema_name, ["table_name_1", "table_name_2"])

# Test Connect, Create two tables, populate with some data

In [None]:
conn.execute(sa.text("select 1")).fetchall()

In [None]:
DB_SCHEMA=f"{WHOAMI}"
display(pd.read_sql(sa.text(f"""
IF NOT EXISTS (SELECT 1 FROM sys.schemas WHERE name = '{DB_SCHEMA}')
BEGIN
    EXEC('CREATE SCHEMA [{DB_SCHEMA}]')
    select 'created'
END
ELSE
BEGIN
    select 'already exists'
END
"""),conn))

In [None]:
display(pd.read_sql(sa.text(f"""
IF OBJECT_ID(N'[{DB_SCHEMA}].[intpk]', N'U') is NULL
BEGIN
    create table [{DB_SCHEMA}].[intpk] (pk int IDENTITY NOT NULL primary key, dt datetime)
    select 'created'
END
ELSE
BEGIN
    select 'exists'
END
"""),conn))

In [None]:
display(pd.read_sql(sa.text(f"""
IF OBJECT_ID(N'[{DB_SCHEMA}].[dtix]', N'U') is NULL
BEGIN
    create table [{DB_SCHEMA}].[dtix] (dt datetime)
    select 'created'
END
ELSE
BEGIN
    select 'exists'
END
"""),conn))

# Enable CD / CT on the select schema and select list of tables

In [None]:
display(pd.read_sql(sa.text(f"""
BEGIN
DECLARE @ct_enabled_on_cat INT;
DECLARE @cdc_enabled_on_cat INT;
-- set if cdc or ct is enabled on the catalog

if exists(select is_cdc_enabled from sys.databases where name=db_name() and is_cdc_enabled=1)
    set @cdc_enabled_on_cat = 1;
else
    set @cdc_enabled_on_cat = 0;

if exists(select database_id from sys.change_tracking_databases where database_id=db_id())
    set @ct_enabled_on_cat = 1;
else
    set @ct_enabled_on_cat = 0;
select @cdc_enabled_on_cat as cdc_enabled_on_cat, @ct_enabled_on_cat as ct_enabled_on_cat
END
"""),conn))

In [None]:
# tabulate tables per schema
s_t_filter={}
for s_or_t in tables_to_replicate:
    if s_or_t.schema and s_or_t.schema.source_schema not in s_t_filter:  s_t_filter[s_or_t.schema.source_schema] = set()
    else:
        if s_or_t.table.source_schema not in s_t_filter: s_t_filter[s_or_t.table.source_schema] = set()
        s_t_filter[s_or_t.table.source_schema].add(s_or_t.table.source_table)
# build where clause
s_t_filter_sql_list=[]
for s_name, t_name in s_t_filter.items():
    schema_filter = f"table_schema='{s_name}'" 
    table_filter = ",".join(f"'{table_name}'" for table_name in t_name)
    if table_filter: s_t_filter_sql_list.append(f"({schema_filter} and table_name in ({table_filter}))")
    else: s_t_filter_sql_list.append(f"({schema_filter})") 
s_t_filter_sql = "and (" + " or ".join(s_t_filter_sql_list) + ")" if s_t_filter_sql_list else ""

# tsql
cdc_ct_enable_tsql="""
DECLARE @cdc_enabled_count INT = 0;
DECLARE @cdc_disabled_count INT = 0;
DECLARE @ct_enabled_count INT = 0;
DECLARE @ct_disabled_count INT = 0;
DECLARE @cdc_enabled_already_count INT = 0;
DECLARE @cdc_disabled_already_count INT = 0;
DECLARE @ct_enabled_already_count INT = 0;
DECLARE @ct_disabled_already_count INT = 0;

OPEN MyCursor
FETCH NEXT FROM MyCursor INTO @TABLE_CAT, @TABLE_SCHEM, @TABLE_NAME, @PK, @CDC, @CT
WHILE @@FETCH_STATUS = 0
BEGIN
    if (@PK is NULL) 
      if (@mode='CDC' or @mode='BOTH') and (@cdc_enabled_on_cat = 1) 
        -- need to enable CDC if not enabled
        if @CDC is NULL 
        BEGIN
            exec sys.sp_cdc_enable_table @source_schema = @TABLE_SCHEM, @source_name = @TABLE_NAME,  @role_name = NULL, @supports_net_changes = 0;
            set @cdc_enabled_count = @cdc_enabled_count + 1;
        END
        else
        BEGIN
            set @cdc_enabled_already_count = @cdc_enabled_already_count + 1;
        END
      else 
        -- need to disable CDC if enabled
        if @CDC is not NULL
        BEGIN
            exec sys.sp_cdc_disable_table @source_schema = @TABLE_SCHEM, @source_name = @TABLE_NAME,  @capture_instance = 'all';
            set @cdc_disabled_count = @cdc_disabled_count + 1;
        END
        else
        BEGIN
            set @cdc_disabled_already_count = @cdc_disabled_already_count + 1;
        END

    if (@PK is NOT NULL)
      if (@mode='CT' or @mode='BOTH') and (@ct_enabled_on_cat = 1) 
        -- need to enable CT if not enabled
        if @CT is NULL 
        BEGIN
            exec('ALTER TABLE ['+@TABLE_SCHEM+'].['+@TABLE_NAME+'] ENABLE CHANGE_TRACKING WITH (TRACK_COLUMNS_UPDATED = ON)');
            set @ct_enabled_count = @ct_enabled_count + 1;
        END
        else
        BEGIN
            set @ct_enabled_already_count = @ct_enabled_already_count + 1;
        END
      else 
        -- need to disable CT if enabled
        if @CT is not NULL
        BEGIN
            exec('ALTER TABLE ['+@TABLE_SCHEM+'].['+@TABLE_NAME+'] DISABLE CHANGE_TRACKING')	
            set @ct_disabled_count = @ct_disabled_count + 1;
        END
        else
        BEGIN
            set @ct_disabled_already_count = @ct_disabled_already_count + 1;
        END
    -- fetch next    
    FETCH NEXT FROM MyCursor INTO @TABLE_CAT, @TABLE_SCHEM, @TABLE_NAME, @PK, @CDC, @CT;
END
CLOSE MyCursor;
DEALLOCATE MyCursor;

SELECT  
    @cdc_enabled_count cdc_enabled_count,
    @cdc_disabled_count cdc_disabled_count,
    @ct_enabled_count ct_enabled_count,
    @ct_disabled_count ct_disabled_count,
    @cdc_enabled_already_count cdc_enabled_already_count,
    @cdc_disabled_already_count cdc_disabled_already_count,
    @ct_enabled_already_count ct_enabled_already_count,
    @ct_disabled_already_count ct_disabled_already_count;
"""

cdc_cd_tsql=jinja2.Template("""
-- CHANGE schema_name to your schema name
BEGIN
DECLARE @mode NVARCHAR(10) = N'{{mode}}'; -- CDC | CT | BOTH
DECLARE @schema_name nvarchar(128) = N'{{source_schema_name}}';
DECLARE @TABLE_CAT nvarchar(128), @TABLE_SCHEM nvarchar(128), @TABLE_NAME nvarchar(128), @PK nvarchar(128), @CT nvarchar(128), @CDC nvarchar(128);

-- set if cdc or ct is enabled on the catalog
DECLARE @ct_enabled_on_cat INT;
DECLARE @cdc_enabled_on_cat INT;
if exists(select is_cdc_enabled from sys.databases where name=db_name() and is_cdc_enabled=1)
    set @cdc_enabled_on_cat = 1;
else
    set @cdc_enabled_on_cat = 0;

if exists(select database_id from sys.change_tracking_databases where database_id=db_id())
    set @ct_enabled_on_cat = 1;
else
    set @ct_enabled_on_cat = 0;

{% if run_cdc_ct %}
DECLARE MyCursor CURSOR FOR
{% endif %}
with 
tab as (
	select table_catalog TABLE_CAT, table_schema TABLE_SCHEM, table_name TABLE_NAME 
	from INFORMATION_SCHEMA.TABLES 
	where table_type='BASE TABLE'
	and table_name not in ('MSchange_tracking_history', 'systranschemas')
	{{s_t_filter_sql}}
	)
, pk as (
	-- PRIMARY KEY TABLES
    SELECT 
        tc.constraint_catalog as TABLE_CAT, tc.constraint_schema as TABLE_SCHEM, tc.table_name as TABLE_NAME 
    FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc 
    JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE AS ccu 
        on tc.constraint_schema = ccu.constraint_schema and tc.constraint_name = ccu.constraint_name 
    JOIN INFORMATION_SCHEMA.COLUMNS AS c 
        ON c.table_schema = tc.constraint_schema AND tc.table_name = c.table_name AND ccu.column_name = c.column_name
    where tc.constraint_type='PRIMARY KEY'
    and tc.constraint_schema = @schema_name
    )
, ct as (    
    -- CT enabled tables
    select db_name() TABLE_CAT, schema_name(t.schema_id) TABLE_SCHEM, t.name TABLE_NAME  
    from sys.change_tracking_tables ctt 
    left join sys.tables t on ctt.object_id = t.object_id
    where t.schema_id=schema_id(@schema_name)
)
, cdc as (
    -- CDC enabled table
    select db_name() TABLE_CAT, s.name TABLE_SCHEM, t.name as TABLE_NAME 
    from sys.tables t
    left join sys.schemas s on t.schema_id = s.schema_id
    where t.is_tracked_by_cdc=1 and 
    t.schema_id=schema_id(@schema_name)
)
select tab.TABLE_CAT, tab.TABLE_SCHEM, tab.TABLE_NAME, pk.TABLE_NAME PK, cdc.TABLE_NAME CDC, ct.TABLE_NAME CT 
from tab
left join pk  on pk.TABLE_CAT=tab.TABLE_CAT  and pk.TABLE_SCHEM=tab.TABLE_SCHEM  and pk.TABLE_NAME=tab.TABLE_NAME
left join ct  on ct.TABLE_CAT=tab.TABLE_CAT  and ct.TABLE_SCHEM=tab.TABLE_SCHEM  and ct.TABLE_NAME=tab.TABLE_NAME
left join cdc on cdc.TABLE_CAT=tab.TABLE_CAT and cdc.TABLE_SCHEM=tab.TABLE_SCHEM and cdc.TABLE_NAME=tab.TABLE_NAME
{% if run_cdc_ct %}
{{cdc_ct_enable_tsql}}
{% endif %}
END
""")

sql_cmd_status = sa.text(cdc_cd_tsql.render(mode=mode, source_schema_name=source_schema_name, cdc_ct_enable_tsql=cdc_ct_enable_tsql, s_t_filter_sql=s_t_filter_sql, run_cdc_ct=False))
sql_cmd_alter  = sa.text(cdc_cd_tsql.render(mode=mode, source_schema_name=source_schema_name, cdc_ct_enable_tsql=cdc_ct_enable_tsql, s_t_filter_sql=s_t_filter_sql, run_cdc_ct=True))

display(pd.read_sql(sql_cmd_status, conn))
if run_cdc_ct:
    display(pd.read_sql(sql_cmd_alter, conn))
    display(pd.read_sql(sql_cmd_status, conn))


# Run insert / update / delete 

In [None]:
# engine.connect() gets a new cursor to execute in the background for 30 minutes
# every sec=6 rows insert, 1 row delete, 1 row update
# pd.read_sql("select max(pk) from robertlee.intpk", conn) # to see the number of rows in the table 
scheduler.add_job(engine.connect().execute, id="ins upd del", run_date=datetime.now() + timedelta(minutes=1),
args=[sa.text(f"""
BEGIN
DECLARE @Counter INT
SET @Counter=1
while ( @Counter <= 1800 )
begin
IF OBJECT_ID(N'{DB_SCHEMA}.intpk', N'U') IS NOT NULL
    begin
    insert into [{DB_SCHEMA}].[intpk] (dt) values (CURRENT_TIMESTAMP),(CURRENT_TIMESTAMP), (CURRENT_TIMESTAMP)
    delete from [{DB_SCHEMA}].[intpk] where pk=(select min(pk) from [{DB_SCHEMA}].[intpk])
    update [{DB_SCHEMA}].[intpk] set dt=CURRENT_TIMESTAMP where pk=(select min(pk) from [{DB_SCHEMA}].[intpk])
    end
IF OBJECT_ID(N'{DB_SCHEMA}.dtix', N'U') IS NOT NULL
    insert into [{DB_SCHEMA}].[dtix] (dt) values (CURRENT_TIMESTAMP),(CURRENT_TIMESTAMP),(CURRENT_TIMESTAMP)
WAITFOR DELAY '00:00:01'
SET @Counter  = @Counter  + 1
END
END
""")])

# Create stage and target schemas and schedule deletes

In [None]:
stg_schema_created=0
try: 
    w.schemas.get(f"{stg_catalog_name}.{stg_schema_name}")
except:
    w.schemas.create(f"{stg_schema_name}", catalog_name=f"{stg_catalog_name}")
    stg_schema_created=1

target_schema_created=0
try: 
    w.schemas.get(f"{target_catalog_name}.{target_schema_name}")
except:
    w.schemas.create(f"{target_schema_name}", catalog_name=f"{target_catalog_name}")
    target_schema_created=1

# Create Gateway Pipeline

In [None]:
# determine the connection id
connection_id = connections.connection_id

gateway_def = pipelines.IngestionGatewayPipelineDefinition(
      connection_id=connection_id,
      gateway_storage_catalog=stg_catalog_name, 
      gateway_storage_schema=stg_schema_name,
      gateway_storage_name = gateway_pipeline_name)

p = w.pipelines.create(
    name = gateway_pipeline_name, 
    gateway_definition=gateway_def, 
    notifications=notifications,
    development=True,  # for faster restart
    clusters= [ gateway_cluster_spec.as_dict() ] if None != gateway_cluster_spec else None
    )
gateway_pipeline_id = p.pipeline_id


print(f"Gateway pipeline {gateway_pipeline_name} created: {gateway_pipeline_id}")

# Create Ingestion Pipeline

In [None]:
if hasattr(pipelines, 'IngestionPipelineDefinition'):
    ingestion_def = pipelines.IngestionPipelineDefinition(
        ingestion_gateway_id=gateway_pipeline_id,
        objects=tables_to_replicate,
        )
else:
    ingestion_def = pipelines.ManagedIngestionPipelineDefinition(
        ingestion_gateway_id=gateway_pipeline_id,
        objects=tables_to_replicate,
        )
    
p = w.pipelines.create(
    name = ingestion_pipeline_name, 
    ingestion_definition=ingestion_def, 
    notifications=notifications,
    serverless=True,
    photon=True,
    continuous=True,
    development=True,   # for faster restart 
    )
ingestion_pipeline_id = p.pipeline_id

print(f"Ingestion pipeline {ingestion_pipeline_name} created: {ingestion_pipeline_id}")

# Wait for pipelines to be online

In [None]:
def wait_pipeline(pipeline_id):
    found_running_completed_state=False
    while 1:
        pipeline_status=w.pipelines.get(pipeline_id)
        print(f"{pipeline_status.state=} {pipeline_status.latest_updates[0].state=}")
        found_running_completed_state = False
        for state in pipeline_status.latest_updates:
            if state.state.name in ('IDLE', 'RUNNING', 'CANCELED'):
                found_running_completed_state = True
                break
        if found_running_completed_state:
            break
        print(conn.execute(sa.text(f"select '{DB_SCHEMA}.intpk', max(pk) from [{DB_SCHEMA}].intpk")).fetchall(), end='\r') # to see the number of rows in the table
        time.sleep(60)

wait_pipeline(gateway_pipeline_id)
wait_pipeline(ingestion_pipeline_id)

scheduler.add_job(w.pipelines.stop, id='stop gateway', run_date=datetime.now() + timedelta(minutes=30), args=[gateway_pipeline_id],replace_existing=True)
scheduler.add_job(w.pipelines.delete, id='delete gateway', run_date=datetime.now() + timedelta(minutes=60), args=[gateway_pipeline_id],replace_existing=True)
scheduler.add_job(w.pipelines.stop, id='stop ingestion', run_date=datetime.now() + timedelta(minutes=30), args=[ingestion_pipeline_id],replace_existing=True)
scheduler.add_job(w.pipelines.delete, id='delete ingestion', run_date=datetime.now() + timedelta(minutes=60), args=[ingestion_pipeline_id],replace_existing=True)

# Schedule source tables delete

In [None]:
scheduler.add_job(conn.execute, id='drop tables', run_date=datetime.now() + timedelta(minutes=30), args=[
sa.text(f"""
BEGIN
IF EXISTS (select db_name() TABLE_CAT, schema_name(t.schema_id) TABLE_SCHEM, t.name TABLE_NAME  
from sys.change_tracking_tables ctt left join sys.tables t on ctt.object_id = t.object_id 
where t.schema_id=schema_id('{DB_SCHEMA}'))
BEGIN
    ALTER TABLE [{DB_SCHEMA}].[intpk] DISABLE CHANGE_TRACKING 
    select 'disabled'
END
ELSE
BEGIN
    select 'already disabled'
END

IF EXISTS (select db_name() TABLE_CAT, s.name TABLE_SCHEM, t.name as TABLE_NAME 
from sys.tables t left join sys.schemas s on t.schema_id = s.schema_id 
where t.is_tracked_by_cdc=1 and t.schema_id=schema_id('{DB_SCHEMA}'))
BEGIN
    EXEC sys.sp_cdc_disable_table @source_schema = N'{DB_SCHEMA}', @source_name = N'dtix', @capture_instance = N'all'
    select 'disabled'
END
ELSE
BEGIN
    select 'already disabled'
END

IF EXISTS (SELECT 1 FROM sys.schemas WHERE name = '{DB_SCHEMA}')
BEGIN
    IF OBJECT_ID(N'[{DB_SCHEMA}].[intpk]', N'U') is NOT NULL
    BEGIN
        EXEC('DROP table [{DB_SCHEMA}].[intpk]')
    END
    IF OBJECT_ID(N'[{DB_SCHEMA}].[dtix]', N'U') is NOT NULL
    BEGIN
        EXEC('DROP table [{DB_SCHEMA}].[dtix]')
    END
    -- EXEC('DROP SCHEMA [{DB_SCHEMA}]')
    -- leave the schema
    select 'dropped'
END
else
BEGIN
    select 'already dropped'
END
END
""")], replace_existing=True)


# Schedule stage and target schema delete

In [None]:
if stg_schema_created:
    try:
        scheduler.add_job(w.schemas.delete, id='delete staging', run_date=datetime.now() + timedelta(minutes=60), args=[f"{stg_catalog_name}.{stg_schema_name}"], kwargs={'force':True},replace_existing=True)
    except:
        scheduler.add_job(w.schemas.delete, id='delete staging', run_date=datetime.now() + timedelta(minutes=60), args=[f"{stg_catalog_name}.{stg_schema_name}"], kwargs={},replace_existing=True)
        
if target_schema_created:
    try:
        scheduler.add_job(w.schemas.delete, id='delete target', run_date=datetime.now() + timedelta(minutes=60), args=[f"{target_catalog_name}.{target_schema_name}"], kwargs={'force':True},replace_existing=True)
    except:
        scheduler.add_job(w.schemas.delete, id='delete target', run_date=datetime.now() + timedelta(minutes=60), args=[f"{target_catalog_name}.{target_schema_name}"], kwargs={},replace_existing=True)

# Show delete job

In [None]:
for job in scheduler.get_jobs(): 
    print(job.id, job.next_run_time, job.name, job.args, job.kwargs)

# Show row count of intpk

In [None]:
while 1:
    print(conn.execute(sa.text(f"select '{DB_SCHEMA}.intpk', max(pk) from [{DB_SCHEMA}].intpk")).fetchall(), end='\r') # to see the number of rows in the table
    time.sleep(1)         

# More examples

## Update gateway pipeline to a bigger node

In [0]:
def gateway_update_to_larger_node():
    gateway_cluster_spec = pipelines.PipelineCluster(
                label="default",
                driver_node_type_id="r5.2xlarge",
            )

    # reenter the original specs with clusters as the only change
    w.pipelines.update(
        pipeline_id=gateway_pipeline_id,    
        name = gateway_pipeline_name, 
        gateway_definition=gateway_def, 
        notifications=notifications,
        development=True,  # for faster restart
        clusters=[ gateway_cluster_spec ] if None != gateway_cluster_spec else None
    )