Skip to content

Commit

Permalink
Fix #16692: Override Lineage Support for View & Dashboard Lineage (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
ulixius9 committed Jul 22, 2024
1 parent b37dc06 commit 5140203
Show file tree
Hide file tree
Showing 13 changed files with 317 additions and 8 deletions.
24 changes: 24 additions & 0 deletions ingestion/src/metadata/ingestion/models/ometa_lineage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Custom wrapper for Lineage Request
"""

from typing import Optional

from pydantic import BaseModel

from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest


class OMetaLineageRequest(BaseModel):
override_lineage: Optional[bool] = False
lineage_request: AddLineageRequest
19 changes: 19 additions & 0 deletions ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
To be used by OpenMetadata class
"""
import functools
import traceback
from copy import deepcopy
from typing import Any, Dict, Generic, List, Optional, Type, TypeVar, Union
Expand Down Expand Up @@ -321,6 +322,24 @@ def delete_lineage_edge(self, edge: EntitiesEdge) -> None:
logger.debug(traceback.format_exc())
logger.error(f"Error {err.status_code} trying to DELETE linage for {edge}")

@functools.lru_cache(maxsize=LRU_CACHE_SIZE)
def delete_lineage_by_source(
self, entity_type: str, entity_id: str, source: str
) -> None:
"""
Remove the given Edge
"""
try:
self.client.delete(
f"{self.get_suffix(AddLineageRequest)}/{entity_type}/{entity_id}/"
f"type/{source}"
)
except APIError as err:
logger.debug(traceback.format_exc())
logger.error(
f"Error {err.status_code} trying to DELETE linage for {entity_id} of type {source}"
)

