# Create Source Database & Dynamic Table
First, create the destination database to use the Snowflake Connector for PostgreSQL to load data from PostgreSQL into Snowflake.

In [None]:
import warnings
warnings.filterwarnings("ignore")

from snowflake.snowpark.context import get_active_session
session = get_active_session()
session.query_tag = {"origin":"sf_sit-is", 
                     "name":"cdc_postgres", 
                     "version":{"major":1, "minor":0},
                     "attributes":{"is_quickstart":1, "source":"notebook"}}

In [None]:
USE ROLE accountadmin;

In [None]:
CREATE OR REPLACE DATABASE CONNECTOR_DEST_DB;
GRANT CREATE SCHEMA ON DATABASE CONNECTOR_DEST_DB TO APPLICATION SNOWFLAKE_CONNECTOR_FOR_POSTGRESQL;

Add the data source `PSQLDS1` (PostgreSQL Data Source 1) by calling `ADD_DATA_SOURCE`. Afterwards, add the tables from PostgreSQL into Snowflake by calling `ADD_TABLES` using the data source, PostgreSQL schema, and PostgreSQL table names.

In [None]:
CALL SNOWFLAKE_CONNECTOR_FOR_POSTGRESQL.PUBLIC.ADD_DATA_SOURCE('PSQLDS1', 'CONNECTOR_DEST_DB');
CALL SNOWFLAKE_CONNECTOR_FOR_POSTGRESQL.PUBLIC.ADD_TABLES('PSQLDS1', 'raw_cdc', ARRAY_CONSTRUCT('customers'));
CALL SNOWFLAKE_CONNECTOR_FOR_POSTGRESQL.PUBLIC.ADD_TABLES('PSQLDS1', 'raw_cdc', ARRAY_CONSTRUCT('merchants'));
CALL SNOWFLAKE_CONNECTOR_FOR_POSTGRESQL.PUBLIC.ADD_TABLES('PSQLDS1', 'raw_cdc', ARRAY_CONSTRUCT('products'));
CALL SNOWFLAKE_CONNECTOR_FOR_POSTGRESQL.PUBLIC.ADD_TABLES('PSQLDS1', 'raw_cdc', ARRAY_CONSTRUCT('transactions'));

Re-run the `check_replication_state` cell about every minute until both the `SCHEMA_INTROSPECTION_STATUS` and `SNAPSHOT_REPLICATION_STATUS` are `DONE`.

In [None]:
SELECT * FROM SNOWFLAKE_CONNECTOR_FOR_POSTGRESQL.PUBLIC.REPLICATION_STATE;

Check to see the rows that have been added after the initial load. This is empty right after the initial load and filled with data once there are changes in the PostgreSQL tables that are replicated to Snowflake after the initial load.

In [None]:
SELECT * FROM SNOWFLAKE_CONNECTOR_FOR_POSTGRESQL.PUBLIC.CONNECTOR_STATS;

Check that the base tables have been properly loaded.

In [None]:
SELECT * FROM CONNECTOR_DEST_DB."raw_cdc"."customers";

In [None]:
SELECT * FROM CONNECTOR_DEST_DB."raw_cdc"."merchants";

In [None]:
SELECT * FROM CONNECTOR_DEST_DB."raw_cdc"."products";

In [None]:
SELECT * FROM CONNECTOR_DEST_DB."raw_cdc"."transactions";

The Dynamic Table `customer_purchase_summary` is composed of specified columnrs from the `customers`, `merchants`, `products`, and `transactions` tables. When new transactions are inserted in PostgreSQL DB, the Dynamic Table will automatically populate with the new data from PostgreSQL in Snowflake.

In [None]:
CREATE OR REPLACE DYNAMIC TABLE cdc_prod.analytics.customer_purchase_summary
TARGET_LAG = '1 minute' 
WAREHOUSE = cdc_ds_wh
REFRESH_MODE = INCREMENTAL
AS
SELECT
    t.transaction_id
    , t.customer_id
    , c.age AS customer_age
    , t.product_id
    , p.product_name
    , p.product_category
    , t.merchant_id
    , m.merchant_name
    , m.merchant_category
    , t.transaction_date
    , t.transaction_time
    , t.quantity
    , t.quantity * p.price AS total_price
    , t.transaction_card
    , t.transaction_category
FROM
    CONNECTOR_DEST_DB."raw_cdc"."transactions" t
JOIN
    CONNECTOR_DEST_DB."raw_cdc"."customers" c ON t.customer_id = c.customer_id
JOIN
    CONNECTOR_DEST_DB."raw_cdc"."products" p ON t.product_id = p.product_id
JOIN
    CONNECTOR_DEST_DB."raw_cdc"."merchants" m ON t.merchant_id = m.merchant_id
AND
    m.merchant_category = p.product_category;

View the contents of the Dynamic Table.

In [None]:
SELECT * FROM cdc_prod.analytics.customer_purchase_summary;

When you're finished with this Quickstart, uncomment and run the code below to remove all Snowflake objects that were created.

In [None]:
-- USE ROLE ACCOUNTADMIN;
-- DROP USER IF EXISTS SNOWFLAKE_CONNECTOR_FOR_POSTGRESQL_AGENT_USER;

-- DROP ROLE IF EXISTS POSTGRESQL_ADMINISTRATIVE_AGENT_ROLE;
-- DROP ROLE IF EXISTS POSTGRESQL_AGENT_ROLE;
-- DROP ROLE IF EXISTS SNOWFLAKE_CONNECTOR_FOR_POSTGRESQL_AGENT_ROLE;
-- DROP ROLE IF EXISTS SNOWFLAKE_CONNECTOR_FOR_POSTGRESQL_ADMINISTRATIVE_AGENT_ROLE;

-- DROP APPLICATION IF EXISTS SNOWFLAKE_CONNECTOR_FOR_POSTGRESQL CASCADE;

-- DROP WAREHOUSE IF EXISTS SNOWFLAKE_CONNECTOR_FOR_POSTGRESQL_COMPUTE_WH;
-- DROP WAREHOUSE IF EXISTS SNOWFLAKE_CONNECTOR_FOR_POSTGRESQL_OPS_WH;

-- DROP DATABASE IF EXISTS CONNECTOR_DEST_DB;