# **2 Prediction Pipeline**

---

## **2.1 Library**

In [1]:
import os
import time
import pyspark.sql.functions as F
import seaborn as sns

In [26]:
from kafka3 import KafkaAdminClient
from pyspark import SparkConf
from pyspark.sql import SparkSession,DataFrame,Window,window
from pyspark.sql.types import *
from pyspark.conf import SparkConf
from pyspark.ml import Pipeline,PipelineModel,Transformer,Estimator,Model
from pyspark.ml.feature import Imputer,Bucketizer
from pyspark.ml.param.shared import HasInputCols,HasOutputCol
from pyspark.ml.evaluation import Evaluator
from pyspark.ml.util import DefaultParamsReadable,DefaultParamsWritable

---

## **2.2 Session Creation**

In [None]:
## ----------------------------------------------------- Create system variable
os.environ['PYSPARK_SUBMIT_ARGS'] = (
    '--packages org.apache.spark:spark-streaming-kafka-'
    '0-10_2.12:3.5.0,org.apache.spark:spark-sql-kafka-'
    '0-10_2.12:3.5.0 pyspark-shell'
)
## ----------------------------------------------------- Params definition
topic = "stream_weather_data"
host_ip = "kafka"
master = "local[4]"
app_name = "data_transformation_pipeline"
## ----------------------------------------------------- Configuration
config = (
    SparkConf()
    .setMaster(value=master)    # Set master and number of cores to use
    .setAppName(value=app_name) # Set the application name
    .set("spark.sql.session.timeZone", "Australia/Melbourne") # Set the time zone
    .set("spark.executor.memory", "10g")
    .set("spark.driver.memory", "10g")
)
## ----------------------------------------------------- Creation
spark_session = SparkSession.builder.config(conf = config).getOrCreate()
## ----------------------------------------------------- Stream DataFrame
DF_weather_stream = (
    spark_session
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", f"{host_ip}:9092")
    .option("subscribe", topic)
    .option("startingOffsets", "latest")
    .option("maxOffsetsPerTrigger", 336)
    .load()
)
DF_weather_stream.printSchema()

---

## **2.3 Schema Definition**

In [None]:
## ----------------------------------------------------- Building Information
# Metadata:
# -------------------
# site_id     Integer
# building_id Integer
# primary_use String
# square_feet Integer
# floor_count Integer
# row_id      Integer
# year_built  Integer
# latent_y    Decimal
# latent_s    Decimal
# latent_r    Decimal
# -------------------
data_schema_build = StructType([
    StructField("site_id",     IntegerType()),
    StructField("building_id", IntegerType()),
    StructField("primary_use", StringType()),
    StructField("square_feet", IntegerType()),
    StructField("floor_count", IntegerType()),
    StructField("row_id",      IntegerType()),
    StructField("year_built",  IntegerType()),
    StructField("latent_y",    DecimalType()),
    StructField("latent_s",    DecimalType()),
    StructField("latent_r",    DecimalType())
])

---

## **2.4 Streaming Query and Information Loading**

### **2.4.1 Weather Stream Definition**

In [None]:
## ----------------------------------------------------- Weather Schema
# Explanation:
#   We send the data in the format:
#       {...:...}{...:...}{...:...}
#   Which can be represent by:
#           StructType(
#               StructField(name, type)
#           )
data_schema_weather = StructType([
        StructField("site_id",            StringType()),
        StructField("timestamp",          StringType()),
        StructField("air_temperature",    StringType()),
        StructField("cloud_coverage",     StringType()),
        StructField("dew_temperature",    StringType()),
        StructField("sea_level_pressure", StringType()),
        StructField("wind_direction",     StringType()),
        StructField("wind_speed",         StringType()),
        StructField("weather_ts",         IntegerType())
])
DF_weather_stream = (
    ## ------------------------------------------------- Extract DataFrame from Value
    DF_weather_stream
    .select(
        F.from_json(
            col = F.col("value").cast("string"),
            schema  = data_schema_weather,
            # I use FAILFAST mode to prevent hidden the error,
            # since we are only allow one type of input,
            # if there are multi type input type,
            # use mode = PERMISSIVE
            options = {"mode":"FAILFAST"}
        ).alias("array_value")
    )
    ## ------------------------------------------------- Expanding
    .select(
        F
        # The function of pyspark.functions.inline will will do two things:
        #   1. Explode element in ArrayType into multi lines
        #   2. Expand key - value pair into multi columns
        .inline("array_value")
    )
    .select(
        F.col("site_id").cast(IntegerType()).alias("site_id"),
        F.col("timestamp").cast(TimestampType()).alias("timestamp"),
        F.col("air_temperature").cast(DecimalType()).alias("air_temperature"),
        F.col("cloud_coverage").cast(DecimalType()).alias("cloud_coverage"),
        F.col("dew_temperature").cast(DecimalType()).alias("dew_temperature"),
        F.col("sea_level_pressure").cast(DecimalType()).alias("sea_level_pressure"),
        F.col("wind_direction").cast(DecimalType()).alias("wind_direction"),
        F.col("wind_speed").cast(DecimalType()).alias("wind_speed"),
        F.col("weather_ts").cast(TimestampType()).alias("weather_ts")
    )
)

### **2.4.2 Building Metadata**

