In [27]:
!rm jaffle_shop.duckdb && dbt clean && dbt deps && dbt seed && dbt build 

[0m21:06:18  Running with dbt=1.5.1
        The default package install path has changed from `dbt_modules` to
`dbt_packages`.         Please update `clean-targets` in `dbt_project.yml` and
check `.gitignore` as well.         Or, set `packages-install-path: dbt_modules`
if you'd like to keep the current value.
[0m21:06:18  Checking target/*
[0m21:06:18  Cleaned target/*
[0m21:06:18  Checking dbt_modules/*
[0m21:06:18  Cleaned dbt_modules/*
[0m21:06:18  Checking logs/*
[0m21:06:18  Cleaned logs/*
[0m21:06:18  Finished cleaning all paths.
[0m21:06:19  Running with dbt=1.5.1
[0m21:06:20  Running with dbt=1.5.1
[0m21:06:20  Unable to do partial parsing because saved manifest not found. Starting full parse.
[0m21:06:21  Found 5 models, 20 tests, 0 snapshots, 0 analyses, 314 macros, 0 operations, 3 seed files, 0 sources, 0 exposures, 0 metrics, 0 groups
[0m21:06:21  
[0m21:06:21  Concurrency: 1 threads (target='dev')
[0m21:06:21  
[0m21:06:21  1 of 3 START seed file main.raw_

In [1]:
import os
import json
import sqlglot 
import time

from loguru import logger
from sqlglot import exp
from sqlglot import parse_one 
from sqlglot.lineage import lineage
from sqlglot.optimizer import optimize

In [2]:
raw_schemas = {
    "jaffle_shop": {
        "main": {
            "raw_payments": {
                "id": "int",
                "order_id": "int",
                "payment_method": "str",
                "amount": "int"
              },
            "raw_customers": {
                "id": "int",
                "first_name": "str",
                "last_name": "str"
            },
            "raw_orders": {
                "id": "int",
                "user_id": "int",
                "order_date": "date",
                "status": "str"
            }
        }
    }
}

In [3]:
def get_sources(starting_directory):
    sources = {}
    for root, dirs, files in os.walk(starting_directory):
        for file in files:
            file_path = os.path.join(root, file)
            if root.split('/')[-1] != 'schema.yml':
                with open(file_path, 'r') as f:
                    sql_content = f.read()
                    model_name_from_path = file_path.split("/")[-1].split(".")[0]
                    sources[f"jaffle_shop.main.{model_name_from_path}"] = sql_content
    return sources

In [4]:
sources = get_sources("./target/compiled/jaffle_shop/models/")

In [5]:
def get_final_columns(_sql, _schemas=None):
    if _schemas:
        return optimize(_sql, schema=_schemas).named_selects
    else:
        return optimize(_sql).named_selects

In [6]:
def get_leaf_nodes(_node): 
    leaf_nodes = []
    if not _node.downstream:
        leaf_nodes.append(_node)
    else:
        for child in _node.downstream:
            leaf_nodes.extend(get_leaf_nodes(child))
    return leaf_nodes

In [7]:
schemas_for_optimizer = {
    "jaffle_shop": {
        "main": {
            "stg_payments": {
                "payment_id": "int", 
                "order_id": "int", 
                "payment_method": "str", 
                "amount": "int"
            },
            "stg_orders": {
                "order_id": "int",
                "customer_id": "int", 
                "order_date": "date", 
                "status": "str"
            }
        }
    }
}

customers = sources.get("jaffle_shop.main.customers")
customers_columns = get_final_columns(customers, schemas_for_optimizer)
logger.info(customers_columns)

[32m2023-06-12 14:04:56.309[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m22[0m - [1m['customer_id', 'first_name', 'last_name', 'first_order', 'most_recent_order', 'number_of_orders', 'customer_lifetime_value'][0m


In [8]:
node = lineage("customer_id", customers, sources=sources, schema=raw_schemas, dialect="duckdb")
node.to_html()

In [9]:
node = lineage("first_name", customers, sources=sources, schema=raw_schemas, dialect="duckdb")
node.to_html()

In [10]:
node = lineage("last_name", customers, sources=sources, schema=raw_schemas, dialect="duckdb")
node.to_html()

In [11]:
node = lineage("most_recent_order", customers, sources=sources, schema=raw_schemas, dialect="duckdb")
node.to_html()

In [12]:
node = lineage("number_of_orders", customers, sources=sources, schema=raw_schemas, dialect="duckdb")
node.to_html()

In [13]:
node = lineage("customer_lifetime_value", customers, sources=sources, schema=raw_schemas, dialect="duckdb")
node.to_html()

In [14]:
all_leaves = set()
for col in customers_columns: 
    node = lineage(col, customers, sources=sources, schema=raw_schemas, dialect="duckdb")
    leaves = get_leaf_nodes(node)
    for leaf in leaves:
        all_leaves.add(leaf.name)

all_leaves

{'raw_customers.first_name',
 'raw_customers.id',
 'raw_customers.last_name',
 'raw_orders.id',
 'raw_orders.order_date',
 'raw_payments.amount'}

In [15]:
orders = sources.get("jaffle_shop.main.orders")

orders_columns = get_final_columns(orders)
logger.info(orders_columns)


[32m2023-06-12 14:05:03.293[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m4[0m - [1m['order_id', 'customer_id', 'order_date', 'status', 'credit_card_amount', 'coupon_amount', 'bank_transfer_amount', 'gift_card_amount', 'amount'][0m


In [16]:
node = lineage("order_id", orders, sources=sources, schema=raw_schemas, dialect="duckdb")
node.to_html()

In [17]:
node = lineage("customer_id", orders, sources=sources, schema=raw_schemas, dialect="duckdb")
node.to_html()

In [18]:
node = lineage("order_date", orders, sources=sources, schema=raw_schemas, dialect="duckdb")
node.to_html()

In [19]:
node = lineage("status", orders, sources=sources, schema=raw_schemas, dialect="duckdb")
node.to_html()

In [20]:
node = lineage("credit_card_amount", orders, sources=sources, schema=raw_schemas, dialect="duckdb")
node.to_html()

In [21]:
node = lineage("coupon_amount", orders, sources=sources, schema=raw_schemas, dialect="duckdb")
node.to_html()

In [22]:
node = lineage("bank_transfer_amount", orders, sources=sources, schema=raw_schemas, dialect="duckdb")
node.to_html()

In [23]:
node = lineage("gift_card_amount", orders, sources=sources, schema=raw_schemas, dialect="duckdb")
node.to_html()

In [24]:
node = lineage("amount", orders, sources=sources, schema=raw_schemas, dialect="duckdb")
node.to_html()

In [25]:
all_leaves = set()
for col in orders_columns: 
    node = lineage(col, orders, sources=sources, schema=raw_schemas, dialect="duckdb")
    leaves = get_leaf_nodes(node)
    for leaf in leaves:
        all_leaves.add(leaf.name)

all_leaves

{'raw_orders.id',
 'raw_orders.order_date',
 'raw_orders.status',
 'raw_orders.user_id',
 'raw_payments.amount',
 'raw_payments.payment_method'}