In [0]:
from pyspark.sql import SparkSession, Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql import functions as F
from pyspark.sql.functions import (
    col, lit, when, year, month, dayofmonth, hour, dayofweek, quarter,
    min, max, avg, stddev, variance, count, mean, to_date, date_format, 
    monotonically_increasing_id, isnan
)

Bronze Layer - Ingestion

In [0]:
# Define schema for Bronze Layer
bronze_schema = StructType([
    StructField("timestamp", StringType(), True),
    StructField("turbine_id", IntegerType(), True),
    StructField("wind_speed", DoubleType(), True),
    StructField("wind_direction", DoubleType(), True),
    StructField("power_output", DoubleType(), True)
])

# Step 1: Bronze Layer
bronze_df = (
    spark.read.format("csv")
    .option("header", "true")
    .schema(bronze_schema)
    .load("/FileStore/colibri/*.csv")
)

# Data Quality Checks

# 1. Check for duplicate rows
if bronze_df.count() != bronze_df.dropDuplicates().count():
    print("Soft warning: Duplicate rows found in Bronze Layer!")

# 2. Check for nulls in critical fields
critical_fields = ["timestamp", "turbine_id", "power_output"]
null_critical_fields = bronze_df.select(
    [col(f).isNull().alias(f"{f}_is_null") for f in critical_fields]
).filter(
    col(f"{critical_fields[0]}_is_null") |
    col(f"{critical_fields[1]}_is_null") |
    col(f"{critical_fields[2]}_is_null")
).count()

if null_critical_fields > 0:
    raise ValueError(f"Null values found in critical fields of Bronze Layer! ({null_critical_fields} rows)")

# 3. Check for invalid or null dates
invalid_dates = bronze_df.withColumn("date", to_date("timestamp")).filter(col("date").isNull()).count()
if invalid_dates > 0:
    print(f"Invalid or null timestamps found in {invalid_dates} rows!")

# 4. Check for unreasonable ranges in numerical fields
wind_speed_range = (0, 100)  # Example: 0 to 100 m/s
wind_direction_range = (0, 360)  # Example: 0 to 360 degrees
power_output_range = (0, 5000)  # Example: 0 to 5000 MW

out_of_range_count = bronze_df.filter(
    (col("wind_speed") < wind_speed_range[0]) | (col("wind_speed") > wind_speed_range[1]) |
    (col("wind_direction") < wind_direction_range[0]) | (col("wind_direction") > wind_direction_range[1]) |
    (col("power_output") < power_output_range[0]) | (col("power_output") > power_output_range[1])
).count()

if out_of_range_count > 0:
    print(f"Soft warning: Found {out_of_range_count} rows with out-of-range values in Bronze Layer!")

# 5. Check for non-numeric or NaN values in numerical fields
nan_count = bronze_df.select(
    [isnan(col(f)).alias(f"{f}_is_nan") for f in ["wind_speed", "wind_direction", "power_output"]]
).filter(
    col("wind_speed_is_nan") | col("wind_direction_is_nan") | col("power_output_is_nan")
).count()

if nan_count > 0:
    print(f"Soft warning: NaN values found in numerical fields ({nan_count} rows)!")

# Save Bronze Layer
bronze_df.write.format("delta").mode("overwrite").partitionBy("turbine_id").saveAsTable("bronze_turbine_data")
bronze_df.display()


timestamp,turbine_id,wind_speed,wind_direction,power_output
2022-03-01 00:00:00,11,9.1,269.0,2.9
2022-03-01 00:00:00,12,11.3,316.0,2.5
2022-03-01 00:00:00,13,11.2,148.0,3.7
2022-03-01 00:00:00,14,10.7,97.0,1.6
2022-03-01 00:00:00,15,11.0,81.0,4.4
2022-03-01 01:00:00,11,12.3,245.0,1.8
2022-03-01 01:00:00,12,11.0,293.0,2.2
2022-03-01 01:00:00,13,11.4,270.0,1.9
2022-03-01 01:00:00,14,10.4,140.0,2.3
2022-03-01 01:00:00,15,14.6,283.0,4.3


Silver layer - Processing

In [0]:
#Step 3: Silver 

bronze_df = bronze_df.dropDuplicates()

# Calculate average values within each turbine_id group
turbine_window = Window.partitionBy("turbine_id")
filled_df = (
    bronze_df
    .withColumn(
        "wind_speed",
        F.when(F.col("wind_speed").isNull(), F.avg("wind_speed").over(turbine_window)).otherwise(F.col("wind_speed"))
    )
    .withColumn(
        "wind_direction",
        F.when(F.col("wind_direction").isNull(), F.avg("wind_direction").over(turbine_window)).otherwise(F.col("wind_direction"))
    )
    .withColumn(
        "power_output",
        F.when(F.col("power_output").isNull(), F.avg("power_output").over(turbine_window)).otherwise(F.col("power_output"))
    )
)

