# Overview

This example showcases the API exposed by the data lineage package. The API can be used to build
a lineage graph by adding nodes and edges that represent columns and transformations. 

Note that the goal of the example to explain the building blocks of the lineage graph.
In practical scenarios, use a pack (e.g. query parser pack) to automate the process.

This example consists of the following sequence of operations:
* Start docker containers containing a demo. Refer to [docs](https://tokern.io/docs/data-lineage/installation) for detailed instructions on installing demo-wikimedia.
* Register nodes from columns in the catalog.
* Register directed edges to represent that a column is the source of data for another column.
* Visualize the graph by visiting [Tokern UI](http://localhost:8000/).
* Analyze the graph

# Installation

This demo requires wikimedia demo to be running. Start the demo using the following instructions:

    # in a new directory run
    wget https://raw.githubusercontent.com/tokern/data-lineage/master/install-manifests/docker-compose/wikimedia-demo.yml
    # or run
    curl https://raw.githubusercontent.com/tokern/data-lineage/master/install-manifests/docker-compose/wikimedia-demo.yml -o docker-compose.yml


Run docker-compose


    docker-compose up -d


Verify container are running


    docker container ls | grep tokern


In [1]:
# Required configuration for API and wikimedia database network address

docker_address = "http://127.0.0.1:8000"
wikimedia_db = {
  "username": "etldev",
  "password": "3tld3v",
  "uri": "tokern-demo-wikimedia",
  "port": "5432",
  "database": "wikimedia"
}

In [2]:
# Setup a connection to catalog using the SDK.
from data_lineage import Catalog

catalog = Catalog(docker_address)

In [3]:
# Register wikimedia datawarehouse with data-lineage app.

source = catalog.add_source(name="wikimedia", source_type="postgresql", **wikimedia_db)

In [4]:
# Scan the wikimedia data warehouse and register all schemata, tables and columns.

catalog.scan_source(source)

True

In [5]:
# Create a job and job_execution that inserts data from page to page_lookup_nonredirect

job = catalog.add_job("insert_into_page_lookup_nonredirect",
                      {
                          "sql": "insert into page_lookup_nonredirect(redirect_id) select page_id from page"
                      })

{'attributes': {'context': {'sql': 'insert into page_lookup_nonredirect(redirect_id) select page_id from page'}, 'name': 'insert_into_page_lookup_nonredirect'}, 'id': '1', 'links': {'self': 'http://tokern-api:4142/api/v1/catalog/jobs/1'}, 'type': 'jobs'}


In [6]:
import datetime
from dbcat.catalog.models import JobExecutionStatus

job_execution = catalog.add_job_execution(
    job=job,
    started_at=datetime.datetime.combine(
        datetime.date(2021, 4, 1), datetime.time(1, 0)
    ),
    ended_at=datetime.datetime.combine(
        datetime.date(2021, 4, 1), datetime.time(1, 15)
    ),
    status=JobExecutionStatus.SUCCESS,
)


In [8]:
# Add an edge between these two columns:
# (test", "default", "page", "page_id") -> ("test", "default", "page_lookup_nonredirect", "redirect_id"),

source_column = catalog.get_column(source_name="wikimedia", 
                                   schema_name="public", 
                                   table_name="page",
                                   column_name="page_id")
target_column = catalog.get_column(source_name="wikimedia", 
                                   schema_name="public", 
                                   table_name="page_lookup_nonredirect",
                                   column_name="redirect_id")

edge = catalog.add_column_lineage(source=source_column,
                                  target=target_column,
                                  job_execution_id=job_execution.id,
                                  context={})

Visit [Kedro UI](http://localhost:8000/)

![One Task Graph](./one_task.png)