

![Egeria Logo](https://raw.githubusercontent.com/odpi/egeria/main/assets/img/ODPi_Egeria_Logo_color.png)

### Egeria Workbook

# Receiving Open Lineage Events via Kafka

## Introduction

Open Lineage supports a proxy backend server that is able to distribute events via Apache Kafka.  This workbook sets up Egeria to receive these events.

---

In [None]:
# Initialize pyegeria

%run ../../pyegeria/initialize-pyegeria.ipynb


In [None]:
from pyegeria import ClassificationManager

classification_manager = ClassificationManager(view_server, url, user_id, user_pwd)
token = classification_manager.create_egeria_bearer_token()

elements = classification_manager.find_elements_by_property_value(property_value="HarvestOpenLineage", property_names=['qualifiedName'], metadata_element_type_name="GovernanceActionProcess")
if type(elements) == str:
    print (elements)
else:
    for element in elements:
        if element:
            properties=element.get('properties')
            if properties:
                qualifiedName=properties.get('qualifiedName')
                description=properties.get('description')
                print('* ' + qualifiedName + ' - ' + description)

----

There is a governance action process called *HarvestOpenLineageEvents:CreateAsCatalogTargetGovernanceActionProcess* that sets up Apache Kafka.

----

In [None]:
from pyegeria import GovernanceOfficer

governance_officer=GovernanceOfficer(view_server, url, user_id, user_pwd)
token = governance_officer.create_egeria_bearer_token()

createAndCatalogServerName="HarvestOpenLineageEvents:CreateAsCatalogTargetGovernanceActionProcess"

process_guid = classification_manager.get_element_guid_by_unique_name(createAndCatalogServerName)
process_graph = governance_officer.get_governance_process_graph(process_guid)

print_governance_action_process_graph(process_graph)


-----

We begin by defining which Apache Kafka topic is being used by other systems to publish open lineage events, and then call the governance action process.

----

In [None]:
from pyegeria import AutomatedCuration
automated_curation=AutomatedCuration(view_server, url, user_id, user_pwd)
token = automated_curation.create_egeria_bearer_token()

requestParameters = {
    "serverName" : "LocalKafka2",
    "hostIdentifier": "localhost",
    "portNumber": "9092",
    "description" : "Receive open lineage events via Kafka Topic.",
    "topicDescription": "Open Lineage Events for Backend Proxy",
    "versionIdentifier" : "V1.0",
    "fullTopicName": "openlineage.events",
    "shortTopicName": "openlineage.events",
    "eventDirection": "inOut"
}

instance_guid = automated_curation.initiate_gov_action_process(createAndCatalogServerName, None, None, None, requestParameters, None, None)

print(instance_guid)

----

The command below displays the latest governance actions.  You should see they are in **ACTIONED** status.  If you see failures it means that either Apache Kafka is not running or the values describing its location are not correct.  If Apache Kafka is down, restart it and re-run the cell above.  If you realize one or more of the values describing topic to listen on is not right, correct the values and retry the process.

----

In [None]:

display_engine_activity_c(row_limit=2)


----

The effect of the process is to configure the *OpenLineageKafkaListener* connector to start listening for Open Lineage Events.
The Target Element column shows the details of the Apache Kafka Topics it is listening to.

----

In [None]:

display_integration_daemon_status(['OpenLineageKafkaListener'], paging=True, width=200)
