# Data analytics pipeline for Financial Services data leveraging dbt and Snowflake

Build scalable data pipeline leveraging dbt and Snowflake to analyze trading performance of a company having trading desks to perform transactions for buying and selling financial intruments in different regions.

The dbt pipeline transforms Finance & Economics data product available in Snowflake Marketplace, establishes data tests, generates documentation, and orchestrates deployment to production.

The dbt transformations include:

- Stock trading history
- Foreign currency exchange rates
- Trading books with active trades of financial instruments
- Market Volume & Profit and Loss calculations


1. Create a new project in dbt Cloud. Navigate to __`Account settings`__ (by clicking on account name in the left side menu), and click __`+ New Project`__.

2. Enter a project name and click Continue.

3. For the __`warehouse`__, click __`Snowflake`__ then Next to set up dbt-Snowflake connection.

4. Enter __`Settings`__ for Snowflake: __`account identifier`__, __`role`__, __`database`__ (e.g., analytics), __`warehouse`__(e.g., transforming) and __`development credentials`__.

5. Link the project to a __`GitHub repository`__.

6. Select __`Develop`__ -> __`Cloud IDE`__ - this spins up the project for the first time as it establishes the git connection, clones the repo, and tests the connection to the warehouse (Snowflake).

7. Click __`Initialize dbt project`__ - this builds out the folder structure with example dbt models. Execute `dbt run` or `dbt test`.

8. Click __`Commit and sync`__ to commit the folder structure to GitHub repo `main` branch. 

9. Click __`+ Create New Branch`__ to check out a new git branch `dev` for development (separated from the main production branch).

From `Snowflake Marketplace` search for and get `Finance & Economics` data product that includes macroeconomic indicators, trading volumes and stock prices of US equities and ETFs executed on the Nasdaq, and banking sector data to give users a view of the current state of the economy & financial industry. A single, unified schema joins together datasets from various sources.

Sample queries:

```sql
USE ROLE ACCOUNTADMIN;
USE DATABASE FINANCE_ECONOMICS;
USE SCHEMA CYBERSYN;

-- SELECT * FROM STOCK_PRICE_TIMESERIES; -- 78.0M rows

SELECT * FROM STOCK_PRICE_TIMESERIES LIMIT 5;

SELECT * FROM STOCK_PRICE_TIMESERIES WHERE TICKER = 'AAPL' AND DATE = '2025-05-13';

SELECT DISTINCT VARIABLE FROM STOCK_PRICE_TIMESERIES;

-- YTD performance of a select group of stocks
WITH ytd_performance AS (
  SELECT
    ticker,
    MIN(date) OVER (PARTITION BY ticker) AS start_of_year_date,
    FIRST_VALUE(value) OVER (PARTITION BY ticker ORDER BY date ASC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS start_of_year_price,
    MAX(date) OVER (PARTITION BY ticker) AS latest_date,
    LAST_VALUE(value) OVER (PARTITION BY ticker ORDER BY date ASC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS latest_price
  FROM cybersyn.stock_price_timeseries
  WHERE
    ticker IN ('AAPL', 'MSFT', 'AMZN', 'GOOGL', 'META', 'TSLA', 'NVDA')
    AND date >= DATE_TRUNC('YEAR', CURRENT_DATE()) -- Truncates the current date to the start of the year
    AND variable_name = 'Post-Market Close'
)
SELECT
  ticker,
  start_of_year_date,
  start_of_year_price,
  latest_date,
  latest_price,
  (latest_price - start_of_year_price) / start_of_year_price * 100 AS percentage_change_ytd
FROM
  ytd_performance
GROUP BY
  ticker, start_of_year_date, start_of_year_price, latest_date, latest_price
ORDER BY percentage_change_ytd DESC;
;

select * from fx_rates_timeseries limit 5;
```


Edit generated `dbt_project.yml` configuration file. Name the project. Remove materialization setting for models in the example folder. 

Delete the example folder from the models folder.

