![Title](images/title-page.png)

![Title](images/title-qr.png)

### What is the TPC?
The TPC is a non-profit corporation focused on developing data-centric benchmark standards and disseminating objective, verifiable data to the industry.

### What is TPC-DI?
The TPC-DI benchmark combines and transforms data extracted from an On-Line Transaction Processing (OTLP) system along with other sources of data, and loads it into a data warehouse.

![ETL Diagram](images/tpc-di-etl-diagram.png)

TPC-DI provides `DIGen.jar` to generate the source files.

The JAR is dated and requires a 1.8 JDK.

In [28]:
!jenv local 1.8
!java -jar ~/dev/Tools/DIGen.jar --help

usage: DIGen
 -h                   print this message
 -jvm <JVM options>   JVM options. E.g. -jvm "-Xms1g -Xmx2g"
 -o <directory>       Specify output directory.  Default is output.
 -sf <sf>             Scale factor.  Default value is 5. (range: 3 -
                      2147483647
 -v                   print DIGen version


In [31]:
!rm -rf ~/dev/tpcdi-output
!mkdir -p ~/dev/tpcdi-output
!cd ~/dev/Tools && java -jar ~/dev/Tools/DIGen.jar -o ~/dev/tpcdi-output -sf 5

/Users/stewartbryson/dev/tpcdi-output
########################################################################################################################
                                                  PDGF v2.5_#1343_b4177
                                            Parallel Data Generation Framework
                (c)bankmark UG (haftungsbeschraenkt), Frank M., Danisch M., Rabl T. http://www.bankmark.de
########################################################################################################################
                                                   License information
                            The Software is provided to you as part of the TPC Benchmark DI. 
 When using this software you must agree to the license provided in LICENSE.TXT of this package. Use is restricted to TPC
DI benchmarking purposes as specified in LICENSE.TXT. If you would like to use the software for other purposes, you must
contact bankmark UG (haftungsbeschraenkt) (http://www.

### The GitHub repository has a prebuilt CLI for easily loading the files.
### https://github.com/stewartbryson/dbt-tpcdi

In [33]:
!python tpcdi.py --help

[1m                                                                                [0m
[1m [0m[1;33mUsage: [0m[1mtpcdi.py [OPTIONS] COMMAND [ARGS]...[0m[1m                                   [0m[1m [0m
[1m                                                                                [0m
 A utility for loading TPC-DI generated files into Snowflake.                   
                                                                                
[2m╭─[0m[2m Options [0m[2m───────────────────────────────────────────────────────────────────[0m[2m─╮[0m
[2m│[0m [1;36m-[0m[1;36m-install[0m[1;36m-completion[0m        [1;2;33m[[0m[1;33mbash[0m[1;2;33m|[0m[1;33mzsh[0m[1;2;33m|[0m[1;33mfish[0m[1;2;33m|[0m[1;33mpowershe[0m  Install completion for  [2m│[0m
[2m│[0m                             [1;33mll[0m[1;2;33m|[0m[1;33mpwsh[0m[1;2;33m][0m[1;33m               [0m  the specified shell.    [2m│[0m
[2m│[0m                                

In [40]:
!python tpcdi.py process-files --help

[1m                                                                                [0m
[1m [0m[1;33mUsage: [0m[1mtpcdi.py process-files [OPTIONS][0m[1m                                       [0m[1m [0m
[1m                                                                                [0m
 Upload a file or files into the stage and build the dependent tables.          
                                                                                
[2m╭─[0m[2m Options [0m[2m───────────────────────────────────────────────────────────────────[0m[2m─╮[0m
[2m│[0m [31m*[0m  [1;36m-[0m[1;36m-output[0m[1;36m-directory[0m                        [1;33mTEXT   [0m  The output directory   [2m│[0m
[2m│[0m                                                       from the TPC-DI        [2m│[0m
[2m│[0m                                                       DIGen.jar execution.   [2m│[0m
[2m│[0m                                                       [2m[default: N

In [6]:
!python tpcdi.py process-files --output-directory ~/dev/tpcdi-output --file-name DailyMarket.txt --show

File DailyMarket.txt: SKIPPED
----------------------------------------------------------------------------------
|"DM_DATE"   |"DM_S_SYMB"      |"DM_CLOSE"  |"DM_HIGH"  |"DM_LOW"  |"DM_VOL"     |
----------------------------------------------------------------------------------
|2015-07-06  |AAAAAAAAAAAAERN  |242.93      |284.42     |185.08    |111904727.0  |
|2015-07-06  |AAAAAAAAAAAAEYJ  |445.46      |522.3      |386.48    |78849320.0   |
|2015-07-06  |AAAAAAAAAAAAEVC  |910.59      |1148.89    |723.37    |807515829.0  |
|2015-07-06  |AAAAAAAAAAAACEZ  |647.07      |756.68     |473.3     |693226268.0  |
|2015-07-06  |AAAAAAAAAAAADOY  |385.01      |564.67     |295.63    |34628570.0   |
|2015-07-06  |AAAAAAAAAAAADSD  |28.01       |34.59      |23.66     |47032973.0   |
|2015-07-06  |AAAAAAAAAAAAELH  |186.85      |249.13     |170.26    |79305649.0   |
|2015-07-06  |AAAAAAAAAAAAAXX  |880.03      |990.35     |727.51    |353491380.0  |
|2015-07-06  |AAAAAAAAAAAABVO  |911.31      |1143.78    |

If you get nothing else from this video, know that there's an easy way to load this dataset into Snowflake.

But I also wanted to show some interesting approaches using Snowpark.

All of the code samples below are snippets from the CLI with abstractions removed.

We start with a `credentials.json` file to store our Snowflake credentials. Something like this:

```json
{
    "account": "myaccount",
    "user": "myuser",
    "password": "mypassword",
    "role": "myrole",
    "warehouse": "stewart_dev",
    "database": "tpc_di",
    "schema": "digen"
}
```

Then we can make a connection to Snowflake.

In [1]:
import sys, json
from snowflake.snowpark import Session, DataFrame
from snowflake.snowpark.types import *
from snowflake.snowpark.functions import *
from pathlib import Path

# Read the credentials.json file
with open("credentials.json") as jsonfile:
    credentials_dict = json.load(jsonfile)

# build the session
session = (
    Session
    .builder
    .configs(credentials_dict)
    .create()
)

Most of the files generated by `DIGen.jar` are pipe-separated files, very similar to CSV files.

These are very simple to handle. First let's upload the file to a stage:

In [15]:
# File paths
source_path = '/Users/stewartbryson/dev/tpcdi-output/Batch1'
stage_path = "@tpcdi/Batch1"

# Put the file
put_result = (
    session
    .file
    .put(
        f"{source_path}/DailyMarket.txt",
        f"{stage_path}/DailyMarket.txt",
        parallel=4,
        auto_compress=True,
    )
)
for result in put_result:
    print(f"File {result.source}: {result.status}")

File DailyMarket.txt: SKIPPED


And now we'll create a table from that file:

In [16]:
# Define the schema
schema = StructType([
                StructField("DM_DATE", DateType(), False),
                StructField("DM_S_SYMB", StringType(), False),
                StructField("DM_CLOSE", FloatType(), False),
                StructField("DM_HIGH", FloatType(), False),
                StructField("DM_LOW", FloatType(), False),
                StructField("DM_VOL", FloatType(), False),
        ])

# create a table from a DataFrame
df = (
    session
    .read
    .schema(schema)
    .option("field_delimiter", '|')
    .csv(f"{stage_path}/DailyMarket.txt")
    .write
    .mode("overwrite")
    .save_as_table('daily_market')
)

# show the table
df = (
    session 
    .table('daily_market') 
    .show()
)


----------------------------------------------------------------------------------
|"DM_DATE"   |"DM_S_SYMB"      |"DM_CLOSE"  |"DM_HIGH"  |"DM_LOW"  |"DM_VOL"     |
----------------------------------------------------------------------------------
|2016-12-18  |AAAAAAAAAAAABZF  |961.07      |1394.14    |847.64    |709969048.0  |
|2016-12-18  |AAAAAAAAAAAABGY  |134.65      |173.1      |106.55    |122085128.0  |
|2016-12-18  |AAAAAAAAAAAACFY  |497.2       |741.06     |427.71    |8263059.0    |
|2016-12-18  |AAAAAAAAAAAACJW  |697.83      |988.37     |630.96    |576146934.0  |
|2016-12-18  |AAAAAAAAAAAAELN  |512.4       |699.93     |508.04    |532344015.0  |
|2016-12-18  |AAAAAAAAAAAADQU  |115.56      |146.54     |71.32     |808265496.0  |
|2016-12-18  |AAAAAAAAAAAABSG  |38.99       |39.65      |29.36     |696226368.0  |
|2016-12-18  |AAAAAAAAAAAAAOO  |640.01      |905.5      |581.14    |828920058.0  |
|2016-12-18  |AAAAAAAAAAAACYZ  |396.78      |536.03     |233.16    |713253731.0  |
|201

The `DIGen.jar` utility generates a series of "finwire" files.

These files represent market history over time.

They are fixed-width, multi-format files.
For instance, the following sample has one of each type of record: `FIN`, `SEC`, and `CMP`:

In [52]:
!cat devrel/multi-record.txt

20151230-152248FIN201542015100120151230    4880880089.63    2473473307.30        4.82        4.42        0.51     200321223.47  139284472514.02    9402305760.19    512872010    5597926720000001595
20151230-152511SECAAAAAAAAAAAAKVDPREF_AACTVDJBJXyQHLBvn EEOGAOvUNgL XwrOxQUBMrgPv                                AMEX  982113436    1903022619730704        1.200000000254
20151230-163207CMPWWfcsOHprIDIUsPfRLrcLPlxaQ                                  0000004432ACTVMCA   1873092521088 Vessey Crescent                                                                                                                                           M5D 1Z1     Winnipeg                 AL                  United States of AmericaMoreno                                        rlRIDCNz dVGrEzomCXIvZVZzFzxCzbGYIEbAXJMJlsYUQEV

We'll start by uploading all the files:

In [19]:
# File paths
stage_path = "@tpcdi/Batch1/FINWIRE"

# glob the files
pathlist = (
    Path(source_path)
    .glob("FINWIRE??????")
)

for file in pathlist:
    # put the file(s) in the stage
    put_result = (
        session 
        .file
        .put(
            str(file), 
            stage_path, 
            parallel=4, 
            auto_compress=True
        )
    )
    for result in put_result:
        print(f"File {result.source}: {result.status}")

File FINWIRE2001Q3: SKIPPED
File FINWIRE2001Q4: SKIPPED
File FINWIRE1997Q4: SKIPPED
File FINWIRE1970Q3: SKIPPED
File FINWIRE1997Q3: SKIPPED
File FINWIRE1970Q4: SKIPPED
File FINWIRE1999Q2: SKIPPED
File FINWIRE1991Q1: SKIPPED
File FINWIRE2005Q2: SKIPPED
File FINWIRE1976Q1: SKIPPED
File FINWIRE1993Q2: SKIPPED
File FINWIRE1974Q2: SKIPPED
File FINWIRE2007Q1: SKIPPED
File FINWIRE1997Q2: SKIPPED
File FINWIRE1978Q1: SKIPPED
File FINWIRE1970Q2: SKIPPED
File FINWIRE2003Q1: SKIPPED
File FINWIRE1995Q1: SKIPPED
File FINWIRE2009Q1: SKIPPED
File FINWIRE2001Q2: SKIPPED
File FINWIRE1972Q1: SKIPPED
File FINWIRE1974Q3: SKIPPED
File FINWIRE1993Q4: SKIPPED
File FINWIRE1974Q4: SKIPPED
File FINWIRE1993Q3: SKIPPED
File FINWIRE1999Q4: SKIPPED
File FINWIRE2005Q3: SKIPPED
File FINWIRE2005Q4: SKIPPED
File FINWIRE1999Q3: SKIPPED
File FINWIRE1987Q2: SKIPPED
File FINWIRE1968Q1: SKIPPED
File FINWIRE2013Q1: SKIPPED
File FINWIRE1985Q1: SKIPPED
File FINWIRE2011Q2: SKIPPED
File FINWIRE1983Q4: SKIPPED
File FINWIRE1983Q3: 

The CMP, SEC, and FIN records all have two fields in common, so we want to create a generic DataFrame that contains the shared logic and we’ll save that DataFrame as a Snowflake temporary table called FINWIRE:

In [20]:
# These are fixed-width fields, so read the entire line in as "line"
schema = StructType([
        StructField("line", StringType(), False),
])

# generic dataframe for all record types
# create a temporary table
# The delimiter '|' seems safer
df = (
    session
    .read
    .schema(schema)
    .option('field_delimiter', '|')
    .csv(stage_path)
    .with_column(
        'pts', 
        to_timestamp(
            substring(col("line"), lit(0), lit(15)), 
            lit("yyyymmdd-hhmiss")
        )
    )
    .with_column(
        'rec_type', 
        substring(col("line"), lit(16), lit(3))
    )
    .write
    .mode("overwrite")
    .save_as_table("finwire", table_type="temporary")
)

# let's see the table
df = (
    session 
    .table('finwire') 
    .show()
)

-----------------------------------------------------------------------------------------
|"LINE"                                              |"PTS"                |"REC_TYPE"  |
-----------------------------------------------------------------------------------------
|19670401-065923FIN196721967040119670401    9288...  |1967-04-01 06:59:23  |FIN         |
|19670401-161220FIN196721967040119670401    6180...  |1967-04-01 16:12:20  |FIN         |
|19670402-012108FIN196721967040119670402     818...  |1967-04-02 01:21:08  |FIN         |
|19670402-140519FIN196721967040119670402    3590...  |1967-04-02 14:05:19  |FIN         |
|19670403-051650FIN196721967040119670403    6457...  |1967-04-03 05:16:50  |FIN         |
|19670403-194201FIN196721967040119670403    6692...  |1967-04-03 19:42:01  |FIN         |
|19670404-011711FIN196721967040119670404    5352...  |1967-04-04 01:17:11  |FIN         |
|19670404-023010FIN196721967040119670404    7901...  |1967-04-04 02:30:10  |FIN         |
|19670404-

Now I can create the three separate tables from this temporary table using `WITH_COLUMN` and `SUBSTRING`.

I'll only show the Security table as an example, but the other two are done the same way:

In [22]:
# SEC record types
table_name = 'sec'
df = (
    session
    .table('finwire')
    .where(col('rec_type') == 'SEC')
    .withColumn(
        'symbol', 
        substring(col("line"), lit(19), lit(15))
    )
    .withColumn(
        'issue_type', 
        substring(col("line"), lit(34), lit(6))
    )
    .withColumn(
        'status', 
        substring(col("line"), lit(40), lit(4))
    )
    .withColumn(
        'name', 
        substring(col("line"), lit(44), lit(70))
    )
    .withColumn(
        'ex_id', 
        substring(col("line"), lit(114), lit(6))
    )
    .withColumn(
        'sh_out', 
        substring(col("line"), lit(120), lit(13))
    )
    .withColumn(
        'first_trade_date', 
        substring(col("line"), lit(133), lit(8))
    )
    .withColumn(
        'first_exchange_date', 
        substring(col("line"), lit(141), lit(8))
    )
    .withColumn(
        'dividend', 
        substring(col("line"), lit(149), lit(12))
    )
    .withColumn(
        'co_name_or_cik', 
        substring(col("line"), lit(161), lit(60))
    )
    .drop(col("line"), col("rec_type"))
    .write
    .mode("overwrite")
    .save_as_table(table_name)
)

print(f"{table_name.upper()} table created.")

# let's see the table
df = (
    session 
    .table('sec') 
    .show()
)

SEC table created.
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"PTS"                |"SYMBOL"         |"ISSUE_TYPE"  |"STATUS"  |"NAME"                                              |"EX_ID"  |"SH_OUT"       |"FIRST_TRADE_DATE"  |"FIRST_EXCHANGE_DATE"  |"DIVIDEND"    |"CO_NAME_OR_CIK"                                    |
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|1968-06-12 02:43:03  |AAAAAAAAAAAAAJG  |COMMON        |ACTV      |rFALDSWBSGSnzzMwTwjF                           ...  |PCX      |792341095      |19230923            |19301212               |        0.

![ETL Diagram](images/tpc-di-logical-model.png)

# When we search on Google for "dbt dynamic tables":

![Google Search](images/dbt-dynamic-tables.png)

# Is it as simple as this?

![Conflict](images/refresh-conflict.png)

# Remember, there's more to dbt than just scheduling refresh jobs. There's a DAG to consider.

# Dynamic Tables need to be (re)created in the correct order. This can become very complex as the number of tables and dependencies increases.

In [2]:
!dbt docs generate
!dbt docs serve

[0m16:40:18  Running with dbt=1.6.6
[0m16:40:18  Registered adapter: snowflake=1.6.4
[0m16:40:18  Unable to do partial parsing because of a version mismatch
[0m16:40:19  Found 45 models, 17 sources, 0 exposures, 0 metrics, 489 macros, 0 groups, 0 semantic models
[0m16:40:19  
[0m16:40:20  Concurrency: 20 threads (target='dev')
[0m16:40:20  
[0m16:40:20  Building catalog
[0m16:40:26  Catalog written to /Users/stewartbryson/Source/dbt-tpcdi/target/catalog.json
[0m16:40:27  Running with dbt=1.6.6
Serving docs at 8080
To access from your browser, navigate to: http://localhost:8080



Press Ctrl+C to exit.
127.0.0.1 - - [27/Nov/2023 11:40:28] "GET / HTTP/1.1" 200 -
127.0.0.1 - - [27/Nov/2023 11:40:28] "GET /manifest.json?cb=1701103228480 HTTP/1.1" 200 -
127.0.0.1 - - [27/Nov/2023 11:40:28] "GET /catalog.json?cb=1701103228480 HTTP/1.1" 200 -
^C
[0m16:41:38  Encountered an error:

[0m16:41:38  Traceback (most recent call last):
  File "/opt/homebrew/Caskroom/miniconda/base/envs/tp

# Let's take a look at a standard dbt project, using the TPC-DI dataset.

# I'll just pull the `dbt_project.yml` file from another branch:

In [27]:
!git restore --source standard-tables -- dbt_project.yml
!dbt build

[0m23:19:20  Running with dbt=1.6.6
[0m23:19:20  Registered adapter: snowflake=1.6.4
[0m23:19:20  Found 45 models, 1 test, 17 sources, 0 exposures, 0 metrics, 489 macros, 0 groups, 0 semantic models
[0m23:19:20  
[0m23:19:22  Concurrency: 20 threads (target='dev')
[0m23:19:22  
[0m23:19:22  1 of 45 START sql table model dl_bronze.brokerage_cash_transaction ............. [RUN]
[0m23:19:22  2 of 45 START sql table model dl_bronze.brokerage_daily_market ................. [RUN]
[0m23:19:22  3 of 45 START sql table model dl_bronze.brokerage_holding_history .............. [RUN]
[0m23:19:22  4 of 45 START sql table model dl_bronze.brokerage_trade ........................ [RUN]
[0m23:19:22  5 of 45 START sql table model dl_bronze.brokerage_trade_history ................ [RUN]
[0m23:19:22  6 of 45 START sql table model dl_bronze.brokerage_watch_history ................ [RUN]
[0m23:19:22  7 of 45 START sql table model dl_bronze.crm_customer_mgmt ...................... [RUN]
[0m23:1

# Now we'll restore our original `dbt_project.yml` file.

In [28]:
!git checkout dbt_project.yml

Updated 1 path from the index


# We can see all that's required to enable dynamic tables in our `dbt_project.yml` file:

```yaml
models:
  dbt_tpcdi:
    example:
      +materialized: view
    bronze:
      +schema: bronze
      +materialized: dynamic_table
      +snowflake_warehouse: tpcdi_xlarge
      +target_lag: '10 minutes'
    silver:
      +schema: silver
      +materialized: dynamic_table
      +snowflake_warehouse: tpcdi_xlarge
      +target_lag: '10 minutes'
    gold:
      +schema: gold
      +materialized: dynamic_table
      +snowflake_warehouse: tpcdi_xlarge
      +target_lag: '20 minutes'
    work:
      +schema: work
      +materialized: dynamic_table
      +snowflake_warehouse: tpcdi_xlarge
      +target_lag: downstream
```

In [29]:
!dbt build

[0m23:23:01  Running with dbt=1.6.6
[0m23:23:01  Registered adapter: snowflake=1.6.4
[0m23:23:01  Unable to do partial parsing because a project config has changed
[0m23:23:02  Found 45 models, 1 test, 17 sources, 0 exposures, 0 metrics, 489 macros, 0 groups, 0 semantic models
[0m23:23:02  
[0m23:23:05  Concurrency: 20 threads (target='dev')
[0m23:23:05  
[0m23:23:05  1 of 46 START sql dynamic_table model dl_bronze.brokerage_cash_transaction ..... [RUN]
[0m23:23:05  2 of 46 START sql dynamic_table model dl_bronze.brokerage_daily_market ......... [RUN]
[0m23:23:05  3 of 46 START sql dynamic_table model dl_bronze.brokerage_holding_history ...... [RUN]
[0m23:23:05  4 of 46 START sql dynamic_table model dl_bronze.brokerage_trade ................ [RUN]
[0m23:23:05  5 of 46 START sql dynamic_table model dl_bronze.brokerage_trade_history ........ [RUN]
[0m23:23:05  6 of 46 START sql dynamic_table model dl_bronze.brokerage_watch_history ........ [RUN]
[0m23:23:05  7 of 46 START s

Click this link to open results:

[Snowflake UI](https://app.snowflake.com/cxmdykz/hib36835/#/data/databases/TPCDI_DT)

# dbt also has Tests.

# We can run them when we create the Dynamic Table:

In [30]:
!dbt build --select fact_trade

[0m23:28:07  Running with dbt=1.6.6
[0m23:28:08  Registered adapter: snowflake=1.6.4
[0m23:28:08  Found 45 models, 1 test, 17 sources, 0 exposures, 0 metrics, 489 macros, 0 groups, 0 semantic models
[0m23:28:08  
[0m23:28:10  Concurrency: 20 threads (target='dev')
[0m23:28:10  
[0m23:28:10  1 of 2 START sql dynamic_table model dl_gold.fact_trade ........................ [RUN]
[0m23:28:12  1 of 2 OK created sql dynamic_table model dl_gold.fact_trade ................... [[32mSUCCESS 1[0m in 2.19s]
[0m23:28:12  2 of 2 START test fact_trade__unique_trade ..................................... [RUN]
[0m23:28:12  2 of 2 PASS fact_trade__unique_trade ........................................... [[32mPASS[0m in 0.66s]
[0m23:28:12  
[0m23:28:12  Finished running 1 dynamic_table model, 1 test in 0 hours 0 minutes and 4.60 seconds (4.60s).
[0m23:28:12  
[0m23:28:12  [32mCompleted successfully[0m
[0m23:28:12  
[0m23:28:12  Done. PASS=2 WARN=0 ERROR=0 SKIP=0 TOTAL=2


# Or we can schedule them to run periodically:

In [31]:
!dbt test

[0m23:29:18  Running with dbt=1.6.6
[0m23:29:18  Registered adapter: snowflake=1.6.4
[0m23:29:19  Found 45 models, 1 test, 17 sources, 0 exposures, 0 metrics, 489 macros, 0 groups, 0 semantic models
[0m23:29:19  
[0m23:29:19  Concurrency: 20 threads (target='dev')
[0m23:29:19  
[0m23:29:19  1 of 1 START test fact_trade__unique_trade ..................................... [RUN]
[0m23:29:20  1 of 1 PASS fact_trade__unique_trade ........................................... [[32mPASS[0m in 0.57s]
[0m23:29:20  
[0m23:29:20  Finished running 1 test in 0 hours 0 minutes and 1.29 seconds (1.29s).
[0m23:29:20  
[0m23:29:20  [32mCompleted successfully[0m
[0m23:29:20  
[0m23:29:20  Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1


# dbt Cloud will need to be more than just a job scheduler, which it already is.

1. Cloud development environment for those that prefer it (Needs to get better).
1. CI/CD workflows for promoting Dynamic Table changes into Production.
1. Perhaps there's promise in the Semantic Layer.

# Clean-up

In [1]:
!python tpcdi.py drop-schema --schema dl_gold
!python tpcdi.py drop-schema --schema dl_silver
!python tpcdi.py drop-schema --schema dl_bronze
!python tpcdi.py drop-schema --schema dl_work

Schema dl_gold dropped.
Schema dl_silver dropped.
Schema dl_bronze dropped.
Schema dl_work dropped.