In [None]:
## ----------------------------------------------------- Params Definition
file_path = "../DataStorage/BuildingMetadata/"
file_name = "new_building_information.csv"
## ----------------------------------------------------- DataFrame Read In
DF_building = spark_session.read.csv(
    path   = file_path + file_name,
    schema = data_schema_build,
    header = True
)

---

## **2.5 Watermark**

In [None]:
DF_weather_stream = (
    DF_weather_stream
    # For my understanding,
    # since there could be some late message,
    # if without watermark,
    # spark stream may process the corresponding batch over and over again,
    # this will
    #   1. Increase the workload
    #   2. Effect the result
    .withWatermark(eventTime="weather_ts", delayThreshold="5 seconds") # 5 seconds as threshold
)

---

## **2.6 Transformation Pipeline**

### **2.6.1 Weather Pipeline**

#### **2.6.1.1 Static Data**

In [None]:
## ============================================================================================= Implementation
## --------------------------------------------------------------------------------------------- Reading Data
filepath = "../dataset/"
filename = "weather.csv"

schema_weather = StructType([
    StructField("site_id",            IntegerType()),
    StructField("timestamp",          TimestampType()),
    StructField("air_temperature",    DecimalType()),
    StructField("cloud_coverage",     DecimalType()),
    StructField("dew_temperature",    DecimalType()),
    StructField("sea_level_pressure", DecimalType()),
    StructField("wind_direction",     DecimalType()),
    StructField("wind_speed",         DecimalType())
])

DF_weather = spark_session.read.csv(filepath+filename,schema=schema_weather, header=True)

#### **2.6.1.2 Imputation**

In [None]:
## ============================================================================================= Implementation
## --------------------------------------------------------------------------------------------- Params Definition
target_table = DF_weather
imputation_strategy = "mean"
input_cols_impute  = list(set(DF_weather.columns) - {"site_id", "timestamp", "weather_ts"})
output_cols_impute = [f"{_name}_impute" for _name in input_cols_impute]
## ============================================================================================= Pipeline List Update
imputer = Imputer(
    strategy     = imputation_strategy,
    inputCols    = input_cols_impute,
    outputCols   = output_cols_impute
)
pipeline_list01 = [imputer]

#### **2.6.1.3 Time Aggregation**

In [None]:
## ============================================================================================= Explanation
# About Transformer:
#   This is the meta abstract class for every transformer,
#   it requires every new transformer class to implement method _transform.
#   I learn this from:
#       https://github.com/apache/spark/blob/master/python/pyspark/ml/base.py
#   And also, the father class for Transformer is Params,
#   which implement the method self._set,
#   return of it is the class itself,
#   Refer to:
#       https://github.com/apache/spark/blob/master/python/pyspark/ml/param/__init__.py
# About DefaultParamsReadable and DefaultParamsWritable:
#   To save our pipeline model,
#   we need to herit these two classes,
#   Refer to:
#       DefaultParamsReadable:
#       https://archive.apache.org/dist/spark/docs/3.5.0/api/python/_modules/pyspark/ml/util.html#DefaultParamsReadable
#       DefaultParamsRWritable:
#       https://archive.apache.org/dist/spark/docs/3.5.0/api/python/_modules/pyspark/ml/util.html#DefaultParamsWritable
## ============================================================================================= Implementation
class TimeAggregator(
    Transformer,
    DefaultParamsReadable,
    DefaultParamsWritable
):

    @staticmethod
    def _transform(
            dataset:DataFrame
    ) -> DataFrame:
        """ Initialize the transformer """
        _tmp = (
            dataset
            ## --------------------------------------------------------------------------------- Time Extraction
            .withColumns({
                "year" : F.year(F.col("timestamp")).cast("integer"),
                "month": F.month(F.col("timestamp")).cast("integer"),
                "day"  : F.day(F.col("timestamp")).cast("integer"),
                "hour" : F.hour(F.col("timestamp")).cast("integer")
            })
            ## --------------------------------------------------------------------------------- Binning
            .withColumn(
                colName = "start_hour",
                col     = F.when(condition = (F.col("hour") >=  0) & (F.col("hour") <   6), value =  0).
                            when(condition = (F.col("hour") >=  6) & (F.col("hour") <  12), value =  6).
                            when(condition = (F.col("hour") >= 12) & (F.col("hour") <  18), value = 12).
                            when(condition = (F.col("hour") >= 18) , value = 18).
                            otherwise(value = None)
            )
        )

        if "weather_ts" in _tmp.columns:
            _tmp = (
                _tmp
                ## --------------------------------------------------------------------------------- Aggregation
                .groupBy(F.col("site_id"),F.col("year"),
                         F.col("month"),F.col("day"), F.col("start_hour"),
                         F.window(F.col("weather_ts"), "7 seconds"))
            )
        else:
            _tmp = (
                _tmp
                ## --------------------------------------------------------------------------------- Aggregation
                .groupBy(F.col("site_id"), F.col("year"),
                         F.col("month"), F.col("day"), F.col("start_hour"))
            )

        return (
            _tmp
            .agg(
                F.mean(F.col("air_temperature_impute")).alias("avg_air_temperature_impute"),
                F.mean(F.col("cloud_coverage_impute")).alias("avg_cloud_coverage_impute"),
                F.mean(F.col("dew_temperature_impute")).alias("avg_dew_temperature_impute"),
                F.mean(F.col("sea_level_pressure_impute")).alias("avg_sea_level_pressure_impute"),
                F.mean(F.col("wind_direction_impute")).alias("avg_wind_direction_impute"),
                F.mean(F.col("wind_speed_impute")).alias("avg_wind_speed_impute"),
            )
        )
