# Lineage Demo

Source Dataset: https://www.kaggle.com/datasets/pschale/mlb-pitch-data-20152018?select=2019_atbats.csv



In [0]:
%python
target_catalog = "lgeorge"
target_schema = "lineage_demo"

In [0]:
%py
spark.sql("CREATE CATALOG IF NOT EXISTS {target_catalog}")
spark.sql("CREATE SCHEMA IF NOT EXISTS {target_schema}")

In [0]:
%python
spark.sql(f"USE CATALOG {target_catalog}")
spark.sql(f"USE SCHEMA {target_schema}")


In [0]:
CREATE VOLUME IF NOT EXISTS raw_data

In [0]:
%py
volume_path = "/Volumes/lgeorge/lineage_demo/raw_data"

In [0]:
%python
from zipfile import ZipFile 
  
with ZipFile(f"{volume_path}/data_files.zip", 'r') as zipped_file: 
  
    zipped_file.extractall( path=f"{volume_path}/") 

In [0]:
%py
import glob

for filepath in glob.glob(f"{volume_path}/*.csv"):
    if '2019' not in filepath:
      
      table_name = filepath.split("/")[-1][:-4]

      df = spark.read.format("csv").option("header", True).option("inferSchema", True).load(filepath)
      df.write.format("delta").mode("overwrite").saveAsTable(table_name)


In [0]:
CREATE TABLE atbat_general
AS
SELECT
  ab.g_id as game_id,
  ab.ab_id,
  concat_ws(" ", b.first_name, b.last_name) as batter_name,
  concat_ws(" ", p.first_name, p.last_name) as pitcher_name,  
  g.umpire_HP,
  ab.event as atbat_event,
  ab.inning,
  ab.o as out,
  ab.top as top_of_inning,
  g.home_team,
  g.away_team,
  g.weather,
  g.wind
FROM
  atbats ab
LEFT JOIN player_names b ON ab.batter_id = b.id
LEFT JOIN player_names p ON ab.pitcher_id = p.id
LEFT JOIN games g ON ab.g_id = g.g_id

In [0]:
CREATE TABLE ejected_players_summary
AS
SELECT
  e.player_id as player_id,
  concat_ws(" ", p.first_name, p.last_name) as ejected_player_name,
  count(*) as ejected_count
FROM
  player_names p
INNER JOIN ejections e ON p.id = e.player_id
GROUP BY player_id, ejected_player_name
ORDER BY ejected_count DESC;

In [0]:
%py
import requests, json

def make_request(tpe, data):
  url = "https://YOUR_HOST.azuredatabricks.net/api/2.0/lineage-tracking/custom"
  headers = {
    'Content-Type': 'application/json',
    'Authorization': f'Bearer {dbutils.entry_point.getDbutils().notebook().getContext().apiToken().get()}'
  }
  res = requests.request(tpe, url, headers=headers, data=json.dumps(data)).text
  print(json.dumps(json.loads(res), indent=2))

def byol_create(data): make_request("POST", data)
def byol_update(data): make_request("PATCH", data)
def byol_delete(data): make_request("DELETE", data)
def byol_list(data): make_request("GET", data)

catalog = target_catalog
schema = target_schema

def table(name): return f'{catalog}.{schema}.{name}'


In [0]:
%py
byol_create({
  "entities": [
    {
      "entity_id": {
        "provider_type": "CUSTOM",
        "guid": "mlb2323"
      },
      "entity_type": "PowerBI Report",
      "display_name": "Baseball Report",
      "url": "https://app.powerbi.com/",
      "description": "Insights report in PBI for Baseball",
      "properties": """{
  		"name": "Sales Report",
  		"datasetId": "abc12345-6789-def0-1234-56789abcdef0",
  		"settings": {
    			"theme": "Light",
    			"autoRefresh": true,
    			"refreshInterval": 300
  		}
}"""
    }
  ],
  "relationships": [
    {
      "source": {
        "provider_type": "DATABRICKS",
        "databricks_type": "TABLE",
        "guid": table("atbat_general")
      },
      "target": {
        "provider_type": "CUSTOM",
        "guid": "mlb2323"
      }
    }
  ]
})
