<h1>Chapter 1: Schema & Lineage Objects</h1>

<p>
Source and staging are pre-defined schemas that represent the first two stages of the data ingestion pipeline.
They are defined as follows: 
</p>

<span>Source - Raw, unprocessed source data pulled from source files</span>
<ul> <code>SELECT * FROM {source_file}</code> </ul>

<span>Staging - Processed data from source with column metadata information applied</span>
<ul><code>SELECT TRY_CAST({column} AS {column_metadata.data_type}) AS {column} FROM source.{table}</code></ul>

In [1]:
import tyr

# Required to get relative path of test datasets
import os

# Required to illustrate how metadata is used
import pandas as pd

from pprint import pprint

# About The Data

The data used in this tutorial is from the 2023 Formula 1 Singapore Grand Prix, courtesy of FastF1 and the OpenF1 api.
This data is useful as it provides multiple tables connected by common ids, and numeric data with which we can calculate
various metrics (averages, std_dev, etc.).

Thank you to both of these projects for providing such a fantastic data source. If you want to check them out, they
can be found at the following addresses:

- https://github.com/theOehrly/Fast-F1
- https://openf1.org/

In [2]:
# This is the full process to read in file metadata. There is a function to do this in source called read_file_metadata

file_metadata = pd.read_csv(
    os.path.abspath(
        os.path.join(os.getcwd(), "..", "tests/configurations/file_metadata.tsv")
    ),
    sep="\t",
)

display(file_metadata)

file_metadata = {
    row.dataset: tyr.lineage.schema.source.FileMetadata(row)
    for index, row in file_metadata.iterrows()
}

file_metadata

Unnamed: 0,schema,dataset,file_regex,delim,distinct
0,staging,car_location,/home/miles/tyr/tests/datasets/car_location_se...,t,True
1,staging,car_telemetry,/home/miles/tyr/tests/datasets/car_telemetry_s...,t,True
2,staging,circuits,/home/miles/tyr/tests/datasets/f1_circuits.geo...,geojson,True
3,staging,meetings,/home/miles/tyr/tests/datasets/meetings.tsv,t,True
4,staging,race_control,/home/miles/tyr/tests/datasets/race_control_se...,t,True
5,staging,session_status,/home/miles/tyr/tests/datasets/session_status_...,t,True
6,staging,track_status,/home/miles/tyr/tests/datasets/track_status_se...,t,True
7,staging,sessions,/home/miles/tyr/tests/datasets/sessions.tsv,t,True
8,staging,weather,/home/miles/tyr/tests/datasets/weather_session...,t,True
9,staging,results,/home/miles/tyr/tests/datasets/results_session...,t,True