## ============================================================================================= Pipeline List Update
time_aggregator = TimeAggregator()
pipeline_list02 = pipeline_list01 + [time_aggregator]

#### **2.6.1.4 Quantile Extraction**

In [None]:
## ============================================================================================= Implementation
def quantile_bucket_splits(
        targetDF:DataFrame,
        targetCol:str,
        bucketRange:tuple,
        quantileList:list=None
) -> list:
    """
    Get the quantile bucket splits.
    :param targetDF: Target DataFrame (type:pyspark.sql.DataFrame)
    :param targetCol: Target feature (type:str)
    :param bucketRange: The bucket value range, e.g., greater than 35 -> (35,None) (type:tuple)
    :param quantileList: The quantile list, default is 5 equal quantile bucketing (type:list)
    :return: The bucket break point list (type:list)
    """

    if quantileList is None and quantileList != []:
        quantileList = [.2,.4,.6,.8]

    targetFeature = targetDF.select(F.col(targetCol).cast("double").alias(targetCol))

    ## ----------------------------------------------------------------------------------------- Data Splitting
    if bucketRange[0] is None and bucketRange[1] is not None:
        # Pattern like [None,breakPoint]
        breakPoint = bucketRange[1]
        targetRange = targetFeature.filter(F.col(targetCol) <= breakPoint)
        remainRange = targetFeature.filter(F.col(targetCol) >  breakPoint)
    elif bucketRange[1] is None and bucketRange[0] is not None:
        # Pattern like [breakPoint,None]
        breakPoint = bucketRange[0]
        targetRange = targetFeature.filter(F.col(targetCol) >= breakPoint)
        remainRange = targetFeature.filter(F.col(targetCol) <  breakPoint)
    else:
        raise ValueError("The range is invalid, please check your range.")

    ## ----------------------------------------------------------------------------------------- Quantile Part
    if quantileList != []:
        # Which means bucket by quantile
        quantileBucket = list(targetRange\
            .toPandas()\
            .quantile(quantileList)\
            .iloc[:,0]) # Get the point list
    else:
        # Which means bucket by specific value
        quantileBucket = [breakPoint]

    ## ----------------------------------------------------------------------------------------- Normal Part
    # The normal part
    normalBucket = list(remainRange\
        .distinct()\
        .toPandas()\
        .iloc[:,0])

    bucketSplits = [float(_element) for _element in normalBucket + quantileBucket]
    bucketSplits.sort()

    return [-float("inf")] + bucketSplits + [float("inf")]

## ============================================================================================= Extraction
## --------------------------------------------------------------------------------------------- Parameter Definition
target_col = "avg_cloud_coverage_impute"
bucket_range = (5,None)
quantile_list = [0.5]
## --------------------------------------------------------------------------------------------- Cleaning
tmp_pipeline = Pipeline(stages = pipeline_list01 + [TimeAggregator()])
tmp_DF = tmp_pipeline.fit(dataset = DF_weather).transform(dataset = DF_weather)
## --------------------------------------------------------------------------------------------- Extracting
quantile_cloud_coverage = quantile_bucket_splits(
    targetDF     = tmp_DF,
    targetCol    = target_col,
    bucketRange  = bucket_range,
    quantileList = quantile_list
)

#### **2.6.1.5 Peak Month Assigning**

In [None]:
## ============================================================================================= Explanation
## --------------------------------------------------------------------------------------------- Resource
# Why I define two classes:
#   I want to use pyspark.ml.Pipeline,
#   when calling fit, it will call ._fit function for estimator
#   and ._transform for transformer,
#   therefore, I need to define two class for different stages
#   Refer to:
#       https://archive.apache.org/dist/spark/docs/3.5.0/api/python/_modules/pyspark/ml/pipeline.html#Pipeline
#   To be specific, Model class just another transformer class,
#   Refer to:
#       https://archive.apache.org/dist/spark/docs/3.5.0/api/python/_modules/pyspark/ml/base.html#Model
## --------------------------------------------------------------------------------------------- Design
# I first group the data by site_id and month,
# then calculate the mean of the air_temperature in each group.
# Then I define two pyspark.sql.Window,
# all partitioning the data by site_id,
# and sort the data in descending and ascending order.
# Now, the index for each month is the rank for the highest and lowest month.
# Finally, assign True to is_peak of those month that are the top 5 rank of highest or lowest month,
# and False to the other.
## ============================================================================================= Implementation
class PeakMonthExtractorModel(
    Model,
    DefaultParamsReadable,
    DefaultParamsWritable
):
    def __init__(self,isPeakDF:DataFrame):
        super().__init__()
        self._is_peak_DF = isPeakDF

    def _transform(
            self,
            dataset:DataFrame
    ) -> DataFrame:
        """
        Mark the peak month in DF.
        """
        ## ------------------------------------------------------------------------------------- Assigning
        return self._is_peak_DF.join(
            other = dataset,
            on    = ["site_id","month"],
            how   = "inner"
        )

