%md
# Transform Pipeline

Transform the source data

In [0]:
import dlt
from pyspark.sql import functions as F

source_table = spark.conf.get("source_table")


In [0]:
catalog = "project"
br_schema = "bronze"

#Return the rules matching the tag as a format ready for DLT annotation.

def get_rules(tablename, quality):
  """
    loads data quality rules from csv file
    :param tablename: tablename to match
    :param quality: logical quality level
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  df = spark.table(f"{catalog}.{br_schema}.expectations").where(f"tablename = '{tablename}' and quality ='{quality}'")
  for row in df.collect():
    rules[row['name']] = row['constraint']
  return rules

In [0]:
@dlt.view(
  name = "bronze_data_view",
  comment="Raw data from the turbines"
)
@dlt.expect_all_or_drop(get_rules('turbine_data','silver'))
def bronze_data_view():
  df = (
    spark.readStream.table(source_table).select(
      F.sha1(F.concat(F.col("timestamp").cast("string"), F.col("turbine_id").cast("string"))).alias("ReadingKey"),
      F.col("timestamp").alias("ReadingDate"),
      F.col("turbine_id").alias("TurbineNumber"),
      ## Smooth next three fields based on last successful reading
      F.col("wind_speed").alias("WindSpeed"),
      F.col("wind_direction").alias("WindDirection"),
      F.col("power_output").alias("PowerOutput"),
      F.col("filename"),
      F.col("processedtime").alias("loadeddatetime")
      )
  )
  return df

##TODO
## handle missing wind speed, wind direction and power output

dlt.create_streaming_table(
  name = "turbine_data",
  table_properties = {"quality": "silver"},
  comment="Cleaned and deduplicated turbine data"
  )

dlt.apply_changes(
    target = "turbine_data",
    source = "bronze_data_view",
    keys = ["ReadingKey"],
    sequence_by = F.col("ReadingDate"),
    except_column_list = ["loadeddatetime", "filename"],
  )