# 🏭 DLT pipeline

This Delta Live Tables (DLT) definition is executed using a pipeline defined in resources/turbines.pipeline.yml.

### 📦 Import libraries

In [0]:
# Import DLT and src/turbines
import dlt
import sys
# from databricks.sdk.runtime import spark

sys.path.append(spark.conf.get("bundle.sourcePath", "."))
from turbines.transformations.bronze import *
from turbines.transformations.silver import *
from turbines.transformations.gold import *
from turbines.dq.expectations import get_rules_for_table

### 🖊️ Get parameters & DQ rules

In [None]:
# Get parameters
landing_path = spark.conf.get("landing.path")
expectations_table = spark.conf.get("expectations.table")

# Get all the DQ rules from the expectations table
rules_df = spark.read.table(expectations_table)

### 💬 Define table names

In [None]:
bronze_table_name = "bronze_turbine"
silver_table_name = "silver_turbine"
quarantine_table_name = "quarantine_turbine"
gold_table_name = "gold_turbine_daily_stats"

### 🥉 Bronze table

In [None]:
@dlt.table(
      name = bronze_table_name,
      comment = "Raw turbine telemetry data"
)
def bronze_turbine():
      df = (spark.readStream.format("cloudFiles")
          .option("cloudFiles.format", "csv")
          .option("cloudFiles.inferColumnTypes", True)
          .option("delimiter", ",")
          .option("header", True)
          .option("dateFormat", "yyyy-MM-dd")
          .load(landing_path)
          .transform(add_file_metadata_column)
      )
      return df

### 🥈 Silver Table

In [None]:
# Get drop rules for the silver table (also required by quarantine table)
silver_drop_rules = get_rules_for_table(rules_df, silver_table_name, "drop")

# Define the silver table
@dlt.expect_all_or_drop(silver_drop_rules)
@dlt.table(
      name = silver_table_name,
      comment = "Cleaned turbine telemetry data"
)
def silver_turbine():
      df = (dlt.read_stream(bronze_table_name)
            .transform(drop_audit_columns))

      return df

### ⛔ Quarantine table

In [None]:
@dlt.table(
    name = quarantine_table_name,
    comment = "Bad turbine telemetry data"
)
def quarantine_turbine():
    return(dlt.read_stream(bronze_table_name)
            .transform(drop_audit_columns)
            .transform(quarantine, silver_drop_rules)
    )

### 🥇 Gold table

In [None]:
@dlt.table(
    name = gold_table_name,
    comment = "Turbine telemetry data summarised each 24-hour period"
)
def gold_turbine_daily_stats():
    return (dlt.read_stream(silver_table_name)
          .transform(summarise_daily)
    )