This project demonstrates how to build an ELT pipeline using dbt, Snowflake, and Airflow. Follow the steps below to set up your environment, configure dbt, create models, macros, tests, and deploy on Airflow.
- Create Accounts and Roles
use role accountadmin;
create warehouse dbt_wh with warehouse_size='x-small';
create database if not exists dbt_db;
create role if not exists dbt_role;
show grants on warehouse dbt_wh;
grant role dbt_role to user <USER_NAME>;
grant usage on warehouse dbt_wh to role dbt_role;
grant all on database dbt_db to role dbt_role;
use role dbt_role;
create schema if not exists dbt_db.dbt_schema;
use role accountadmin;
drop warehouse if exists dbt_wh;
drop database if exists dbt_db;
drop role if exists dbt_role;
models:
snowflake_workshop:
staging:
materialized: view
snowflake_warehouse: dbt_wh
marts:
materialized: table
snowflake_warehouse: dbt_wh
version: 2
sources:
- name: tpch
database: snowflake_sample_data
schema: tpch_sf1
tables:
- name: orders
columns:
- name: o_orderkey
tests:
- unique
- not_null
- name: lineitem
columns:
- name: l_orderkey
tests:
- relationships:
to: source('tpch', 'orders')
field: o_orderkey
o_orderkey as order_key,
o_custkey as customer_key,
o_orderstatus as status_code,
o_totalprice as total_price,
o_orderdate as order_date
from
{{ source('tpch', 'orders') }}
select
{{
dbt_utils.generate_surrogate_key([
'l_orderkey',
'l_linenumber'
])
}} as order_item_key,
l_orderkey as order_key,
l_partkey as part_key,
l_linenumber as line_number,
l_quantity as quantity,
l_extendedprice as extended_price,
l_discount as discount_percentage,
l_tax as tax_rate
from
{{ source('tpch', 'lineitem') }}
{% macro discounted_amount(extended_price, discount_percentage, scale=2) %}
(-1 * {{ extended_price }} * {{ discount_percentage }})::decimal(16, {{ scale }})
{% endmacro %}
select
line_item.order_item_key,
line_item.part_key,
line_item.line_number,
line_item.extended_price,
orders.order_key,
orders.customer_key,
orders.order_date,
{{ discounted_amount('line_item.extended_price', 'line_item.discount_percentage') }} as item_discount_amount
from
{{ ref('stg_tpch_orders') }} as orders
join
{{ ref('stg_tpch_line_items') }} as line_item
on orders.order_key = line_item.order_key
order by
orders.order_date
select
order_key,
sum(extended_price) as gross_item_sales_amount,
sum(item_discount_amount) as item_discount_amount
from
{{ ref('int_order_items') }}
group by
order_key
select
orders.*,
order_item_summary.gross_item_sales_amount,
order_item_summary.item_discount_amount
from
{{ ref('stg_tpch_orders') }} as orders
join
{{ ref('int_order_items_summary') }} as order_item_summary
on orders.order_key = order_item_summary.order_key
order by order_date
models:
- name: fct_orders
columns:
- name: order_key
tests:
- unique
- not_null
- relationships:
to: ref('stg_tpch_orders')
field: order_key
severity: warn
- name: status_code
tests:
- accepted_values:
values: ['P', 'O', 'F']
*
from
{{ ref('fct_orders') }}
where
item_discount_amount > 0
select
*
from
{{ ref('fct_orders') }}
where
date(order_date) > CURRENT_DATE()
or date(order_date) < date('1990-01-01')
pip install --no-cache-dir dbt-snowflake && deactivate
apache-airflow-providers-snowflake
"account": "<account_locator>-<account_name>",
"warehouse": "dbt_wh",
"database": "dbt_db",
"role": "dbt_role",
"insecure_mode": false
}
from datetime import datetime
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile