In [0]:
%sql
select * from workspace.default.shipments1

###Foreign Catalog Connection
A Foreign Catalog is a Unity Catalog object that allows Databricks to reference and query metadata that lives outside Databricks, in an external metastore or database, without copying the data.

Databricks can securely access an external database (Google Cloud SQL) using Unity Catalogâ€“managed connections and foreign catalogs, without ingesting or copying the data.

![](/Workspace/Users/infoblisstech@gmail.com/databricks-code-repo/6_lakeflow_pipelines/connection_foreign_catalog.png)

**Source DB Side**
CREATE TABLE shipments (
    shipment_id INT PRIMARY KEY,
    first_name  VARCHAR(50),
    last_name   VARCHAR(50),
    age         INT,
    role        VARCHAR(50),
    insert_ts   TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    update_ts   TIMESTAMP DEFAULT CURRENT_TIMESTAMP);

INSERT INTO shipments
(shipment_id, first_name, last_name, age, role)
VALUES
(5000001, 'Rajesh',  'Kumar', 35, 'Driver'),
(5000002, 'Anita',   'Sharma',29, 'Dispatcher'),
(5000003, 'Michael', 'Chen',  41, 'Warehouse Manager'),
(5000004, 'Suresh',  NULL,    52, 'Loader'),
(5000005, 'Priya',   'Iyer',  27, 'Analyst');


MySQL (shipments table)-  (Foreign Catalog) - Databricks (External Table (foreign catalog)) -> CDC Filter (insert_ts / update_ts) -> Bronze Delta Table 


####1. Source DB Readiness

**Create the following table in source Database**
create database if not exists logistics;

CREATE TABLE logistics.shipments (
  shipment_id INT PRIMARY KEY,
  first_name  VARCHAR(50),
  last_name   VARCHAR(50),
  age         INT,
  role        VARCHAR(50),
  updated_at  TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);


INSERT INTO logistics.shipments VALUES
(5000001,'Rajesh','Kumar',35,'Driver',CURRENT_TIMESTAMP),
(5000002,'Anita','Sharma',29,'Dispatcher',CURRENT_TIMESTAMP),
(5000003,'Michael','Chen',41,'Warehouse Manager',CURRENT_TIMESTAMP),
(5000004,'Suresh',NULL,52,'Loader',CURRENT_TIMESTAMP),
(5000005,'Priya','Iyer',27,'Analyst',CURRENT_TIMESTAMP);


####2. Create Connection & Foreign Catalog
**Foreign Catalog** is a Unity Catalog object that allows Databricks to reference and query metadata that lives outside Databricks, in an external metastore or database, without copying the data.

**Connection**
- Avoid hard-coding usernames/passwords in notebooks
- Enable centralized governance via Unity Catalog
- Allow multiple users and tables to reuse the same connection
- Support SQL-based external access

In [0]:
%sql
CREATE CONNECTION gcp_mysql_conn_we47
TYPE mysql
OPTIONS (
  host '34.123.166.158',
  port '3306',
  user 'devuser',
  password 'will ping in our group'
);

In [0]:
%sql
CREATE FOREIGN CATALOG gcp_mysql_fc_we471
USING CONNECTION gcp_mysql_conn_we47;

In [0]:
%sql
DESCRIBE CONNECTION gcp_mysql_fc_we471;

In [0]:
%sql
show catalogs;

In [0]:
%sql
select * from gcp_mysql_fc_we471.logistics.shipments1;

####3. Bronze table (Incremental ingestion)
Let me simply use Foreign catalog as a data ingestion mechanism (rather than using traditional JDBC or third party tools to ingest data from Database).

In [0]:
%sql
--Historical (Onetime) load
CREATE TABLE IF NOT EXISTS catalog3_we47.schema3_we47.bronze_shipments1
USING DELTA
AS SELECT
  shipment_id,
  first_name,
  last_name,
  age,
  role,
  updated_at
FROM gcp_mysql_fc_we471.logistics.shipments1;

In [0]:
%sql
select * from catalog3_we47.schema3_we47.bronze_shipments1

Try to achieve SCD Type2 (just like that)
####Insert data into the source Database table and run the incremental load
INSERT INTO logistics.shipments1 VALUES (5000006,'Bala','Chander',35,'DE',CURRENT_TIMESTAMP);
####Update data into the source Database table and run the incremental load
update logistics.shipments1 set role='Databricks Data Engineer',updated_at=CURRENT_TIMESTAMP where shipment_id=5000006;

In [0]:
%sql
--Incremental load insert/update
INSERT INTO catalog3_we47.schema3_we47.bronze_shipments1
SELECT
  shipment_id,
  first_name,
  last_name,
  age,
  role,
  updated_at
FROM gcp_mysql_fc_we471.logistics.shipments1
WHERE updated_at >
(
  SELECT COALESCE(MAX(updated_at), '1970-01-01')
  FROM catalog3_we47.schema3_we47.bronze_shipments1
);


In [0]:
%sql
select *,row_number() over(partition by shipment_id order by updated_at desc) rno from catalog3_we47.schema3_we47.bronze_shipments1
qualify rno=1;--latest version or history you can access

**We will learn a complete VVV Important Cycle of CDC to CDF to SCD1 & SCD2 features when we move to Cloud, without too much of coding offered by Databricks**