#Databricks Platform Overview
This project aims to load the raw data from the wind turbines into Databricks, process & transform, and provide a platform for analytics on this data.

##Database Objects
We use a Unity Catalog enabled Databricks workspace, with a catalog created for this project `lakehouse_sbx`. Under this is a dedicated schema for the project called `cd_edw`. This is a managed schema located on a dedicated storage account container and will serve as a central location for all objects created as part of this project.

The project also follows the medallion architecture of Bronze -> Silver -> Gold where possible.

#Ingest Raw Data
Ingest raw CSV data from Azure Data Lake. This location is saved and exposed as a volume on Databricks Unity Catalog under the name `/Volumes/lakehouse_sbx/cd_edw/landing`.

We specify the schema to avoid issues in the future, but also include a `_rescued_data` column to catch any new fields that may appear in the future.

Since the source data is an append-only set of CSV files that is updated once daily, the following options were considered:
1. Batch loads on a trigger everyday after the files are updated to load all raw data into a Dataframe and overwrite the Bronze table.
1. Set up a Python method to track the rows that were read for each file, and every subsequent load would skip the previously loaded lines in the CSV and load only the new records (if any exist). 
    - This essentially decouples the integration layer with the data load layer, allowing the Databricks pipeline to be run whenever and at any frequency, regardless of when data is landed in the data lake.
    - Reduces overhead in the future as the data grows as data is always incrementally processed.
    - Caveat, this adds a bit of complexity to the pipeline, with an audit log Delta Table and reading from and writing to this table during each data load run.

##Questions
1. Do we have control over the ingest pattern to create new files that are date-partitioned instead of appending to a single/group of files? This could allow for using Autoloader, which handles checkpoints and processed files automatically enabling incremental loads.

In [0]:
spark.sql("""
CREATE TABLE IF NOT EXISTS lakehouse_sbx.cd_edw.edw_turbine_ingest_audit_log (
    file_name STRING,
    file_path STRING,
    last_processed_position LONG,
    process_time TIMESTAMP
) USING DELTA
""")

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType, DecimalType
from pyspark.sql.functions import *

schema = StructType([
  StructField("timestamp", TimestampType(), True),
  StructField("turbine_id", IntegerType(), True),
  StructField("wind_speed", DecimalType(6, 2), True),
  StructField("wind_direction", DecimalType(6, 2), True),
  StructField("power_output", DecimalType(6, 2), True)
])

def load_csv_incremental(schema, file_path):
    file_path_list = dbutils.fs.ls(file_path)

    final_df = spark.createDataFrame([], schema=schema)

    for file in file_path_list:
        last_position = spark.sql(f"""
                SELECT COALESCE(MAX(last_processed_position), 0) as pos
                FROM lakehouse_sbx.cd_edw.edw_turbine_ingest_audit_log
                WHERE file_name = '{file.name}'
            """).collect()[0]['pos']
        
        temp_df = (spark.read.format("csv") \
            .option("header", True) \
            .option("mode", "PERMISSIVE") \
            .option("rescuedDataColumn", "_rescued_data") \
            .option("skipRows", last_position)
            .schema(schema) \
            .load(f"{file_path}{file.name}") \
            .select(
                "*"
                , col("_metadata.file_path").alias("raw_file_path")
                , element_at(split("_metadata.file_path", '/'), array_size(split("_metadata.file_path", '/'))).alias("raw_file_name")
                , from_utc_timestamp(current_timestamp(), 'GMT').alias("bronze_processing_time")
            ))
                
        if temp_df.count() > 0:
            temp_df.write \
                .option("overwriteSchema", "true") \
                .saveAsTable("`lakehouse_sbx`.`cd_edw`.`brnz_turbine_data`",mode="append")

            new_position = last_position + temp_df.count()
            spark.sql(f"""
                INSERT INTO lakehouse_sbx.cd_edw.edw_turbine_ingest_audit_log 
                VALUES (
                    '{file.name}'
                    , '{file_path}'
                    , '{new_position}'
                    , current_timestamp()
                )
            """)

load_csv_incremental(schema, '/Volumes/lakehouse_sbx/cd_edw/landing/cd_raw_data/')


#Create Transformation Tables for Silver Layer
Create all tables required for cleaning and transforming Turbine data. Performs the following steps:
1. Create temporary dataframe with all 24h for the last 30d at the time of processing. This will be used to impute any missing values.
1. Join this against the Bronze table to get a "master" list of all datapoints over the last 30 days, with readings as NULLs for where they are missing.
1. Impute these missing values from the previous known readings. A simple `last` function is used to forward fill values.

In [0]:
from pyspark.sql.window import Window
from datetime import timedelta
import pandas as pd

##########################
# Load delta table into df
##########################
data_df = spark.read.format("delta") \
                    .table("lakehouse_sbx.cd_edw.brnz_turbine_data") \
                    .select("timestamp", "turbine_id", "wind_speed", "wind_direction", "power_output")