class PeakMonthExtractor(
    Estimator,
    DefaultParamsReadable,
    DefaultParamsWritable
):
    @staticmethod
    def _fit(
            dataset:DataFrame
    ) -> DataFrame:
        """
        Mark the peak month in DF.
        :param DF: target table (type:spark.sql.DataFrame)
        :return: result (type:spark.sql.DataFrame)
        """
        ## ------------------------------------------------------------------------------------- Window Definition
        window_asce = Window.partitionBy(["site_id"]).orderBy("temp")         # Highest month  ascend order
        window_desc = Window.partitionBy(["site_id"]).orderBy(F.desc("temp")) # Lowest month:  descend order

        ## ------------------------------------------------------------------------------------- Peak Month Extraction
        _is_peak_DF = (
            dataset.
            select(["site_id","month","avg_air_temperature_impute"]).
            groupBy(["site_id","month"]).
            agg(F.mean(F.col("avg_air_temperature_impute")).alias("temp")).
            withColumns({
                "highest": F.row_number().over(window_asce),
                "lowest":  F.row_number().over(window_desc)
            }).
            withColumn(
                colName = "is_peak",
                col     = F.when(condition=(F.col("highest") <= 3) | (F.col("lowest") <= 3), value=1).
                            otherwise(value=0)
            ).
            drop("highest").
            drop("lowest").
            drop("temp")
        )

        ## ------------------------------------------------------------------------------------- Initialization Model
        return PeakMonthExtractorModel(isPeakDF=_is_peak_DF)

## ============================================================================================= Pipeline List Update
peak_month_extractor = PeakMonthExtractor()
pipeline_list03 = pipeline_list02 + [peak_month_extractor]

#### **2.6.1.6 Bucketizer**

In [None]:
## ============================================================================================= Implementation
## --------------------------------------------------------------------------------------------- Params definition
input_cols_bucket  = "avg_cloud_coverage_impute"
output_cols_bucket = f"{input_cols_bucket}_bucket"
## ============================================================================================= Pipeline List Update
bucketer = Bucketizer(
    inputCol  = input_cols_bucket,
    outputCol = output_cols_bucket,
    splits    = quantile_cloud_coverage
)
pipeline_list04 = pipeline_list03 + [bucketer]

#### **2.6.1.7 Main Pipeline**

In [None]:
data_transformation_model_weather = Pipeline(stages = pipeline_list04).fit(dataset = DF_weather)

#### **2.6.1.8 Transformation and Cleanning**

In [None]:
DF_weather_final =  (
    data_transformation_model_weather.transform(DF_weather_stream).
    drop("avg_cloud_coverage_impute")
)

### **2.6.2 Building Metadata**

#### **2.6.2.1 Class Combination**

In [None]:
## ============================================================================================= Implementation
## --------------------------------------------------------------------------------------------- Transformer Definition
def class_combination(targetDF:DataFrame):
    return (
        targetDF.
        withColumns({
            ## Feature creation and transformation
            "primary_use_modify": F.when((F.col("primary_use") == "Warehouse") |
                                         (F.col("primary_use") == "Services") |
                                         (F.col("primary_use") == "Parking"),
                                         "Education")\
                                   .when((F.col("primary_use") != "Community") &
                                         (F.col("primary_use") != "Entertainment") &
                                         (F.col("primary_use") != "Residential") &
                                         (F.col("primary_use") != "Office") &
                                         (F.col("primary_use") != "Education") &
                                         (F.col("primary_use") != "Warehouse") &
                                         (F.col("primary_use") != "Services") &
                                         (F.col("primary_use") != "Parking"),
                                         "Residential")\
                                    .otherwise(F.col("primary_use")),
        }).
        drop("primary_use")
    )

## --------------------------------------------------------------------------------------------- Transforming
DF_building02 = class_combination(targetDF=DF_building)

#### **2.6.2.2 Bucketizing**

In [None]:
## ============================================================================================= Implementation
## --------------------------------------------------------------------------------------------- Params definition
# floor_count
floor_count_bucket = [-float("inf"),1.1,float("inf")]

# latent_s
latent_s_bucket = [-float("inf"),4.5,float("inf")]

# latent_y
latent_y_bucket = quantile_bucket_splits(
    targetDF    = DF_building,
    targetCol   = "latent_y",
    bucketRange = (35,None)
)

# Bucketizing params
input_cols_bucket  = ["floor_count","latent_s","latent_y"]
output_cols_bucket = [f"{_col}_bucket" for _col in input_cols_bucket]
splits = [floor_count_bucket,latent_s_bucket,latent_y_bucket]
## --------------------------------------------------------------------------------------------- Bucketizing
bucketer_building = Bucketizer(
    inputCols   = input_cols_bucket,
    outputCols  = output_cols_bucket,
    splitsArray = splits
)
DF_building_final = (
    bucketer_building.transform(dataset=DF_building02).
    withColumns({
        "is_square_feet_4e05_6e05": F.when(
            condition=(F.col("square_feet")>4e05) & (F.col("square_feet")<=6e05),
            value=1
        ).otherwise(value=0),
        "is_square_feet_6e05_inf": F.when(
            condition=(F.col("square_feet")>6e05),
            value=1
        ).otherwise(value=0),
        "square_feet_bucket": F.when(
            condition=F.col("square_feet")>4e05,
            value=4e05
        ).otherwise(value=F.col("square_feet"))
    }).
    drop("year_use").
    drop("latent_s").
    drop("latent_y").
    drop("square_feet").
    drop("floor_count").
    drop("avg_cloud_coverage_impute").
    drop("floor_count").
    drop("latent_s").
    drop("latent_y").
    drop("row_id")
)

