# DELTA LIVE TABLES GOLD LAYER 

## Coaches dlt pipeline 

In [0]:
import dlt
from pyspark.sql.functions import *

## Expectations for Data Quality

In [0]:
expect_coaches = {
    "rule1": "code is not null",
    "rule2": "current is true"
}

In [0]:
expect_nocs = {
    "rule1": "code is not null"
}

In [0]:
expect_events = {
    "rule1": "event is not null"
}

In [0]:
@dlt.table

def source_coaches():
    df = spark.readStream.table('olympics.silver.coaches')
    return df

Name,Type
code,string
current,string
name,string
gender,string
function,string
category,string
country_code,string
country,string
country_long,string
disciplines,string


In [0]:
@dlt.view
def view_coaches():
    df = spark.readStream.table('LIVE.source_coaches')
    return df

Name,Type
code,string
current,string
name,string
gender,string
function,string
category,string
country_code,string
country,string
country_long,string
disciplines,string


In [0]:
@dlt.table
@dlt.expect_all(expect_coaches)
def coaches():
    df = spark.readStream.table('LIVE.view_coaches')
    return df

Name,Type
code,string
current,string
name,string
gender,string
function,string
category,string
country_code,string
country,string
country_long,string
disciplines,string


## NOCS DLT Pipeline

In [0]:
@dlt.view 

def source_nocs():
    df = spark.readStream.table('olympics.silver.nocs')
    return df

Name,Type
code,string
country_long,string
tag,string
note,string


In [0]:
@dlt.table
@dlt.expect_all_or_drop(expect_nocs)
def nocs():
    df = spark.readStream.table('LIVE.source_nocs')
    return df

Name,Type
code,string
country_long,string
tag,string
note,string


## EVENTS DLT Pipeline

In [0]:
@dlt.view 

def source_events():
    df = spark.readStream.table('olympics.silver.events')
    return df

Name,Type
event,string
tag,string
sport,string
sport_code,string
sport_url,string


In [0]:
@dlt.table
@dlt.expect_all(expect_events)
def events():
    df = spark.readStream.table('LIVE.source_events')
    return df

Name,Type
event,string
tag,string
sport,string
sport_code,string
sport_url,string


## CDC - Apply Changes (DLT)

In [0]:
@dlt.view

def source_athletes():
    df = spark.readStream.table('olympics.silver.athletes')
    return df 

Name,Type
athlete_id,string
current,string
name,string
name_short,string
name_tv,string
gender,string
function,string
country_code,string
country,string
country_long,string


In [0]:
dlt.create_streaming_table('athletes')

In [0]:
dlt.apply_changes(
    target = "athletes",
    source = "source_athletes",
    keys = ["athlete_id"],
    sequence_by = col("height"),
    stored_as_scd_type = 1
)

Name,Type
athlete_id,string
current,string
name,string
name_short,string
name_tv,string
gender,string
function,string
country_code,string
country,string
country_long,string
