Fixes #27538: feat(openlineage) add AWS Glue, Kusto, and Cosmos DB dataset naming support#27533
Fixes #27538: feat(openlineage) add AWS Glue, Kusto, and Cosmos DB dataset naming support#27533mohittilala wants to merge 1 commit intomainfrom
Conversation
Code Review ✅ ApprovedExpands OpenLineage dataset naming support to include AWS Glue, Kusto, and Cosmos DB. No issues found. OptionsDisplay: compact → Showing less information. Comment with these commands to change:
Was this helpful? React with 👍 / 👎 | Gitar |
There was a problem hiding this comment.
Pull request overview
This PR improves the OpenLineage ingestion connector’s ability to parse non-standard dataset naming formats emitted by AWS Glue EMR, Azure Data Explorer (Kusto), and Azure Cosmos DB, so lineage edges can be created instead of dropped due to unparseable dataset names.
Changes:
- Add namespace-aware parsing dispatch in
OpenlineageSource._get_table_detailsfor Glue/Kusto/Cosmos naming formats. - Introduce dedicated parsers for Glue (
table/{db}/{table}), Kusto ({db}/{table}), and Cosmos (/dbs/{db}+colls/{collection}). - Add unit tests covering the new parsers and namespace dispatch behavior.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py |
Adds namespace-based dataset-name parsing and new parser helpers for Glue/Kusto/Cosmos. |
ingestion/tests/unit/topology/pipeline/test_openlineage.py |
Adds unit tests validating the new parsing logic and dispatch behavior. |
| if not name.startswith("table/"): | ||
| return None | ||
| parts = name[len("table/") :].split("/") | ||
| if len(parts) < 2: | ||
| return None | ||
| return TableDetails(name=parts[-1].lower(), schema=parts[-2].lower()) |
There was a problem hiding this comment.
The Glue name parser can return empty schema/table when the input has empty path segments (e.g., trailing slash table/db/table/ or double slashes). That would later build an invalid FQN and potentially create/lookup wrong entities. Consider filtering out empty segments (or stripping trailing slashes) and returning None when database/table are missing.
| parts = name.split("/") | ||
| if len(parts) < 2: | ||
| return None | ||
| return TableDetails(name=parts[-1].lower(), schema=parts[-2].lower()) |
There was a problem hiding this comment.
_parse_slash_table_name has the same empty-segment issue as the Glue parser: inputs like db/table/ or db//table can yield an empty schema/table (since it blindly takes the last two split parts). Consider normalizing by stripping/filtering empty segments and returning None when the required parts are missing.
| def test_parse_cosmos_table_name_happy_path(self): | ||
| """Cosmos OL naming: db from namespace /dbs/{db}, name colls/{coll} — source: Naming.java CosmosNaming.""" | ||
| result = OpenlineageSource._parse_cosmos_table_name( | ||
| "azurecosmos://myaccount.documents.azure.com/dbs/mydb", | ||
| "colls/mycollection", | ||
| ) | ||
| self.assertEqual(result.name, "mycollection") | ||
| self.assertEqual(result.schema, "mydb") | ||
|
|
||
| def test_parse_cosmos_table_name_normalizes_to_lowercase(self): | ||
| """Cosmos database and collection names are normalized to lowercase for FQN matching.""" | ||
| result = OpenlineageSource._parse_cosmos_table_name( | ||
| "azurecosmos://host/dbs/MyDB", "colls/MyCollection" | ||
| ) | ||
| self.assertEqual(result.name, "mycollection") | ||
| self.assertEqual(result.schema, "mydb") | ||
|
|
There was a problem hiding this comment.
The new dataset-name parsers are tested for happy paths, but there are no tests asserting they reject malformed inputs that would currently yield empty schema/table (e.g., trailing slashes) or, for Cosmos, names that don't match the documented colls/{collection} pattern. Adding these negative tests would help prevent incorrect lineage edges when events contain unexpected naming variants.
| database = match.group(1).lower() | ||
| collection = name.split("/")[-1].lower() if "/" in name else name.lower() |
There was a problem hiding this comment.
_parse_cosmos_table_name currently returns a TableDetails for any name value (including ones not in the documented colls/{collection} format). Because _get_table_details dispatches on azurecosmos:// namespace, this can mis-parse unrelated Cosmos dataset names and produce incorrect lineage. Consider validating the name prefix/pattern (e.g., require colls/ with a non-empty collection) and returning None when it doesn't match.
| database = match.group(1).lower() | |
| collection = name.split("/")[-1].lower() if "/" in name else name.lower() | |
| collection_match = re.fullmatch(r"colls/([^/]+)", name) | |
| if not collection_match: | |
| return None | |
| database = match.group(1).lower() | |
| collection = collection_match.group(1).lower() |
|
The Python checkstyle failed. Please run You can install the pre-commit hooks with |
|
🟡 Playwright Results — all passed (22 flaky)✅ 3665 passed · ❌ 0 failed · 🟡 22 flaky · ⏭️ 89 skipped
🟡 22 flaky test(s) (passed on retry)
How to debug locally# Download playwright-test-results-<shard> artifact and unzip
npx playwright show-trace path/to/trace.zip # view trace |



Describe your changes:
Fixes #27538
OpenLineage events from AWS Glue EMR, Azure Data Explorer (Kusto), and Azure Cosmos DB use non-standard dataset name formats that the connector couldn't parse, producing no lineage edges. This adds namespace-aware dispatch in
_get_table_detailsto handle each format before falling back to the existing dot-split logic. All new parsers are sourced from OpenLineage'sNaming.javaand covered by unit tests.Type of change:
Checklist:
Fixes <issue-number>: <short explanation>Improvement