### **2.6.3 Joining and Further Transformation**

In [None]:
## ============================================================================================= Implementation
## --------------------------------------------------------------------------------------------- Joining
DF_join = DF_weather_final.join(
    other = DF_building_final,
    on    = "site_id",
    how   = "inner"
)
## --------------------------------------------------------------------------------------------- Final Features Extraction
DF_final = (
    DF_join.
    withColumns({
        "year_use": F.col("year") - F.col("year_built"),
        "year_use_bucket": F.when(F.col("year_use") > 57, 58).otherwise(F.col("year_use"))
    }).
    drop("year").
    drop("year_use").
    drop("year_built")
)

---

## **2.7 Prediction and Aggregation**

### **2.7.1 Loading Pipeline And Model**

In [None]:
## ============================================================================================= Implementation
## --------------------------------------------------------------------------------------------- Evaluator
class RMSLE(Evaluator, DefaultParamsReadable, DefaultParamsWritable):

    def __init__(self, predictionCol:str="value_pred", labelCol:str="value") -> None:
        super(RMSLE,self).__init__()
        self.predictionCol = predictionCol
        self.labelCol = labelCol

    def _evaluate(self, dataset) -> float:
        """
        Define the RMSLE metrics.
        :param dataset: DataStorage contain labels and predictions (type: pyspark.sql.DataFrame)
        :return: Metrics (type: float)
        """
        return (
            dataset.
            select(
                # RMSLE =
                # sqrt(mean((log(y_pred + 1) - log(y_true + 1))^2))
                F.sqrt(
                    F.mean(
                        F.power(
                            F.log(F.col(self.labelCol) + 1) - F.log(F.col(self.predictionCol) + 1),
                            F.lit(2)
                ))).alias("RMSLE")
            # .collect will return a list which looks like,
            # [ROW(...)] and [0][0] means the first element in the first column of the first row
            ).collect()[0][0]
        )

    def isLargerBetter(self) -> bool:
        """
        Is the metrics larger is good.
        :return: False
        """
        return False
## --------------------------------------------------------------------------------------------- Pipeline Model
model = PipelineModel.load("../fine_tuning_step_one")
## --------------------------------------------------------------------------------------------- Prediction
DF_pred_final = (
    model.transform(dataset = DF_final)
    .withColumns({
        "start_weather_ts": F.col("window.start").cast("int"),
        "end_weather_ts": F.col("window.end").cast("int"),
        "value_pred_adjust": F.when(F.col("value_pred") < 0, 0).otherwise(F.col("value_pred"))
    })
)
## --------------------------------------------------------------------------------------------- Stream manager class
class QueryStream:
    def __init__(self, targetStreamDF:DataFrame, sparkSession:SparkSession):
        self._check_point_path = None
        self._parquet_path = None
        self._trigger_time = None
        self._DF = targetStreamDF
        self._is_called = False
        self._is_stream = False
        self._is_memory:bool = False
        self._is_parquet:bool = False
        self._query = None
        self._spark_session = sparkSession

    def _create_stream(self) -> None:
        """ Internal function to create a stream"""
        if not self._is_called:
            raise ValueError("Do not call internal function directly")

        if self._is_memory:
            # Using memory
            print("Using memory mode")
            self._query = (
                self._DF
                .writeStream
                .outputMode("append")
                # Why not use foreachBatch:
                #   For minibatch mode,
                #   there could be a situation that spark process
                #   a single batch for multiple times,
                #   foreachBatch can only guarantee each batch
                #   being process 'at least once',
                #   however, could not guarantee exactly once,
                #   The original query mode can guarantee exactly once,
                # There is a way to implement exactly once when using foreachBatch,
                # using batch ID to implement roll back or skip process manually
                .format("memory")
                # .foreachBatch(for_each_batch_function)
                .trigger(processingTime=f"{self._trigger_time} seconds")
                # The different watermark strategy under each mode:
                #   1. Append: Only output the data when the window event-time are all later than watermark,
                #   2. Update: Output the data once the process is done, will update processed batch when new data arrive,
                #              will delete the status when the event time for a batch is later than watermark threshold
                #   3. Complete: Will not delete the status, and will process the data everytime new data arrive,
                #                or to say, watermark is a little bit useless for this mode
                .queryName("test")
            )
        elif self._is_parquet:
            # Using parquet
            print("Using parquet mode")
            self._query = (
                self._DF
                .writeStream
                .option("checkpointLocation", self._check_point_path)
                .format("parquet")
                .trigger(processingTime=f"{self._trigger_time} seconds")
                .outputMode("append")
                .option("path", self._parquet_path)
            )
        else:
            raise ValueError("Please set a single mode")

    def start_stream(
            self,
            isMemory:bool  = None,
            isParquet:bool = None,
            triggerTime:int|str  = 7,
            parquetPath:str|None = None,
            checkPointPath:str|None = None
    ):
        """ Function to start the stream"""
        try:
            # Mode
            if self._is_stream:
                raise ValueError("The stream is on, please stop first.")
            else:
                if self._query is None:
                    if isMemory and isParquet:
                        raise ValueError("Please select a single mode")
                    if not isMemory and not isParquet and not self._is_memory and not self._is_parquet:
                        raise ValueError("Please select at least one mode")
                    if isMemory:
                        self._is_memory = isMemory
                    else:
                        self._is_parquet = isParquet

                    # Trigger time
                    triggerTime = int(triggerTime)
                    if isinstance(triggerTime, int) and triggerTime > 0:
                        # Trigger time should be a positive integer
                        self._trigger_time = str(triggerTime)

                    # Parquet path
                    if isinstance(parquetPath, str):
                        self._parquet_path = parquetPath
                    elif parquetPath is None and self._parquet_path is None:
                        self._parquet_path = "../parquet/weather"

                    # Checkpoint path
                    if isinstance(checkPointPath, str):
                        self._check_point_path = parquetPath
                    elif checkPointPath is None and self._check_point_path is None:
                        self._check_point_path = "../checkpoint"

                    # Create a query
                    self._is_called = True
                    self._create_stream()
                    self._is_stream = True
                    self._is_called = False
                self._query = self._query.start()
                return self._query
        except ValueError as e:
            print("Error when open the stream:")
            print("---------------------------")
            print(str(e))

    def end_stream(self) -> None:
        """ End stream """
        try:
            if self._is_stream:
                self._query.stop()
                self._query = None
                self._is_stream = False
                print("Stream has been stop")
            else:
                raise ValueError("The stream has been already stopped")
        except ValueError as e:
            print("Error when end the stream:")
            print("--------------------------")
            print(str(e))

    def get_data(self) -> DataFrame:
        """ Get the current data """
        try:
            if not self._is_memory:
                raise ValueError("You could only get the data in memory mode")
            return self._spark_session.sql("SELECT * FROM test")
        except ValueError as e:
            print("Error when get the data:")
            print("------------------------")
            print(str(e))

