From f21c3b4fe13b8708b9f9df8a8b2f07be22972593 Mon Sep 17 00:00:00 2001 From: aji-aju Date: Fri, 27 Mar 2026 16:14:51 +0530 Subject: [PATCH 1/2] feat: Add Reverse Metadata Agent pipeline type support Add OSS-level support for the Reverse Metadata Agent pipeline, enabling reverse metadata operations (description, tag, owner sync) to run as an ingestion workflow instead of an application. - Add reverseMetadataPipeline JSON schema with sync toggles and filter patterns - Add REVERSE_METADATA pipeline type to ingestionPipeline schema - Add reverseMetadata command to ingestion CLI (cmd.py) - Add deployReverseMetadata to PipelineServiceClientInterface Co-Authored-By: Claude Opus 4.6 (1M context) --- ingestion/src/metadata/cmd.py | 7 ++ .../sdk/PipelineServiceClientInterface.java | 4 +- .../ingestionPipelines/ingestionPipeline.json | 2 +- .../reverseMetadataPipeline.json | 80 +++++++++++++++++++ .../schema/metadataIngestion/workflow.json | 3 + 5 files changed, 94 insertions(+), 2 deletions(-) create mode 100644 openmetadata-spec/src/main/resources/json/schema/metadataIngestion/reverseMetadataPipeline.json diff --git a/ingestion/src/metadata/cmd.py b/ingestion/src/metadata/cmd.py index 735ea369291f..f8ea687c5423 100644 --- a/ingestion/src/metadata/cmd.py +++ b/ingestion/src/metadata/cmd.py @@ -53,6 +53,7 @@ class MetadataCommands(Enum): LINEAGE = "lineage" APP = "app" AUTO_CLASSIFICATION = "classify" + REVERSE_METADATA = "reverse_metadata" SCAFFOLD_CONNECTOR = "scaffold-connector" @@ -165,6 +166,12 @@ def get_parser(args: Optional[List[str]] = None): help="Workflow for running auto classification", ) ) + create_common_config_parser_args( + sub_parser.add_parser( + MetadataCommands.REVERSE_METADATA.value, + help="Reverse Metadata Workflow", + ) + ) webhook_args( sub_parser.add_parser( MetadataCommands.WEBHOOK.value, diff --git a/openmetadata-spec/src/main/java/org/openmetadata/sdk/PipelineServiceClientInterface.java b/openmetadata-spec/src/main/java/org/openmetadata/sdk/PipelineServiceClientInterface.java index f56475286148..bb1498dfccd8 100644 --- a/openmetadata-spec/src/main/java/org/openmetadata/sdk/PipelineServiceClientInterface.java +++ b/openmetadata-spec/src/main/java/org/openmetadata/sdk/PipelineServiceClientInterface.java @@ -65,7 +65,9 @@ public interface PipelineServiceClientInterface { PipelineType.DATA_INSIGHT.toString(), "data_insight_task", PipelineType.APPLICATION.toString(), - "application_task"); + "application_task", + PipelineType.REVERSE_METADATA.toString(), + "reverse_metadata_task"); URL validateServiceURL(String serviceURL); diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/ingestionPipelines/ingestionPipeline.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/ingestionPipelines/ingestionPipeline.json index 835bebffa633..fd7a56199c3d 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/ingestionPipelines/ingestionPipeline.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/ingestionPipelines/ingestionPipeline.json @@ -11,7 +11,7 @@ "description": "Type of Pipeline - metadata, usage", "type": "string", "javaType": "org.openmetadata.schema.entity.services.ingestionPipelines.PipelineType", - "enum": ["metadata", "usage", "lineage", "profiler", "autoClassification", "TestSuite", "dataInsight", "elasticSearchReindex", "dbt", "application"] + "enum": ["metadata", "usage", "lineage", "profiler", "autoClassification", "TestSuite", "dataInsight", "elasticSearchReindex", "dbt", "application", "reverseMetadata"] }, "pipelineStatus": { "type": "object", diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/reverseMetadataPipeline.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/reverseMetadataPipeline.json new file mode 100644 index 000000000000..106e3f92488c --- /dev/null +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/reverseMetadataPipeline.json @@ -0,0 +1,80 @@ +{ + "$id": "https://open-metadata.org/schema/metadataIngestion/reverseMetadataPipeline.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "ReverseMetadataPipeline", + "javaType": "org.openmetadata.schema.metadataIngestion.ReverseMetadataPipeline", + "description": "Reverse Metadata Agent Pipeline Configuration. Controls which metadata changes (descriptions, tags, owners) are propagated back to source systems.", + "type": "object", + "definitions": { + "reverseMetadataConfigType": { + "description": "Reverse Metadata Pipeline type", + "type": "string", + "enum": [ + "ReverseMetadata" + ], + "default": "ReverseMetadata" + } + }, + "properties": { + "type": { + "description": "Pipeline type", + "$ref": "#/definitions/reverseMetadataConfigType", + "default": "ReverseMetadata" + }, + "syncDescriptions": { + "description": "Enable syncing description changes back to the source system.", + "type": "boolean", + "default": true, + "title": "Sync Descriptions" + }, + "syncTags": { + "description": "Enable syncing tag changes back to the source system.", + "type": "boolean", + "default": true, + "title": "Sync Tags" + }, + "syncOwners": { + "description": "Enable syncing owner changes back to the source system.", + "type": "boolean", + "default": false, + "title": "Sync Owners" + }, + "databaseFilterPattern": { + "description": "Regex to only include/exclude databases that match the pattern.", + "$ref": "../type/filterPattern.json#/definitions/filterPattern", + "title": "Database Filter Pattern" + }, + "schemaFilterPattern": { + "description": "Regex to only include/exclude schemas that match the pattern.", + "$ref": "../type/filterPattern.json#/definitions/filterPattern", + "title": "Schema Filter Pattern" + }, + "tableFilterPattern": { + "description": "Regex to only include/exclude tables that match the pattern.", + "$ref": "../type/filterPattern.json#/definitions/filterPattern", + "title": "Table Filter Pattern" + }, + "useFqnForFiltering": { + "description": "Regex will be applied on fully qualified name (e.g service_name.db_name.schema_name.table_name) instead of raw name (e.g. table_name).", + "type": "boolean", + "default": false, + "title": "Use FQN For Filtering" + }, + "service": { + "description": "Service to apply reverse metadata operations on.", + "$ref": "../type/entityReference.json" + }, + "operations": { + "description": "List of operations to be performed on the service. Populated at runtime by the handler.", + "type": "array", + "items": { + "$ref": "reverseIngestionPipeline.json#/definitions/operation" + }, + "default": [] + } + }, + "additionalProperties": false, + "required": [ + "type" + ] +} diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/workflow.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/workflow.json index 070a84ed496e..4343bd7b6dac 100644 --- a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/workflow.json +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/workflow.json @@ -68,6 +68,9 @@ }, { "$ref": "reverseIngestionPipeline.json" + }, + { + "$ref": "reverseMetadataPipeline.json" } ] } From 2f81fc7fef5b54ba5e6f860fa77135a61c8ca5ab Mon Sep 17 00:00:00 2001 From: harshsoni2024 Date: Wed, 15 Apr 2026 10:37:47 +0530 Subject: [PATCH 2/2] run path method change --- ingestion/src/metadata/cmd.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ingestion/src/metadata/cmd.py b/ingestion/src/metadata/cmd.py index f8ea687c5423..dbbc1133e391 100644 --- a/ingestion/src/metadata/cmd.py +++ b/ingestion/src/metadata/cmd.py @@ -29,6 +29,7 @@ from metadata.cli.ingest_dbt import run_ingest_dbt from metadata.cli.lineage import run_lineage from metadata.cli.profile import run_profiler +from metadata.cli.reverse_metadata import run_reverse_metadata from metadata.cli.scaffold import ( AUTH_CHOICES, CAPABILITY_CHOICES, @@ -66,6 +67,7 @@ class MetadataCommands(Enum): MetadataCommands.TEST.value: run_test, MetadataCommands.APP.value: run_app, MetadataCommands.AUTO_CLASSIFICATION.value: run_classification, + MetadataCommands.REVERSE_METADATA.value: run_reverse_metadata, }