# OpenLineage Demo


Marquez is a reference implementation of OpenLineage specification. You can access Marquez components on the following URLs:
- Marquez Web UI: http://localhost:3000
- Marquez GraphQL Playground: http://localhost:5002/graphql-playground
- Marquez Admin UI: http://localhost:5001

First, we will create a spark session with lineage tracking enabled. It may take a while since it will also download the required packages.

In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as f

# Ref: https://github.com/OpenLineage/OpenLineage/blob/1.22.0/website/docs/integrations/spark/installation.md
spark = (SparkSession.builder
    .appName("OpenLineage Demo")
    .config("spark.jars.packages", "io.openlineage:openlineage-spark_2.12:1.22.0")
    .config("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener")
    .config("spark.openlineage.transport.type", "http")
    .config("spark.openlineage.transport.url", "http://host.docker.internal:5002")
    .config("spark.openlineage.namespace", "demo")
    .getOrCreate()
)

## Read datasets
Let's read sample product, customer and raw sales data.

In [2]:
input_dir = "/home/jovyan/data"
output_dir = "/home/jovyan/output"
product = spark.read.option("header", True).csv(f"{input_dir}/product")
customer = spark.read.option("header", True).csv(f"{input_dir}/customer")
sales_raw = spark.read.option("header", True).csv(f"{input_dir}/sales_raw")

Marquez will register the events.

### Jobs

We can see the jobs at [http://localhost:3000](http://localhost:3000). Make sure to select the `demo` namespace. We can see two jobs have been created.

![](demo_images/jobs_1.png)

### Datasets

We can see the datasets at [http://localhost:3000/datasets](http://localhost:3000/datasets). Make sure to select the `file` namespace because OpenLineage will register the datasets under `file` for any spark job running with `--master=local` ([Github discussion](https://github.com/OpenLineage/OpenLineage/issues/2709)).

![](demo_images/datasets_1.png)

### GraphQL

We can also query GraphQL at [http://localhost:5002/graphql-playground](http://localhost:5002/graphql-playground). The GraphQL Endpoint will be [http://localhost:5002/api/v1-beta/graphql](http://localhost:5002/api/v1-beta/graphql).

![](demo_images/datasets_1.png)


Sample Request
  ```json
  query {
    datasets {
      name
      fields { name }
    }
  }
  ```

![](demo_images/graphql_1.png)


## Generate new datasets

Let's create newa datasets for US customers and SG customers.

In [3]:
(customer
    .filter(f.col("country") == 'US')
    .drop(f.col("country"))
    .write.mode("overwrite")
    .option("header", True)
    .csv(f"{output_dir}/customer_us")
)

(customer
    .filter(f.col("country") == 'SG')
    .drop(f.col("country"))
    .write.mode("overwrite")
    .option("header", True)
    .csv(f"{output_dir}/customer_sg")
)

### Datasets

http://localhost:3000/datasets will now show `customer_sg` and `customer_us`.
![](demo_images/datasets_2.png)

We can also see the column lineage. We can see `country` column is dropped.
![](demo_images/datasets_2_col.png)

## Generate joined dataset

Next, we will check how lineage works when multiple sources are joined.

In [4]:
sales_report = (sales_raw
    .join(product, sales_raw.product_id == product.id)
    .join(customer, sales_raw.customer_id == customer.id)
    .select(
        customer["name"].alias("customer_name"),
        product["name"].alias("product_name"),
        sales_raw["qty"],
    )
)
sales_report.write.mode("overwrite").option("header", True).csv(f"{output_dir}/sales_report")
sales_report.show()

+-------------+-------------+---+
|customer_name| product_name|qty|
+-------------+-------------+---+
|        Alice|Awesome Apple|  1|
|          Bob|Awesome Apple| 10|
|          Bob|   Big Banana|  3|
+-------------+-------------+---+



### Datasets

http://localhost:3000/datasets will now show `sales_report`
![](demo_images/datasets_3.png)

We can also see the column lineage. It seems to include the join columns as part of the lineage.
![](demo_images/datasets_3_col.png)