### **2.7.2 Debug**

In [21]:
## ============================================================================================= Explanation
# This part is for debug and help me understand the input rate for each turn
## ============================================================================================= Implementation
## --------------------------------------------------------------------------------------------- Query Creation
trigger_time = 5
stream_manager = QueryStream(targetStreamDF=DF_pred_final, sparkSession=spark_session)
query = stream_manager.start_stream(isMemory=True, triggerTime=trigger_time)
current_end = None
start = False
i = 0
## --------------------------------------------------------------------------------------------- Streaming Output
try:
    while start:
        print("waiting...")
        time.sleep(trigger_time)
        print("sending...")
        progress = query.lastProgress["sources"]
        if progress:
            print("-"*50)
            print(f"Turn {i}")
            print(
                f"\tstartOffset:            {progress[0]['startOffset']}\n"
                f"\tendOffset:              {progress[0]['endOffset']}\n"
                f"\tnumInputRows:           {progress[0]['numInputRows']}\n"
                f"\tinputRowsPerSecond:     {progress[0]['inputRowsPerSecond']}\n"
                f"\tprocessedRowsPerSecond: {progress[0]['processedRowsPerSecond']}"
            )
        else:
            print(f"\tNo progress")
        i += 1
except Exception as e:
    print("Error when reading DF:")
    print(str(e))
finally:
    stream_manager.end_stream()

Using memory mode
Stream has been stop


### **2.7.3 Aggregation**

#### **2.7.3.1 Hourly Load Forecasting for Energy Consumption**

In [None]:
## ============================================================================================= Implementation
## --------------------------------------------------------------------------------------------- DataFrame Definition
DF_prediction = (
    DF_pred_final
    .select("window","month","day","site_id","building_id","start_hour","value_pred_adjust")
)
## --------------------------------------------------------------------------------------------- Query Creation
trigger_time = 5
stream_manager = QueryStream(targetStreamDF=DF_prediction, sparkSession=spark_session)
query = stream_manager.start_stream(isMemory=True, triggerTime=trigger_time)
current_end = None
i = 0
start = True
## --------------------------------------------------------------------------------------------- Streaming Output
try:
    while True:
        print("waiting...")
        time.sleep(trigger_time)
        print("sending...")
        progress = query.lastProgress["sources"]
        if progress and start:
            # Debug message
            print("="*50)
            print(f"Turn {i}")
            print("Debug" + "-" * 40)
            print(
                f"\tstartOffset:            {progress[0]['startOffset']}\n"
                f"\tendOffset:              {progress[0]['endOffset']}\n"
                f"\tnumInputRows:           {progress[0]['numInputRows']}\n"
                f"\tinputRowsPerSecond:     {progress[0]['inputRowsPerSecond']}\n"
                f"\tprocessedRowsPerSecond: {progress[0]['processedRowsPerSecond']}"
            )
            print("Result" + "-" * 41)
            print("First 5")
            print(
                stream_manager
                .get_data()
                .orderBy(
                    F.col("window.start"),F.col("site_id"),
                    F.col("building_id"), F.col("start_hour")
                )
                .select("window.start","site_id","building_id","start_hour","value_pred_adjust")
                .show(5)
            )
            print("Last 5")
            print(
                stream_manager
                .get_data()
                .orderBy(
                    F.col("window.start").desc(),F.col("site_id").desc(),
                    F.col("building_id").desc(), F.col("start_hour").desc()
                )
                .select("window.start","site_id","building_id","start_hour","value_pred_adjust")
                .show(5)
            )
        else:
            print(f"\tNo progress")
        i += 1
