In [0]:
import os
from databricks import sql
from databricks.sdk.core import Config
import pandas as pd
import traceback

In [0]:
CONFIG_TABLE = "vr_demo_dev.control.meta_ingestion_config"

In [0]:
def sqlQuery(query: str) -> pd.DataFrame:
    cfg = Config() # Pull environment variables for auth
    with sql.connect(
        server_hostname=cfg.host,
        http_path=f"/sql/1.0/warehouses/dd43ee29fedd958d",
        credentials_provider=lambda: cfg.authenticate
    ) as connection:
        with connection.cursor() as cursor:
            cursor.execute(query)
            return cursor.fetchall_arrow().to_pandas()

In [0]:
sqlQuery(f"""
        select distinct ingestion_group 
        from {CONFIG_TABLE} 
        order by ingestion_group
    """)["ingestion_group"].to_list()

In [0]:
df = sqlQuery(f"""
        select *, row_number() over (order by target_catalog, target_database, target_table) as idx
        from vr_demo_dev.control.meta_ingestion_config
        order by target_catalog, target_database, target_table
    """)
print(df)

In [0]:
# [f"{row['target_catalog']}.{row['target_database']}.{row['target_table']}" for row in df.iterrows()]
[f"{cat}.{db}.{tbl}" for cat, db, tbl in zip(df["target_catalog"], df["target_database"], df["target_table"])]

In [0]:
df1 = df
df2 = df.copy()
df2.at[3, 'source_type'] = 'kafka'

In [0]:
df1.compare(df2)

In [0]:
df1.filter(items=df1.compare(df2).index, axis=0)

In [0]:
df1 != df2

In [0]:
max_idx = df["idx"].max()
df2 = df
df2["idx"] = df.index + max_idx + 1
print(df2)

In [0]:
df1 = spark.table('vr_demo_dev.control.meta_ingestion_config').toPandas()
df2 = spark.table('vr_demo_dev.control.meta_ingestion_config').limit(1).toPandas()
df2['target_table'] = 'table1'

In [0]:
keys = ['ingestion_group', 'target_catalog', 'target_database', 'target_table']
df1 = df1.set_index(keys)
df2 = df2.set_index(keys)
df3 = pd.concat([df1[~df1.index.isin(df2.index)], df2]).reset_index()

In [0]:
display(df3)

In [0]:
df1 = spark.table('vr_demo_dev.control.meta_ingestion_config').toPandas()
keys = ['ingestion_group', 'target_catalog', 'target_database', 'target_table']
df1 = df1[[df1[key] != "" for key in keys]]

In [0]:
%sql 
-- NEED TO CONVERT NaT's to null
WITH s AS (
  VALUES ('test-group-1', 'vr_demo', 'group1', 'table1', True, 'file', 's3://src-bucket/group1/table1', 's3://dst-bucket/control/group1/table1', '', '', '* EXCEPT (id)', 'eventid', 'processingTime', 'time,action', '2025-01-21 17:53:50.039637', null),('test-group-1', 'vr_demo', 'group1', 'table2', True, 'file', 's3://src-bucket/group1/table2', 's3://dst-bucket/control/group1/table2', '', '', '* EXCEPT (id)', 'eventid', 'processingTime', 'time,action,asd', '2025-01-21 17:53:50.039685', null),('test-group-1', 'vr_demo', 'group1', 'table3', True, 'file', 's3://src-bucket/group1/table3', 's3://dst-bucket/control/group1/table3', '', '', '* EXCEPT (id)', 'eventid', 'processingTime', 'time,action,asd', '2025-01-21 17:53:50.039696', null) AS s(ingestion_group, target_catalog, target_database, target_table, active, source_type, source_file_path, source_file_aux_path, source_kafka_topic, source_kafka_schema, silver_transformations, silver_time_key, silver_merge_keys, silver_clustering_keys, time_last_modified, time_last_promoted)
)
MERGE INTO vr_demo_dev.control.meta_ingestion_config t
USING s
ON t.ingestion_group = s.ingestion_group AND t.target_catalog = s.target_catalog AND t.target_database = s.target_database AND t.target_table = s.target_table
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *

In [0]:
%sql
WITH s AS (
  VALUES ('test-group-1', 'vr_demo', 'group1', 'table1', True, 'file', 's3://src-bucket/group1/table1', 's3://dst-bucket/control/group1/table1', '', '', '* EXCEPT (id)', 'eventid', 'processingTime', 'time,action', '2025-01-21 20:28:03.174669', null),('test-group-1', 'vr_demo', 'group1', 'table2', True, 'file', 's3://src-bucket/group1/table2', 's3://dst-bucket/control/group1/table2', '', '', '* EXCEPT (id)', 'eventid', 'processingTime', 'time,action,asd', '2025-01-21 20:28:03.174688', null),('test-group-1', 'vr_demo', 'group1', 'table3', True, 'file', 's3://src-bucket/group1/table3', 's3://dst-bucket/control/group1/table3', '', '', '* EXCEPT (id)', 'eventid', 'processingTime', 'time,action,asd', '2025-01-21 20:28:03.174693', null) AS s(ingestion_group, target_catalog, target_database, target_table, active, source_type, source_file_path, source_file_aux_path, source_kafka_topic, source_kafka_schema, silver_transformations, silver_time_key, silver_merge_keys, silver_clustering_keys, time_last_modified, time_last_promoted)
)
MERGE INTO vr_demo_dev.control.meta_ingestion_config t
USING VALUES s
ON t.ingestion_group = s.ingestion_group AND t.target_catalog = s.target_catalog AND t.target_database = s.target_database AND t.target_table = s.target_table
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *


In [0]:
%sql create or replace table vr_demo_dev.control.meta_ingestion_config as select * except (time_last_promoted) from vr_demo_dev.control.meta_ingestion_config