Initializing Python

In [1]:
import time
import requests
import uuid
from datetime import datetime
import pytz


def get_now_formatted():
    current_timestamp = datetime.now(tz=pytz.timezone("America/Sao_Paulo")).strftime("%Y-%m-%dT%H:%M:%S.%f%z")
    return current_timestamp[:-8]+current_timestamp[-5:-2]+":"+current_timestamp[-2:]


marquez_url = "http://host.docker.internal:5000/api/v1"

Creating job payload

In [3]:
event_a_complete_run = {
    "eventTime": get_now_formatted(),
    "producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
    "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/definitions/RunEvent",

    "eventType": "COMPLETE",

    "run": {
        "runId": str(uuid.uuid4()),
        "facets": {
            "nominalTime": {
                "_producer": "https://github.com/OpenLineage/OpenLineage/tree/0.10.0/integration/airflow",
                "_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/NominalTimeRunFacet",
                "nominalStartTime": get_now_formatted(),
                "nominalEndTime": get_now_formatted(),
            }
        }
    },
    "job": {
        "namespace": "complex_data_types_tests",
        "name": "job_complex_data_type",
        "type": "SERVICE",
        "facets": {
            "ownership": {
                "_producer": "https://some.producer.com/version/1.0",
                "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/OwnershipJobFacet.json",
                "owners": [
                    {
                        "name": "lyamada",
                        "type": "MAINTAINER"
                    }
                ]
            }
        }
    },
    "outputs": [{
        "namespace": "complex_data_types_tests",
        "name": "complex_data_type_dataset",
        "physicalName": 's3://my_bucket/complex_data_type_dataset',
        "type": "FILE",
        "facets": {
            "schema": {
                "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                "_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/spec/OpenLineage.json#/definitions/SchemaDatasetFacet",
                "fields": [
                    { "name": "a", "type": "VARCHAR", "description": "this is the column A"},
                    { "name": "b", "type": "INTEGER", "description": "this is the column B"},
                    { "name": "c", "type": "INTEGER", "description": "this is the column C"},
                    { "name": "d", "type": "DOUBLE", "description": "this is the column D"},
                    { "name": "e", "type": "TIMESTAMP", "description": "this is the column E"},
                    { "name": "f", "type": "ARRAY[LONG]", "description": "this is the column F"},
                    { "name": "g", "type": "MAP", "description": "this is the column G"},
                    { "name": "h", "type": "CUSTOM_MAP", "description": "this is the column H"},
                ]
            }
        }
    }]
}

Sending event to Marquez API

In [4]:
try:
    response = requests.post(url=f'{marquez_url}/lineage',json=event_a_complete_run)
except Exception as e:
    print(f'An Exception happend: {e}')
else:
    print(f'{get_now_formatted()}, response status code: {response.status_code}')

2023-11-10T12:57:10.706-03:00, response status code: 201


Diving deeper into data types representations

In [5]:
event_b_complete_run = {
    "eventTime": get_now_formatted(),
    "producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
    "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/definitions/RunEvent",

    "eventType": "COMPLETE",

    "run": {
        "runId": str(uuid.uuid4()),
        "facets": {
            "nominalTime": {
                "_producer": "https://github.com/OpenLineage/OpenLineage/tree/0.10.0/integration/airflow",
                "_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/NominalTimeRunFacet",
                "nominalStartTime": get_now_formatted(),
                "nominalEndTime": get_now_formatted(),
            }
        }
    },
    "job": {
        "namespace": "complex_data_types_tests",
        "name": "job_complex_data_type_2",
        "type": "SERVICE",
        "facets": {
            "ownership": {
                "_producer": "https://some.producer.com/version/1.0",
                "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/OwnershipJobFacet.json",
                "owners": [
                    {
                        "name": "lyamada",
                        "type": "MAINTAINER"
                    }
                ]
            }
        }
    },
    "outputs": [{
        "namespace": "complex_data_types_tests",
        "name": "complex_data_type_dataset_2",
        "physicalName": 's3://my_bucket/complex_data_type_dataset_2',
        "type": "FILE",
        "facets": {
            "schema": {
                "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                "_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/spec/OpenLineage.json#/definitions/SchemaDatasetFacet",
                "fields": [
                    { "name": "a", "type": "VARCHAR", "description": "this is the column A"},
                    { "name": "b", "type": "INTEGER", "description": "this is the column B"},
                    { "name": "c", "type": "INTEGER", "description": "this is the column C"},
                    { "name": "d", "type": "DOUBLE", "description": "this is the column D"},
                    { "name": "e", "type": "TIMESTAMP", "description": "this is the column E"},
                    { "name": "f", "type": "ARRAY", "description": "this is the column F"},
                    { "name": "g", "type": "MAP", "description": "this is the column G"}
                ]
            },
            "schema_details": {
                "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                "_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/spec/OpenLineage.json#/definitions/CustomSchemaDatasetFacet",
                "fields": [
                    { "name": "f", "type": "ARRAY[LONG]"},
                    { "name": "g", "type": "MAP"},
                    { "name": "g.customer", "type": "MAP"},
                    { "name": "g.customer.id", "type": "INTEGER"},
                    { "name": "g.customer.name", "type": "VARCHAR"},
                ]
            }
        }
    }]
}

In [6]:
try:
    response = requests.post(url=f'{marquez_url}/lineage',json=event_b_complete_run)
except Exception as e:
    print(f'An Exception happend: {e}')
else:
    print(f'{get_now_formatted()}, response status code: {response.status_code}')

