In [16]:
import yaml
from jsonschema import validate, ValidationError

# Load YAML file
def load_yaml(file_path):
    with open(file_path, 'r') as file:
        return yaml.safe_load(file)

# Validate YAML file against a schema
def validate_yaml_schema(data, schema_path):
    with open(schema_path, 'r') as schema_file:
        schema = yaml.safe_load(schema_file)
    try:
        validate(instance=data, schema=schema)
        print("YAML file is valid.")
    except ValidationError as e:
        print(f"Validation error: {e}")
        raise

# Load lineage data
lineage_data = load_yaml("lineage.yaml")

# Validate lineage.yaml against lineage_schema.yaml
validate_yaml_schema(lineage_data, schema_path="validation_schema.yaml")


YAML file is valid.


In [17]:
import os
import yaml
from dotenv import load_dotenv
import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
    DatasetLineageType,
    FineGrainedLineage,
    FineGrainedLineageDownstreamType,
    FineGrainedLineageUpstreamType,
    Upstream,
    UpstreamLineage,
)

# Load environment variables
load_dotenv()
DATAHUB_SERVER_URL = os.getenv("DATAHUB_SERVER_URL")
DATAHUB_TOKEN = os.getenv("DATAHUB_TOKEN")

# Initialize the DataHub emitter
emitter = DatahubRestEmitter(gms_server=DATAHUB_SERVER_URL, token=DATAHUB_TOKEN)

# Load lineage data from the YAML file
def load_lineage(file_path):
    with open(file_path, 'r') as file:
        return yaml.safe_load(file)

lineage_data = load_lineage("lineage.yaml")

# Helper functions for URNs
def dataset_urn(platform, name, env):
    return f"urn:li:dataset:(urn:li:dataPlatform:{platform},{name},{env})"

def field_urn(dataset_urn, field):
    return builder.make_schema_field_urn(dataset_urn, field)

# Process lineages
for lineage in lineage_data['lineages']:
    source_urn = dataset_urn(lineage['source']['platform'], lineage['source']['dataset'], "PROD")
    target_urn = dataset_urn(lineage['target']['platform'], lineage['target']['dataset'], "PROD")

    print(f"Processing lineage from {source_urn} to {target_urn}")

    # Prepare field-level lineage mappings
    fine_grained_lineages = []
    for mapping in lineage['field_mappings']:
        source_field_urn = field_urn(source_urn, mapping['source_field'])
        target_field_urn = field_urn(target_urn, mapping['target_field'])

        print(f"Mapping {source_field_urn} -> {target_field_urn}")

        fine_grained_lineages.append(
            FineGrainedLineage(
                upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
                upstreams=[source_field_urn],
                downstreamType=FineGrainedLineageDownstreamType.FIELD,
                downstreams=[target_field_urn],
            )
        )

    # Create and emit lineage
    upstreams = [Upstream(dataset=source_urn, type=DatasetLineageType.TRANSFORMED)]
    lineage_aspect = UpstreamLineage(upstreams=upstreams, fineGrainedLineages=fine_grained_lineages)

    lineage_mcp = MetadataChangeProposalWrapper(entityUrn=target_urn, aspect=lineage_aspect)

    print("Emitting lineage MCP...")
    emitter.emit_mcp(lineage_mcp)

print("Lineage processing complete!")


Processing lineage from urn:li:dataset:(urn:li:dataPlatform:mssql,OpenData.dbo.Companies,PROD) to urn:li:dataset:(urn:li:dataPlatform:mssql,DWH.dbo.UnifiedCompanyList,PROD)
Mapping urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mssql,OpenData.dbo.Companies,PROD),CompanyName) -> urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mssql,DWH.dbo.UnifiedCompanyList,PROD),CompanyName)
Mapping urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mssql,OpenData.dbo.Companies,PROD),CompanyNumber) -> urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mssql,DWH.dbo.UnifiedCompanyList,PROD),CompanyNumber)
Emitting lineage MCP...
Lineage processing complete!
