![Egeria Logo](https://raw.githubusercontent.com/odpi/egeria/master/assets/img/ODPi_Egeria_Logo_color.png)

### Egeria Hands-On Lab
# Welcome to the Data Lineage Lab

**NOTE - this lab is still under construction and should not be used**

## Introduction

Egeria is an open source project that provides open standards and implementation libraries to connect tools, catalogs and platforms together so they can share information about data and technology (called metadata).

In this hands-on lab you will get a chance to work with Egeria's lineage services to see how data flows between storage and processing.

## Ensure existing services are running

This is necessary to ensure that we have the following already running in the cohort:

- Asset Catalog OMAS: cocoMDS1, cocoMDS2, cocoMDS3, cocoMDS4, cocoMDS6
- Asset Lineage OMAS: cocoMDS2, cocoMDS4
- Data Engine OMAS: cocoMDS1, cocoMDS6
- Glossary View OMAS: cocoMDS1, cocoMDS2, cocoMDS3, cocoMDS4, cocoMDS6
- Asset Owner OMAS: cocoMDS1, cocoMDS2, cocoMDS3, cocoMDS4, cocoMDS6

In [None]:
%run ../common/environment-check.ipynb

## Configure open lineage services

### Setup default variables

In [None]:
adminUserId       = "garygeeke"
organizationName  = "Coco Pharmaceuticals"
jsonContentHeader = {'content-type':'application/json'}
eventBusURLroot   = os.environ.get('eventBusURLroot', 'localhost:9092')
eventBusBody      = {
    "producer": {
        "bootstrap.servers": eventBusURLroot
    },
    "consumer":{
        "bootstrap.servers": eventBusURLroot
    }
}
inMemoryRepositoryOption = "in-memory-repository"
graphRepositoryOption    = "local-graph-repository"

# Pick up which repo type to use from environment if set, otherwise default to inmemory 
metadataRepositoryType   = os.environ.get('repositoryType', inMemoryRepositoryOption)

### Configure Open Lineage Services server

Note that we will actually do the configuration via the dev platform, just as is done for the other servers in the other notebooks.

In [None]:
olsServerName          = "open-lineage"
olsServerPlatform      = corePlatformURL
metadataCollectionName = "Open Lineage Services"

print("Configuring " + olsServerName + "...")

configurePlatformURL(adminPlatformURL, adminUserId, olsServerName, olsServerPlatform)
#configureMaxPageSize(adminPlatformURL, adminUserId, olsServerName, maxPageSize)
clearServerType(adminPlatformURL, adminUserId, olsServerName)
configureOwningOrganization(adminPlatformURL, adminUserId, olsServerName, organizationName)
#configureUserId(adminPlatformURL, adminUserId, mdrServerName, mdrServerUserId)
#configurePassword(adminPlatformURL, adminUserId, mdrServerName, mdrServerPassword)
#configureSecurityConnection(adminPlatformURL, adminUserId, mdrServerName, serverSecurityConnectionBody)
configureEventBus(adminPlatformURL, adminUserId, olsServerName, eventBusBody)
configureMetadataRepository(adminPlatformURL, adminUserId, olsServerName, metadataRepositoryType)
configureDescriptiveName(adminPlatformURL, adminUserId, olsServerName, metadataCollectionName)
configureCohortMembership(adminPlatformURL, adminUserId, olsServerName, cocoCohort)

print("\nDone.")

In [None]:
# TODO: consider placing the below configuration into function calls like the other configuration,
# once functions exist to do the open lineage configuration...

olsConfigDocument = """
{
    "class": "OpenLineageConfig",
    "openLineageId": 0,
    "openLineageDescription": "Open Lineage Service is used for the storage and querying of lineage",
    "inTopicName": "server.cocoMDS4.omas.assetlineage.outTopic",
    "inTopicConnection": {
        "class": "Connection",
        "headerVersion": 0,
        "connectorType": {
            "class": "ConnectorType",
            "headerVersion": 0,
            "type": {
                "class": "ElementType",
                "headerVersion": 0,
                "elementOrigin": "LOCAL_COHORT",
                "elementVersion": 0,
                "elementTypeId": "954421eb-33a6-462d-a8ca-b5709a1bd0d4",
                "elementTypeName": "ConnectorType",
                "elementTypeVersion": 1,
                "elementTypeDescription": "A set of properties describing a type of connector."
            },
            "guid": "3851e8d0-e343-400c-82cb-3918fed81da6",
            "qualifiedName": "Kafka Open Metadata Topic Connector",
            "displayName": "Kafka Open Metadata Topic Connector",
            "description": "Kafka Open Metadata Topic Connector supports string based events over an Apache Kafka event bus.",
            "connectorProviderClassName": "org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaOpenMetadataTopicProvider",
            "recognizedConfigurationProperties": [
                "producer",
                "consumer",
                "local.server.id",
                "sleepTime"
            ]
        },
        "endpoint": {
            "class": "Endpoint",
            "headerVersion": 0,
            "address": "server.cocoMDS4.omas.assetlineage.outTopic"
        },
        "configurationProperties": {
            "producer": {
                "bootstrap.servers": "lab-kafka:9092"
            },
            "local.server.id": "39b54be8-229f-4480-8a41-242b1c822669",
            "consumer": {
                "bootstrap.servers": "lab-kafka:9092"
            }
        }
    },
    "lineageGraphConnection": {
        "class": "Connection",
        "headerVersion": 0,
        "displayName": "Lineage Graph Connection",
        "description": "Used for storing lineage in the Open Metadata format",
        "connectorType": {
            "class": "ConnectorType",
            "headerVersion": 0,
            "connectorProviderClassName": "org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.graph.LineageGraphConnectorProvider"
        },
        "configurationProperties": {
            "storage.directory": "data/servers/open-lineage/lineage-repository/berkeley",
            "index.search.directory": "data/servers/open-lineage/lineage-repository/searchindex",
            "storage.backend": "berkeleyje",
            "gremlin.graph": "org.janusgraph.core.JanusGraphFactory",
            "index.search.backend": "lucene"
        }
    },
    "accessServiceConfig": {
        "serverName": "cocoMDS4",
        "serverPlatformUrlRoot": "https://lab-datalake:9443"
    },
    "backgroundJobs": [
        {
            "jobName": "LineageGraphJob",
            "jobInterval": 120,
            "jobEnabled": true
        },
        {
            "jobName": "AssetLineageUpdateJob",
            "jobInterval": 120,
            "jobEnabled": true,
            "jobDefaultValue": "2021-12-03T10:15:30"
        }
    ]
}
"""

def configureOpenLineage(platformURL, adminUserId, serverName, configDoc):
    print("   ... configuring the open lineage services...")
    adminCommandURLRoot = platformURL + '/open-metadata/admin-services/users/' + adminUserId + '/servers/'
    url = adminCommandURLRoot + serverName + '/open-lineage/configuration'
    jsonContentHeader = {'content-type':'application/json'}
    postAndPrintResult(url, json=configDoc, headers=jsonContentHeader)

configureOpenLineage(adminPlatformURL, "garygeeke", "open-lineage", json.loads(olsConfigDocument))

### Start the Open Lineage Services

Now we will deploy the configuration from the dev platform across to the core platform, where we will run the Open Lineage Services, and actually startup (activate) the `open-lineage` server.

In [None]:
deployServerToPlatform(adminPlatformURL, "garygeeke", "open-lineage", corePlatformURL)
activateServerOnPlatform("open-lineage", "unused", corePlatformURL)

## Create some sample lineage

Now that we have the lineage services configured and running, we will create some example metadata that can be seen and used within the user interface.

### Register an external tool

We'll begin by registering an external data platform that was used to create the data files and processing that we will see in the lineage. Gary Geeke will carry out this step as an administrative user against cocoMDS2 where governance is done.

In [None]:
def registerTool(serverName, serverPlatformURL, userId, body):
    registerURL = serverPlatformURL + '/servers/' + serverName + '/open-metadata/access-services/data-engine/users/' + userId
    registerURL += '/registration'
    response = issuePost(registerURL, body)
    print("Response: ", response.json())
    guid = response.json().get('guid')
    if guid:
        return guid
    else:
        print ("No data engine created")
        processErrorResponse(serverName, serverName, serverPlatformURL, response)

engineDoc = """
{
    "dataEngine":
    {
        "qualifiedName": "(organization)=Coco Pharmaceuticals::(project)=ExternalDataPlatform",
        "displayName": "ExternalDataEngine",
        "description": "An external data engine capability",
        "engineType": "DataEngine",
        "engineVersion": "1",
        "enginePatchLevel": "2",
        "vendor": "Example",
        "version": "1",
        "source": "source"
    }
}
"""

registerTool("cocoMDS2", corePlatformURL, "garygeeke", json.loads(engineDoc))

### Create some example files

Next, we'll simulate that external tool sharing metadata about some of the files that it has processed: both the files themselves, as well as the fields within them.

These will be created by Peter Profile against cocoMDS1 in the Data Lake platform.

In [None]:
def createFile(serverName, serverPlatformURL, userId, body):
    registerURL = serverPlatformURL + '/servers/' + serverName + '/open-metadata/access-services/data-engine/users/' + userId
    registerURL += '/data-files'
    response = issuePost(registerURL, body)
    print("Response: ", response.json())
    guid = response.json().get('guid')
    if guid:
        return guid
    else:
        print ("No data files created")
        processErrorResponse(serverName, serverPlatformName, serverPlatformURL, response)

file1 = """
{
    "externalSourceName": "(organization)=Coco Pharmaceuticals::(project)=ExternalDataPlatform",
    "file": {
        "fileType": "CSVFile",
        "qualifiedName": "(host)=HOST::(data_file_folder)=home::(data_file_folder)=files::(data_file)=names.csv",
        "displayName": "names.csv",
        "pathName": "/home/files/names.csv",
        "columns": [
            {
                "qualifiedName": "(host)=HOST::(data_file_folder)=home::(data_file_folder)=files::(data_file)=names.csv::(data_file_record)=names::(data_file_field)=Id",
                "displayName": "Id"
            },
            {
                "qualifiedName": "(host)=HOST::(data_file_folder)=home::(data_file_folder)=files::(data_file)=names.csv::(data_file_record)=names::(data_file_field)=First",
                "displayName": "First"
            },
            {
                "qualifiedName": "(host)=HOST::(data_file_folder)=home::(data_file_folder)=files::(data_file)=names.csv::(data_file_record)=names::(data_file_field)=Last",
                "displayName": "Last"
            },
            {
                "qualifiedName": "(host)=HOST::(data_file_folder)=home::(data_file_folder)=files::(data_file)=names.csv::(data_file_record)=names::(data_file_field)=Location",
                "displayName": "Location"
            }
        ]
    }
}
"""

file2 = """
{
    "externalSourceName": "(organization)=Coco Pharmaceuticals::(project)=ExternalDataPlatform",
    "file": {
        "fileType": "CSVFile",
        "qualifiedName": "(host)=HOST::(data_file_folder)=home::(data_file_folder)=files::(data_file)=emplname.csv",
        "displayName": "emplname.csv",
        "pathName": "home/files/emplname.csv",
        "columns": [
            {
                "qualifiedName": "(host)=HOST::(data_file_folder)=home::(data_file_folder)=files::(data_file)=emplname.csv::(data_file_record)=emplname::(data_file_field)=EMPID",
                "displayName": "EMPID"
            },
            {
                "qualifiedName": "(host)=HOST::(data_file_folder)=home::(data_file_folder)=files::(data_file)=emplname.csv::(data_file_record)=emplname::(data_file_field)=FNAME",
                "displayName": "FNAME"
            },
            {
                "qualifiedName": "(host)=HOST::(data_file_folder)=home::(data_file_folder)=files::(data_file)=emplname.csv::(data_file_record)=emplname::(data_file_field)=LNAME",
                "displayName": "LNAME"
            },
            {
                "qualifiedName": "(host)=HOST::(data_file_folder)=home::(data_file_folder)=files::(data_file)=emplname.csv::(data_file_record)=emplname::(data_file_field)=LOCID",
                "displayName": "LOCID"
            }
        ]
    }
}
"""

In [None]:
createFile("cocoMDS1", dataLakePlatformURL, petersUserId, json.loads(file1))

In [None]:
createFile("cocoMDS1", dataLakePlatformURL, petersUserId, json.loads(file2))

### Create an example process

And finally, we'll simulate the external data platform sharing metadata about the data processing it does to move data between these two files.

Once again, Peter Profile will submit the details for this process through the Data Lake platform (cocoMDS1).

In [None]:
def createProcess(serverName, serverPlatformURL, userId, body):
    registerURL = serverPlatformURL + '/servers/' + serverName + '/open-metadata/access-services/data-engine/users/' + userId
    registerURL += '/processes'
    response = issuePost(registerURL, body)
    print("Response: ", response.json())
    guids = response.json().get('guids')
    if guids:
        return guids
    else:
        print ("No processes created")
        processErrorResponse(serverName, serverPlatformName, serverPlatformURL, response)

processesDoc = """
{
    "processes": [
        {
            "qualifiedName": "(process)=CopyColumnsFlow::(process)=CopyColumns",
            "displayName": "CopyColumns",
            "name": "CopyColumns",
            "description": "CopyColumns is (sub)process that describes the low level implementation activities performed by a platform or tool.",
            "owner": "Platform User",
            "portImplementations": [
                {
                    "displayName": "NamesFileInputPort",
                    "qualifiedName": "(process)=CopyColumnsFlow::(process)=CopyColumns::(port)=NamesFileInputPort",
                    "updateSemantic": "REPLACE",
                    "type": "INPUT_PORT",
                    "schema": {
                        "displayName": "NamesFileInputSchema",
                        "qualifiedName": "(process)=CopyColumnsFlow::(process)=CopyColumns::(port)=NamesFileInputSchema",
                        "author": "Platform User",
                        "columns": [
                            {
                                "qualifiedName": "(process)=CopyColumnsFlow::(process)=CopyColumns::(port)=NamesFileInputSchema::(column)=Last",
                                "displayName": "Last",
                                "minCardinality": 0,
                                "maxCardinality": 0,
                                "allowsDuplicateValues": false,
                                "orderedValues": false,
                                "position": 0,
                                "dataType": "VARCHAR"
                            },
                            {
                                "qualifiedName": "(process)=CopyColumnsFlow::(process)=CopyColumns::(port)=NamesFileInputSchema::(column)=First",
                                "displayName": "First",
                                "minCardinality": 0,
                                "maxCardinality": 0,
                                "allowsDuplicateValues": false,
                                "orderedValues": false,
                                "position": 1,
                                "dataType": "VARCHAR"
                            },
                            {
                                "qualifiedName": "(process)=CopyColumnsFlow::(process)=CopyColumns::(port)=NamesFileInputSchema::(column)=Id",
                                "displayName": "Id",
                                "minCardinality": 0,
                                "maxCardinality": 0,
                                "allowsDuplicateValues": false,
                                "orderedValues": false,
                                "position": 2,
                                "dataType": "INTEGER"
                            },
                            {
                                "qualifiedName": "(process)=CopyColumnsFlow::(process)=CopyColumns::(port)=NamesFileInputSchema::(column)=Location",
                                "displayName": "Location",
                                "minCardinality": 0,
                                "maxCardinality": 0,
                                "allowsDuplicateValues": false,
                                "orderedValues": false,
                                "position": 3,
                                "dataType": "INTEGER"
                            }
                        ]
                    }
                },
                {
                    "displayName": "EmplnameFileOutputPort",
                    "qualifiedName": "(process)=CopyColumnsFlow::(process)=CopyColumns::(port)=EmplnameFileOutputPort",
                    "updateSemantic": "REPLACE",
                    "type": "OUTPUT_PORT",
                    "schema": {
                        "displayName": "EmplnameFileOutputSchema",
                        "qualifiedName": "(process)=CopyColumnsFlow::(process)=CopyColumns::(port)=EmplnameFileOutputSchema",
                        "author": "Platform User",
                        "columns": [
                            {
                                "qualifiedName": "(process)=CopyColumnsFlow::(process)=CopyColumns::(port)=EmplnameFileOutputSchema::(column)=EMPID",
                                "displayName": "EMPID",
                                "minCardinality": 0,
                                "maxCardinality": 0,
                                "allowsDuplicateValues": false,
                                "orderedValues": false,
                                "position": 0
                            },
                            {
                                "qualifiedName": "(process)=CopyColumnsFlow::(process)=CopyColumns::(port)=EmplnameFileOutputSchema::(column)=FNAME",
                                "displayName": "FNAME",
                                "minCardinality": 0,
                                "maxCardinality": 0,
                                "allowsDuplicateValues": false,
                                "orderedValues": false,
                                "position": 0
                            },
                            {
                                "qualifiedName": "(process)=CopyColumnsFlow::(process)=CopyColumns::(port)=EmplnameFileOutputSchema::(column)=LOCID",
                                "displayName": "LOCID",
                                "minCardinality": 0,
                                "maxCardinality": 0,
                                "allowsDuplicateValues": false,
                                "orderedValues": false,
                                "position": 0
                            },
                            {
                                "qualifiedName": "(process)=CopyColumnsFlow::(process)=CopyColumns::(port)=EmplnameFileOutputSchema::(column)=LNAME",
                                "displayName": "LNAME",
                                "minCardinality": 0,
                                "maxCardinality": 0,
                                "allowsDuplicateValues": false,
                                "orderedValues": false,
                                "position": 0
                            }
                        ]
                    }
                }
            ],
            "lineageMappings": [
                {
                    "sourceAttribute": "(host)=HOST::(data_file_folder)=home::(data_file_folder)=files::(data_file)=names.csv::(data_file_record)=names::(data_file_field)=Id",
                    "targetAttribute": "(process)=CopyColumnsFlow::(process)=CopyColumns::(port)=NamesFileInputSchema::(column)=Id"
                },
                {
                    "sourceAttribute": "(host)=HOST::(data_file_folder)=home::(data_file_folder)=files::(data_file)=names.csv::(data_file_record)=names::(data_file_field)=First",
                    "targetAttribute": "(process)=CopyColumnsFlow::(process)=CopyColumns::(port)=NamesFileInputSchema::(column)=First"
                },
                {
                    "sourceAttribute": "(host)=HOST::(data_file_folder)=home::(data_file_folder)=files::(data_file)=names.csv::(data_file_record)=names::(data_file_field)=Last",
                    "targetAttribute": "(process)=CopyColumnsFlow::(process)=CopyColumns::(port)=NamesFileInputSchema::(column)=Last"
                },
                {
                    "sourceAttribute": "(host)=HOST::(data_file_folder)=home::(data_file_folder)=files::(data_file)=names.csv::(data_file_record)=names::(data_file_field)=Location",
                    "targetAttribute": "(process)=CopyColumnsFlow::(process)=CopyColumns::(port)=NamesFileInputSchema::(column)=Location"
                },
                {
                    "sourceAttribute": "(process)=CopyColumnsFlow::(process)=CopyColumns::(port)=NamesFileInputSchema::(column)=Id",
                    "targetAttribute": "(process)=CopyColumnsFlow::(process)=CopyColumns::(port)=EmplnameFileOutputSchema::(column)=EMPID"
                },
                {
                    "sourceAttribute": "(process)=CopyColumnsFlow::(process)=CopyColumns::(port)=NamesFileInputSchema::(column)=First",
                    "targetAttribute": "(process)=CopyColumnsFlow::(process)=CopyColumns::(port)=EmplnameFileOutputSchema::(column)=FNAME"
                },
                {
                    "sourceAttribute": "(process)=CopyColumnsFlow::(process)=CopyColumns::(port)=NamesFileInputSchema::(column)=Last",
                    "targetAttribute": "(process)=CopyColumnsFlow::(process)=CopyColumns::(port)=EmplnameFileOutputSchema::(column)=LNAME"
                },
                {
                    "sourceAttribute": "(process)=CopyColumnsFlow::(process)=CopyColumns::(port)=NamesFileInputSchema::(column)=Location",
                    "targetAttribute": "(process)=CopyColumnsFlow::(process)=CopyColumns::(port)=EmplnameFileOutputSchema::(column)=LOCID"
                },
                {
                    "sourceAttribute": "(process)=CopyColumnsFlow::(process)=CopyColumns::(port)=EmplnameFileOutputSchema::(column)=EMPID",
                    "targetAttribute": "(host)=HOST::(data_file_folder)=home::(data_file_folder)=files::(data_file)=emplname.csv::(data_file_record)=emplname::(data_file_field)=EMPID"
                },
                {
                    "sourceAttribute": "(process)=CopyColumnsFlow::(process)=CopyColumns::(port)=EmplnameFileOutputSchema::(column)=FNAME",
                    "targetAttribute": "(host)=HOST::(data_file_folder)=home::(data_file_folder)=files::(data_file)=emplname.csv::(data_file_record)=emplname::(data_file_field)=FNAME"
                },
                {
                    "sourceAttribute": "(process)=CopyColumnsFlow::(process)=CopyColumns::(port)=EmplnameFileOutputSchema::(column)=LNAME",
                    "targetAttribute": "(host)=HOST::(data_file_folder)=home::(data_file_folder)=files::(data_file)=emplname.csv::(data_file_record)=emplname::(data_file_field)=LNAME"
                },
                {
                    "sourceAttribute": "(process)=CopyColumnsFlow::(process)=CopyColumns::(port)=EmplnameFileOutputSchema::(column)=LOCID",
                    "targetAttribute": "(host)=HOST::(data_file_folder)=home::(data_file_folder)=files::(data_file)=emplname.csv::(data_file_record)=emplname::(data_file_field)=LOCID"
                }
            ],
            "updateSemantic": "REPLACE",
            "parentProcesses": [
                {
                    "qualifiedName": "(process)=CopyColumnsFlow",
                    "containmentType": "OWNED"
                }
            ]
        },
        {
            "qualifiedName": "(process)=CopyColumnsFlow",
            "displayName": "CopyColumnsFlow",
            "name": "CopyColumnsFlow",
            "description": "CopyColumnsFlow describes high level process input and output and mappings between (sub)processes (if any).",
            "owner": "Platform User",
            "portAliases": [
                {
                    "displayName": "ReadInputFilePortAlias",
                    "qualifiedName": "(process)=CopyColumnsFlow::(port)=ReadInputFilePortAlias",
                    "updateSemantic": "REPLACE",
                    "delegatesTo": "(process)=CopyColumnsFlow::(process)=CopyColumns::(port)=NamesFileInputPort",
                    "type": "INPUT_PORT"
                },
                {
                    "displayName": "WriteOutputFilePortAlias",
                    "qualifiedName": "(process)=CopyColumnsFlow::(port)=WriteOutputFilePortAlias",
                    "updateSemantic": "REPLACE",
                    "delegatesTo": "(process)=CopyColumnsFlow::(process)=CopyColumns::(port)=EmplnameFileOutputPort",
                    "type": "OUTPUT_PORT"
                }
            ],
            "lineageMappings": [],
            "updateSemantic": "REPLACE"
        }
    ],
    "externalSourceName": "(organization)=Coco Pharmaceuticals::(project)=ExternalDataPlatform"
}
"""

createProcess("cocoMDS1", dataLakePlatformURL, petersUserId, json.loads(processesDoc))

## Access details on the user interface

It may now take a couple of minutes for the lineage to be picked up and processed for visualization.

Once available, you can run a search for `csv` through the Asset Catalog section of the user interface. Click on either of the results (for example, `names.csv`) and then the End2End button to see the end-to-end lineage that was ingested from the external data platform.