fix(lineage): service nodes appearing in entity lineage view and empty By Service view#27258
fix(lineage): service nodes appearing in entity lineage view and empty By Service view#27258
Conversation
…in/dataProduct lineage and add pipeline service edges Bug #1: Service nodes (e.g., DatabaseService, MessagingService) were incorrectly appearing in entity-level lineage views. Root cause: getOrCreateLineageDetails() in addServiceLineage(), addDomainLineage(), and addDataProductsLineage() was copying the pipeline annotation from entity-level LineageDetails to service/domain/dataProduct-level LineageDetails. This caused service entities to have upstreamLineage.pipeline.fqnHash set in their Elasticsearch documents, making them match the PIPELINE_AS_EDGE_KEY query during BFS traversal and incorrectly appear alongside actual data assets. Fix: add .withPipeline(null) on each service/domain/dataProduct LineageDetails object to strip the pipeline annotation before persisting. Bug #2: "By Service" view was empty when viewing lineage for pipeline entities that were stored as edge annotators (Case B: table → topic with pipeline=flink_pipeline in LineageDetails) rather than as actual nodes (Case A). Root cause: addServiceLineage() only created database_service → kafka_service edges but no edges involving flink_pipeline_service. Fix: add addPipelineServiceEdges() called from addServiceLineage() that creates fromService → pipelineService and pipelineService → toService edges when a pipeline annotation exists in the entity-level lineage details. Also add unit tests covering both fixes to prevent regression. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Fixes a lineage visualization issue where service-level lineage edges were being annotated with pipeline details, causing services to “bleed into” default asset lineage and making “By Service” views behave incorrectly.
Changes:
- Strip
pipelinefrom extended lineage edges (service/domain/data product) by forcingLineageDetails.pipelinetonullbefore persisting/indexing those edges. - Add pipeline-service intermediary service edges derived from the pipeline’s owning service.
- Add unit tests asserting
pipelinecan be absent from ES lineage data / lineage details.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java | Nulls pipeline on extended lineage edges; adds logic to insert pipeline-service service edges. |
| openmetadata-service/src/test/java/org/openmetadata/service/jdbi3/LineageRepositoryTest.java | Adds tests around pipeline == null behavior in lineage ES data and LineageDetails. |
🟡 Playwright Results — all passed (29 flaky)✅ 3634 passed · ❌ 0 failed · 🟡 29 flaky · ⏭️ 89 skipped
🟡 29 flaky test(s) (passed on retry)
How to debug locally# Download playwright-test-results-<shard> artifact and unzip
npx playwright show-trace path/to/trace.zip # view trace |
…e/domain/dataProduct lineage edges The previous fix (e6df7a6) prevented new lineage from inheriting pipeline annotations on service/domain/dataProduct-level edges. However, existing data in the entity_relationship table already has pipeline set on those edges from before the fix, and Elasticsearch reindex reads from the DB — so reindex alone does not fix stale data. This migration removes the pipeline field from all service-to-service, domain-to-domain, and dataProduct-to-dataProduct lineage edges (relation=13/UPSTREAM) in entity_relationship. After upgrading and running this migration, operators should trigger an Elasticsearch/OpenSearch reindex so that the corrected DB records are reflected in the search index, which is what the lineage graph BFS traversal reads from. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Moves the data migration that removes the pipeline field from service/domain/dataProduct lineage edges in entity_relationship to the 1.13.0 migration scripts, which is the correct target version. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
….12.6 Creates a new 1.12.6 migration with the data fix that removes the pipeline field from service/domain/dataProduct lineage edges in entity_relationship, and removes it from 1.13.0 where it was previously placed. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…es for existing data For installations upgrading to 1.12.6 with existing lineage data, service edges fromService→pipelineService and pipelineService→toService were never created (only added by the code fix for new lineage going forward). This migration reads service-level lineage edges that have a pipeline annotation, resolves the pipeline entity's service, and inserts the two missing service edges into entity_relationship (DB only). After the SQL migration strips pipeline from service edges and a reindex runs, the "By Service" lineage view for pipeline services correctly shows their upstream/downstream service connections. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
The Java checkstyle failed. Please run You can install the pre-commit hooks with |
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
When the json column in entity_relationship is NULL, JsonUtils.readValue returns null. getPipelineService now short-circuits on a null argument instead of throwing NullPointerException via entityLineageDetails.getPipeline(). Fixes NPE in deleteLineageByFQN and deleteLineage cleanup paths. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
page.request.get() sends browser cookies but OpenMetadata authenticates via JWT in localStorage, so those calls were unauthenticated (non-2xx). Replace with getToken + getAuthContext pattern used elsewhere. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
| let topicFqn: string; | ||
| let pipelineFqn: string; | ||
|
|
||
| const LINEAGE_API = '/api/v1/lineage/getLineage?fqn=*'; |
There was a problem hiding this comment.
The LINEAGE_API glob (/api/v1/lineage/getLineage?fqn=*) assumes fqn is the first query param. If the UI adds params before fqn (or changes ordering), waitForResponse(LINEAGE_API) can miss the request and make the test flaky/hang. Using a broader glob like /api/v1/lineage/getLineage?* (pattern used elsewhere in the Playwright suite) or a predicate on response.url().includes('/api/v1/lineage/getLineage') would be more robust.
| const LINEAGE_API = '/api/v1/lineage/getLineage?fqn=*'; | |
| const LINEAGE_API = '/api/v1/lineage/getLineage?*'; |
| 'mlmodelService', 'storageService', 'searchService', 'apiService') | ||
| AND toEntity IN ('databaseService', 'messagingService', 'pipelineService', 'dashboardService', | ||
| 'mlmodelService', 'storageService', 'searchService', 'apiService') |
There was a problem hiding this comment.
There is driveService too
Directory, File, Spreadsheet, and Worksheet entities map to driveService, so service-level lineage edges between driveService instances could also have incorrectly inherited the pipeline annotation. Include driveService in the 1.12.6 cleanup migration for both MySQL and PostgreSQL. Also drops the stray trailing-newline changes from the 1.12.0 migration files — those edits were unnecessary. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
driveService-to-driveService edges must be skipped during the pipeline service edge migration scan, same as all other service-level edges. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
| Entity.METADATA_SERVICE, | ||
| Entity.STORAGE_SERVICE, | ||
| Entity.SEARCH_SERVICE, | ||
| Entity.API_SERVICE, |
There was a problem hiding this comment.
SERVICE_ENTITY_TYPES is missing Entity.DRIVE_SERVICE, even though the 1.12.6 SQL migrations treat driveService as a service entity. This can cause the Java migration to incorrectly treat driveService→* relationships as data-asset lineage edges during the scan (and attempt unnecessary Entity.getEntity(...) lookups). Add Entity.DRIVE_SERVICE to the set to keep behavior consistent with the SQL migration scripts and avoid misclassification.
| Entity.API_SERVICE, | |
| Entity.API_SERVICE, | |
| Entity.DRIVE_SERVICE); |
The rebase left MigrationUtil with duplicate imports and a missing closing brace on insertEdgeIfMissing. Merged both method sets cleanly and ran spotless. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Code Review ✅ Approved 3 resolved / 3 findingsFixes service node visibility in entity lineage and empty service views by resolving null pointer exceptions and syntax errors. Comprehensive tests have been added to validate the new edge-handling logic. ✅ 3 resolved✅ Quality: Missing tests for new addPipelineServiceEdges logic
✅ Bug: NullPointerException when lineageDetails is null in cleanup path
✅ Bug: Missing closing brace for insertEdgeIfMissing — won't compile
OptionsDisplay: compact → Showing less information. Comment with these commands to change:
Was this helpful? React with 👍 / 👎 | Gitar |
| private static LineageDetails buildServiceLineageDetails(LineageDetails source) { | ||
| return new LineageDetails() | ||
| .withCreatedAt(source.getCreatedAt()) | ||
| .withCreatedBy(source.getCreatedBy()) | ||
| .withUpdatedAt(source.getUpdatedAt()) | ||
| .withUpdatedBy(source.getUpdatedBy()) | ||
| .withSource(LineageDetails.Source.CHILD_ASSETS) | ||
| .withPipeline(null) | ||
| .withAssetEdges(1); |
There was a problem hiding this comment.
buildServiceLineageDetails() always sets assetEdges to 1. If multiple entity-level lineage edges require the same derived service edge (e.g., many table→topic edges annotated with pipelines from the same pipeline service), the migrated service edge should reflect the aggregate assetEdges count; otherwise later cleanup can delete the edge prematurely when only one child edge is removed. Consider accumulating a count per ServiceEdge while scanning records and setting assetEdges accordingly (or updating existing edges by incrementing assetEdges).
| for (CollectionDAO.EntityRelationshipObject record : batch) { | ||
| if (SERVICE_ENTITY_TYPES.contains(record.getFromEntity())) { | ||
| continue; | ||
| } | ||
| String json = record.getJson(); | ||
| if (json == null || !json.contains("\"pipeline\"")) { | ||
| continue; | ||
| } | ||
| collectPipelineServiceEdges(record, edgesToCreate); | ||
| } | ||
| offset += batchSize; | ||
| } while (batch.size() == batchSize); | ||
|
|
||
| int created = 0; | ||
| for (Map.Entry<ServiceEdge, LineageDetails> entry : edgesToCreate.entrySet()) { | ||
| try { | ||
| if (insertEdgeIfMissing(collectionDAO, entry.getKey(), entry.getValue())) { | ||
| created++; | ||
| } | ||
| } catch (Exception e) { | ||
| LOG.warn( | ||
| "Failed to insert pipeline service edge {} -> {}: {}", | ||
| entry.getKey().fromId(), | ||
| entry.getKey().toId(), | ||
| e.getMessage()); | ||
| } | ||
| } | ||
|
|
||
| LOG.info("Pipeline service edges migration complete: {} edges created", created); | ||
| } | ||
|
|
||
| private static void collectPipelineServiceEdges( | ||
| CollectionDAO.EntityRelationshipObject record, | ||
| Map<ServiceEdge, LineageDetails> edgesToCreate) { | ||
|
|
||
| try { | ||
| LineageDetails details = JsonUtils.readValue(record.getJson(), LineageDetails.class); | ||
| EntityReference pipelineRef = details.getPipeline(); | ||
| if (pipelineRef == null || pipelineRef.getId() == null) { | ||
| return; | ||
| } | ||
|
|
||
| EntityInterface fromEntity = | ||
| Entity.getEntity( | ||
| record.getFromEntity(), UUID.fromString(record.getFromId()), "service", Include.ALL); | ||
| EntityInterface toEntity = | ||
| Entity.getEntity( | ||
| record.getToEntity(), UUID.fromString(record.getToId()), "service", Include.ALL); | ||
| EntityInterface pipelineEntity = | ||
| Entity.getEntity(pipelineRef.getType(), pipelineRef.getId(), "service", Include.ALL); | ||
|
|
There was a problem hiding this comment.
The migration scans any non-service entity_relationship row whose JSON contains a pipeline key, and then unconditionally calls Entity.getEntity(..., "service", ...) for record.getFromEntity() / record.getToEntity(). Rows like domain→domain or dataProduct→dataProduct (which are explicitly mentioned as affected by Bug #1) will throw IllegalArgumentException because those entity types don’t allow the service field, leading to noisy logs and extra work. Consider filtering upfront to only process relationships where both fromEntity and toEntity support service (e.g., Entity.entityHasField(type, "service")) and skipping everything else before attempting entity fetches.
|
|



Problem
Two lineage visualization bugs affecting OpenMetadata when using a pipeline as an edge annotator (i.e., lineage stored as
table → topicwithpipelineinLineageDetails, rather than as explicit pipeline nodes):Bug 1 — Service nodes bleeding into entity-level lineage view
In the default lineage view, service entities (
DatabaseService,MessagingService,PipelineService) were incorrectly appearing in the graph.Bug 2 — "By Service" view is empty for pipeline entities
Switching to the "By Service" view returned an empty graph instead of showing lineage grouped by service.
Root Causes
Bug 1:
getOrCreateLineageDetails()inaddServiceLineage(),addDomainLineage(), andaddDataProductsLineage()was copying the pipeline annotation from entity-levelLineageDetailsinto service/domain/dataProduct-levelLineageDetails. This caused service entities to haveupstreamLineage.pipeline.fqnHashpopulated in their Elasticsearch documents. The BFS traversal inESLineageGraphBuilderusesPIPELINE_AS_EDGE_KEY = "upstreamLineage.pipeline.fqnHash.keyword"to discover pipeline-connected nodes — so service entities incorrectly matched this query and appeared in the entity-level lineage view.Bug 2:
addServiceLineage()only created a directdatabase_service → kafka_serviceedge. When a pipeline is used as an annotator (not a node), the pipeline's own service (flink_pipeline_service) received zero service-level edges. Since the frontend fetches lineage forentity.service.fullyQualifiedNamein "By Service" mode, Elasticsearch returned an empty result.Changes
openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.javaBug 1 fix: Added
.withPipeline(null)on theLineageDetailsreturned bygetOrCreateLineageDetails()inaddServiceLineage(),addDomainLineage(), andaddDataProductsLineage(). This strips the pipeline annotation before persisting service/domain/dataProduct-level edges, preventing service entities from matching thePIPELINE_AS_EDGE_KEYElasticsearch query.Bug 2 fix: Added three new private methods:
addPipelineServiceEdges()— called fromaddServiceLineage(), creates additional service edges for the pipeline's own servicegetPipelineService()— resolves the pipeline entity's service reference fromLineageDetailsinsertServiceEdgeIfDistinct()— inserts a service edge only if the two service IDs differ (avoids self-edges)When entity lineage has a pipeline annotator, two additional service-level edges are created:
fromService → pipelineServiceandpipelineService → toService, so the pipeline service has edges in the service-level lineage graph.openmetadata-service/src/test/java/org/openmetadata/service/jdbi3/LineageRepositoryTest.javaLineageDetailsfor service/domain/dataProduct-level edges never inherit the pipeline annotation from entity-level lineage.openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/LineagePipelineAnnotatorIT.javaopenmetadata-ui/src/main/resources/ui/playwright/e2e/Features/LineagePipelineAnnotator.spec.tsTest Results