{'car_location': <tyr.lineage.schema.source.FileMetadata at 0x7f90000658e0>,
 'car_telemetry': <tyr.lineage.schema.source.FileMetadata at 0x7f900005f730>,
 'circuits': <tyr.lineage.schema.source.FileMetadata at 0x7f8fa10cceb0>,
 'meetings': <tyr.lineage.schema.source.FileMetadata at 0x7f8fa10cce80>,
 'race_control': <tyr.lineage.schema.source.FileMetadata at 0x7f8fa1083160>,
 'session_status': <tyr.lineage.schema.source.FileMetadata at 0x7f8fa10831f0>,
 'track_status': <tyr.lineage.schema.source.FileMetadata at 0x7f8fa10832b0>,
 'sessions': <tyr.lineage.schema.source.FileMetadata at 0x7f8fa10830d0>,
 'weather': <tyr.lineage.schema.source.FileMetadata at 0x7f8fa1083070>,
 'results': <tyr.lineage.schema.source.FileMetadata at 0x7f8fa1083220>,
 'team_radio': <tyr.lineage.schema.source.FileMetadata at 0x7f8fa1083340>,
 'pit_stops': <tyr.lineage.schema.source.FileMetadata at 0x7f8fa10830a0>,
 'position': <tyr.lineage.schema.source.FileMetadata at 0x7f8fa1083460>,
 'intervals': <tyr.lineage.

In [3]:
# This is the full process to read in column metadata. There is a function to do this in source called read_column_metadata
column_metadata = pd.read_csv(
    os.path.abspath(
        os.path.join(os.getcwd(), "..", "tests/configurations/column_metadata.tsv")
    ),
    sep="\t",
)

column_metadata["ordinal_position"] = column_metadata["ordinal_position"].astype(int)

column_metadata["is_primary_key"] = column_metadata["is_primary_key"].astype(bool)

column_metadata["is_event_time"] = column_metadata["is_event_time"].astype(bool)

column_metadata["filter_values"] = column_metadata["filter_values"].fillna("[]")

for column in [
    column
    for column in column_metadata.columns.tolist()
    if column
    not in ["ordinal_position", "is_primary_key", "is_event_time", "filter_values"]
]:
    column_metadata[column] = column_metadata[column].fillna("")
    column_metadata[column] = column_metadata[column].astype(str)

display(column_metadata)

column_metadata = {
    dataset: {
        row.column_name: tyr.lineage.schema.source.ColumnMetadata(row)
        for index, row in column_metadata[
            column_metadata["dataset"] == dataset
        ].iterrows()
    }
    for dataset in column_metadata.dataset.unique()
}

column_metadata

Unnamed: 0,schema,dataset,column_name,column_alias,var_type,data_type,on_null,is_primary_key,is_event_time,filter_values,on_filter,regex,source_unit,target_unit,scale_factor,precision,ordinal_position
0,staging,car_location,Date,event_ts,timestamp,TIMESTAMP,PASS,True,True,[],PASS,%Y-%m-%d %H:%M:%S.%g,,,,,0
1,staging,car_location,Status,status,categorical,VARCHAR,PASS,False,False,[],PASS,,,,,,2
2,staging,car_location,X,x,numeric,"DECIMAL(15,5)",PASS,False,False,[],PASS,,m^1,,,1dp,3
3,staging,car_location,Y,y,numeric,"DECIMAL(15,5)",PASS,False,False,[],PASS,,m^1,,,1dp,4
4,staging,car_location,Z,z,numeric,"DECIMAL(15,5)",PASS,False,False,[],PASS,,m^1,,,1dp,5
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
104,staging,intervals,date,event_ts,timestamp,TIMESTAMP,PASS,True,True,[],PASS,,,,,,0
105,staging,intervals,session_key,,key,INTEGER,PASS,True,False,[],PASS,,,,,,1
106,staging,intervals,driver_number,,key,INTEGER,PASS,True,False,[],PASS,,,,,,2
107,staging,intervals,gap_to_leader,,numeric,FLOAT,PASS,False,False,[],PASS,,s^1,,,,3


{'car_location': {'Date': <tyr.lineage.schema.source.ColumnMetadata at 0x7f8fa108e550>,
  'Status': <tyr.lineage.schema.source.ColumnMetadata at 0x7f8fa108e520>,
  'X': <tyr.lineage.schema.source.ColumnMetadata at 0x7f8fa10322b0>,
  'Y': <tyr.lineage.schema.source.ColumnMetadata at 0x7f8fa1032850>,
  'Z': <tyr.lineage.schema.source.ColumnMetadata at 0x7f8fa10449a0>,
  'Source': <tyr.lineage.schema.source.ColumnMetadata at 0x7f8fa1032a30>,
  'Time': <tyr.lineage.schema.source.ColumnMetadata at 0x7f8fa1032a00>,
  'SessionTime': <tyr.lineage.schema.source.ColumnMetadata at 0x7f8fa103ea90>,
  'DriverNumber': <tyr.lineage.schema.source.ColumnMetadata at 0x7f8fa1037370>},
 'car_telemetry': {'Date': <tyr.lineage.schema.source.ColumnMetadata at 0x7f8fa10370a0>,
  'RPM': <tyr.lineage.schema.source.ColumnMetadata at 0x7f8fa1037580>,
  'Speed': <tyr.lineage.schema.source.ColumnMetadata at 0x7f8fa1044580>,
  'nGear': <tyr.lineage.schema.source.ColumnMetadata at 0x7f8fa1044550>,
  'Throttle': <tyr.

_________________________________________________________________________________________________________________
# Editing Metadata

We're going to build source to illustrate how column metadata affects the construction of a staging table.

**This is bad practice.** 

Always update column and file metadata in the configuration files and rebuild from
the configuration files. We're only doing this to demonstrate how changing column metadata
changes the staging query.

In [4]:
source = tyr.lineage.schema.source.Source(
    settings=tyr.lineage.schema.source.SourceSettings(
        file_metadata=file_metadata,
        expected_column_metadata=column_metadata,
        extensions=["spatial"],
    )
)

# Staging columns are built using a function called lineage.macros.columns.staging_column_transform()
# Here, we will call it on a single source column.

car_location_subquery = tyr.lineage.tables.Subquery(source.tables.car_location)

source_table = tyr.lineage.tables.Core(
    name="test_column_metadata_change",
    source=car_location_subquery,
    columns=tyr.lineage.core.ColumnList(
        [
            tyr.lineage.macros.columns.staging_column_transform(
                tyr.lineage.tables.Select(
                    source.tables.car_location
                ).columns.list_columns_()[0],
                source.tables.car_location.expected_column_metadata["X"],
            )
        ]
    ),
)

print("Query with 1dp precision")
print(source_table.sql)

# Let's update the precision of X from 1dp to 0dp
source.tables.car_location.expected_column_metadata["X"].precision = "0dp"

# Any object downstream relying on this information must be reinitialized
source_table = tyr.lineage.tables.Core(
    name="test_column_metadata_change",
    source=car_location_subquery,
    columns=tyr.lineage.core.ColumnList(
        [
            tyr.lineage.macros.columns.staging_column_transform(
                tyr.lineage.tables.Select(
                    source.tables.car_location
                ).columns.list_columns_()[0],
                source.tables.car_location.expected_column_metadata["X"],
            )
        ]
    ),
)

print("\n")
print("Query with 0dp precision")
print(source_table.sql)

# Note that re-running this cell without the following line does not reset the column precision to 0dp
# This is because the column metadata has not been re-read from file
source.tables.car_location.expected_column_metadata["X"].precision = "1dp"

Query with 1dp precision
SELECT ROUND(TRY_CAST("X" AS DECIMAL(15, 5)), 1) AS x
FROM
  (SELECT DISTINCT *
   FROM READ_CSV(CAST('/home/miles/tyr/tests/datasets/car_location_session_*.tsv' AS VARCHAR), union_by_name=TRUE, header=TRUE, all_varchar=TRUE)) AS car_location


Query with 0dp precision
SELECT ROUND(TRY_CAST("X" AS DECIMAL(15, 5)), 0) AS x
FROM
  (SELECT DISTINCT *
   FROM READ_CSV(CAST('/home/miles/tyr/tests/datasets/car_location_session_*.tsv' AS VARCHAR), union_by_name=TRUE, header=TRUE, all_varchar=TRUE)) AS car_location


Experiment with changing parameters in the configuration files:
- <code>tyr/test/configurations/column_metadata.tsv</code>
- <code>tyr/test/configurations/file_metadata.tsv</code>

A full list of parameters can be queried in the ColumnMetadata object docs



In [5]:
print(tyr.lineage.schema.source.ColumnMetadata.__doc__)


    The ColumnMetadata object takes the following pd.Series as an argument

    :param schema: Schema containing table
    :type schema: str
    :param dataset: Dataset name. Will be used as table name in schema
    :type dataset: str
    :param column_name: Column name in source.
    :type column_name: str
    :param column_alias: Column alias.
    :type column_alias: str
    :param var_type: Options: Variable type. Used to determine which validation tests are run.
     ``'numeric'``/``'categorical'``/``'string'``/``'timedelta'``/``'datetime'``/``'key'``/``'sequential'``/``''``
     Default: ``''``
    :type var_type: str
    :param data_type: SQL data_type of column to cast to
    :type data_type: str
    :param source_unit: Unit of measurement of column in source file. Default: ``''``
    :type source_unit: str
    :param target_unit: Target unit of measurement to convert to in source table. Default: ``''``
    :type target_unit: str
    :param precision: Precision to round column 

________________________________________________________________________________________________________________
# Building Source & Staging

<p>Now that we have our metadata objects read in, we can construct source and staging. It is importand to note that, although source takes <code>expected_column_metadata</code> as an argument, this isn't used in the construction of the sql query. It is stored within the source table object and used downstream by staging. The <code>SourceSettings</code> and <code>StagingSettings</code> objects are required for their respective schemas to read in the information correctly. The <code>extensions</code> parameter is used to install any necessary duckdb extensions. In this case, <code>spatial</code> will be required as geometry objects exist in the files.</p>

In [6]:
source = tyr.lineage.schema.source.Source(
    settings=tyr.lineage.schema.source.SourceSettings(
        file_metadata=file_metadata,
        expected_column_metadata=column_metadata,
        extensions=["spatial"],
    )
)

staging = tyr.lineage.schema.staging.Staging(
    source=source,
    settings=tyr.lineage.schema.staging.StagingSettings(
        name="staging", extensions=["spatial"]
    ),
)

_________________________________________________________________________________________________________
# Saving schemas

We can save schemas to recall them in other files without re-reading metadata. 
Note that this will mean that changing the metadata files will not update the schema. 
Only recreating them from the files will do this.

This practice is useful for version controlling schemas.

In [7]:
# Note that after this operation, the saved schema will appear in the tests/saved_schema/ directory
source.save(os.path.abspath(os.path.join(os.getcwd(), "..", "tests/saved_schema")))
staging.save(os.path.abspath(os.path.join(os.getcwd(), "..", "tests/saved_schema")))

source = tyr.lineage.schema.core.load_schema_from_pkl(
    os.path.abspath(os.path.join(os.getcwd(), "..", "tests/saved_schema/source.pkl"))
)
staging = tyr.lineage.schema.core.load_schema_from_pkl(
    os.path.abspath(os.path.join(os.getcwd(), "..", "tests/saved_schema/staging.pkl"))
)

___________________________________________________________________________________________________________________________________________________
# Interacting with lineage objects
<p>Now we can interact with the table objects and retrieve their attributes as follows: </p>

In [8]:
# Retrieve name
print("\n")
print(rf"Table name: {staging.tables.sessions.name}")

# Retrieve column names
print("\n")
print("Column Names:")
print(staging.tables.sessions.columns.list_names_())

# Column attributes can be achieved through attribute name or string indexing
print("\n")
print("Column from attribute name:")
print(staging.tables.sessions.columns.start_date.__dict__)

print("\n")
print("Column from string index:")
print(staging.tables.sessions.columns["start_date"].__dict__)

# Retrieve primary key
print("\n")
print("Primary Key:")
print(staging.tables.sessions.primary_key.list_names_())

# Retrieve query
print("\n")
print("SQL:")
print(staging.tables.sessions.sql)

# Retrieve schema of table
print("\n")
print("Schema name:")
print(staging.tables.sessions.schema.name)

# Retrieve name of source table with schema
print("\n")
print("Source table name:")
print(
    rf"{staging.tables.sessions.source.schema.name}.{staging.tables.sessions.source.name}"
)



Table name: sessions


Column Names:
['session_key', 'type', 'name', 'start_date', 'end_date', 'gmt_offset', 'path']


Column from attribute name:
{'source': <tyr.lineage.functions.datetime.StringToTimestamp object at 0x7f8fa06c0700>, 'name': 'start_date', 'data_type': <tyr.lineage.values.Datatype object at 0x7f8fa06c0b50>, 'var_type': 'timestamp', 'on_null': 'PASS', 'is_primary_key': False, 'is_event_time': False, 'current_table': <tyr.lineage.tables.Core object at 0x7f8fa06acb80>, 'unit': <units.core.Unit object at 0x7f8fa0af73d0>, 'sql': 'STRPTIME(TRY_CAST("StartDate" AS VARCHAR), CAST(\'%Y-%m-%d %H:%M:%S\' AS VARCHAR))', '_node_data': {'label': 'start_date', 'name': 'start_date', 'data_type': 'TIMESTAMP', 'var_type': 'timestamp', 'type': "<class 'tyr.lineage.columns.Core'>", 'base': "<class 'tyr.lineage.core._Column'>", 'unit': '', 'macro_group': '', 'sql': 'STRPTIME(TRY_CAST("StartDate" AS VARCHAR), CAST(\'%Y-%m-%d %H:%M:%S\' AS VARCHAR))'}, 'graph': <tyr.lineage.core.LineageGra

______________________________________________________________________________________________________________________________________________________
# Retreiving attributes to root
Attributes can be chained until no root attribute exists

In [9]:
# Schema
print(staging)
print(staging.name)

<tyr.lineage.schema.staging.Staging object at 0x7f8f9e45be80>
staging


In [10]:
# Schema tables
print(staging.tables)
print(staging.tables.list_names_())

<tyr.lineage.core.TableList object at 0x7f8fa0ae8a90>
['car_location', 'car_telemetry', 'circuits', 'meetings', 'race_control', 'session_status', 'track_status', 'sessions', 'weather', 'results', 'team_radio', 'pit_stops', 'position', 'intervals']


In [11]:
# car_telemetry table
print(staging.tables.car_telemetry)
# TableList can also be accessed with string index
print(staging.tables["car_telemetry"].name)

<tyr.lineage.tables.Core object at 0x7f8fa0a30040>
car_telemetry


In [12]:
# car_telemetry columns
print(staging.tables.car_telemetry.columns)
print(staging.tables.car_telemetry.columns.list_names_())

<tyr.lineage.core.ColumnList object at 0x7f8fa0a30220>
['event_ts', 'rpm', 'kmh', 'n_gear', 'throttle', 'brake', 'drs', 'source', 'meeting_time', 'session_time', 'driver_number']


In [13]:
# kmh column in car_telemetry
print(staging.tables.car_telemetry.columns.kmh)
# kmh column name
print(staging.tables.car_telemetry.columns.kmh.name)
# check attributes of kmh
print(rf"var_type: {staging.tables.car_telemetry.columns.kmh.var_type}")
print(rf"data_type: {staging.tables.car_telemetry.columns.kmh.data_type.value}")
print(rf"unit: {staging.tables.car_telemetry.columns.kmh.unit.name}")
print(rf"sql: {staging.tables.car_telemetry.columns.kmh.sql}")

<tyr.lineage.columns.Core object at 0x7f8fa09d6bb0>
kmh
var_type: numeric
data_type: DECIMAL(15,5)
unit: km^1h^-1
sql: ROUND(TRY_CAST("Speed" AS DECIMAL(15, 5)), 1)


In [14]:
# kmh column source in car_telemetry - math ROUND function
print(staging.tables.car_telemetry.columns.kmh.source)
# At any point we can query sql. This will display sql down to the root level
print(staging.tables.car_telemetry.columns.kmh.source.sql)

<tyr.lineage.functions.math.Round object at 0x7f8fa09d6be0>
ROUND(TRY_CAST("Speed" AS DECIMAL(15, 5)), 1)


In [15]:
# math ROUND function arg[0] - data_type TRYCAST function
print(staging.tables.car_telemetry.columns.kmh.source.args[0])
print(staging.tables.car_telemetry.columns.kmh.source.args[0].sql)

<tyr.lineage.functions.data_type.TryCast object at 0x7f8fa09d6c10>
TRY_CAST("Speed" AS DECIMAL(15, 5))


In [16]:
# data_type TRYCAST function arg[0] - expression AS
print(staging.tables.car_telemetry.columns.kmh.source.args[0].args[0])
print(staging.tables.car_telemetry.columns.kmh.source.args[0].args[0].sql)

<tyr.lineage.expressions.As object at 0x7f8fa09d6c40>
"Speed" AS DECIMAL(15, 5)


In [17]:
# expression AS left object - utility function SourceWildToStaging <- This initializes the lineage tree for column objects
print(staging.tables.car_telemetry.columns.kmh.source.args[0].args[0].left)
# We can check which attributes are available to query using the .__dict__ method
pprint(staging.tables.car_telemetry.columns.kmh.source.args[0].args[0].left.__dict__)

<tyr.lineage.functions.utility.SourceWildToStagingColumn object at 0x7f8fa09d6d30>
{'_node_data': {'base': "<class 'tyr.lineage.core._Function'>",
                'data_type': '',
                'label': 'SOURCE_WILD_TO_STAGING_COLUMN',
                'macro_group': 'StagingColumnTransform - 140254857691632',
                'sql': '"Speed"',
                'type': '<class '
                        "'tyr.lineage.functions.utility.SourceWildToStagingColumn'>",
                'unit': 'km^1h^-1',
                'var_type': ''},
 'args': [<tyr.lineage.columns.WildCard object at 0x7f8fa0838910>,
          <tyr.lineage.schema.source.ColumnMetadata object at 0x7f8fa0c82ca0>],
 'data_type': None,
 'distinct': False,
 'framing': None,
 'graph': <tyr.lineage.core.LineageGraph object at 0x7f8fa09d6d60>,
 'is_event_time': False,
 'is_primary_key': False,
 'name': 'SOURCE_WILD_TO_STAGING_COLUMN',
 'order_by': <tyr.lineage.core.OrderBy object at 0x7f8fa0ae8fa0>,
 'partition_by': <tyr.lineage.co

In [18]:
# Get WildCard column from SourceWildToStaging
print(staging.tables.car_telemetry.columns.kmh.source.args[0].args[0].left.args[0])

# We can check the current table of this object to determine the source table
print(
    "Source table: "
    + staging.tables.car_telemetry.columns.kmh.source.args[0]
    .args[0]
    .left.args[0]
    .current_table.schema.name
    + "."
    + staging.tables.car_telemetry.columns.kmh.source.args[0]
    .args[0]
    .left.args[0]
    .current_table.name
)

<tyr.lineage.columns.WildCard object at 0x7f8fa0838910>
Source table: source.car_telemetry


In [19]:
# The WildCard value is the root object and therefore has no source
print(
    staging.tables.car_telemetry.columns.kmh.source.args[0]
    .args[0]
    .left.args[0]
    .source.__dict__
)

{'value': <class 'tyr.lineage.operators.WildCard'>, 'name': '*', 'data_type': <tyr.lineage.values.Datatype object at 0x7f8fa0838970>, 'var_type': None, 'unit': <units.core.Unit object at 0x7f8fa08389a0>, 'sql': '*', '_node_data': {'label': 'WILDCARD', 'data_type': 'WILDCARD', 'var_type': '', 'type': "<class 'tyr.lineage.values.WildCard'>", 'base': "<class 'tyr.lineage.core._Value'>", 'unit': '', 'macro_group': '', 'sql': '*'}, 'graph': <tyr.lineage.core.LineageGraph object at 0x7f8fa0838e50>}