except Exception as e:
    print("Error when reading DF:")
    print(str(e))
finally:
    stream_manager.end_stream()

#### **2.7.3.2 Weekly Load Forecasting for Energy Consumption**

In [None]:
## ============================================================================================= Implementation
## --------------------------------------------------------------------------------------------- DataFrame Definition
DF_weekly = (
    DF_pred_final
    .groupBy("window","building_id","start_hour")
    .agg(F.sum("value_pred_adjust").alias("total_value_pred_adjust"))
)
## --------------------------------------------------------------------------------------------- Query Creation
trigger_time = 7
stream_manager = QueryStream(targetStreamDF=DF_weekly, sparkSession=spark_session)
query = stream_manager.start_stream(isMemory=True, triggerTime=trigger_time)
current_end = None
i = 0
start = True
## --------------------------------------------------------------------------------------------- Streaming Output
try:
    while True:
        print("waiting...")
        time.sleep(trigger_time)
        print("sending...")
        progress = query.lastProgress["sources"]
        if progress and start:
            # Debug message
            print("="*50)
            print(f"Turn {i}")
            print("Debug" + "-" * 40)
            print(
                f"\tstartOffset:            {progress[0]['startOffset']}\n"
                f"\tendOffset:              {progress[0]['endOffset']}\n"
                f"\tnumInputRows:           {progress[0]['numInputRows']}\n"
                f"\tinputRowsPerSecond:     {progress[0]['inputRowsPerSecond']}\n"
                f"\tprocessedRowsPerSecond: {progress[0]['processedRowsPerSecond']}"
            )
            print("Result" + "-" * 41)
            print("First 5 weeks")

            print(
                stream_manager
                .get_data()
                .orderBy(F.col("window.start"),F.col("building_id"),F.col("start_hour"))
                .select(F.col("window.start"),"building_id","start_hour","total_value_pred_adjust")
                .show(5)
            )
            print("Last 5 weeks")
            print(
                stream_manager
                .get_data()
                .orderBy(F.col("window.start").desc(),F.col("building_id").desc(),F.col("start_hour"))
                .select(F.col("window.start"),"building_id","start_hour","total_value_pred_adjust")
                .show(5)
            )
        else:
            print(f"\tNo progress")
        i += 1
except Exception as e:
    print("Error when reading DF:")
    print(str(e))
finally:
    stream_manager.end_stream()

#### **2.7.3.3 Daily Load Forecasting for Energy Consumption**

In [None]:
## ============================================================================================= Implementation
DF_daily = (
    DF_pred_final
    .groupBy(F.window("window","14 second"),"site_id","start_hour")
    .agg(F.mean("value_pred_adjust").alias("mean_value_pred_adjust"))
    .groupBy("window","site_id")
    .agg(F.mean("mean_value_pred_adjust").alias("daily_value_pred_adjust"))
)
## --------------------------------------------------------------------------------------------- Query Creation
trigger_time = 14
stream_manager = QueryStream(targetStreamDF=DF_daily, sparkSession=spark_session)
query = stream_manager.start_stream(isMemory=True, triggerTime=trigger_time)
current_end = None
i = 0
start = True
## --------------------------------------------------------------------------------------------- Streaming Output
try:
    while True:
        print("waiting...")
        time.sleep(trigger_time)
        print("sending...")
        progress = query.lastProgress["sources"]
        if progress and start:
            # Debug message
            print("="*50)
            print(f"Turn {i}")
            print("Debug" + "-" * 40)
            print(
                f"\tstartOffset:            {progress[0]['startOffset']}\n"
                f"\tendOffset:              {progress[0]['endOffset']}\n"
                f"\tnumInputRows:           {progress[0]['numInputRows']}\n"
                f"\tinputRowsPerSecond:     {progress[0]['inputRowsPerSecond']}\n"
                f"\tprocessedRowsPerSecond: {progress[0]['processedRowsPerSecond']}"
            )
            print("Result" + "-" * 41)
            print(
                stream_manager
                .get_data()
                .orderBy("window.start","site_id")
                .select(
                    F.col("window.start").cast("int").alias("start"),
                    F.col("window.end").cast("int").alias("end"),
                    "site_id","daily_value_pred_adjust"
                )
                .show(5)
            )
        else:
            print(f"\tNo progress")
        i += 1
except Exception as e:
    print("Error when reading DF:")
    print(str(e))
finally:
    stream_manager.end_stream()

### **2.7.4 Data Persistence**

