Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ Track progress on adding real-world data pipeline example notebooks.

Discovered during CDC/SCD design review (see `docs/superpowers/specs/2026-04-13-cdc-scd-pipeline-gaps-design.md` §Risks 5, 6). Both are cross-cutting changes that must land before the CDC/SCD notebook (item B) can honestly showcase lineage.

- [ ] **Gap 4. Self-referencing target across statements** (statement-scoped table versioning)
- Today: `depends_on_tables` / `depends_on_units` reference tables by name only; no N-vs-N+1 snapshot distinction (see `pipeline_lineage_builder.py:76-108`).
- Symptom: in SCD2, Step 2's `LEFT JOIN dim_customer t` collapses onto the same node that Step 1 (MERGE) just wrote — the pipeline graph shows a self-loop instead of "read prior state, then overwrite."
- Needs its own design doc.
- [x] **Gap 4. Self-referencing target across statements** (statement-scoped table versioning)
- Implemented in PR #61: self-read node detection via AST node identity, cycle-safe dependency resolution, query-scoped `{query_id}:self_read:{table}.{col}` naming, column-granular cross-query wiring, edge role/order annotations.
- Design: `docs/superpowers/specs/2026-04-13-gap4-self-referencing-target-design.md`

- [ ] **Gap 7. JOIN ON predicate columns not recorded in column lineage**
- Today: JOIN ON predicates produce **zero** column-lineage edges (no handling in `lineage_builder` for ON clause columns beyond the equi-join's identity resolution).
Expand Down
11 changes: 11 additions & 0 deletions src/clgraph/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,10 @@ class ColumnEdge:
tvf_info: Optional["TVFInfo"] = None # Full TVF specification
is_tvf_output: bool = False # True if this edge is from a TVF output

# ─── Self-Reference / Pipeline Ordering Metadata ───
statement_order: Optional[int] = None # Topological sort index of the query
edge_role: Optional[str] = None # "prior_state_read", "cross_query_self_ref", or None

def __hash__(self):
return hash((self.from_node.full_name, self.to_node.full_name, self.edge_type))

Expand Down Expand Up @@ -834,6 +838,7 @@ class SQLOperation(Enum):
MERGE = "MERGE"
DELETE_AND_INSERT = "DELETE+INSERT" # Common pattern
UPDATE = "UPDATE"
DELETE = "DELETE"

# DQL Operations
SELECT = "SELECT" # Query-only, no table creation/modification
Expand All @@ -857,6 +862,11 @@ class ParsedQuery:
destination_table: Optional[str] # Table being created/modified (None for SELECT-only)
source_tables: Set[str] # Tables being read

# Self-referencing tables (tables that appear as both destination and source)
self_referenced_tables: Set[str] = field(default_factory=set)
# Mapping SQL alias -> resolved table name for self-referenced tables only
self_ref_aliases: Dict[str, str] = field(default_factory=dict)

# Query-level lineage
query_lineage: Optional["ColumnLineageGraph"] = None # Single-query lineage graph

Expand Down Expand Up @@ -885,6 +895,7 @@ def is_dml(self) -> bool:
SQLOperation.MERGE,
SQLOperation.DELETE_AND_INSERT,
SQLOperation.UPDATE,
SQLOperation.DELETE,
]

def is_dql(self) -> bool:
Expand Down
59 changes: 48 additions & 11 deletions src/clgraph/multi_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"""

import re
from typing import Dict, List, Optional, Set, Tuple
from typing import Dict, List, Optional, Tuple

import sqlglot
from sqlglot import exp
Expand Down Expand Up @@ -185,8 +185,8 @@ def _parse_single_query(
# Determine operation type and destination table
operation, destination = self._extract_operation_and_destination(ast, tokenizer)

# Extract source tables
sources = self._extract_source_tables(ast, tokenizer)
# Extract source tables (now also returns self-reference info)
sources, self_referenced, self_ref_aliases = self._extract_source_tables(ast, tokenizer)

# Restore templates in SQL for lineage building
# Only restore if templates were resolved (template_context was provided)
Expand All @@ -205,6 +205,8 @@ def _parse_single_query(
operation=operation,
destination_table=destination,
source_tables=sources,
self_referenced_tables=self_referenced,
self_ref_aliases=self_ref_aliases,
original_sql=original_sql if is_templated else None,
is_templated=is_templated,
)
Expand Down Expand Up @@ -245,6 +247,10 @@ def _extract_operation_and_destination(
operation = SQLOperation.UPDATE
destination = self._get_table_name(ast.this, tokenizer)

elif isinstance(ast, exp.Delete):
operation = SQLOperation.DELETE
destination = self._get_table_name(ast.this, tokenizer)

# DQL: SELECT (query-only)
elif isinstance(ast, exp.Select):
operation = SQLOperation.SELECT
Expand All @@ -256,24 +262,41 @@ def _extract_operation_and_destination(

return operation, destination

def _extract_source_tables(self, ast: exp.Expression, tokenizer: TemplateTokenizer) -> Set[str]:
def _extract_source_tables(
self, ast: exp.Expression, tokenizer: TemplateTokenizer
) -> tuple[set[str], set[str], dict[str, str]]:
"""
Extract all source tables referenced in the query (excluding destination table).
Extract all source tables referenced in the query.