### ANOMALY REMOVAL ### 

# Define the columns to process
stats_columns = ["wind_speed", "wind_direction", "power_output"]

# Initialize cleaned DataFrame with the original filled DataFrame
cleaned_df = filled_df

for column in stats_columns:
    # Calculate Q1, Q3, and IQR for the column
    q1, q3 = filled_df.approxQuantile(column, [0.25, 0.75], 0)
    iqr = q3 - q1
    lower_bound = q1 - 1.5 * iqr
    upper_bound = q3 + 1.5 * iqr

    # Filter rows based on the IQR bounds
    silver_df = cleaned_df.filter((col(column) >= lower_bound) & (col(column) <= upper_bound))

# Save Silver Layer
silver_df.write.format("delta").mode("overwrite").partitionBy("turbine_id").saveAsTable("silver_turbine_data")
silver_df.display()

timestamp,turbine_id,wind_speed,wind_direction,power_output
2022-03-07 04:00:00,1,9.7,44.0,2.7
2022-03-23 19:00:00,1,12.0,87.0,1.7
2022-03-24 15:00:00,1,14.8,311.0,3.8
2022-03-30 01:00:00,1,13.4,321.0,2.9
2022-03-15 00:00:00,1,13.1,216.0,2.6
2022-03-19 03:00:00,1,13.8,10.0,2.6
2022-03-30 14:00:00,1,10.5,195.0,2.1
2022-03-05 18:00:00,1,12.6,162.0,4.2
2022-03-28 12:00:00,1,12.2,223.0,2.4
2022-03-04 20:00:00,1,9.4,126.0,3.4


Gold layer - Processing & Warehousing 

In [0]:

# Step 3: Gold Layer (Star Schema)

# Create a fact table
silver_df_with_keys = silver_df \
    .withColumn("record_sid", monotonically_increasing_id()) \
    .withColumn("date_key", date_format(to_date("timestamp"), "yyyyMMdd"))

fact_turbine_metrics = silver_df_with_keys.select(
    "record_sid",         
    "turbine_id",    
    "wind_speed",         
    "wind_direction",    
    "power_output",
    "timestamp",
    "date_key"                
)

# Dimension: Time
dim_time_df = cleaned_df.select(to_date("timestamp").alias("date")).distinct() \
    .withColumn("date_key", date_format("date", "yyyyMMdd")) \
    .withColumn("year", year("date")) \
    .withColumn("month", month("date")) \
    .withColumn("day", dayofmonth("date")) \
    .withColumn("day_of_week", dayofweek("date")) \
    .withColumn("quarter", quarter("date")) \
    .select(
        "date_key", "date", "year", "month", "day", "day_of_week", "quarter"
    )

# Dimension: Turbine
dim_turbine_df = (
    cleaned_df.select("turbine_id")
    .distinct()
    .withColumn("location", lit("unknown"))      # Replace "unknown" with actual defaults if available
    .withColumn("manufacturer", lit("unknown")) # Replace "unknown" with actual defaults if available
    .withColumn("capacity", lit("unknown"))     # Replace "unknown" with actual defaults if available
)

# Save the fact and dimension tables
fact_turbine_metrics.write.format("delta").mode("overwrite").saveAsTable("gold_fact_turbine_metrics")
dim_time_df.write.format("delta").mode("overwrite").saveAsTable("gold_dim_time")
dim_turbine_df.write.format("delta").mode("overwrite").saveAsTable("gold_dim_turbine")

fact_turbine_metrics.display()

record_sid,turbine_id,wind_speed,wind_direction,power_output,timestamp,date_key
0,1,9.7,44.0,2.7,2022-03-07 04:00:00,20220307
1,1,12.0,87.0,1.7,2022-03-23 19:00:00,20220323
2,1,14.8,311.0,3.8,2022-03-24 15:00:00,20220324
3,1,13.4,321.0,2.9,2022-03-30 01:00:00,20220330
4,1,13.1,216.0,2.6,2022-03-15 00:00:00,20220315
5,1,13.8,10.0,2.6,2022-03-19 03:00:00,20220319
6,1,10.5,195.0,2.1,2022-03-30 14:00:00,20220330
7,1,12.6,162.0,4.2,2022-03-05 18:00:00,20220305
8,1,12.2,223.0,2.4,2022-03-28 12:00:00,20220328
9,1,9.4,126.0,3.4,2022-03-04 20:00:00,20220304


Summary statistics

In [0]:
# Define the time period (e.g., extract the date for grouping by 24 hours)
daily_summary_df = fact_turbine_metrics \
    .groupBy("turbine_id", "date_key") \
    .agg(
        min("power_output").alias("min_power_output"),
        max("power_output").alias("max_power_output"),
        avg("power_output").alias("avg_power_output")
    )
    
daily_summary_df.write.format("delta").mode("overwrite").saveAsTable("gold_daily_summary")
daily_summary_df.display()

