-
Hi, I am running the Airflow metadata ingestion and some Pipelines, even if their inlets/outlets appear as being processed in the logs, end up showing no lineage. What is going on? |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments
-
One thing to consider here is that in recent releases, OpenMetadata updated the Pipeline lineage to appear as an Edge in the lineage graph, not as a node. This means Now, the situation described here could be happening if we have multiple Pipelines being flagged as edges for the same tables. For example:
In these cases, only the latest processed Pipeline will be used as the edge, since we don't consider multiple edges for the same nodes. In the case of Airflow, review your inlets/outlets configuration in the DAGs to validate if you are repeating information. |
Beta Was this translation helpful? Give feedback.
-
As a bonus, if someone would like to replicate this scenario using the Python SDK: from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
OpenMetadataJWTClientConfig,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.api.services.createDatabaseService import (
CreateDatabaseServiceRequest,
)
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
MysqlConnection,
)
from metadata.generated.schema.entity.services.databaseService import (
DatabaseConnection,
DatabaseService,
DatabaseServiceType,
)
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.entity.data.table import Column, DataType
from metadata.generated.schema.type.entityLineage import (
ColumnLineage,
EntitiesEdge,
LineageDetails,
)
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.services.createPipelineService import (
CreatePipelineServiceRequest,
)
from metadata.generated.schema.entity.services.pipelineService import (
PipelineConnection,
PipelineService,
PipelineServiceType,
)
from metadata.generated.schema.entity.services.connections.pipeline.backendConnection import (
BackendConnection,
)
from metadata.generated.schema.entity.services.connections.pipeline.airflowConnection import (
AirflowConnection,
)
server_config = OpenMetadataConnection(
hostPort="http://localhost:8585/api",
authProvider="openmetadata",
securityConfig=OpenMetadataJWTClientConfig(
jwtToken="<YOUR TOKEN>"
),
)
metadata = OpenMetadata(server_config)
assert metadata.health_check()
# Create Service, Database, Schema and Tables
db_service = CreateDatabaseServiceRequest(
name="test-service-db-lineage",
serviceType=DatabaseServiceType.Mysql,
connection=DatabaseConnection(
config=MysqlConnection(
username="username",
password="password",
hostPort="http://localhost:1234",
)
),
)
db_service_entity = metadata.create_or_update(data=db_service)
create_db = CreateDatabaseRequest(
name="test-db",
service=db_service_entity.fullyQualifiedName,
)
create_db_entity = metadata.create_or_update(data=create_db)
create_schema = CreateDatabaseSchemaRequest(
name="test-schema", database=create_db_entity.fullyQualifiedName
)
create_schema_entity = metadata.create_or_update(data=create_schema)
table_a = CreateTableRequest(
name="tableA",
databaseSchema=create_schema_entity.fullyQualifiedName,
columns=[Column(name="id", dataType=DataType.BIGINT)],
)
table_a_entity = metadata.create_or_update(data=table_a)
table_c = CreateTableRequest(
name="tableC",
databaseSchema=create_schema_entity.fullyQualifiedName,
columns=[Column(name="id", dataType=DataType.BIGINT)],
)
table_c_entity = metadata.create_or_update(data=table_c)
# Create Pipeline to be used as edge
pipeline_service = CreatePipelineServiceRequest(
name="test-service-pipeline",
serviceType=PipelineServiceType.Airflow,
connection=PipelineConnection(
config=AirflowConnection(
hostPort="http://localhost:8080",
connection=BackendConnection(),
),
),
)
pipeline_service_entity = metadata.create_or_update(data=pipeline_service)
create_pipeline = CreatePipelineRequest(
name="test",
service=pipeline_service_entity.fullyQualifiedName,
)
pipeline_entity = metadata.create_or_update(data=create_pipeline)
column_lineage = ColumnLineage(
fromColumns=["test-service-db-lineage.test-db.test-schema.tableA.id"],
toColumn="test-service-db-lineage.test-db.test-schema.tableC.id"
)
lineage_details = LineageDetails(
sqlQuery="SELECT * FROM AWESOME",
columnsLineage=[column_lineage],
pipeline=EntityReference(id=pipeline_entity.id, type="pipeline"),
)
add_lineage_request = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(id=table_a_entity.id, type="table"),
toEntity=EntityReference(id=table_c_entity.id, type="table"),
lineageDetails=lineage_details,
),
)
# After running this, you should see the lineage on the Pipeline `test`
created_lineage = metadata.add_lineage(data=add_lineage_request)
# Now replicate it with another pipeline
create_pipeline2 = CreatePipelineRequest(
name="another_test",
service=pipeline_service_entity.fullyQualifiedName,
)
pipeline_entity2 = metadata.create_or_update(data=create_pipeline2)
lineage_details2 = LineageDetails(
pipeline=EntityReference(id=pipeline_entity2.id, type="pipeline"),
)
add_lineage_request = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(id=table_a_entity.id, type="table"),
toEntity=EntityReference(id=table_c_entity.id, type="table"),
lineageDetails=lineage_details2,
),
)
# After running this, the lineage will instead appear on `another_test`
created_lineage = metadata.add_lineage(data=add_lineage_request) |
Beta Was this translation helpful? Give feedback.
One thing to consider here is that in recent releases, OpenMetadata updated the Pipeline lineage to appear as an Edge in the lineage graph, not as a node.
This means
(node) Table A -> (edge) Pipeline X -> (node) Table B
Now, the situation described here could be happening if we have multiple Pipelines being flagged as edges for the same tables. For example:
In these cases, only the latest processed Pipeline will be used as the edge, since we don't consider multiple edges for the same nodes.
In the case of Airflow, review your inlets/outlets configuration in the DAGs to validate if you are repeating information.