
## Gold Layer
Purpose of this layer is for feature engineering in our scenario (or business case use)

In silver layer, we already transformed and processed data (filter, pivot, forward fill to fix missing timestamps)


Now in this layer we have the necessary information we need to build data normalisation for each sensor


### Imports

In [0]:
from pyspark.ml.feature import MinMaxScaler, VectorAssembler
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.functions import vector_to_array

from pyspark.sql import Window
from pyspark.sql.functions import col, first, last, to_timestamp, lower
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, ArrayType, MapType

import json
import pickle
import pandas as pd

from io import BytesIO


### Paths

In [0]:
silver_source = "abfss://silver@sembcorpete.dfs.core.windows.net/"
gold = "abfss://gold@sembcorpete.dfs.core.windows.net/"


### Helper functions

In [0]:
def build_normalization_pipeline(sensor_cols):
    """
    Min Max Scaler for data normalisation 
    
    Args:
        sensor_cols: List of sensor column names to normalise 
    
    Returns:
        Pipeline, list of (scaled_col, normalised_col) for us to extract value from
    """

    assemblers = []
    scalers = []
    normalised_cols = []

    
    for sensor in sensor_cols:
        #vector column for each sensor
        vec_col = f"{sensor}_vec"
        scaled_col = f"{sensor}_scaled_vec"
        normalised_col = f"{sensor}_normalized"
        
        #assembler to convert single column to vector
        assembler = VectorAssembler(
            inputCols=[sensor],
            outputCol=vec_col,
            handleInvalid="keep"
        )
    
        
        #scale to 0, 1
        scaler = MinMaxScaler(
            inputCol=vec_col,
            outputCol=scaled_col,
            min=0.0,
            max=1.0
        )
        
        assemblers.append(assembler)
        scalers.append(scaler)
        normalised_cols.append((scaled_col, normalised_col))

    # build pipeline stages
    pipeline_stages = []
    for assembler, scaler in zip(assemblers, scalers):
        pipeline_stages.extend([assembler, scaler])
    
    pipeline = Pipeline(stages=pipeline_stages)
    
    return pipeline, normalised_cols


def fit_and_transform(pipeline, df, normalised_cols):
    """
    Fit pipeline and transform data
    
    Returns:
        fitted_model, transformed_df
    """
    # Fit model
    model = pipeline.fit(df)
    
    # Transform
    transformed = model.transform(df)
    
    # Extract normalized values from vectors
    for scaled_col, normalized_col in normalised_cols:
        transformed = transformed.withColumn(
            normalised_col,
            vector_to_array(col(scaled_col))[0]
        )
    
    return model, transformed


## Read from silver container before building data normalisation

Requirement is to normalise to range 0 to 1 for each sensor

If we do a union of all sets and process, it will result in NULL values for dates that are earlier than some set's dates. This is because fill forward has a limitation where only if data exists, then it can fill forward

   i.e Sensors 1/2 has dates only on Jul 5th. If we include sensor 5 with sensors 1/2 then there will be no fill forward for sensors 1/2, only NULL values

   i.e Sensor 1 does not have values in June, so cannot fill forward




## Set with all sensors

In [0]:
all_silver_ffilled = spark.read \
                          .format("delta") \
                          .load(f"{silver_source}/ffill/all_sensors_ffill/")


## All sensor union pipeline + modelling

In [0]:
#get all sensors
sensor_cols = [c for c in all_silver_ffilled.columns if c != "timestamp"]

#build pipeline
pipeline_all, normalized_cols_all = build_normalization_pipeline(sensor_cols)

#fit and transform
normalisation_model_all, gold_normalised_all = fit_and_transform(
    pipeline_all, 
    all_silver_ffilled, 
    normalized_cols_all
)

In [0]:
final_cols = ["timestamp"] + sensor_cols + [nc for _, nc in normalised_cols]
gold_final = gold_normalised.select(final_cols)


## Reusability + saving DF and models

In [0]:
normalisation_model.write() \
                       .overwrite() \
                       .save(f"{gold}/all_sensors/models/")

In [0]:
gold_final.write \
          .mode("overwrite") \
          .format("delta") \
          .option("mergeSchema", "true").save(f"{gold}/all_sensors/tables/")


#### Business level data, we minimise null values from fill forward limitation

In [0]:
gold_sensor1 = gold_final.select("timestamp", 
                                 "sens_1", 
                                 "sens_1_normalized").filter(col("sens_1").isNotNull())

gold_sensor2 = gold_final.select("timestamp", 
                                 "sens_2", 
                                 "sens_2_normalized").filter(col("sens_2").isNotNull())

gold_sensor4 = gold_final.select("timestamp", 
                                 "sens_4", 
                                 "sens_4_normalized").filter(col("sens_4").isNotNull())

gold_sensor5 = gold_final.select("timestamp", 
                                 "sens_5", 
                                 "sens_5_normalized").filter(col("sens_5").isNotNull())

In [0]:
gold_sensor1.write \
            .mode("overwrite") \
            .format("delta") \
            .option("mergeSchema", "true").save(f"{gold}/sensor1/tables/")

gold_sensor2.write \
            .mode("overwrite") \
            .format("delta") \
            .option("mergeSchema", "true").save(f"{gold}/sensor2/tables/")

gold_sensor4.write \
            .mode("overwrite") \
            .format("delta") \
            .option("mergeSchema", "true").save(f"{gold}/sensor4/tables/")

gold_sensor5.write \
            .mode("overwrite") \
            .format("delta") \
            .option("mergeSchema", "true").save(f"{gold}/sensor5/tables/")