def add_lineage_by_query(
self,
database_service: DatabaseService,
Expand Down
40 changes: 40 additions & 0 deletions ingestion/src/metadata/ingestion/sink/metadata_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
TestCaseResolutionStatus,
)
from metadata.generated.schema.tests.testSuite import TestSuite
from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.generated.schema.type.schema import Topic
from metadata.ingestion.api.models import Either, Entity, StackTraceError
from metadata.ingestion.api.steps import Sink
Expand All @@ -59,6 +60,7 @@
from metadata.ingestion.models.delete_entity import DeleteEntity
from metadata.ingestion.models.life_cycle import OMetaLifeCycleData
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.models.ometa_lineage import OMetaLineageRequest
from metadata.ingestion.models.ometa_topic_data import OMetaTopicSampleData
from metadata.ingestion.models.patch_request import (
ALLOWED_COMMON_PATCH_FIELDS,
Expand Down Expand Up @@ -250,6 +252,44 @@ def write_lineage(self, add_lineage: AddLineageRequest) -> Either[Dict[str, Any]
created_lineage = self.metadata.add_lineage(add_lineage, check_patch=True)
return Either(right=created_lineage["entity"]["fullyQualifiedName"])

@_run_dispatch.register
def write_override_lineage(
self, add_lineage: OMetaLineageRequest
) -> Either[Dict[str, Any]]:
"""
Writes the override lineage for the given lineage request.
Args:
add_lineage (OMetaLineageRequest): The lineage request containing the override lineage information.
Returns:
Either[Dict[str, Any]]: The result of the dispatch operation.
"""
if (
add_lineage.override_lineage is True
and add_lineage.lineage_request.edge.lineageDetails
and add_lineage.lineage_request.edge.lineageDetails.source
):
if (
add_lineage.lineage_request.edge.lineageDetails.pipeline
and add_lineage.lineage_request.edge.lineageDetails.source
== LineageSource.PipelineLineage
):
self.metadata.delete_lineage_by_source(
entity_type="pipeline",
entity_id=str(
add_lineage.lineage_request.edge.lineageDetails.pipeline.id.root
),
source=add_lineage.lineage_request.edge.lineageDetails.source.value,
)
else:
self.metadata.delete_lineage_by_source(
entity_type=add_lineage.lineage_request.edge.toEntity.type,
entity_id=str(add_lineage.lineage_request.edge.toEntity.id.root),
source=add_lineage.lineage_request.edge.lineageDetails.source.value,
)
return self._run_dispatch(add_lineage.lineage_request)

def _create_role(self, create_role: CreateRoleRequest) -> Optional[Role]:
"""
Internal helper method for write_user
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
from metadata.ingestion.lineage.sql_lineage import get_column_fqn
from metadata.ingestion.models.delete_entity import DeleteEntity
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.models.ometa_lineage import OMetaLineageRequest
from metadata.ingestion.models.patch_request import PatchRequest
from metadata.ingestion.models.topology import (
NodeStage,
Expand Down Expand Up @@ -345,17 +346,25 @@ def get_db_service_names(self) -> List[str]:

def yield_dashboard_lineage(
self, dashboard_details: Any
) -> Iterable[Either[AddLineageRequest]]:
) -> Iterable[Either[OMetaLineageRequest]]:
"""
Yields lineage if config is enabled.
We will look for the data in all the services
we have informed.
"""
yield from self.yield_datamodel_dashboard_lineage() or []
for lineage in self.yield_datamodel_dashboard_lineage() or []:
if lineage.right is not None:
yield Either(
right=OMetaLineageRequest(
lineage_request=lineage.right,
override_lineage=self.source_config.overrideLineage,
)
)
else:
yield lineage

db_service_names = self.get_db_service_names()

for db_service_name in db_service_names or []:
yield from self.yield_dashboard_lineage_details(
dashboard_details, db_service_name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
from metadata.ingestion.api.models import Either
from metadata.ingestion.connections.session import create_and_bind_thread_safe_session
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.models.ometa_lineage import OMetaLineageRequest
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection
from metadata.ingestion.source.database.database_service import DatabaseServiceSource
Expand Down Expand Up @@ -578,18 +579,27 @@ def yield_table(
)

@calculate_execution_time_generator()
def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]:
def yield_view_lineage(self) -> Iterable[Either[OMetaLineageRequest]]:
logger.info("Processing Lineage for Views")
for view in [
v for v in self.context.get().table_views if v.view_definition is not None
]:
yield from get_view_lineage(
for lineage in get_view_lineage(
view=view,
metadata=self.metadata,
service_name=self.context.get().database_service,
connection_type=self.service_connection.type.value,
timeout_seconds=self.source_config.queryParsingTimeoutLimit,
)
):
if lineage.right is not None:
yield Either(
right=OMetaLineageRequest(
lineage_request=lineage.right,
override_lineage=self.source_config.overrideViewLineage,
)
)
else:
yield lineage

def _get_foreign_constraints(self, foreign_columns) -> List[TableConstraint]:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from metadata.ingestion.api.topology_runner import TopologyRunnerMixin
from metadata.ingestion.models.delete_entity import DeleteEntity
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.models.ometa_lineage import OMetaLineageRequest
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.models.topology import (
NodeStage,
Expand Down Expand Up @@ -177,10 +178,19 @@ def yield_pipeline_status(

def yield_pipeline_lineage(
self, pipeline_details: Any
) -> Iterable[Either[AddLineageRequest]]:
) -> Iterable[Either[OMetaLineageRequest]]:
"""Yields lineage if config is enabled"""
if self.source_config.includeLineage:
yield from self.yield_pipeline_lineage_details(pipeline_details) or []
for lineage in self.yield_pipeline_lineage_details(pipeline_details) or []:
if lineage.right is not None:
yield Either(
right=OMetaLineageRequest(
lineage_request=lineage.right,
override_lineage=self.source_config.overrideLineage,
)
)
else:
yield lineage

def yield_tag(self, *args, **kwargs) -> Iterable[Either[OMetaTagAndClassification]]:
"""Method to fetch pipeline tags"""
Expand Down
24 changes: 24 additions & 0 deletions ingestion/tests/integration/ometa/test_ometa_lineage_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
EntityLineage,
LineageDetails,
)
from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.generated.schema.type.entityReference import EntityReference

from ..integration_base import generate_name, get_create_entity, get_create_service
Expand Down Expand Up @@ -286,6 +287,29 @@ def test_create(self):
len(res["downstreamEdges"][0]["lineageDetails"]["columnsLineage"]), 2
)

def test_delete_by_source(self):
"""
Test case for deleting lineage by source.
This method tests the functionality of deleting lineage by source. It retrieves the lineage
information for a specific table entity using its ID. Then, it records the original length of
the upstream edges in the lineage. After that, it deletes the lineage by specifying the source
type, table ID, and lineage source. Finally, it asserts that the length of the upstream edges
in the lineage has decreased by 1.
"""
lineage = self.metadata.get_lineage_by_id(
entity="table", entity_id=self.table2_entity.id.root
)
original_len = len(lineage.get("upstreamEdges") or [])
self.metadata.delete_lineage_by_source(
"table", self.table2_entity.id.root, LineageSource.Manual.value
)
lineage = self.metadata.get_lineage_by_id(
entity="table", entity_id=self.table2_entity.id.root
)
updated_len = len(lineage.get("upstreamEdges") or [])
self.assertEqual(updated_len, original_len - 1)

def test_table_datamodel_lineage(self):
"""We can create and get lineage for a table to a dashboard datamodel"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -894,6 +894,44 @@ List<EntityRelationshipRecord> findFrom(
List<EntityRelationshipRecord> findFromPipeline(
@BindUUID("toId") UUID toId, @Bind("relation") int relation);

@ConnectionAwareSqlQuery(
value =
"SELECT toId, toEntity, fromId, fromEntity, relation FROM entity_relationship "
+ "WHERE JSON_UNQUOTE(JSON_EXTRACT(json, '$.source')) = :source AND (toId = :toId AND toEntity = :toEntity) "
+ "AND relation = :relation ORDER BY fromId",
connectionType = MYSQL)
@ConnectionAwareSqlQuery(
value =
"SELECT toId, toEntity, fromId, fromEntity, relation FROM entity_relationship "
+ "WHERE json->>'source' = :source AND (toId = :toId AND toEntity = :toEntity) "
+ "AND relation = :relation ORDER BY fromId",
connectionType = POSTGRES)
@RegisterRowMapper(RelationshipObjectMapper.class)
List<EntityRelationshipObject> findLineageBySource(
@BindUUID("toId") UUID toId,
@Bind("toEntity") String toEntity,
@Bind("source") String source,
@Bind("relation") int relation);

@ConnectionAwareSqlQuery(
value =
"SELECT toId, toEntity, fromId, fromEntity, relation FROM entity_relationship "
+ "WHERE JSON_UNQUOTE(JSON_EXTRACT(json, '$.pipeline.id')) =:toId OR toId = :toId AND relation = :relation "
+ "AND JSON_UNQUOTE(JSON_EXTRACT(json, '$.source')) = :source ORDER BY toId",
connectionType = MYSQL)
@ConnectionAwareSqlQuery(
value =
"SELECT toId, toEntity, fromId, fromEntity, relation FROM entity_relationship "
+ "WHERE json->'pipeline'->>'id' =:toId OR toId = :toId AND relation = :relation "
+ "AND json->>'source' = :source ORDER BY toId",
connectionType = POSTGRES)
@RegisterRowMapper(RelationshipObjectMapper.class)
List<EntityRelationshipObject> findLineageBySourcePipeline(
@BindUUID("toId") UUID toId,
@Bind("toEntity") String toEntity,
@Bind("source") String source,
@Bind("relation") int relation);

@SqlQuery(
"SELECT count(*) FROM entity_relationship WHERE fromEntity = :fromEntity AND toEntity = :toEntity")
int findIfAnyRelationExist(
Expand Down Expand Up @@ -957,6 +995,42 @@ void deleteTo(
@SqlUpdate("DELETE from entity_relationship WHERE fromId = :id or toId = :id")
void deleteAllWithId(@BindUUID("id") UUID id);

@ConnectionAwareSqlUpdate(
value =
"DELETE FROM entity_relationship "
+ "WHERE JSON_UNQUOTE(JSON_EXTRACT(json, '$.source')) = :source AND toId = :toId AND toEntity = :toEntity "
+ "AND relation = :relation ORDER BY fromId",
connectionType = MYSQL)
@ConnectionAwareSqlUpdate(
value =
"DELETE FROM entity_relationship "
+ "WHERE json->>'source' = :source AND (toId = :toId AND toEntity = :toEntity) "
+ "AND relation = :relation ORDER BY fromId",
connectionType = POSTGRES)
void deleteLineageBySource(
@BindUUID("toId") UUID toId,
@Bind("toEntity") String toEntity,
@Bind("source") String source,
@Bind("relation") int relation);

@ConnectionAwareSqlUpdate(
value =
"DELETE FROM entity_relationship "
+ "WHERE JSON_UNQUOTE(JSON_EXTRACT(json, '$.pipeline.id')) =:toId OR toId = :toId AND relation = :relation "
+ "AND JSON_UNQUOTE(JSON_EXTRACT(json, '$.source')) = :source ORDER BY toId",
connectionType = MYSQL)
@ConnectionAwareSqlUpdate(
value =
"DELETE FROM entity_relationship "
+ "WHERE json->'pipeline'->>'id' =:toId OR toId = :toId AND relation = :relation "
+ "AND json->>'source' = :source ORDER BY toId",
connectionType = POSTGRES)
void deleteLineageBySourcePipeline(
@BindUUID("toId") UUID toId,
@Bind("toEntity") String toEntity,
@Bind("source") String source,
@Bind("relation") int relation);

class FromRelationshipMapper implements RowMapper<EntityRelationshipRecord> {
@Override
public EntityRelationshipRecord map(ResultSet rs, StatementContext ctx) throws SQLException {
Expand All @@ -978,6 +1052,19 @@ public EntityRelationshipRecord map(ResultSet rs, StatementContext ctx) throws S
.build();
}
}

class RelationshipObjectMapper implements RowMapper<EntityRelationshipObject> {
@Override
public EntityRelationshipObject map(ResultSet rs, StatementContext ctx) throws SQLException {
return EntityRelationshipObject.builder()
.fromId(rs.getString("fromId"))
.fromEntity(rs.getString("fromEntity"))
.toEntity(rs.getString("toEntity"))
.toId(rs.getString("toId"))
.relation(rs.getInt("relation"))
.build();
}
}
}

interface FeedDAO {
Expand Down
Loading

0 comments on commit 5140203

Please sign in to comment.