# Turbine Pipeline

In [0]:
import sys

import pyspark.sql.functions as sf

import dlt
from databricks.sdk.runtime import spark

sys.path.append(spark.conf.get("bundle.sourcePath", "."))
from turbine_ingestion import apply_latest_filter, compute_aggregates, flag_anomalies

raw_path = spark.conf.get("raw_turbine_data_path")

In [None]:
dlt.create_streaming_table("bronze", comment="Bronze level turbine data")


@dlt.append_flow(target="bronze", name="turbines_bronze_ingest_flow")
def turbine_bronze_ingest_flow():
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.inferColumnTypes", "true")
        .option(
            "cloudFiles.allowOverwrites", "true"
        )  # This setting allows for modifications in files
        .load(raw_path)
        .withColumn("file_modification_time", sf.expr("_metadata.file_modification_time"))
        .withColumn("file_name", sf.expr("_metadata.file_name"))
    )

In [None]:
dlt.create_streaming_table(
    name="silver",
    expect_all_or_drop={
        "no_rescued_data": "_rescued_data IS NULL",
        "valid_turbine_id": "turbine_id IS NOT NULL",
        "valid_power_output": "power_output IS NOT NULL",
    },
)


# Silver data with
@dlt.append_flow(target="silver", name="turbines_silver_clean_flow")
def turbines_silver_clean_flow():
    return dlt.read_stream("bronze")

In [None]:
# Summary Statistics for turbines by day
@dlt.table(name="gold_daily_aggregates")
def gold_daily_aggregates():
    return (
        spark.table("silver")
        .transform(apply_latest_filter)
        .transform(lambda df: compute_aggregates(df, "DAY"))
    )

In [None]:
# Summary Statistics for turbines by hour
@dlt.table(name="gold_hourly_aggregates")
def gold_hourly_aggregates():
    return (
        spark.table("silver")
        .transform(apply_latest_filter)
        .transform(lambda df: compute_aggregates(df, "HOUR"))
    )

In [None]:
@dlt.view
def silver_joined():
    return dlt.read_stream("silver").transform(
        lambda df: flag_anomalies(df, spark.table("gold_hourly_aggregates"))
    )


dlt.create_streaming_table(
    name="silver_enriched",
)


# Deduplicated and anomalies flagged
dlt.create_auto_cdc_flow(
    name="turbines_enriched_silver_flow",
    target="silver_enriched",
    source="silver_joined",
    keys=["turbine_id", "timestamp"],
    sequence_by="file_modification_time",
    column_list=[
        "timestamp",
        "turbine_id",
        "wind_speed",
        "wind_direction",
        "power_output",
        "file_modification_time",
        "is_anomaly",
    ],
    stored_as_scd_type=1,
)