Tables that match the destination table but appear in the query body
(not as the direct target slot) are kept as source tables and also
recorded as self-referenced tables.

CTE aliases are filtered out so they don't leak into the table dependency
graph as phantom source tables. For example, in:
WITH source AS (SELECT * FROM raw.orders)
SELECT * FROM source
only `raw.orders` is returned, not the CTE alias `source`.

Returns:
Tuple of (source_tables, self_referenced_tables, self_ref_aliases)
"""
tables = set()
tables: set[str] = set()
self_referenced_tables: set[str] = set()
self_ref_aliases: dict[str, str] = {}

# For CREATE/INSERT/MERGE/UPDATE, the destination table is in ast.this
# We need to exclude it from source tables
# For CREATE/INSERT/MERGE/UPDATE/DELETE, the destination table is in ast.this
destination_table = None
if isinstance(ast, (exp.Create, exp.Insert, exp.Merge, exp.Update)):
target_table_node_ids: set[int] = set()

if isinstance(ast, (exp.Create, exp.Insert, exp.Merge, exp.Update, exp.Delete)):
if ast.this:
destination_table = self._get_table_name(ast.this, tokenizer)
# Collect AST node IDs in the target slot so we can distinguish
# the target reference from body references to the same table.
target_table_node_ids.add(id(ast.this))
for t in ast.this.find_all(exp.Table):
target_table_node_ids.add(id(t))

# Collect CTE alias names so we can exclude CTE references from source
# tables. sqlglot represents `FROM <cte_name>` as an exp.Table node,
Expand All @@ -283,15 +306,29 @@ def _extract_source_tables(self, ast: exp.Expression, tokenizer: TemplateTokeniz
# Find all Table nodes in the AST
for table_node in ast.find_all(exp.Table):
table_name = self._get_table_name(table_node, tokenizer)
if not table_name or table_name == destination_table:
if not table_name:
continue

# Skip the table node if it IS the target slot itself
if id(table_node) in target_table_node_ids:
continue

# A bare Table node whose name matches a CTE alias (and has no
# schema/db qualifier) is a CTE reference, not an external table.
if table_name in cte_aliases and not (table_node.db or table_node.catalog):
continue

tables.add(table_name)

return tables
# Detect self-references: table in the body that matches destination
if table_name == destination_table:
self_referenced_tables.add(table_name)
# Record alias if present
alias = table_node.alias
if alias:
self_ref_aliases[alias] = table_name

return tables, self_referenced_tables, self_ref_aliases

def _get_table_name(self, table_node: exp.Table, tokenizer: TemplateTokenizer) -> str:
"""
Expand Down
37 changes: 34 additions & 3 deletions src/clgraph/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ def get_column(
Column keys now include query_id prefix (e.g., "query_1:table.column")
for uniqueness. This method provides convenient lookup by table/column name.

When multiple candidates match (e.g., both input and output layers),
output-layer columns are preferred since they represent the written state.

Args:
table_name: The table name
column_name: The column name
Expand All @@ -215,11 +218,39 @@ def get_column(
Returns:
The ColumnNode if found, None otherwise
"""
best: Optional[ColumnNode] = None
for col in self.columns.values():
if col.table_name == table_name and col.column_name == column_name:
if query_id is None or col.query_id == query_id:
return col
return None
if query_id is not None and col.query_id != query_id:
continue
# Skip self-read nodes for the default lookup (they have
# node_type="self_read" and query-scoped full_names)
if col.node_type == "self_read":
continue
if best is None:
best = col
elif col.layer == "output" and best.layer != "output":
best = col
return best

def get_self_read_columns(self, table_name: str) -> List[ColumnNode]:
"""
Get all self-read nodes for a given physical table.

Self-read nodes represent the prior state of a table that a query
reads from while also writing to the same table.

Args:
table_name: The physical table name (e.g., "dim_customer")

Returns:
List of ColumnNode objects with node_type="self_read" for that table
"""
return [
col
for col in self.columns.values()
if col.node_type == "self_read" and f":self_read:{table_name}." in col.full_name
]

def get_columns_by_table(self, table_name: str) -> List[ColumnNode]:
"""
Expand Down
Loading
Loading