Install [`dbt_utils`](https://hub.getdbt.com/dbt-labs/dbt_utils/latest/) package hosted on dbt Packages Hub to use some dbt_utils macros in writing more complex SQL in dbt models.

`dbt models` are .sql files that live in the models folder. `dbt macros` are snippets of reusable code written in Jinja, a templating language integrated into dbt. `dbt package` is a dbt project that can be installed onto own dbt project to gain access to the code and use it as own. 

To install dbt_utils package, create a `packages.yml` file at the same level as the dbt_project.yml file. 

```yml
packages:
  - package: dbt-labs/dbt_utils
    version: 1.3.0
```

Run `dbt deps` to install the package.


Declare`dbt sources`. 

`Sources` (src) refer to the raw table data that have been built in the Snowflake data platform through a loading process or available directly in Snowflake through Snowflake Marketplace without any ETL.

`Staging` folder: The folder where all source configurations and staging models and source configurations are stored. Further subfolders can be used to separate data by data source. 

In the `models` folder create a `staging` folder and a subfolder finance to hold the dbt sources.

Create a `_finance_sources.yml` file in the staging/finance folder.

```yml
version: 2

sources:
  - name: finance_economics
    description: Finance and economics data including stock prices
    database: finance_economics
    schema: cybersyn
    tables:
      - name: stock_price_timeseries
        description: Stock price timeseries data
```

Commit changes to `dev` branch.

Create `staging models`.

`Staging` (stg) refers to models that are built directly on top of sources. They have a one-to-one relationship with sources tables and are used for very light transformations (e.g. renaming columns). These models are used to clean and standardize the data before transforming data downstream. They are typically materialized as views.

Create a `stg_finance__stock_history.sql` in the models/staging/finance folder

```sql
with source as (

    select * from {{source('finance_economics','stock_price_timeseries')}}
), 

renamed as (

    select 

        TICKER as company_symbol,
        ASSET_CLASS as asset_class,
        PRIMARY_EXCHANGE_CODE as stock_exchange_code,
        PRIMARY_EXCHANGE_NAME as stock_exchange_name,
        VARIABLE as indicator,
        VARIABLE_NAME as indicator_name, 
        DATE as stock_date,
        VALUE as stock_value

    from source 

) 

select * from renamed
```

Click `Preview` and `Compile` buttons. View the `Lineage`.

Commit changes to `dev` branch.

To create the view in Snowflake, run the model in the staging folder with: 

```bash 
dbt run --select staging.*
```

In dbt, the default materialization for a model is a view. This means, when `dbt run` or `dbt build` are executed, all of the models will be built as a view in the Snowflake data platform.

Verify dbt model materialized as view in Snowflake:

```sql
-- verify dbt model materialized as view
select * from analytics.information_schema.views
where table_name = 'STG_FINANCE__STOCK_HISTORY';

select * 
  from analytics.dbt_lcarabet.stg_finance__stock_history
 where company_symbol ='AAPL' 
   and stock_date ='2025-05-13';
```

In the dataset, different indicators like close, open, high, low price and trading volume are represented by different rows. Transposing data into columns provides a cleaner data representation for downstream analytics.

| INDICATOR |
| --------- |
all-day_high
nasdaq_volume
post-market_close
pre-market_open
all-day_low

Create `intermediate models`.

In dbt, `intermediate` (int) models refer to any models that exist between final fact and dimension tables. They are built on staging models rather than directly on sources to leverage the data cleaning done in the staging layer.

`Marts` folder: The folder where all intermediate, fact, and dimension models are stored. Further subfolders can be used to separate data by business function.

Create a `marts` folder and `core/intermediate` subfolder in `models`.

Create a `int_finance__stock_history.sql` file in the models/marts/core/intermediate folder.

The model uses the `dbt_utils.pivot() SQL generator macro` to transpose the dataset from rows to columns, and `dbt_utils.get_column_values() introspective macro` to list the column values dynamically.

```sql
with stock_history as (

    select * from {{ ref('stg_finance__stock_history') }}
        where indicator in ('post-market_close', 'pre-market_open','all-day_high','all-day_low', 'nasdaq_volume') 

),

pivoted as (

    select 
        company_symbol, 
        asset_class, 
        stock_exchange_name, 
        stock_date,         
        {{ dbt_utils.pivot(
      column = 'indicator',
      values = dbt_utils.get_column_values(ref('stg_finance__stock_history'), 'indicator'),
      then_value = 'stock_value'
            ) }}
    
    from stock_history
    group by company_symbol, asset_class, stock_exchange_name, stock_date
)

select * from pivoted

```
Click Compile to view compiled code, results and model lineage.

The code makes use of the `ref` Jinja function which tells dbt how a model relates to another model. It is useful as:

- allows automatic generation of a lineage/directed acyclic graph (DAG) and promotion of code through different environments without having to update the code to change a (hardcoded) database object.

- allows to run based on dependencies, a model plus parent models in the DAG.

```bash
dbt run --select +int_finance__stock_history
```
Run summary:

1. stg_finance__stock_history

```bash
23:43:01 1 of 2 START sql view model dbt_lcarabet.stg_finance__stock_history ............ [RUN]
23:43:02 1 of 2 OK created sql view model dbt_lcarabet.stg_finance__stock_history ....... [SUCCESS 1 in 0.75s]
```

2. int_finance__stock_history

```bash
23:43:02 2 of 2 START sql table model dbt_lcarabet.int_finance__stock_history ........... [RUN]
23:43:32 2 of 2 OK created sql table model dbt_lcarabet.int_finance__stock_history ...... [`SUCCESS 1` in 30.90s]
```

Verify int dbt model materialized as table

```sql
select * from analytics.information_schema.tables
where table_name = 'INT_FINANCE__STOCK_HISTORY';

select * 
  from analytics.dbt_lcarabet.int_finance__stock_history
 where company_symbol ='AAPL' 
   and stock_date ='2025-05-13';
```

Commit changes to `dev` branch.

Update `_finance_sources.yml` file

```yml
version: 2

sources:
  - name: finance_economics
    description: Finance and economics data including stock prices and foreign exchage rates timeseries
    database: finance_economics
    schema: cybersyn
    tables:
      - name: stock_price_timeseries
        description: Stock price timeseries data - daily prices (open/close, high/low) & trading volumes of US securities executed on the Nasdaq
      - name: fx_rates_timeseries
        description: Foreign exchange rates timeseries for currency pairs (base and quote currencies)        
```

Create `stg_finance__fx_rates.sql` staging model

```sql
with source as (

    select * from {{source('finance_economics','fx_rates_timeseries')}}
),
 
renamed as (
 
select 
 
    BASE_CURRENCY_ID as base_currency_id,
    BASE_CURRENCY_NAME as base_currency_name,
    QUOTE_CURRENCY_ID as quote_currency_id,
    QUOTE_CURRENCY_NAME as quote_currency_name,
    BASE_CURRENCY_ID || '/' || QUOTE_CURRENCY_ID as indicator,
    VARIABLE_NAME AS indicator_name,
    DATE as exchange_date,
    VALUE as exchange_value

 
from source 
 
) 
 
select * from renamed
```

Create `int_finance__fx_rates.sql` intermediate model. Filter staged data by exchange date to match the start date of the stock prices history series. Stock prices timeseries data range from '2018-05-01' to '2025-05-15' while foreign exchange rates timeseries data range from '1953-08-10' to '2025-05-16'. 

```sql
select * from {{ ref('stg_finance__fx_rates') }} 
 where exchange_date >= '2018-05-01'
```

Create `int_finance__stock_history_major_currency.sql` intermediate model to join trading history and foreign exchange rates timeseries data.

```sql
with
stock_history as (
    select * from {{ ref('int_finance__stock_history')}}
),
 
fx_rates as (
    select * from {{ ref('int_finance__fx_rates') }}
),
 
fx_rates_gdp as (
    select * from fx_rates
        where indicator = 'USD/GBP'   
),
 
fx_rates_eur as (
    select * from fx_rates
        where indicator = 'USD/EUR' 
),
 
joined as (
    select 
        stock_history.*,
        fx_rates_gdp.exchange_value * stock_history."pre-market_open" as gbp_open,       
        fx_rates_gdp.exchange_value * stock_history."all-day_high" as gbp_high,     
        fx_rates_gdp.exchange_value * stock_history."all-day_low" as gbp_low,   
        fx_rates_gdp.exchange_value * stock_history."post-market_close" as gbp_close,     
        fx_rates_eur.exchange_value * stock_history."pre-market_open" as eur_open,       
        fx_rates_eur.exchange_value * stock_history."all-day_high" as eur_high,     
        fx_rates_eur.exchange_value *stock_history."all-day_low" as eur_low,
        fx_rates_eur.exchange_value * stock_history."post-market_close" as eur_close    
    from stock_history
    left join fx_rates_gdp on stock_history.stock_date = fx_rates_gdp.exchange_date
    left join fx_rates_eur on stock_history.stock_date = fx_rates_eur.exchange_date
)

select * from joined
```

Execute dbt run

```bash
dbt run --select +int_finance__stock_history_major_currency
```

Commit changes to `dev` branch.

In a dbt project, `seeds` are `CSV files` in the seeds directory, that dbt can load into the data warehouse using the `dbt seed` command.

Because these CSV files are located in the dbt linked repository, they are version controlled and code reviewable. 

Seeds are best suited to static data which changes infrequently.

Seeds can be referenced in downstream models the same way as referencing models — by using the ref function.

Create two CSV files for load into Snowflake by running dbt seed command.

`seeds/trading_book_gbp.csv`

```csv
Book,Date,Trader,Instrument,Action,Cost,Currency,Volume,Cost_Per_Share,Stock_exchange_name
B-GB1,2024-03-01,Alex V.,AAPL,BUY,-35800,GBP,200,179.0, NASDAQ CAPITAL MARKET
B-GB1,2024-03-01,Alex V.,AAPL,BUY,-656010,GBP,3700,177.3,NASDAQ CAPITAL MARKET
B-GB1,2024-01-26,Alex V.,AAPL,SELL,97400,GBP,-500,194.8,NASDAQ CAPITAL MARKET
B-GB1,2024-01-22,Alex V.,AAPL,BUY,-187866,GBP,980,191.7,NASDAQ CAPITAL MARKET
B-GB1,2024-01-22,Eugene C.,AAPL,SELL,97655150,GBP,-50,195.3,NASDAQ CAPITAL MARKET
B-GB1,2023-08-31,Eugene C.,AAPL,BUY,-18760,GBP,100,187.6,NASDAQ CAPITAL MARKET
B-GB1,2023-08-31,Eugene C.,AAPL,BUY,-9355,GBP,50,187.1,NASDAQ CAPITAL MARKET
```
`seeds/trading_book_eur.csv`

```csv
Book,Date,Trader,Instrument,Action,Cost,Currency,Volume,Cost_Per_Share,Stock_exchange_name
B-EU1,2024-03-01,Mark O.,AAPL,BUY,-35800,EUR,200,179.0, NASDAQ CAPITAL MARKET
B-EU1,2024-03-01,Mark O.,AAPL,BUY,-656010,EUR,3700,177.3,NASDAQ CAPITAL MARKET
B-EU1,2024-01-22,Mark O.,AAPL,BUY,-187866,EUR,980,191.7,NASDAQ CAPITAL MARKET
B-EU1,2023-08-31,Mark O.,AAPL,BUY,-18760,EUR,100,187.6,NASDAQ CAPITAL MARKET
```
Verify loading into Snowflake

```sql
select * from analytics.dbt_lcarabet.trading_book_gbp;
```

Model seeded data by unioning the seeds together using `dbt_utils.union_relations() macro` which aligns attributes by name and type, and combines the datasets with UNION ALL. Less code, and dynamic - if new columns were to be added to ref models, dbt takes care of it.

Create `int_finance__trading_books_unioned.sql` intermediate model.

```sql
with 
unioned as (
    {{ dbt_utils.union_relations(
        relations=[ref('trading_book_gbp'), ref('trading_book_eur')]
    ) }}
 
),
 
renamed as (
    select      
        Book,
        Date as book_date,
        Trader,
        Instrument,
        Action as book_action,
        Cost,
        Currency,
        Volume,
        Cost_Per_Share,
        Stock_exchange_name
    from unioned 
)
 
select * from renamed

```

Create the intermediate model by running

```bash
dbt run --select int_finance__trading_books_unioned
```
```bash
02:54:13 1 of 1 START sql table model dbt_lcarabet.int_finance__trading_books_unioned ... [RUN]
02:54:14 1 of 1 OK created sql table model dbt_lcarabet.int_finance__trading_books_unioned  [SUCCESS 1 in 1.87s]
```


Enhance `trading activities` beyond `BUY` and `SELL` to include `HOLD`.

First, create `int_finance__trading_daily_frequency_instrument_position.sql` intermediate model to calculate distinct total_shares per day per trader. Makes use of `dbt_utils.group_by() macro` that lists all columns to group by dynamically.

```sql
with 
stock_history as (
    select * from {{ ref('int_finance__stock_history_major_currency') }} 
), 

unioned_book as (
    select * from {{ ref('int_finance__trading_books_unioned') }}
),

cst_market_days as (
    select distinct stock_date
        from stock_history
        where stock_history.stock_date >= (select min(book_date) as min_dt from  unioned_book)
),

joined as (
    select 
        cst_market_days.stock_date,
        unioned_book.trader,
        unioned_book.stock_exchange_name,
        unioned_book.instrument,
        unioned_book.book,
        unioned_book.currency,
        sum(unioned_book.volume) as total_shares
    from cst_market_days
    inner join unioned_book on unioned_book.book_date = cst_market_days.stock_date
    where unioned_book.book_date <= cst_market_days.stock_date
    {{ dbt_utils.group_by(6) }}
)

select * from joined

```

Full trading activities, create `int_finance__trading_daily_frequency_instrument_position_with_trades.sql` intermediate model.

```sql
with unioned_book as (    
    
    select * from {{ ref('int_finance__trading_books_unioned') }}

),
 
daily_position as (
    select * from {{ ref('int_finance__trading_daily_frequency_instrument_position') }}
),
 
unioned as (
    select 
        book,
        book_date,
        trader,
        instrument,
        book_action,
        cost, 
        currency,
        volume, 
        cost_per_share, 
        stock_exchange_name,
        sum(unioned_book.volume) 
            over(
                partition by 
                    instrument, 
                    stock_exchange_name, 
                    trader 
                order by 
                    unioned_book.book_date rows unbounded preceding) 
                        as total_shares
    from unioned_book  
 
    union all 
 
    select  
        book,
        stock_date as book_date, 
        trader, 
        instrument, 
        'HOLD' as book_action,
        0 as cost,
        currency, 
        0 as volume, 
        0 as cost_per_share,
        stock_exchange_name,
        total_shares
    from daily_position
    where (book_date,trader,instrument,book,stock_exchange_name) 
        not in 
        (select book_date,trader,instrument,book,stock_exchange_name
            from unioned_book
        )
)
 
select * from unioned
```

Build int_finance__trading_books_unioned and it's child models. Execute

```bash
dbt run --select int_finance__trading_books_unioned+
```

Verify in Snowflake:

```sql
select * 
from analytics.dbt_lcarabet.int_finance__trading_daily_frequency_instrument_position_with_trades
where trader = 'Alex V.'
order by book_date
```


Create `core/facts/fct_finance__trading_mv_pnl.sql` `facts` model to calculate Market Value and Profit and Loss (PnL) change over time. Tracking PnL over time allows investors and businesses to understand their financial performance, identify trends and make more informed decisions. 

```sql
{{ 
config(
      tags = 'core'
      ) 
}}

with
daily_positions as (
    select * from {{ ref('int_finance__trading_daily_frequency_instrument_position_with_trades' )}}

),

stock_history as (
    select * from {{ ref('int_finance__stock_history_major_currency') }}

),

joined as (
    select 
        daily_positions.instrument, 
        daily_positions.stock_exchange_name, 
        daily_positions.book_date, 
        daily_positions.trader, 
        daily_positions.volume,
        daily_positions.cost, 
        daily_positions.total_shares,
        daily_positions.cost_per_share,
        daily_positions.currency,
        sum(cost) over(
                partition by 
                    daily_positions.instrument, 
                    daily_positions.stock_exchange_name, 
                    trader 
                order by
                    daily_positions.book_date rows unbounded preceding 
                    )
                as cash_cumulative,
       case when daily_positions.currency = 'GBP' then gbp_close
            when daily_positions.currency = 'EUR' then eur_close
            else 'post-market_close'
       end AS close_price_matching_currency, 
       daily_positions.total_shares  * close_price_matching_currency as market_value, 
       daily_positions.total_shares  * close_price_matching_currency + cash_cumulative as PnL
   from daily_positions
   inner join stock_history 
      on daily_positions.instrument = stock_history.company_symbol 
     and stock_history.stock_date = daily_positions.book_date 
     and daily_positions.stock_exchange_name = stock_history.stock_exchange_name
)

select * from joined
```

Run model

```bash
dbt run --select fct_finance__trading_mv_pnl
```

The model was materialized as a table. As the datasets are getting larger and larger, the runtimes are getting longer. To save on the query times when the table is queried, the size of the warehouse could be increased or, more efficiently, materialize the model as an incremental table.


With `incremental materialization`, the model will not be rebuilt each time, but rather only the latest rows transformed and added to the existing table. For incremental materialization, the model configuration is updated and the `is_incremental macro` is included and comes into action for incremental runs (and is ignored during initial run and full_refresh option). Incremental models provide perfomance improvement in production applications.

```sql
{{ 
config(
      materialized='incremental',
      unique_key= 'pk_key',
      tags = 'core'
      ) 
}}


with
daily_positions as (
    select * from {{ ref('int_finance__trading_daily_frequency_instrument_position_with_trades' )}}

),

stock_history as (
    select * from {{ ref('int_finance__stock_history_major_currency') }}

),

joined as (
    select 
        daily_positions.instrument, 
        daily_positions.stock_exchange_name, 
        daily_positions.book_date, 
        daily_positions.trader, 
        daily_positions.volume,
        daily_positions.cost, 
        daily_positions.total_shares,
        daily_positions.cost_per_share,
        daily_positions.currency,
        sum(cost) over(
                partition by 
                    daily_positions.instrument, 
                    daily_positions.stock_exchange_name, 
                    trader 
                order by
                    daily_positions.book_date rows unbounded preceding 
                    )
                as cash_cumulative,
       case when daily_positions.currency = 'GBP' then gbp_close
            when daily_positions.currency = 'EUR' then eur_close
            else 'post-market_close'
       end AS close_price_matching_currency, 
       daily_positions.total_shares  * close_price_matching_currency as market_value, 
       daily_positions.total_shares  * close_price_matching_currency + cash_cumulative as PnL
   from daily_positions
   inner join stock_history 
      on daily_positions.instrument = stock_history.company_symbol 
     and stock_history.stock_date = daily_positions.book_date 
     and daily_positions.stock_exchange_name = stock_history.stock_exchange_name
),

joined_primary_key as (
 
    select 
 
        {{ dbt_utils.surrogate_key([
                'trader', 
                'instrument', 
                'book_date', 
                'stock_exchange_name',
                'PnL', 
            ]) }} as pk_key,
                *
 
    from joined 
)

select * from joined_primary_key

{% if is_incremental() %}
  -- this filter will only be applied on an incremental run
  -- this is the database representation of the current model
   where book_date > (select max(book_date) from {{ this }})
 
{% endif %}
```

Run model twice with

```bash
dbt run --select fct_finance__trading_mv_pnl_incremental
```
For the first run, the where clause is not built into the sql statement. The first run of an incremental model builds the table to which new rows will be added to in subsequent runs.

```sql
joined_primary_key as (
 
    select 
 
        md5(cast(coalesce(cast(trader as TEXT), '_dbt_utils_surrogate_key_null_') || '-' || coalesce(cast(instrument as TEXT), '_dbt_utils_surrogate_key_null_') || '-' || coalesce(cast(book_date as TEXT), '_dbt_utils_surrogate_key_null_') || '-' || coalesce(cast(stock_exchange_name as TEXT), '_dbt_utils_surrogate_key_null_') || '-' || coalesce(cast(PnL as TEXT), '_dbt_utils_surrogate_key_null_') as TEXT)) as pk_key,
                *
 
    from joined 
)

select * from joined_primary_key
```

```bash
00:37:51 SQL status: SUCCESS 1 in 0.163 seconds
00:37:51 1 of 1 OK created sql incremental model dbt_lcarabet.fct_finance__trading_mv_pnl_incremental  [SUCCESS 1 in 2.23s]
00:37:51 Finished running node model.dbt_snowflake_finance.fct_finance__trading_mv_pnl_incremental
```

In subsequent runs, dbt is including the where clause to create a temporary table merged then into the existing table. 

```sql
joined_primary_key as (
 
    select 
 
        md5(cast(coalesce(cast(trader as TEXT), '_dbt_utils_surrogate_key_null_') || '-' || coalesce(cast(instrument as TEXT), '_dbt_utils_surrogate_key_null_') || '-' || coalesce(cast(book_date as TEXT), '_dbt_utils_surrogate_key_null_') || '-' || coalesce(cast(stock_exchange_name as TEXT), '_dbt_utils_surrogate_key_null_') || '-' || coalesce(cast(PnL as TEXT), '_dbt_utils_surrogate_key_null_') as TEXT)) as pk_key,
                *
 
    from joined 
)

select * from joined_primary_key


  -- this filter will only be applied on an incremental run
  -- this is the database representation of the current model
   where book_date > (select max(book_date) from analytics.dbt_lcarabet.fct_finance__trading_mv_pnl_incremental)
```
```bash
00:41:22 Using snowflake connection "model.dbt_snowflake_finance.fct_finance__trading_mv_pnl_incremental"
00:41:22 On model.dbt_snowflake_finance.fct_finance__trading_mv_pnl_incremental: /* {"app": "dbt", "dbt_version": "2025.5.13+c9cbe0f", "profile_name": "user", "target_name": "default", "node_id": "model.dbt_snowflake_finance.fct_finance__trading_mv_pnl_incremental"} */
merge into analytics.dbt_lcarabet.fct_finance__trading_mv_pnl_incremental as DBT_INTERNAL_DEST
        using analytics.dbt_lcarabet.fct_finance__trading_mv_pnl_incremental__dbt_tmp as DBT_INTERNAL_SOURCE
        on ((DBT_INTERNAL_SOURCE.pk_key = DBT_INTERNAL_DEST.pk_key))

    
    when matched then update set
        "PK_KEY" = DBT_INTERNAL_SOURCE."PK_KEY","INSTRUMENT" = DBT_INTERNAL_SOURCE."INSTRUMENT","STOCK_EXCHANGE_NAME" = DBT_INTERNAL_SOURCE."STOCK_EXCHANGE_NAME","BOOK_DATE" = DBT_INTERNAL_SOURCE."BOOK_DATE","TRADER" = DBT_INTERNAL_SOURCE."TRADER","VOLUME" = DBT_INTERNAL_SOURCE."VOLUME","COST" = DBT_INTERNAL_SOURCE."COST","TOTAL_SHARES" = DBT_INTERNAL_SOURCE."TOTAL_SHARES","COST_PER_SHARE" = DBT_INTERNAL_SOURCE."COST_PER_SHARE","CURRENCY" = DBT_INTERNAL_SOURCE."CURRENCY","CASH_CUMULATIVE" = DBT_INTERNAL_SOURCE."CASH_CUMULATIVE","CLOSE_PRICE_MATCHING_CURRENCY" = DBT_INTERNAL_SOURCE."CLOSE_PRICE_MATCHING_CURRENCY","MARKET_VALUE" = DBT_INTERNAL_SOURCE."MARKET_VALUE","PNL" = DBT_INTERNAL_SOURCE."PNL"
    

    when not matched then insert
        ("PK_KEY", "INSTRUMENT", "STOCK_EXCHANGE_NAME", "BOOK_DATE", "TRADER", "VOLUME", "COST", "TOTAL_SHARES", "COST_PER_SHARE", "CURRENCY", "CASH_CUMULATIVE", "CLOSE_PRICE_MATCHING_CURRENCY", "MARKET_VALUE", "PNL")
    values
        ("PK_KEY", "INSTRUMENT", "STOCK_EXCHANGE_NAME", "BOOK_DATE", "TRADER", "VOLUME", "COST", "TOTAL_SHARES", "COST_PER_SHARE", "CURRENCY", "CASH_CUMULATIVE", "CLOSE_PRICE_MATCHING_CURRENCY", "MARKET_VALUE", "PNL")

;
00:41:22 SQL status: SUCCESS 0 in 0.731 seconds
00:41:22 Using snowflake connection "model.dbt_snowflake_finance.fct_finance__trading_mv_pnl_incremental"
00:41:22 On model.dbt_snowflake_finance.fct_finance__trading_mv_pnl_incremental: /* {"app": "dbt", "dbt_version": "2025.5.13+c9cbe0f", "profile_name": "user", "target_name": "default", "node_id": "model.dbt_snowflake_finance.fct_finance__trading_mv_pnl_incremental"} */
COMMIT
00:41:23 SQL status: SUCCESS 1 in 0.345 seconds
00:41:23 Applying DROP to: analytics.dbt_lcarabet.fct_finance__trading_mv_pnl_incremental__dbt_tmp
00:41:23 Using snowflake connection "model.dbt_snowflake_finance.fct_finance__trading_mv_pnl_incremental"
00:41:23 On model.dbt_snowflake_finance.fct_finance__trading_mv_pnl_incremental: /* {"app": "dbt", "dbt_version": "2025.5.13+c9cbe0f", "profile_name": "user", "target_name": "default", "node_id": "model.dbt_snowflake_finance.fct_finance__trading_mv_pnl_incremental"} */
drop view if exists analytics.dbt_lcarabet.fct_finance__trading_mv_pnl_incremental__dbt_tmp cascade
00:41:23 SQL status: SUCCESS 1 in 0.189 seconds
00:41:23 1 of 1 OK created sql incremental model dbt_lcarabet.fct_finance__trading_mv_pnl_incremental  [SUCCESS 0 in 2.53s]
00:41:23 Finished running node model.dbt_snowflake_finance.fct_finance__trading_mv_pnl_incremental
```
dbt incremental models provide a way to build tables in Snowflake efficiently, especially for large datasets, by only processing new or changed data instead of rebuilding the entire table each time. 

Commit to dev branch.

`dbt testing` allows data/analytics engineers to ensure that SQL transformations produce models that meet certain assertions.

In dbt, there are two types of tests: `generic tests` and `singular tests`.

- `Generic tests` are used to validate data models and ensure data quality. These tests are predefined and can be applied to any column of the data models to check for common data issues. They are written in YAML files. dbt ships with four `built-in generic tests`: unique, not_null, accepted_values, relationships.
    - `Unique tests` to see if every value in a column is unique
    - `Not_null tests` to see if every value in a column is not null
    - `Accepted_values tests` to make sure every value in a column is equal to a value in a provided list
    - `Relationships tests` to ensure that every value in a column exists in a column in another model 
    
- `Singular tests` are data tests defined by writing specific SQL queries that return records which fail the test conditions. These tests are referred to as "singular" because they are one-off assertions that are uniquely designed for a single purpose or specific scenario within the data models.

Several way to execute the tests:

- `dbt test` runs all tests in the dbt project.
- `dbt test --select test_type:generic` runs all generic tests.
- `dbt test --select test_type:singular` runs all singular tests.
- `dbt test --select one_specific_model`

Create `intermediate/int_finance.yml` file to configure generic dbt tests for the intermediate models.

```yml
version: 2

models:
  - name: int_finance__fx_rates
    description: "An intermediate model that filters stg_finance__fx_rates staging model by exchange_date"
    columns:
      - name: indicator||exchange_date
        tests:
          - unique
          - not_null

  - name: int_finance__trading_books_unioned
    description: "An intermediate model that unions seeded data in CSV files"
    columns:
      - name: instrument
        tests:
          - not_null
          - relationships:
              to: ref('int_finance__stock_history')
              field: company_symbol

  - name: int_finance__stock_history
    description: "An intermediate model that pivots the stg_finance__stock_history model by indicator"
    columns:
      - name: company_symbol||stock_date
        tests:
          - not_null
          - unique
```

Create `staging/finance/stg_finance.yml` to configure generic tests for staging models which makes use of {{doc}} jinja function that references `docs blocks` in the descriptin field of .yml files.

```yml
version: 2

models:
  - name: stg_finance__stock_history
    description: Staged stock prices historical data.
    columns:      
      - name: indicator
        description: '{{doc("trading_indicator")}}'
        tests:
          - accepted_values:
              values:
                - all-day_high
                - all-day_low
                - pre-market_open
                - post-market_close
                - nasdaq_volume
```
Create `staging/finance/trading_indicator.md` to document trading indicator accepted values.

```md
{% docs trading_indicator %}

One of the following values: 

| status            | definition                                       |
|-------------------|--------------------------------------------------|
| all-day_high      | Daily high stock price                           |
| all-day_low       | Daily low stock price                            |
| pre-market_open   | Daily opening price                              |
| post-market_close | Daily closing price                              |
| nasdaq_volume     | Daily trading volume on stock exchange markets   |

{% enddocs %}
```

Create `tests/assert_stg_finance__fx_rates_exchange_value_positive.sql` for a singular test.

```sql
-- exchange rates should be >=0
select 
    indicator,
    exchange_date,
    exchange_value
from {{ ref ('stg_finance__fx_rates') }}
where exchange_value < 0
```

`dbt run`

```bash
06:21:04 Cloud CLI invocation created: fd4c5401-48b2-413c-8ad7-08bf4a9966a6
06:21:04 Running dbt...
06:21:05 Found 10 models, 2 seeds, 8 data tests, 2 sources, 592 macros
06:21:05 
06:21:05 Concurrency: 4 threads (target='default')
06:21:05 
06:21:53 
06:21:53 Finished running 1 incremental model, 7 table models, 2 view models in 0 hours 0 minutes and 48.65 seconds (48.65s).
06:21:53 
06:21:53 Completed successfully
06:21:53 
06:21:53 Done. PASS=10 WARN=0 ERROR=0 SKIP=0 NO-OP=0 TOTAL=10
```

Commit all changes to the `dev` branch of GitHub repo. Click `Commit and Sync` one last time in dbt Cloud IDE.


In dbt, models are built in SQL files. These models are documented in YML files (where generic tests also live) inside the same folder as the models. 
- For models, descriptions can happen at the model, source, or column level.
- If a longer form, more styled version of text would provide a strong description, `docs blocks` can be used to render markdown in the generated documentation.


`dbt docs generate` command generates a static webpage with a data dictionary by pulling in information from the dbt project as well as the Snowflake information_schema. Facilitates information sharing with internal teams, as it contains model, source, and column descriptions, tags, tests as well as the source and compiled SQL code for each model. It also provides an interactive DAG to visualize the full lineage graph of dbt models.



Merge all changes made in the `development environment` that makes use of `the development schema` (`dbt_lcarabet`) and the `dev branch` to `main branch` so that those changes can be used in `deployment`.

In dbt Cloud IDE, click on `Create a pull request on GitHub`. In GitHub, compare changes between dev and main for ability to merge and if everything checks out click `Create pull request` dev-to-main. If no conflicts with main/base branch, merging is performed automatically by GitHub by clicking `Merge pull request` button and `Confirm merge`.

Alternatively, merging can be done via command line:

- Connect to GitHub repo via https
- Clone the repository or update local repository with latest changes
```bash
git pull origin main
```
- Switch to the main/base branch of the pull request
```bash
git checkout main
```
- Merge the dev/head branch into the main/base branch
```bash
git merge dev
```
- Push the changes
```bash
git push -u origin main
```
Can then `Delete branch dev` if no longer needed.

In dbt, click `Change branch` and `Pull from main`.

`Deployment` in dbt is the process of running dbt on a schedule in a deployment/production environment for use to power dashboards, reporting, and drive key business decision-making processes. The `deployment environment` runs from the `main branch` and uses a dedicated `deployment schema` (e.g., `dbt_production`) different than the development schema. 

Click on `Deploy` menu in dbt Could IDE to:

- Create `deployment environment` -> `Environments` -> Setup general settings for environment name (`Production`) and type (`Deployment`), `Snowflake connection settings` (account/role/database/warehouse), `Snowflake deployment credentials` (authentication and deployment schema: `dbt_production`)

- `Schedule a job` to orchestrate the execution of models in production -> `Jobs` -> `Create job` -> 
    - set `job settings`: name (e.g., 'finance_economics_job'), set environment to 'Production', 
    - set `execution settings` & `commands` -> dbt seed -> dbt run -> dbt test; and check `Generate docs on run`
    - set the `schedule`: intervals, specific hours or cron scheduled job
    - set `advanced settings` `threads` (max number of paths) to greater than default 4 threads (e.g., 8) to minimize the runtime at the potential expense on the Snowflake warehouse

Turn off scheduling to run the job one time. Save -> Run now -> Click on Run #number to view execution -> Success

Verify deployment in Snowflake:

```sql
-- verify deployment

-- two views
select * from analytics.information_schema.views
where table_schema = 'DBT_PRODUCTION' and table_name like 'STG_FINANCE__%';

-- two tables
select * from analytics.information_schema.tables
where table_schema = 'DBT_PRODUCTION' and table_name like 'TRADING_%';

-- six tables
select * from analytics.information_schema.tables
where table_schema = 'DBT_PRODUCTION' and table_name like 'INT_FINANCE__%';
```