turbine_id,date_key,min_power_output,max_power_output,avg_power_output
1,20220307,1.9,4.3,3.283333333333333
1,20220323,1.7,4.4,3.225
1,20220324,1.6,4.3,2.9791666666666665
1,20220330,1.5,4.4,3.0500000000000003
1,20220315,1.6,4.4,3.0125
1,20220319,1.8,4.5,3.1250000000000004
1,20220305,1.6,4.3,3.245833333333332
1,20220328,1.6,4.5,3.0749999999999997
1,20220304,1.5,4.4,2.9874999999999994
1,20220320,1.6,3.8,2.716666666666667


In [0]:

# Step 1: Calculate mean and standard deviation for each turbine over a given period (e.g., daily)
stats_df = fact_turbine_metrics \
    .groupBy("turbine_id", "date_key") \
    .agg(
        mean("power_output").alias("mean_power_output"),
        stddev("power_output").alias("stddev_power_output")
    )

# Step 2: Join the calculated statistics back to the fact table
joined_df = fact_turbine_metrics.join(stats_df, on=["turbine_id", "date_key"], how="inner")

# Step 3: Identify anomalies (output outside ±2 standard deviations from the mean)
anomalies_df = joined_df.filter(
    (col("power_output") > col("mean_power_output") + 2 * col("stddev_power_output")) |
    (col("power_output") < col("mean_power_output") - 2 * col("stddev_power_output"))
).select("turbine_id", "date_key", "power_output", "mean_power_output", "stddev_power_output")

# Show the anomalies
anomalies_df.display()
anomalies_df.write.format("delta").mode("overwrite").saveAsTable("gold_anomalies")


turbine_id,date_key,power_output,mean_power_output,stddev_power_output
1,20220305,1.6,3.245833333333332,0.7150428182172436
1,20220329,1.5,3.1291666666666664,0.8110536178419356
1,20220311,4.4,2.5750000000000006,0.779771426932835
1,20220326,4.5,2.816666666666667,0.8354465958352776
2,20220331,1.6,3.2291666666666674,0.7123013325874079
2,20220322,4.5,2.920833333333334,0.7835144718103375
2,20220328,4.5,2.858333333333333,0.8139632865701364
2,20220302,4.2,2.4666666666666663,0.7545033878776428
3,20220317,1.5,3.25,0.8387361091961912
3,20220321,1.5,3.3416666666666663,0.8581662282811638


Machine learning

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

# Prepare the data
features = ["wind_speed", "wind_direction"]
assembler = VectorAssembler(inputCols=features, outputCol="features")
data = assembler.transform(fact_turbine_metrics).select("features", "power_output")

# Split into training and test sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Train a Linear Regression model
lr = LinearRegression(featuresCol="features", labelCol="power_output")
lr_model = lr.fit(train_data)

# Evaluate the model
predictions = lr_model.transform(test_data)
predictions.select("features", "power_output", "prediction").display()


features,power_output,prediction
"Map(vectorType -> dense, length -> 2, values -> List(9.0, 12.0))",3.4,2.9960878042254326
"Map(vectorType -> dense, length -> 2, values -> List(9.0, 26.0))",1.8,2.9970752271879717
"Map(vectorType -> dense, length -> 2, values -> List(9.0, 31.0))",2.8,2.997427878246021
"Map(vectorType -> dense, length -> 2, values -> List(9.0, 50.0))",1.9,2.998767952266609
"Map(vectorType -> dense, length -> 2, values -> List(9.0, 67.0))",3.3,2.999966965863978
"Map(vectorType -> dense, length -> 2, values -> List(9.0, 92.0))",3.1,3.0017302211542254
"Map(vectorType -> dense, length -> 2, values -> List(9.0, 114.0))",4.5,3.003281885809644
"Map(vectorType -> dense, length -> 2, values -> List(9.0, 144.0))",3.0,3.0053977921579413
"Map(vectorType -> dense, length -> 2, values -> List(9.0, 194.0))",2.3,3.008924302738437
"Map(vectorType -> dense, length -> 2, values -> List(9.0, 201.0))",3.3,3.0094180142197064


In [0]:
from pyspark.ml.classification import LogisticRegression

# Label data for anomalies (e.g., power_output < threshold)
threshold = 100  # Example threshold
labeled_df = fact_turbine_metrics.withColumn(
    "label", when(col("power_output") < threshold, 1).otherwise(0)
)

# Prepare the data
features = ["wind_speed", "wind_direction"]
assembler = VectorAssembler(inputCols=features, outputCol="features")
data = assembler.transform(labeled_df).select("features", "label")

# Split into training and test sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Train a Logistic Regression model
lr = LogisticRegression(featuresCol="features", labelCol="label")
lr_model = lr.fit(train_data)

# Evaluate the model
predictions = lr_model.transform(test_data)
predictions.select("features", "label", "prediction", "probability").display()
