In [9]:
import os
from datetime import datetime, timezone

import snowflake.connector
from diepvries.data_vault_load import DataVaultLoad
from diepvries.deserializers.snowflake_deserializer import (
    DatabaseConfiguration,
    SnowflakeDeserializer,
)

In [10]:
def get_load_sql():
    # Configuration for the Snowflake connection
    database_configuration = DatabaseConfiguration(
        database="diepvries_tutorial",
        user=os.environ.get("SNOWFLAKE_USER"),
        password=os.environ.get("SNOWFLAKE_PASSWORD"),
        warehouse=os.environ.get("SNOWFLAKE_WAREHOUSE"),
        account=os.environ.get("SNOWFLAKE_ACCOUNT"),
    )

    # Automatically deserialize known Data Vault tables
    deserializer = SnowflakeDeserializer(
        target_schema="dv",
        target_tables=["h_customer", "hs_customer", "h_order", "hs_order"],
        database_configuration=database_configuration,
    )

    # Prepare data load
    dv_load = DataVaultLoad(
        extract_schema="dv_extract",
        extract_table="order_customer",
        staging_schema="dv_staging",
        staging_table="order_customer",
        extract_start_timestamp=datetime.utcnow().replace(tzinfo=timezone.utc),
        target_tables=deserializer.deserialized_target_tables,
        source="Data from diepvries tutorial",
    )

    # Show generated SQL statements
    return dv_load.sql_load_script

In [11]:
statements = get_load_sql()

In [12]:
con = snowflake.connector.connect(
    user=os.environ.get("SNOWFLAKE_USER"),
    password=os.environ.get("SNOWFLAKE_PASSWORD"),
    account=os.environ.get("SNOWFLAKE_ACCOUNT"),
    warehouse=os.environ.get("SNOWFLAKE_WAREHOUSE"),
)

In [14]:
con.cursor().execute("USE DATABASE diepvries_tutorial")

<snowflake.connector.cursor.SnowflakeCursor at 0x7f75590e3dc0>

In [15]:
for statement in statements:
    print(statement)
    con.cursor().execute(statement)

CREATE OR REPLACE TABLE dv_staging.order_customer_20210831_135940
  (h_customer_hashkey TEXT (32) NOT NULL, r_timestamp TIMESTAMP_NTZ NOT NULL, r_source TEXT (16777216) NOT NULL, customer_id TEXT (16777216) NOT NULL, h_order_hashkey TEXT (32) NOT NULL, order_id TEXT (16777216) NOT NULL, hs_customer_hashdiff TEXT (32) NOT NULL, firstname TEXT (16777216), lastname TEXT (16777216), hs_order_hashdiff TEXT (32) NOT NULL, create_ts TIMESTAMP_NTZ, quantity NUMBER (38, 0)) AS
  SELECT MD5(COALESCE(CAST(customer_id AS TEXT (16777216)), 'dv_unknown')) AS h_customer_hashkey, CAST('2021-08-31T13:59:40.540735Z' AS TIMESTAMP) AS r_timestamp, 'Data from diepvries tutorial' AS r_source, COALESCE(customer_id, 'dv_unknown') AS customer_id, MD5(COALESCE(CAST(order_id AS TEXT (16777216)), 'dv_unknown')) AS h_order_hashkey, COALESCE(order_id, 'dv_unknown') AS order_id, MD5(REGEXP_REPLACE(COALESCE(CAST(customer_id AS TEXT (16777216)), 'dv_unknown')||'|~~|'||COALESCE(CAST(firstname AS TEXT (16777216)), '')||