In [None]:
## ============================================================================================= Definition
DF_prediction = (
    DF_pred_final
    .select("window","month","day","site_id","building_id","start_hour","value_pred_adjust")
)
DF_weekly = (
    DF_pred_final
    .groupBy("window","building_id","start_hour")
    .agg(F.sum("value_pred_adjust").alias("total_value_pred_adjust"))
)
DF_daily = (
    DF_pred_final
    .groupBy(F.window("window","14 second"),"site_id","start_hour")
    .agg(F.mean("value_pred_adjust").alias("mean_value_pred_adjust"))
    .groupBy("window","site_id")
    .agg(F.mean("mean_value_pred_adjust").alias("daily_value_pred_adjust"))
)
## ============================================================================================= Start Stream
## --------------------------------------------------------------------------------------------- Hourly
stream_manager01 = QueryStream(
    targetStreamDF = DF_prediction,
    sparkSession   = spark_session,
)
_ = stream_manager01.start_stream(isParquet=True, parquetPath="../DataStorage/EnergyConsumption/Prediction/hourly", checkPointPath="../CheckPoint/writing/hourly")
## --------------------------------------------------------------------------------------------- Weekly
stream_manager02 = QueryStream(
    targetStreamDF = DF_weekly,
    sparkSession   = spark_session,
)
_ = stream_manager02.start_stream(isParquet=True, parquetPath="../DataStorage/EnergyConsumption/Prediction/weekly", checkPointPath="../CheckPoint/writing/weekly")
## --------------------------------------------------------------------------------------------- Daily
stream_manager03 = QueryStream(
    targetStreamDF = DF_daily,
    sparkSession   = spark_session,
)
_ = stream_manager03.start_stream(isParquet=True, parquetPath="../DataStorage/EnergyConsumption/Prediction/daily", checkPointPath="../CheckPoint/writing/daily")

In [None]:
## ============================================================================================= End Stream
stream_manager01.end_stream()
stream_manager02.end_stream()
stream_manager03.end_stream()

### **2.7.5 Dashboard Producer**

In [None]:
## ============================================================================================= Schema Definition
## --------------------------------------------------------------------------------------------- Daily
daily_schema = StructType([
    StructField("window", StructType([
            StructField("start",TimestampType()),
            StructField("end"  ,TimestampType())
        ])),
    StructField("site_id", IntegerType()),
    StructField("daily_value_pred_adjust", DoubleType())
])
## --------------------------------------------------------------------------------------------- Weekly
weekly_schema = StructType([
    StructField("window", StructType([
        StructField("start", TimestampType()),
        StructField("end",   TimestampType())
    ])),
    StructField("building_id",             IntegerType()),
    StructField("start_hour",              IntegerType()),
    StructField("total_value_pred_adjust", DoubleType())
])
## --------------------------------------------------------------------------------------------- Hourly
prediction_schema = StructType([
    StructField("window", StructType([
        StructField("start", TimestampType()),
        StructField("end",   TimestampType())
    ])),
    StructField("site_id",             IntegerType()),
    StructField("building_id",         IntegerType()),
    StructField("start_hour",          IntegerType()),
    StructField("day",                 IntegerType()),
    StructField("month",               IntegerType()),
    StructField("value_pred_adjust",   DoubleType())
])
## ============================================================================================= Loading
## --------------------------------------------------------------------------------------------- Function
def get_stream_parquet(
        folderName:str,
        schema:StructType,
        sparkSession:SparkSession
) -> DataFrame:
    """ Get the stream DataFrame """
    return (
        sparkSession
        .readStream
        .format("parquet")
        .schema(schema)
        .load("../DataStorage/EnergyConsumption/Prediction/" + folderName)
    )
## --------------------------------------------------------------------------------------------- Daily
daily_stream = get_stream_parquet(
    folderName = "daily/",
    schema     = daily_schema,
    sparkSession = spark_session
)
## --------------------------------------------------------------------------------------------- Weekly
weekly_stream = get_stream_parquet(
    folderName = "weekly/",
    schema     = weekly_schema,
    sparkSession = spark_session
)
## --------------------------------------------------------------------------------------------- Hourly
prediction_stream = get_stream_parquet(
    folderName = "hourly/",
    schema     = prediction_schema,
    sparkSession = spark_session
)

In [None]:
## ============================================================================================= Sender Definition
## --------------------------------------------------------------------------------------------- Transformation Function
def get_sending_stream(streamDF:DataFrame, targetKey:str) -> DataFrame:
    """ Cast DataFrame to string """
    return (
        streamDF
        .selectExpr("to_json(struct(*)) AS value")
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "kafka:9092")
        .option("topic", f"prediction_{targetKey}")
        .option("checkpointLocation", "../CheckPoint/loading/" + targetKey)
        .outputMode("append")
    )
## --------------------------------------------------------------------------------------------- Hourly
prediction_query = get_sending_stream(streamDF=prediction_stream,targetKey="hourly")
## --------------------------------------------------------------------------------------------- Weekly
weekly_query = get_sending_stream(streamDF=weekly_stream,targetKey="weekly")
## --------------------------------------------------------------------------------------------- Daily
daily_query = get_sending_stream(streamDF=daily_stream,targetKey="daily")

In [None]:
## ============================================================================================= Sending
## --------------------------------------------------------------------------------------------- Hourly
prediction_query_start = prediction_query.start()
## --------------------------------------------------------------------------------------------- Weekly
weekly_query_start = weekly_query.start()
## --------------------------------------------------------------------------------------------- Daily
daily_query_start = daily_query.start()

In [None]:
## ============================================================================================= Stopping
## --------------------------------------------------------------------------------------------- Hourly
prediction_query_start.stop()
## --------------------------------------------------------------------------------------------- Weekly
weekly_query_start.stop()
## --------------------------------------------------------------------------------------------- Daily
daily_query_start.stop()