impute_table_name = "lakehouse_sbx.cd_edw.silver_impute_turbine_data"
min_date, max_date = data_df.select(min("timestamp"), max("timestamp")).first()
table_exists = spark.catalog.tableExists(impute_table_name)

if table_exists:
    min_date = max_date - timedelta(days=30)

hourly_range = pd.date_range(start=min_date, end=max_date, freq='H')

all_hours = spark.createDataFrame(
    [(ts.to_pydatetime(),) for ts in hourly_range],
    ["timestamp"]
)

all_turbines = data_df.select("turbine_id").distinct()

complete_index = all_hours.crossJoin(all_turbines)

window_spec = Window.partitionBy("turbine_id").orderBy("timestamp").rowsBetween(-24, 0)

impute_df = complete_index.join(data_df, ["timestamp", "turbine_id"], "left") \
    .withColumn("wind_speed_filled", last("wind_speed", ignorenulls=True).over(window_spec)) \
    .withColumn("wind_direction_filled", last("wind_direction", ignorenulls=True).over(window_spec)) \
    .withColumn("power_output_filled", last("power_output", ignorenulls=True).over(window_spec))
    
final_filled_df = impute_df \
    .withColumn("wind_speed", coalesce("wind_speed", "wind_speed_filled")) \
    .withColumn("wind_direction", coalesce("wind_direction", "wind_direction_filled")) \
    .withColumn("power_output", coalesce("power_output", "power_output_filled")) \
    .drop("wind_speed_filled", "wind_direction_filled", "power_output_filled")

if not table_exists:
    final_filled_df.write.format("delta") \
        .mode("overwrite") \
        .partitionBy("turbine_id", "date") \
        .saveAsTable(impute_table_name)
else:
    target_table = DeltaTable.forName(spark, impute_table_name)
    (
        target_table.alias("target")
        .merge(
            final_filled_df.alias("source"),
            "target.timestamp = source.timestamp AND target.turbine_id = source.turbine_id"
        )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )






In [0]:
stat01_table_name = "lakehouse_sbx.cd_edw.silver_turbine_statistics_01"
table_exists = spark.catalog.tableExists(stat01_table_name)
data_df = spark.read.format("delta") \
                    .table("lakehouse_sbx.cd_edw.silver_impute_turbine_data") \
                    .select("timestamp", "turbine_id", "wind_speed", "wind_direction", "power_output")

trfn_df = data_df.withColumn("date", to_date(col("timestamp"))) \
            .withColumn("month", month(col("timestamp"))) \
            .withColumn("24h_avg_output", avg("power_output").over(Window.partitionBy("turbine_id", "date")).cast("decimal(6,2)")) \
            .withColumn("24h_stddev_output", stddev("power_output").over(Window.partitionBy("turbine_id", "date")).cast("decimal(7,3)")) \
            .withColumn("1m_avg_output", avg("power_output").over(Window.partitionBy("turbine_id", "month")).cast("decimal(6,2)")) \
            .withColumn("1m_stddev_output", stddev("power_output").over(Window.partitionBy("turbine_id", "month")).cast("decimal(7,3)")) \
            .na.fill({"power_output": 0.0, "wind_speed": 0.0}) \
            .withColumn("is_24h_anomaly", (col("power_output") > (col("24h_avg_output") + (2 * col("24h_stddev_output"))))) \
            .withColumn("silver_processing_time", from_utc_timestamp(current_timestamp(), 'GMT'))

trfn_df.sort("timestamp","turbine_id") \
    .where(trfn_df.turbine_id == 1) \
    .display()

# if not table_exists:
#     trfn_df.write.format("delta") \
#         .mode("overwrite") \
#         .saveAsTable(stat01_table_name)
# else:
#     target_table = DeltaTable.forName(spark, stat01_table_name)
#     (
#         target_table.alias("target")
#         .merge(
#             trfn_df.alias("source"),
#             "target.timestamp = source.timestamp AND target.turbine_id = source.turbine_id"
#         )
#         .whenMatchedUpdateAll()
#         .whenNotMatchedInsertAll()
#         .execute()
#     )

In [0]:
final_filled_df.sort("turbine_id", "timestamp").display()

In [0]:
%sql
select * from lakehouse_sbx.cd_edw.brnz_turbine_data
where turbine_id = 1

In [0]:
impute_table_name = "lakehouse_sbx.cd_edw.brnz_turbine_data"
a = spark.catalog.tableExists(impute_table_name)
# Determine full or incremental load
# try:
#     spark.catalog.tableExists(impute_table_name)
# except AnalysisException:
#     table_exists = False

print(a)
# # Date range for imputation
# min_ts, max_ts = data_df.select(min("timestamp"), max("timestamp")).first()

# if table_exists:
#     min_date = max_ts - timedelta(days=30)
# else:
#     min_date = min_ts  # Use full range on first load