![Title](images/title-page.png)

![Title](images/title-qr.png)

### What is the TPC?
The TPC is a non-profit corporation focused on developing data-centric benchmark standards and disseminating objective, verifiable data to the industry.

[tpc.org](https://www.tpc.org)

### What is TPC-DI?
The TPC-DI benchmark combines and transforms data extracted from an On-Line Transaction Processing (OTLP) system along with other sources of data, and loads it into a data warehouse.

[tpc.org/tpcdi](https://www.tpc.org/tpcdi/)

![ETL Diagram](images/tpc-di-etl-diagram.png)

TPC-DI provides `DIGen.jar` to generate the source files.

The JAR is dated and requires a 1.8 JDK.

In [None]:
!jenv local 1.8
!java -jar ~/dev/Tools/DIGen.jar --help

In [None]:
!rm -rf ~/dev/tpcdi-output
!mkdir -p ~/dev/tpcdi-output
!cd ~/dev/Tools && java -jar ~/dev/Tools/DIGen.jar -o ~/dev/tpcdi-output -sf 10

### The GitHub repository has a prebuilt CLI for easily loading the files.
[github.com/stewartbryson/dbt-tpcdi](https://www.github.com/stewartbryson/dbt-tpcdi)

In [None]:
!python tpcdi.py --help

In [None]:
!python tpcdi.py process-files --help

In [None]:
!python tpcdi.py process-files --output-directory ~/dev/tpcdi-output --file-name DailyMarket.txt --show

### If you get nothing else from this video, know that there's an easy way to load this dataset into Snowflake.

But I also wanted to show some interesting approaches using Snowpark.

All of the code samples below are snippets from the CLI with abstractions removed.

We start with a `credentials.json` file to store our Snowflake credentials. Something like this:

```json
{
    "account": "myaccount",
    "user": "myuser",
    "password": "mypassword",
    "role": "myrole",
    "warehouse": "stewart_dev",
    "database": "tpcdi",
    "schema": "digen"
}
```

We create a Snowflake session.

In [None]:
import json
from snowflake.snowpark.types import *
from snowflake.snowpark.functions import *
from snowflake.snowpark import Session

# Read the credentials.json file
with open("credentials.json") as jsonfile:
    credentials_dict = json.load(jsonfile)

# build the session
session = (
    Session
    .builder
    .configs(credentials_dict)
    .create()
)

# Constants
source_path = '/Users/stewartbryson/dev/tpcdi-output/Batch1'

Most of the files generated by `DIGen.jar` are pipe-separated files, very similar to CSV files.

These are very simple to handle. First let's upload the file to a stage:

In [None]:
# Constants
stage_path = "@tpcdi/Batch1"

# Put the file
put_result = (
    session
    .file
    .put(
        f"{source_path}/DailyMarket.txt",
        f"{stage_path}/DailyMarket.txt",
        parallel=4,
        auto_compress=True,
    )
)

# Report back file results
for result in put_result:
    print(f"File {result.source}: {result.status}")

And now we'll create a table from that file:

In [None]:
table_name = 'daily_market'
# Define the schema
schema = StructType([
                StructField("DM_DATE", DateType(), False),
                StructField("DM_S_SYMB", StringType(), False),
                StructField("DM_CLOSE", FloatType(), False),
                StructField("DM_HIGH", FloatType(), False),
                StructField("DM_LOW", FloatType(), False),
                StructField("DM_VOL", FloatType(), False),
        ])

# create a table from a DataFrame
df = (
    session
    .read
    .schema(schema)
    .option("field_delimiter", '|')
    .csv(f"{stage_path}/DailyMarket.txt")
    .write
    .mode("overwrite")
    .save_as_table(table_name)
)

print(f"{table_name.upper()} table created.")

In [None]:
# show the table
df = (
    session 
    .table(table_name) 
    .show()
)

The `DIGen.jar` utility generates a series of "finwire" files.

These files represent market history over time.

They are fixed-width, multi-format files.

Here's a sample of one of the files that has one of each type of record: `FIN`, `SEC`, and `CMP`:

In [None]:
!cat devrel/multi-record.txt

We'll start by uploading all the files:

In [None]:
from pathlib import Path

# File paths
stage_path = "@tpcdi/Batch1/FINWIRE"

# glob the files
pathlist = (
    Path(source_path)
    .glob("FINWIRE??????")
)

for file in pathlist:
    # put the file(s) in the stage
    put_result = (
        session 
        .file
        .put(
            str(file), 
            stage_path, 
            parallel=4, 
            auto_compress=True
        )
    )
    for result in put_result:
        print(f"File {result.source}: {result.status}")

The `CMP`, `SEC`, and `FIN` records all have two fields in common, so we want to create a generic DataFrame that contains the shared logic and we’ll save that DataFrame as a Snowflake temporary table called `FINWIRE`. We'll use `WITH_COLUMN` and `SUBSTRING`

In [None]:
# These are fixed-width fields, so read the entire line in as "line"
schema = StructType([
        StructField("line", StringType(), False),
])

# generic dataframe for all record types
# create a temporary table
# The delimiter '|' seems safer
df = (
    session
    .read
    .schema(schema)
    .option('field_delimiter', '|')
    .csv(stage_path)
    .with_column(
        'pts', 
        to_timestamp(
            substring(col("line"), lit(0), lit(15)), 
            lit("yyyymmdd-hhmiss")
        )
    )
    .with_column(
        'rec_type', 
        substring(col("line"), lit(16), lit(3))
    )
    .write
    .mode("overwrite")
    .save_as_table("finwire", table_type="temporary")
)

# let's see the table
df = (
    session 
    .table('finwire') 
    .show()
)

Now we can create the three separate tables from this temporary table using the `WITH_COLUMN` and `SUBSTRING` functions.

I'll only show the Security table as an example, but the other two are done the same way:

In [None]:
# SEC record types
table_name = 'sec'
df = (
    session
    .table('finwire')
    .where(col('rec_type') == 'SEC')
    .withColumn(
        'symbol', 
        substring(col("line"), lit(19), lit(15))
    )
    .withColumn(
        'issue_type', 
        substring(col("line"), lit(34), lit(6))
    )
    .withColumn(
        'status', 
        substring(col("line"), lit(40), lit(4))
    )
    .withColumn(
        'name', 
        substring(col("line"), lit(44), lit(70))
    )
    .withColumn(
        'ex_id', 
        substring(col("line"), lit(114), lit(6))
    )
    .withColumn(
        'sh_out', 
        substring(col("line"), lit(120), lit(13))
    )
    .withColumn(
        'first_trade_date', 
        substring(col("line"), lit(133), lit(8))
    )
    .withColumn(
        'first_exchange_date', 
        substring(col("line"), lit(141), lit(8))
    )
    .withColumn(
        'dividend', 
        substring(col("line"), lit(149), lit(12))
    )
    .withColumn(
        'co_name_or_cik', 
        substring(col("line"), lit(161), lit(60))
    )
    # these columns are no longer relevant
    .drop(col("line"), col("rec_type"))
    .write
    .mode("overwrite")
    .save_as_table(table_name)
)

print(f"{table_name.upper()} table created.")

In [None]:

# let's see the table
df = (
    session 
    .table('sec') 
    .show()
)

The `DIGen.jar` utility creates a single XML file called `CustomerMgmt.xml`, with a sample below:

```xml
<?xml version="1.0" encoding="UTF-8"?>
<TPCDI:Actions xmlns:TPCDI="http://www.tpc.org/tpc-di">
 <TPCDI:Action ActionType="NEW" ActionTS="2007-07-07T02:56:25">
  <Customer C_ID="0" C_TAX_ID="923-54-6498" C_GNDR="F" C_TIER="3" C_DOB="1940-12-02">
   <Name>
    <C_L_NAME>Joannis</C_L_NAME>
    <C_F_NAME>Adara</C_F_NAME>
    <C_M_NAME/>
   </Name>
   <Address>
    <C_ADLINE1>4779 Weller Way</C_ADLINE1>
    <C_ADLINE2/>
    <C_ZIPCODE>92624</C_ZIPCODE>
    <C_CITY>Columbus</C_CITY>
    <C_STATE_PROV>Ontario</C_STATE_PROV>
    <C_CTRY>Canada</C_CTRY>
   </Address>
   <ContactInfo>
    <C_PRIM_EMAIL>Adara.Joannis@moose-mail.com</C_PRIM_EMAIL>
    <C_ALT_EMAIL>Adara.Joannis@gmx.com</C_ALT_EMAIL>
    <C_PHONE_1>
     <C_CTRY_CODE>1</C_CTRY_CODE>
     <C_AREA_CODE>872</C_AREA_CODE>
     <C_LOCAL>523-8928</C_LOCAL>
     <C_EXT/>
    </C_PHONE_1>
    <C_PHONE_2>
     <C_CTRY_CODE/>
     <C_AREA_CODE/>
     <C_LOCAL>492-3961</C_LOCAL>
     <C_EXT/>
    </C_PHONE_2>
    <C_PHONE_3>
     <C_CTRY_CODE/>
     <C_AREA_CODE/>
     <C_LOCAL/>
     <C_EXT/>
    </C_PHONE_3>
   </ContactInfo>
   <TaxInfo>
    <C_LCL_TX_ID>CA3</C_LCL_TX_ID>
    <C_NAT_TX_ID>YT3</C_NAT_TX_ID>
   </TaxInfo>
   <Account CA_ID="0" CA_TAX_ST="1">
    <CA_B_ID>17713</CA_B_ID>
    <CA_NAME>CJlmMuFyibKOmKLHIaTeWugvCgZdmcfpDsYb</CA_NAME>
   </Account>
  </Customer>
 </TPCDI:Action>
</TPCDI:Actions>
```

The hierarchical representation of a TPCDI:Action record, with @ signifying a node attribute as opposed to an element, is shown below:

```
|-- TPCDI:Action
    |-- @ActionType: string
    |-- @ActionTS: timestamp
    |-- Customer
        |-- @C_ID: number
        |-- @C_TAX_ID: string
        |-- @C_GNDR: string
        |-- @C_TIER: number
        |-- @C_DOB: date
        |-- Name
            |-- C_F_NAME: string
            |-- C_L_NAME: string
            |-- C_M_NAME: string
        |-- Address
            |-- C_ADLINE1: string
            |-- C_ADLINE2: string
            |-- C_CITY: string
            |-- C_CTRY: string
            |-- C_STATE_PROV: string
            |-- C_ZIPCODE: string
        |-- ContactInfo
            |-- C_ALT_EMAIL: string
            |-- C_PHONE_1
                |-- C_AREA_CODE: number
                |-- C_CTRY_CODE: number
                |-- C_EXT: long
                |-- C_LOCAL: string
            |-- C_PHONE_2
                |-- C_AREA_CODE: number
                |-- C_CTRY_CODE: number
                |-- C_EXT: number
                |-- C_LOCAL: string
            |-- C_PHONE_3
                |-- C_AREA_CODE: number
                |-- C_CTRY_CODE: number
                |-- C_EXT: number
                |-- C_LOCAL: string
        |-- TaxInfo
            |-- C_LCL_TX_ID: string
            |-- C_NAT_TX_ID: string
        |-- Account
            |-- CA_B_ID: number
            |-- CA_NAME: string
            |-- @CA_ID: number
            |-- @CA_TAX_ST: number
```


Let's create a DataFrame from the XML file and see what we get:

In [None]:
# File paths
stage_path = "@tpcdi/Batch1"

# Put the file
put_result = (
    session
    .file
    .put(
        f"{source_path}/CustomerMgmt.xml",
        f"{stage_path}/CustomerMgmt.xml",
        parallel=4,
        auto_compress=True,
    )
)
for result in put_result:
    print(f"File {result.source}: {result.status}")

# Read the XML file into a DataFrame and show it
df = (
    session
    .read
    .option('STRIP_OUTER_ELEMENT', True) # Strips TPCDI:Actions
    .xml(f"{stage_path}/CustomerMgmt.xml")
    .show(1, 100)
)

Snowflake does not support simple dot notation for XML the way it does for JSON.

Instead we have to pair the `GET()` function with an `XMLGET()`, which can be quite tedious.

So we can create helper functions to abstract the toil:

In [None]:
# Simplifies retrieving XML elements
def get_xml_element(
        column:str,
        element:str,
        datatype:str,
        with_alias:bool = True
):
    new_element = (
        get(
            xmlget(
                col(column),
                lit(element),
            ),
            lit('$')
        )
        .cast(datatype)
    )

    # alias needs to be optional
    return (
        new_element.alias(element) if with_alias else new_element
    )

# Simplifies retrieving XML attributes
def get_xml_attribute(
        column:str,
        attribute:str,
        datatype:str,
        with_alias:bool = True
):
    new_attribute = (
        get(
            col(column),
            lit(f"@{attribute}")
        )
        .cast(datatype)
    )

    # alias needs to be optional
    return (
        new_attribute.alias(attribute) if with_alias else new_attribute
    )

# Constructs a phone number from multiple nested fields
def get_phone_number(
        phone_id:str,
        separator:str = '-'
):
    return (
        concat (
            get_xml_element(f"phone{phone_id}", 'C_CTRY_CODE', 'STRING', False),
            when(get_xml_element(f"phone{phone_id}", 'C_CTRY_CODE', 'STRING', False) == '', '')
            .otherwise(separator),
            get_xml_element(f"phone{phone_id}", 'C_AREA_CODE', 'STRING', False),
            when(get_xml_element(f"phone{phone_id}", 'C_AREA_CODE', 'STRING', False) == '', '')
            .otherwise(separator),
            get_xml_element(f"phone{phone_id}", 'C_LOCAL', 'STRING', False),
            when(get_xml_element(f"phone{phone_id}", 'C_EXT', 'STRING', False) == '', '')
            .otherwise(" ext: "),
            get_xml_element(f"phone{phone_id}", 'C_EXT', 'STRING', False)
        )
        .alias(f"c_phone_{phone_id}")
    )

Now let's put it all together and create our `customer_mgmt` table:

In [None]:
table_name = 'customer_mgmt'
df = (
    session
    .read
    .option('STRIP_OUTER_ELEMENT', True) # Strips the TPCDI:Actions node
    .xml(f"{stage_path}/CustomerMgmt.xml")
    .select(
        # flatten out all of the nested elements
        xmlget(col('$1'), lit('Customer'), 0).alias('customer'),
        xmlget(col('customer'), lit('Name'), 0).alias('name'),
        xmlget(col('customer'), lit('Address'), 0).alias('address'),
        xmlget(col('customer'), lit('ContactInfo'), 0).alias('contact_info'),
        xmlget(col('contact_info'), lit('C_PHONE_1')).alias('phone1'),
        xmlget(col('contact_info'), lit('C_PHONE_2')).alias('phone2'),
        xmlget(col('contact_info'), lit('C_PHONE_3')).alias('phone3'),
        xmlget(col('customer'), lit('TaxInfo'), 0).alias('tax_info'),
        xmlget(col('customer'), lit('Account'), 0).alias('account'),
        # get the Action attributes
        get_xml_attribute('$1','ActionType','STRING'),
        get_xml_attribute('$1','ActionTS','STRING'),
    )
    .select(
        # Handling Action attributes
        to_timestamp(
            col('ActionTs'),
            lit('yyyy-mm-ddThh:mi:ss')
        ).alias('action_ts'),
        col('ActionType').alias('ACTION_TYPE'),
        # Get Customer Attributes
        get_xml_attribute('customer','C_ID','NUMBER'),
        get_xml_attribute('customer','C_TAX_ID','STRING'),
        get_xml_attribute('customer','C_GNDR','STRING'),
        # Had to disable auto-aliasing
        try_cast(
            get_xml_attribute('customer','C_TIER','STRING', False),
            'NUMBER'
        ).alias('c_tier'),
        get_xml_attribute('customer','C_DOB','DATE'),
        # Get Name elements
        get_xml_element('name','C_L_NAME','STRING'),
        get_xml_element('name','C_F_NAME','STRING'),
        get_xml_element('name','C_M_NAME','STRING'),
        # Get Address elements
        get_xml_element('address','C_ADLINE1','STRING'),
        get_xml_element('address', 'C_ADLINE2', 'STRING'),
        get_xml_element('address','C_ZIPCODE','STRING'),
        get_xml_element('address','C_CITY','STRING'),
        get_xml_element('address','C_STATE_PROV','STRING'),
        get_xml_element('address','C_CTRY','STRING'),
        # Get Contact Info elements
        get_xml_element('contact_info','C_PRIM_EMAIL','STRING'),
        get_xml_element('contact_info','C_ALT_EMAIL','STRING'),
        # Contruct phone numbers from multi-nested elements
        get_phone_number('1'),
        get_phone_number('2'),
        get_phone_number('3'),
        # Get TaxInfo elements
        get_xml_element('tax_info','C_LCL_TX_ID','STRING'),
        get_xml_element('tax_info','C_NAT_TX_ID','STRING'),
        # Get Account Attributes
        get_xml_attribute('account','CA_ID','STRING'),
        get_xml_attribute('account','CA_TAX_ST','NUMBER'),
        # Get Account elements
        get_xml_element('account','CA_B_ID','NUMBER'),
        get_xml_element('account','CA_NAME','STRING'),
    )
    .write
    .mode("overwrite")
    .save_as_table(table_name)
)

print(f"{table_name.upper()} table created.")

In [None]:
df = (
    session
    .table('customer_mgmt')
    .select(
        col('action_ts'),
        col('c_id'),
        col('c_tier'),
        col('c_phone_1')
    )
    .show()
)

![ETL Diagram](images/tpc-di-logical-model.png)

# When we Google "dbt dynamic tables":

![Google Search](images/dbt-dynamic-tables.png)

# It's not as simple as this

![Conflict](images/refresh-conflict.png)

# dbt is more than just a job scheduler.
Dynamic Tables need to be (re)created in the correct order.
The dependent relationships become very complex as the number of dependencies increases.
We don't want to maintain a create script with all the tables in the correct order.

dbt understands your DAG and can infer the relationships.

![dbt DAG](images/dbt-dag.png)

### Enabling dynamic tables in our dbt project
Be on version `1.7.0` of `dbt-snowflake` and set a new materialization type in our `dbt_project.yml` file:

```yaml
models:
  dbt_tpcdi:
    example:
      +materialized: view
    bronze:
      +schema: bronze
      +materialized: dynamic_table
      +snowflake_warehouse: tpcdi_large
      +target_lag: downstream
    silver:
      +schema: silver
      +materialized: dynamic_table
      +snowflake_warehouse: tpcdi_large
      +target_lag: '10 minutes'
    gold:
      +schema: gold
      +materialized: dynamic_table
      +snowflake_warehouse: tpcdi_large
      +target_lag: '20 minutes'
    work:
      +schema: work
      +materialized: ephemeral
```

In [None]:
!dbt build

### dbt also has Tests

We can run them when we create the Dynamic Table:

In [None]:
!dbt build --select fact_trade

Or we can schedule them to run periodically:

In [None]:
!dbt test

### dbt is more than a scheduler

1. Cloud development environment for those that prefer it.
1. CI/CD workflows for promoting Dynamic Table changes into Production.
1. Perhaps there's promise in the Semantic Layer.

### Snowflake has its own version of the DAG
However, this only exists after all the Dynamic Tables have been created.
dbt understands the DAG _before_ it's created.

![Snowflake DAG](images/snowflake-dag.png)

# Clean-up

In [None]:
!python tpcdi.py drop-schema --schema dl_gold
!python tpcdi.py drop-schema --schema dl_silver
!python tpcdi.py drop-schema --schema dl_bronze
!python tpcdi.py drop-schema --schema dl_work