2023-11-10T13:03:59.123-03:00, response status code: 201


In [7]:
event_c_complete_run = {
    "eventTime": get_now_formatted(),
    "producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
    "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/definitions/RunEvent",

    "eventType": "COMPLETE",

    "run": {
        "runId": str(uuid.uuid4()),
        "facets": {
            "nominalTime": {
                "_producer": "https://github.com/OpenLineage/OpenLineage/tree/0.10.0/integration/airflow",
                "_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/NominalTimeRunFacet",
                "nominalStartTime": get_now_formatted(),
                "nominalEndTime": get_now_formatted(),
            }
        }
    },
    "job": {
        "namespace": "complex_data_types_tests",
        "name": "job_complex_data_type_3",
        "type": "SERVICE",
        "facets": {
            "ownership": {
                "_producer": "https://some.producer.com/version/1.0",
                "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/OwnershipJobFacet.json",
                "owners": [
                    {
                        "name": "lyamada",
                        "type": "MAINTAINER"
                    }
                ]
            }
        }
    },
    "outputs": [{
        "namespace": "complex_data_types_tests",
        "name": "complex_data_type_dataset_3",
        "physicalName": 's3://my_bucket/complex_data_type_dataset_3',
        "type": "FILE",
        "facets": {
            "schema": {
                "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                "_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/spec/OpenLineage.json#/definitions/SchemaDatasetFacet",
                "fields": [
                    { "name": "a", "type": "VARCHAR", "description": "this is the column A"},
                    { "name": "b", "type": "INTEGER", "description": "this is the column B"},
                    { "name": "c", "type": "INTEGER", "description": "this is the column C"},
                    { "name": "d", "type": "DOUBLE", "description": "this is the column D"},
                    { "name": "e", "type": "TIMESTAMP", "description": "this is the column E"},
                    { "name": "f", "type": "ARRAY", "description": "this is the column F"},
                    { "name": "g", "type": "MAP", "description": "this is the column G"}
                ]
            },
            "schema_details": {
                "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                "_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/spec/OpenLineage.json#/definitions/CustomSchemaDatasetFacet",
                "fields": [
                    { "name": "f", "type": "ARRAY[LONG]"},
                    {
                        "name": "g",
                        "type": "MAP",
                        "properties": [
                            {
                                "name": "customer",
                                "type": "MAP",
                                "properties": [
                                    {
                                        "name": "id",
                                        "type": "INTEGER"
                                    },
                                    {
                                        "name": "name",
                                        "type": "VARCHAR"
                                    }
                                ]
                            }
                        ]
                    }
                ]
            }
        }
    }]
}

In [8]:
try:
    response = requests.post(url=f'{marquez_url}/lineage',json=event_c_complete_run)
except Exception as e:
    print(f'An Exception happend: {e}')
else:
    print(f'{get_now_formatted()}, response status code: {response.status_code}')

2023-11-10T13:09:42.547-03:00, response status code: 201


In [9]:
event_d_complete_run = {
    "eventTime": get_now_formatted(),
    "producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
    "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/definitions/RunEvent",

    "eventType": "COMPLETE",

    "run": {
        "runId": str(uuid.uuid4()),
        "facets": {
            "nominalTime": {
                "_producer": "https://github.com/OpenLineage/OpenLineage/tree/0.10.0/integration/airflow",
                "_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/NominalTimeRunFacet",
                "nominalStartTime": get_now_formatted(),
                "nominalEndTime": get_now_formatted(),
            }
        }
    },
    "job": {
        "namespace": "complex_data_types_tests",
        "name": "job_complex_data_type_4",
        "type": "SERVICE",
        "facets": {
            "ownership": {
                "_producer": "https://some.producer.com/version/1.0",
                "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/OwnershipJobFacet.json",
                "owners": [
                    {
                        "name": "lyamada",
                        "type": "MAINTAINER"
                    }
                ]
            }
        }
    },
    "outputs": [{
        "namespace": "complex_data_types_tests",
        "name": "complex_data_type_dataset_4",
        "physicalName": 's3://my_bucket/complex_data_type_dataset_4',
        "type": "FILE",
        "facets": {
            "schema": {
                "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                "_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/spec/OpenLineage.json#/definitions/SchemaDatasetFacet",
                "fields": [
                    { "name": "a", "type": "VARCHAR", "description": "this is the column A"},
                    { "name": "b", "type": "INTEGER", "description": "this is the column B"},
                    { "name": "c", "type": "INTEGER", "description": "this is the column C"},
                    { "name": "d", "type": "DOUBLE", "description": "this is the column D"},
                    { "name": "e", "type": "TIMESTAMP", "description": "this is the column E"},
                    { "name": "f", "type": "ARRAY", "description": "this is the column F"},
                    { "name": "g", "type": "MAP", "description": "this is the column G"},
                    { "name": "g.customer", "type": "MAP", "description": "this is the column G.CUSTOMER"},
                    { "name": "g.customer.id", "type": "INTEGER", "description": "this is the column G.CUSTOMER.ID"},
                    { "name": "g.customer.name", "type": "VARCHAR", "description": "this is the column G.CUSTOMER.NAME"}
                ]
            }
        }
    }]
}

In [10]:
try:
    response = requests.post(url=f'{marquez_url}/lineage',json=event_d_complete_run)
except Exception as e:
    print(f'An Exception happend: {e}')
else:
    print(f'{get_now_formatted()}, response status code: {response.status_code}')

2023-11-10T13:15:30.426-03:00, response status code: 201
