# Create Airflow DAG Lineage

This notebook provides an example for creating lineage between an Airflow DAG and table in Secoda. To get started, you will need to [obtain an API key](https://app.secoda.co/settings/api)

Input your API key and Secoda API endpoint URL below. The URL for the cloud instance is `https://api.secoda.co`. If you are self-hosting Secoda or on the EU instance, you will have to update variable.

In [None]:

import requests

API_KEY = "api_key"
SECODA_API_URL = "https://api.secoda.co"

session = requests.Session()
session.headers.update(dict(
    Authorization=f"Bearer {API_KEY}"
))


def build_url(url: str):
    return f"{SECODA_API_URL}{url}"

In [2]:
def find_table(title, schema, database, *args, **kwargs):
    res = session.get(
        build_url(f"/table/tables/?title={title}&parent__title={schema}&parent__parent__title={database}")
    )
    results = res.json().get("results")
    if len(results) >= 1:
        return results[0]
    return None

def find_job(title, *args, **kwargs):
    res = session.get(
        build_url(f"/job/jobs/?title={title}")
    )
    results = res.json().get("results")
    if len(results) >= 1:
        return results[0]
    return None

def set_lineage(upstream_table_id, downstream_table_id):
    session.post(
        build_url(f"/lineage/manual/"),
        json=dict(
            from_entity=upstream_table_id,
            to_entity=downstream_table_id,
            direction="DOWNSTREAM",
        )
    )

lineage = [
    {
        "upstream_dag": {
            "title": "tutorial",
        },
        "downstream_table": {
            "title": "raw_payments",
            "schema": "lightdash",
            "database": "secodaredshift",
        },
    }
]

for lineage_pair in lineage:
    upstream_dag = find_job(
        **lineage_pair["upstream_dag"],
    )
    
    downstream_table = find_table(
        **lineage_pair["downstream_table"],
    )

    set_lineage(upstream_dag.get("id", ""), downstream_table.get("id", ""))