From 7bf96cdbf0a24ee83876885f6c6653193d47baab Mon Sep 17 00:00:00 2001 From: Mohit Tilala Date: Thu, 16 Apr 2026 10:44:38 +0530 Subject: [PATCH 1/2] deps(ingestion): upgrade collate-sqllineage to >=2.1.1 with expanded lineage test coverage --- ingestion/setup.py | 2 +- .../queries/test_specific_dialect_queries.py | 309 ++++++++++++++++-- .../tests/unit/lineage/test_sql_lineage.py | 5 - 3 files changed, 274 insertions(+), 42 deletions(-) diff --git a/ingestion/setup.py b/ingestion/setup.py index e9a63d4d3fff..e1b51a7f8be0 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -165,7 +165,7 @@ "requests>=2.23", "requests-aws4auth~=1.1", # Only depends on requests as external package. Leaving as base. "sqlalchemy>=2.0.0,<3", - "collate-sqllineage>=2.0.2", + "collate-sqllineage>=2.1.1", "tabulate==0.9.0", "typing-inspect", "packaging", # For version parsing diff --git a/ingestion/tests/unit/lineage/queries/test_specific_dialect_queries.py b/ingestion/tests/unit/lineage/queries/test_specific_dialect_queries.py index 27d3d0c13cab..3a2e68906849 100644 --- a/ingestion/tests/unit/lineage/queries/test_specific_dialect_queries.py +++ b/ingestion/tests/unit/lineage/queries/test_specific_dialect_queries.py @@ -12,7 +12,6 @@ ------------------- 1. PostgreSQL COPY command - Not supported, returns empty source tables - test_postgres_copy_with_jsonb: test_sqlglot=False - - test_postgres_copy_with_jsonb_to_target: test_sqlglot=False 2. CREATE PROCEDURE syntax - Not supported (Oracle, SQL Server) - test_oracle_create_procedure_insert_select: test_sqlglot=False @@ -28,25 +27,25 @@ SqlFluff Limitations: -------------------- -1. ClickHouse CREATE TABLE AS SELECT with CTEs - Returns empty source tables - - test_clickhouse_create_table_with_ctes: test_sqlfluff=False - -2. PostgreSQL DDL statements - UnsupportedStatementException for SET/ALTER SEQUENCE +1. PostgreSQL DDL statements - UnsupportedStatementException for SET/ALTER SEQUENCE - test_postgres_ddl_statements: test_sqlfluff=False -3. Snowflake bind parameters - InvalidSyntaxException with :param syntax in INSERT +2. Snowflake bind parameters - InvalidSyntaxException with :param syntax in INSERT - test_snowflake_insert_with_cte_and_sequence: test_sqlfluff=False - test_snowflake_insert_parse_xml: test_sqlfluff=False -4. Snowflake LATERAL FLATTEN - IndexError when parsing JSON flattening syntax +3. Snowflake LATERAL FLATTEN - IndexError when parsing JSON flattening syntax - test_snowflake_lateral_flatten_json: test_sqlfluff=False -5. Oracle CREATE PROCEDURE - InvalidSyntaxException for procedure syntax +4. Oracle CREATE PROCEDURE - InvalidSyntaxException for procedure syntax - test_oracle_create_procedure_insert_select: test_sqlfluff=False -6. Nested subquery wildcards - KeyError in wildcard handler for complex nested queries +5. Nested subquery wildcards - SubQuery error on wildcard handling in deeply nested queries - test_copy_grants_with_complex_case: test_sqlfluff=False +6. Deeply nested UNION ALL column lineage - Returns empty column lineage (~5% of runs) + - test_complex_postgres_view: test_sqlfluff=False + SqlParse Limitations: -------------------- 1. CTE name confusion - Incorrectly includes CTE names as source tables @@ -59,25 +58,26 @@ 3. Complex UPDATE with subqueries - Returns empty source tables - test_snowflake_update_with_nested_select: test_sqlparse=False -4. JSON path expressions - Doesn't parse Snowflake JSON paths correctly +4. JSON path expressions - Returns raw alias (v → v) instead of resolved column names - test_snowflake_lateral_flatten_json: test_sqlparse=False 5. CREATE PROCEDURE - Not supported, returns empty source tables - test_oracle_create_procedure_insert_select: test_sqlparse=False -6. COPY FROM file - Doesn't recognize COPY FROM as a write operation - - test_postgres_copy_with_jsonb_to_target: test_sqlparse=False +6. BigQuery CLONE statement - Returns empty source tables + - test_bigquery_clone_table_with_digit_starting_name: test_sqlparse=False Graph Comparison Skips (skip_graph_check=True): ----------------------------------------------- Used when parsers produce valid lineage but with different internal graph structures: 1. test_postgres_copy_with_jsonb - Different node structures between SqlFluff/SqlParse -2. test_snowflake_insert_with_cte_and_sequence - Different CTE handling SqlGlot/SqlParse -3. test_snowflake_insert_parse_xml - Different bind parameter handling -4. test_postgres_create_table - Different DDL representations -5. test_bigquery_with_cte_window_functions - Different CTE graph structures -6. test_complex_postgres_view - Same nodes/edges but different graph structure +2. test_postgres_copy_with_jsonb_to_target (column) - SqlFluff graph differs from SqlGlot/SqlParse +3. test_snowflake_insert_with_cte_and_sequence - Different CTE handling SqlGlot/SqlParse +4. test_snowflake_insert_parse_xml - Different bind parameter handling +5. test_postgres_create_table - Different DDL representations +6. test_bigquery_with_cte_window_functions - Different CTE graph structures +7. test_clickhouse_ctas_engine_union_all_not_in - SqlFluff graph differs (24n/33e vs 26n/35e) Column Lineage Categories: -------------------------- @@ -96,7 +96,7 @@ Test Coverage: ------------- -- Total Tests: 18 +- Total Tests: 28 - Dialects: Snowflake, BigQuery, MySQL, ClickHouse, PostgreSQL, T-SQL, Oracle - Parsers: SqlGlot, SqlFluff, SqlParse - All tests validate both table lineage AND column lineage @@ -104,7 +104,6 @@ from unittest import TestCase -import pytest from collate_sqllineage.core.models import Location, Path from ingestion.tests.unit.lineage.queries.helpers import ( @@ -187,13 +186,13 @@ def test_copy_grants_with_complex_case(self): ) # All columns are masked literals - no meaningful source column lineage - # SqlFluff crashes with KeyError on wildcard handling for nested subqueries + # SqlFluff crashes with SubQuery error on wildcard handling for nested subqueries assert_column_lineage_equal( query, [], dialect=Dialect.SNOWFLAKE.value, test_sqlparse=False, - test_sqlfluff=False, # SqlFluff crashes with KeyError on SubQuery wildcard handling + test_sqlfluff=False, ) def test_dbt_model_style_create_view(self): @@ -360,8 +359,6 @@ def test_clickhouse_create_table_with_ctes(self): }, {"atlas.dbt.int_inventory_juvo"}, dialect=Dialect.CLICKHOUSE.value, - # SqlFluff returns empty source tables for ClickHouse CREATE TABLE AS SELECT with CTEs - test_sqlfluff=False, # SqlParse incorrectly includes CTE name 'sku_cost' as source table test_sqlparse=False, ) @@ -391,7 +388,6 @@ def test_clickhouse_create_table_with_ctes(self): ), ], dialect=Dialect.CLICKHOUSE.value, - test_sqlfluff=False, test_sqlparse=False, ) @@ -474,19 +470,15 @@ def test_postgres_copy_with_jsonb_to_target(self): {Path("/data/exports/customers.csv")}, {"public.customer_data"}, dialect=Dialect.POSTGRES.value, - # SqlGlot does not support PostgreSQL COPY command - test_sqlglot=False, - # SqlParse doesn't recognize COPY FROM as a write operation - test_sqlparse=False, ) # No column lineage expected - COPY FROM file + # SqlFluff column graph differs from SqlGlot/SqlParse, skip graph check assert_column_lineage_equal( query, [], dialect=Dialect.POSTGRES.value, - test_sqlglot=False, - test_sqlparse=False, + skip_graph_check=True, ) def test_column_lineage_extraction(self): @@ -515,9 +507,6 @@ def test_column_lineage_extraction(self): dialect=Dialect.MYSQL.value, ) - @pytest.mark.skip( - "SqlFluff returning empty column lineage unexpectedly in rare cases (5% of runs)" - ) def test_complex_postgres_view(self): """Test complex PostgreSQL CREATE VIEW with UNION ALL, nested subqueries, and JSON functions""" query = """create view stg_globalv2_default.b2c_order_operational_converted as @@ -659,7 +648,7 @@ def test_complex_postgres_view(self): ], dialect=Dialect.POSTGRES.value, test_sqlglot=False, # SqlGlot doesn't extract column lineage for UNION ALL - skip_graph_check=True, # SqlFluff and SqlParse have same nodes/edges but different graph structure + test_sqlfluff=False, # SqlFluff returns empty column lineage for deeply nested UNION ALL (~5% of runs) ) def test_postgres_ddl_statements(self): @@ -909,7 +898,7 @@ def test_snowflake_lateral_flatten_json(self): ], dialect=Dialect.SNOWFLAKE.value, test_sqlfluff=False, - # SqlParse doesn't parse JSON path expressions correctly, returns just "v" column + # SqlParse returns v → v (the raw column) instead of v → named output columns test_sqlparse=False, ) @@ -958,7 +947,8 @@ def test_snowflake_update_with_nested_select(self): ], dialect=Dialect.SNOWFLAKE.value, test_sqlparse=False, - test_sqlglot=False, # SqlGlot doesn't extract column lineage for UPDATE + # SqlGlot returns empty column lineage for UPDATE statements + test_sqlglot=False, ) def test_snowflake_insert_parse_xml(self): @@ -1170,3 +1160,250 @@ def test_snowflake_copy_into_fully_qualified_stage(self): [], dialect=Dialect.SNOWFLAKE.value, ) + + # ----------------------------------------------------------------------- + # collate-sqllineage 2.1.1 regression tests + # Release: https://github.com/open-metadata/collate-sqllineage/releases/tag/2.1.1-release + # ----------------------------------------------------------------------- + + def test_ctas_union_all_inside_cte_column_lineage(self): + """Test CTAS where the CTE body is a UNION ALL — column lineage maps both branches. + + Verifies that when a CTE wraps a UNION ALL and the outer SELECT reads from that + CTE, column lineage correctly flows from both UNION ALL input tables to the + CTAS write target (not to the CTE name or a wrong intermediate table). + All 3 parsers produce identical graphs (19n/26e). + """ + query = """CREATE TABLE analytics.fact_orders AS +WITH combined_data AS ( + SELECT order_id, amount, status FROM staging.orders_source_a + UNION ALL + SELECT order_id, amount, status FROM staging.orders_source_b +) +SELECT order_id, amount, status FROM combined_data""" + + assert_table_lineage_equal( + query, + {"staging.orders_source_a", "staging.orders_source_b"}, + {"analytics.fact_orders"}, + ) + + assert_column_lineage_equal( + query, + [ + ( + TestColumnQualifierTuple("order_id", "staging.orders_source_a"), + TestColumnQualifierTuple("order_id", "analytics.fact_orders"), + ), + ( + TestColumnQualifierTuple("amount", "staging.orders_source_a"), + TestColumnQualifierTuple("amount", "analytics.fact_orders"), + ), + ( + TestColumnQualifierTuple("status", "staging.orders_source_a"), + TestColumnQualifierTuple("status", "analytics.fact_orders"), + ), + ( + TestColumnQualifierTuple("order_id", "staging.orders_source_b"), + TestColumnQualifierTuple("order_id", "analytics.fact_orders"), + ), + ( + TestColumnQualifierTuple("amount", "staging.orders_source_b"), + TestColumnQualifierTuple("amount", "analytics.fact_orders"), + ), + ( + TestColumnQualifierTuple("status", "staging.orders_source_b"), + TestColumnQualifierTuple("status", "analytics.fact_orders"), + ), + ], + ) + + def test_clickhouse_ctas_engine_union_all_not_in(self): + """Test ClickHouse CTAS with ENGINE clause, UNION ALL, and NOT IN subquery. + + Regression for https://github.com/open-metadata/OpenMetadata/issues/21953. + Verifies that CTAS queries combining ENGINE = ..., CTEs, UNION ALL and a NOT IN + subfilter produce correct source/target table lineage and column lineage. + SqlFluff graph structure differs from SqlGlot/SqlParse (24n/33e vs 26n/35e), + requiring skip_graph_check. + """ + query = """CREATE TABLE analytics_mart.dim_entity ENGINE = ReplacingMergeTree() AS +WITH source_a AS ( + SELECT entity_id, entity_name, source_system FROM staging.int_entity__source_a +), +source_b AS ( + SELECT entity_id, entity_name, source_system FROM staging.int_entity__source_b +) +SELECT entity_id, entity_name, source_system FROM source_a +WHERE entity_id NOT IN (SELECT entity_id FROM source_b) +UNION ALL +SELECT entity_id, entity_name, source_system FROM source_b""" + + assert_table_lineage_equal( + query, + {"staging.int_entity__source_a", "staging.int_entity__source_b"}, + {"analytics_mart.dim_entity"}, + dialect=Dialect.CLICKHOUSE.value, + skip_graph_check=True, + ) + + assert_column_lineage_equal( + query, + [ + ( + TestColumnQualifierTuple( + "entity_id", "staging.int_entity__source_a" + ), + TestColumnQualifierTuple("entity_id", "analytics_mart.dim_entity"), + ), + ( + TestColumnQualifierTuple( + "entity_name", "staging.int_entity__source_a" + ), + TestColumnQualifierTuple( + "entity_name", "analytics_mart.dim_entity" + ), + ), + ( + TestColumnQualifierTuple( + "source_system", "staging.int_entity__source_a" + ), + TestColumnQualifierTuple( + "source_system", "analytics_mart.dim_entity" + ), + ), + ( + TestColumnQualifierTuple( + "entity_id", "staging.int_entity__source_b" + ), + TestColumnQualifierTuple("entity_id", "analytics_mart.dim_entity"), + ), + ( + TestColumnQualifierTuple( + "entity_name", "staging.int_entity__source_b" + ), + TestColumnQualifierTuple( + "entity_name", "analytics_mart.dim_entity" + ), + ), + ( + TestColumnQualifierTuple( + "source_system", "staging.int_entity__source_b" + ), + TestColumnQualifierTuple( + "source_system", "analytics_mart.dim_entity" + ), + ), + ], + dialect=Dialect.CLICKHOUSE.value, + skip_graph_check=True, + ) + + def test_bigquery_clone_table_with_digit_starting_name(self): + """Test BigQuery CREATE OR REPLACE TABLE ... CLONE where source name starts with digit. + + Regression for https://github.com/open-metadata/OpenMetadata/issues/23338. + BigQuery allows identifiers that start with digits (e.g. 1st_layer___name). + SqlParse returns empty sources for CLONE statements so it is excluded. + SqlGlot and SqlFluff produce isomorphic graphs (3n/2e). + """ + query = "CREATE OR REPLACE TABLE analytics_ref.region_summary_v2 CLONE analytics_source.1st_layer___region_summary_v2" + + assert_table_lineage_equal( + query, + {"analytics_source.1st_layer___region_summary_v2"}, + {"analytics_ref.region_summary_v2"}, + dialect=Dialect.BIGQUERY.value, + test_sqlparse=False, + ) + + assert_column_lineage_equal( + query, + [], + dialect=Dialect.BIGQUERY.value, + test_sqlparse=False, + ) + + def test_snowflake_copy_into_table_with_column_list_from_stage_subquery(self): + """Test COPY INTO table (col1, col2) FROM (SELECT ... FROM @stage) with explicit column list. + + Regression for https://github.com/open-metadata/OpenMetadata/issues/27380. + Verifies that the stage reference is resolved as a Location source even when the + COPY INTO target specifies an explicit column list and the subquery uses Snowflake + positional column syntax ($1:field). Internal graph structures differ across parsers. + """ + query = """COPY INTO PROD_DB.STAGING.RAW_EVENTS (EVENT_ID, EVENT_DATA) +FROM (SELECT $1:event_id, $2:event_data FROM @PROD_DB.STAGING.STG_EVENTS_ROOT) +FILE_FORMAT = (TYPE = PARQUET)""" + + assert_table_lineage_equal( + query, + {Location("@prod_db.staging.stg_events_root")}, + {"prod_db.staging.raw_events"}, + dialect=Dialect.SNOWFLAKE.value, + skip_graph_check=True, + ) + + assert_column_lineage_equal( + query, + [], + dialect=Dialect.SNOWFLAKE.value, + skip_graph_check=True, + ) + + def test_snowflake_copy_into_stage_subpath_with_external_file_format(self): + """Test COPY INTO from @stage/subpath/file.csv with an external named FILE_FORMAT. + + Regression for https://github.com/open-metadata/OpenMetadata/issues/27380. + Verifies that the stage subpath (CDL/delivery_data/file.csv) is stripped so the + source resolves to the stage root (@stage), and that a fully-qualified external + FILE_FORMAT reference (db.schema.format) does not interfere with lineage. + Internal graph structures differ across parsers. + """ + query = """COPY INTO LOAD_DB.PUBLIC.FACT_DELIVERIES +FROM (SELECT $1, $2, $3 FROM @LOAD_DB.STAGING.STG_DELIVERIES/CDL/delivery_data/file.csv) +FILE_FORMAT = LOAD_DB.PUBLIC.CSV_FORMAT +FORCE = TRUE +ON_ERROR = CONTINUE""" + + assert_table_lineage_equal( + query, + {Location("@load_db.staging.stg_deliveries")}, + {"load_db.public.fact_deliveries"}, + dialect=Dialect.SNOWFLAKE.value, + skip_graph_check=True, + ) + + assert_column_lineage_equal( + query, + [], + dialect=Dialect.SNOWFLAKE.value, + skip_graph_check=True, + ) + + def test_snowflake_copy_into_stage_subpath_date_partitioned(self): + """Test COPY INTO from @stage/YYYY/MM/DD/file.csv date-partitioned path. + + Regression for https://github.com/open-metadata/OpenMetadata/issues/27380. + Verifies that date-partitioned stage subpaths (e.g. /2026/04/11/events.csv) are + stripped so the source resolves to the stage root rather than the full path. + Internal graph structures differ across parsers. + """ + query = """COPY INTO ANALYTICS_DB.PUBLIC.FACT_EVENTS +FROM (SELECT $1 FROM @ANALYTICS_DB.PUBLIC.STG_EVENTS/2026/04/11/events.csv) +FILE_FORMAT = (TYPE = CSV)""" + + assert_table_lineage_equal( + query, + {Location("@analytics_db.public.stg_events")}, + {"analytics_db.public.fact_events"}, + dialect=Dialect.SNOWFLAKE.value, + skip_graph_check=True, + ) + + assert_column_lineage_equal( + query, + [], + dialect=Dialect.SNOWFLAKE.value, + skip_graph_check=True, + ) diff --git a/ingestion/tests/unit/lineage/test_sql_lineage.py b/ingestion/tests/unit/lineage/test_sql_lineage.py index e3a35e0f43b0..6c2320635a0e 100644 --- a/ingestion/tests/unit/lineage/test_sql_lineage.py +++ b/ingestion/tests/unit/lineage/test_sql_lineage.py @@ -126,11 +126,6 @@ def test_populate_column_lineage_map_select_all(self): lineage_map, {"testdb.public.target": {"testdb.public.users": [("*", "*")]}} ) - # TODO: since default parser is sqlglot, which fails to parse CTEs properly, - # we need to either fix sqlglot or change the default parser to test this case - @pytest.mark.skip( - reason="SqlGlot does not handle CTEs properly yet for column lineage." - ) def test_populate_column_lineage_map_ctes(self): """ Method to test column lineage map populate func with ctes From 23efcc69a084738082549001cd402e3926504a59 Mon Sep 17 00:00:00 2001 From: Mohit Tilala Date: Thu, 16 Apr 2026 12:05:43 +0530 Subject: [PATCH 2/2] address copilot comments --- .../unit/lineage/queries/test_specific_dialect_queries.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ingestion/tests/unit/lineage/queries/test_specific_dialect_queries.py b/ingestion/tests/unit/lineage/queries/test_specific_dialect_queries.py index 3a2e68906849..a195048b9eef 100644 --- a/ingestion/tests/unit/lineage/queries/test_specific_dialect_queries.py +++ b/ingestion/tests/unit/lineage/queries/test_specific_dialect_queries.py @@ -1338,7 +1338,7 @@ def test_snowflake_copy_into_table_with_column_list_from_stage_subquery(self): assert_table_lineage_equal( query, - {Location("@prod_db.staging.stg_events_root")}, + {Location("@PROD_DB.STAGING.STG_EVENTS_ROOT")}, {"prod_db.staging.raw_events"}, dialect=Dialect.SNOWFLAKE.value, skip_graph_check=True, @@ -1368,7 +1368,7 @@ def test_snowflake_copy_into_stage_subpath_with_external_file_format(self): assert_table_lineage_equal( query, - {Location("@load_db.staging.stg_deliveries")}, + {Location("@LOAD_DB.STAGING.STG_DELIVERIES")}, {"load_db.public.fact_deliveries"}, dialect=Dialect.SNOWFLAKE.value, skip_graph_check=True, @@ -1395,7 +1395,7 @@ def test_snowflake_copy_into_stage_subpath_date_partitioned(self): assert_table_lineage_equal( query, - {Location("@analytics_db.public.stg_events")}, + {Location("@ANALYTICS_DB.PUBLIC.STG_EVENTS")}, {"analytics_db.public.fact_events"}, dialect=Dialect.SNOWFLAKE.value, skip_graph_check=True,