# ETL

La primera columna "node" contiene el nombre del nodo en el que se toman los datos (eg c7102). Para los datos de las enfriadoras, aparecen en esta columna como "1" y "2" (enfriadora 1, enfriadora 2 respectivamente) para los datos de temperatura y presión y como "basement" para los datos de consumo (la enfriadora 1 se corresponde con la columna "Power13" y la 2 con "Power14").

La segunda y tercera columna incluyen los rangos de fecha para las series temporales de esa fila. Como se indicó antes, de 2018/01/01 hasta 2021/01/06.

La columna 4 "power": incluye los consumos de los nodos (ie node="cxxxx"). En cada celda se encuentra una serie temporal en formato diccionario en el que la key se corresponde con el timestamp del dato.

Las columnas 5 y 6 'Power13', 'Power14' se corresponden con el consumo de las enfriadoras (medido en los cuadros PM13 PM14) como se indicó en la explicación de la columna nodo. Solo debería tener datos para node="basement". En cada celda se encuentra una serie temporal en formato diccionario en el que la key se corresponde con el timestamp del dato.

Las columnas de la 7 a 10 :'in' -(free cooling)-> 'evaporator' -(compresores)-> 'out'. Y  'ambient' sería la temperatura externa. Estas incluyen las medidas de las temperaturas en las enfriadoras (respectivamente de entrada, salida, en el evaporador y ambiente). Solo debería tener datos para node="1" o "2" dependiendo de que enfriadora se trate. En cada celda se encuentra una serie temporal en formato diccionario en el que la key se corresponde con el timestamp del dato.

Las columnas 11, 12: 'Compressor1', 'Compressor2' son equivalentes al punto 
anterior pero con los datos de presión en cada uno de los dos compresores de cada enfriadora. Solo debería tener datos para node="1" o "2" dependiendo de que enfriadora se trate. En cada celda se encuentra una serie temporal en formato diccionario en el que la key se corresponde con el timestamp del dato.

## Objetivos:
Como series temporales usaremos (agrupando cada 30 minutos):
### Suma del consumo de los nodos -  Originalmente en Wattios
### Suma del consumo de las dos enfriadoras - KW (Power 13 enfriadora 1, Power 14 enfriadora 2)
### Máximo de la presión de los 4 compresores - En Pascales (2 enfriadoras, 4 compresores, una lista con dos diccionarios en principio)
### Número de compresores activos (compresores con presión mayor a 15 bars) 
### Cogeremos las temperaturas in, evaporator, out, ambient correspondientes a la enfriadora activa (la que tenga consumo mayor a 10KW) (referencia, agua caliente que volver del CPD es en torno a 18 grados)
### Cogeremos la diferencia entre la temperatura ambient y el setpoint (se puede obtener como media de temperatura out) 

En total tenemos 9 series temporales (a determinar si interesa mantener por separado temperatura ambiente y la diferencia de temperatura entre ambiente y setpoint).

Trataremos de hacer:
- CU1: Predicción de la suma del consumo de las enfriadoras a 24h (será función de cuanto free cooling se pueda utilizar)
- CU2: Predicción de la presión máxima a 24h

Para el entrenamiento del CU1 podemos usar datos generales aunque serán más significativos los de invierno (sólo se puede usar free cooling aquellos momentos en que la temperatura ambiente es menor a la temperatura in, en verano esto solo es probable que ocurra durante la noche).

Para el entrenamiento del CU2 son sólo relevantes los datos de los meses de verano.


Para visualizar los datos se puede usar el siguiente dashboard:

Dashboard:

http://grafana.srv.cesga.es/d/000000016/dcim?orgId=1


## Resultados: 
Procesado 50 primeras columnas de nodos 1437.9353668689728 segundos = 24 minutos

Primer Paso: pasar de la tabla con el formato original conteniendo las series temporales como diccionarios en una celda (o una lista con dos diccionarios si hay dos series - Compresor 1 y 2 de Enfriadora 1 y lo mismo para enfriadora 2, a series temporales tabulares

# 1- Total averga Node electrical consumption/ Watts   obtention
## Getting the average node consumption (Watts) averaged in sliding window per 30 minutes since start to end of time sourcing. Then adding all of them in  single time series with the added average.

In [5]:
#Dos columnaas: Time in seconds 86.5683023929596

In [1]:
## Imports
from pyspark.sql import SparkSession
from pyspark import SparkContext
import os
import copy
import time
import statsmodels
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# Spark dependencies
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark import StorageLevel, SparkConf
from pyspark.ml.feature import StandardScaler, VectorAssembler, PCA
from pyspark.mllib.linalg import SparseVector, DenseVector, VectorUDT
from pyspark.ml.classification import LogisticRegression

from pyspark.sql import functions as F
from pyspark.sql.window import Window
from datetime import datetime, timedelta
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import col, expr, udf, sequence

# Other configs
pd.options.display.float_format = "{:.2f}".format

# Useful directory variables
src_path = os.getcwd()
root_path = os.path.dirname(src_path)
data_path = root_path + "/datasets"
visualization_path = root_path + "/data_visualization"

# Start counting time
start_t = time.time()
# Reading the original file
df = spark.read.parquet(
    "output_final.parquet"
)  # Functional programming. Reading the raw data file with the Structured API
# df.printSchema()
df.createOrReplaceTempView("df")

# Generating a dataFrame with the times (every 30 minutes from start_timestamp to end_timestamp)
dates = pd.date_range(
    start=datetime(2018, 1, 1, 0, 0, 0),
    end=datetime(2021, 6, 30, 0, 0, 0),
    freq="30min",
)
datetimes = [date.to_pydatetime() for date in dates]
time_df = (
    spark.createDataFrame(datetimes, TimestampType())
    .withColumnRenamed("value", "time")
    .sort(F.asc("time"))
)

# Obtaning each consumption node in a list
node_list = (
    spark.sql("SELECT node from df").rdd.flatMap(lambda x: x).collect()
)  # Getting the list with all the node names


# Obtaning each consumption node:
# We are having two time related Spark Dataframes that originally prior to iterate will be identical
# time_df: will remain unchanged during the whole execution, just a reference to ensure all the times are met and if not a null is given for the corresponding electrical consumption column
# consumption_df: this will suffer a left join at the end of each iteration and will be the container for all the consumption columns
consumption_df = spark.createDataFrame(time_df.rdd, time_df.schema)
consumption_df = consumption_df.withColumn("total_average_power_consumption_W", lit(0))
for node in node_list[:50]:  # All the consumption related cluster nodes
    sql_query_node_consumption = """
                    SELECT 
                        EXPLODE(power) as (time, node_{}_power_consumption) 
                    FROM df
                    WHERE 
                        node LIKE "{}"
                """.format(
        node, node
    )
    node_consumption = spark.sql(sql_query_node_consumption)
    node_consumption = node_consumption.withColumn(
        "time", F.to_timestamp(node_consumption.time, "yyyy-MM-dd HH:MM:SS")
    )
    node_consumption = node_consumption.groupBy(
        "time", F.window("time", "30 minutes")
    ).agg(
        avg("node_{}_power_consumption".format(node)).alias(
            "node_{}_power_consumption".format(node)
        ),
    )
    node_consumption = node_consumption.select(
        "time", "window.*", "node_{}_power_consumption".format(node)
    ).sort(F.asc("time"))
    node_consumption = node_consumption.select(
        col("end").alias("time"), col("node_{}_power_consumption".format(node))
    )
    node_consumption = node_consumption.groupBy("time").agg(
        avg("node_{}_power_consumption".format(node)).alias(
            "node_{}_average_power_consumption".format(node)
        )
    )
    node_consumption = node_consumption.select(
        "time", "node_{}_average_power_consumption".format(node)
    ).sort(F.asc("time"))
    consumption_df = consumption_df.join(node_consumption, ["time"], how="left").sort(
        F.asc("time")
    )
    consumption_df = consumption_df.fillna(0, subset=["node_{}_average_power_consumption".format(node)])
    consumption_df = consumption_df.withColumn("total_average_power_consumption_W", col("total_average_power_consumption_W")+col("node_{}_average_power_consumption".format(node)))
    consumption_df = consumption_df.drop("node_{}_average_power_consumption".format(node))
# consumption_df.cache()
consumption_df.write.parquet("consumption_total_average_30min_W_1_to_50")
end_t = time.time()
print("Time in seconds " + str(end_t - start_t))


Time in seconds 1437.9353668689728


In [5]:
a=spark.read.parquet("consumption_total_average_30min_W_1_to_50").sort(F.asc("time"))
a.write.parquet("consumption_total_average_30min_W_1_to_50_repartition")

DataFrame[time: timestamp, total_average_power_consumption_W: double]

In [2]:
## Imports
from pyspark.sql import SparkSession
from pyspark import SparkContext
import os
import copy
import time
import statsmodels
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# Spark dependencies
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark import StorageLevel, SparkConf
from pyspark.ml.feature import StandardScaler, VectorAssembler, PCA
from pyspark.mllib.linalg import SparseVector, DenseVector, VectorUDT
from pyspark.ml.classification import LogisticRegression

from pyspark.sql import functions as F
from pyspark.sql.window import Window
from datetime import datetime, timedelta
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import col, expr, udf, sequence

# Other configs
pd.options.display.float_format = "{:.2f}".format

# Useful directory variables
src_path = os.getcwd()
root_path = os.path.dirname(src_path)
data_path = root_path + "/datasets"
visualization_path = root_path + "/data_visualization"

# Start counting time
start_t = time.time()
# Reading the original file
df = spark.read.parquet(
    "output_final.parquet"
)  # Functional programming. Reading the raw data file with the Structured API
# df.printSchema()
df.createOrReplaceTempView("df")

# Generating a dataFrame with the times (every 30 minutes from start_timestamp to end_timestamp)
dates = pd.date_range(
    start=datetime(2018, 1, 1, 0, 0, 0),
    end=datetime(2021, 6, 30, 0, 0, 0),
    freq="30min",
)
datetimes = [date.to_pydatetime() for date in dates]
time_df = (
    spark.createDataFrame(datetimes, TimestampType())
    .withColumnRenamed("value", "time")
    .sort(F.asc("time"))
)

# Obtaning each consumption node in a list
node_list = (
    spark.sql("SELECT node from df").rdd.flatMap(lambda x: x).collect()
)  # Getting the list with all the node names


# Obtaning each consumption node:
# We are having two time related Spark Dataframes that originally prior to iterate will be identical
# time_df: will remain unchanged during the whole execution, just a reference to ensure all the times are met and if not a null is given for the corresponding electrical consumption column
# consumption_df: this will suffer a left join at the end of each iteration and will be the container for all the consumption columns
consumption_df = spark.createDataFrame(time_df.rdd, time_df.schema)
consumption_df = consumption_df.withColumn("total_average_power_consumption_W", lit(0))
for node in node_list[50:80]:  # All the consumption related cluster nodes
    sql_query_node_consumption = """
                    SELECT 
                        EXPLODE(power) as (time, node_{}_power_consumption) 
                    FROM df
                    WHERE 
                        node LIKE "{}"
                """.format(
        node, node
    )
    node_consumption = spark.sql(sql_query_node_consumption)
    node_consumption = node_consumption.withColumn(
        "time", F.to_timestamp(node_consumption.time, "yyyy-MM-dd HH:MM:SS")
    )
    node_consumption = node_consumption.groupBy(
        "time", F.window("time", "30 minutes")
    ).agg(
        avg("node_{}_power_consumption".format(node)).alias(
            "node_{}_power_consumption".format(node)
        ),
    )
    node_consumption = node_consumption.select(
        "time", "window.*", "node_{}_power_consumption".format(node)
    ).sort(F.asc("time"))
    node_consumption = node_consumption.select(
        col("end").alias("time"), col("node_{}_power_consumption".format(node))
    )
    node_consumption = node_consumption.groupBy("time").agg(
        avg("node_{}_power_consumption".format(node)).alias(
            "node_{}_average_power_consumption".format(node)
        )
    )
    node_consumption = node_consumption.select(
        "time", "node_{}_average_power_consumption".format(node)
    ).sort(F.asc("time"))
    consumption_df = consumption_df.join(node_consumption, ["time"], how="left").sort(
        F.asc("time")
    )
    consumption_df = consumption_df.fillna(0, subset=["node_{}_average_power_consumption".format(node)])
    consumption_df = consumption_df.withColumn("total_average_power_consumption_W", col("total_average_power_consumption_W")+col("node_{}_average_power_consumption".format(node)))
    consumption_df = consumption_df.drop("node_{}_average_power_consumption".format(node))
# consumption_df.cache()

consumption_df.repartition(1).write.parquet("consumption_total_average_30min_W_51_to_80")
end_t = time.time()
print("Time in seconds " + str(end_t - start_t))


100%|██████████| 30/30 [00:03<00:00,  9.08it/s]


Time in seconds 2422.4834904670715


Comenzamos preparando la serie temporal para el consumo electrico de los nodos del cluster en Vatios. Para ello debemos hacer explode de todas las series temporales asociadas a nodos (todas las que estan en una fila cuyo valor en la columna node no sea "1", "2" o "basement". Despues las agruparemos (con media) cada 30 minutos y por ultima realizaremos la suma para tener el total de consumo medio en Vatios cada 30 minutos. Es importante notar que no se puede sumar primero y luego agrupar ya que cada una de las series temporales tiene medidas para un valor de tiempo distinto (difieren en el orden de los segundos) tal y como se ve para c6601 y c7102 en el notebook eDA.ipynb

##### IDEA ORIGINAL

In [11]:
node_consumption_pandas=node_consumption.limit(1000).toPandas()

In [17]:
node_consumption_pandas.head(30)

Unnamed: 0,time,electric_consumption_node_c6601
0,2018-01-01 00:00:44,54.0
1,2018-01-01 00:01:57,54.0
2,2018-01-01 00:03:09,72.0
3,2018-01-01 00:04:21,54.0
4,2018-01-01 00:05:33,54.0
5,2018-01-01 00:06:45,54.0
6,2018-01-01 00:07:57,54.0
7,2018-01-01 00:09:08,54.0
8,2018-01-01 00:10:20,54.0
9,2018-01-01 00:11:33,90.0


In [27]:
import time
fecha_fija = "2018-01-01 00:00:00"
start_time = datetime.strptime(fecha_fija,"%Y-%m-%d %H:%M:%S")
minutes_since_1970_to_start_time = int(time.mktime(start_time.timetuple())/60)#Passing the total seconds to minutes dividing by 60
offset_minutes = minutes_since_1970_to_start_time % 30 #offset minutes
window_30_mins = F.window("time", "30 minutes", startTime = "{} minutes".format(offset_minutes))
pandas=node_consumption.groupBy("time", window_30_mins).agg(F.mean("electric_consumption_node_c6601").alias("power")).limit(20).toPandas()
pandas

Unnamed: 0,time,window,power
0,2018-01-01 12:33:04,"(2018-01-01 12:30:00, 2018-01-01 13:00:00)",108.0
1,2018-01-01 16:33:32,"(2018-01-01 16:30:00, 2018-01-01 17:00:00)",108.0
2,2018-01-01 19:55:46,"(2018-01-01 19:30:00, 2018-01-01 20:00:00)",108.0
3,2018-01-02 02:18:10,"(2018-01-02 02:00:00, 2018-01-02 02:30:00)",54.0
4,2018-01-02 06:36:48,"(2018-01-02 06:30:00, 2018-01-02 07:00:00)",54.0
5,2018-01-02 06:53:33,"(2018-01-02 06:30:00, 2018-01-02 07:00:00)",90.0
6,2018-01-02 07:19:53,"(2018-01-02 07:00:00, 2018-01-02 07:30:00)",54.0
7,2018-01-02 09:28:53,"(2018-01-02 09:00:00, 2018-01-02 09:30:00)",54.0
8,2018-01-02 13:27:24,"(2018-01-02 13:00:00, 2018-01-02 13:30:00)",54.0
9,2018-01-03 00:18:36,"(2018-01-03 00:00:00, 2018-01-03 00:30:00)",54.0


In [26]:
fecha_fija = "2018-01-01 00:00:00"
start_time = datetime.strptime(fecha_fija,"%Y-%m-%d %H:%M:%S")
start_time

datetime.datetime(2018, 1, 1, 0, 0)

## BORRADOR

In [16]:
dates.show()#All the nodes has the same starting and ending date

+-------------------+-------------------+
|         start_time|           end_time|
+-------------------+-------------------+
|2018-01-01 01:00:00|2021-06-01 02:00:00|
|2018-01-01 01:00:00|2021-06-01 02:00:00|
|2018-01-01 01:00:00|2021-06-01 02:00:00|
|2018-01-01 01:00:00|2021-06-01 02:00:00|
|2018-01-01 01:00:00|2021-06-01 02:00:00|
|2018-01-01 01:00:00|2021-06-01 02:00:00|
|2018-01-01 01:00:00|2021-06-01 02:00:00|
|2018-01-01 01:00:00|2021-06-01 02:00:00|
|2018-01-01 01:00:00|2021-06-01 02:00:00|
|2018-01-01 01:00:00|2021-06-01 02:00:00|
|2018-01-01 01:00:00|2021-06-01 02:00:00|
|2018-01-01 01:00:00|2021-06-01 02:00:00|
|2018-01-01 01:00:00|2021-06-01 02:00:00|
|2018-01-01 01:00:00|2021-06-01 02:00:00|
|2018-01-01 01:00:00|2021-06-01 02:00:00|
|2018-01-01 01:00:00|2021-06-01 02:00:00|
|2018-01-01 01:00:00|2021-06-01 02:00:00|
|2018-01-01 01:00:00|2021-06-01 02:00:00|
|2018-01-01 01:00:00|2021-06-01 02:00:00|
|2018-01-01 01:00:00|2021-06-01 02:00:00|
+-------------------+-------------

## Original Working


In [None]:
## Imports
import os
import copy
import time
import statsmodels
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# Spark dependencies
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark import StorageLevel, SparkConf
from pyspark.ml.feature import StandardScaler, VectorAssembler, PCA
from pyspark.mllib.linalg import SparseVector, DenseVector, VectorUDT
from pyspark.ml.classification import LogisticRegression

from pyspark.sql import functions as F
from pyspark.sql.window import Window
from datetime import datetime, timedelta
from pyspark.sql.types import TimestampType 
from pyspark.sql.functions import col, expr,udf, sequence

# Other configs
pd.options.display.float_format = "{:.2f}".format

# Useful directory variables
src_path = os.getcwd()
root_path = os.path.dirname(src_path)
data_path = root_path + "/datasets"
visualization_path = root_path + "/data_visualization"

#Start counting time
start_t = time.time()
#Reading the original file
df = spark.read.parquet(
    "output_final.parquet"
)  # Functional programming. Reading the raw data file with the Structured API
# df.printSchema()
df.createOrReplaceTempView("df")

#Generating a dataFrame with the times (every 30 minutes from start_timestamp to end_timestamp)
dates = pd.date_range(start=datetime(2018, 1, 1, 0,0,0), end=datetime(2021, 6, 30, 0,0,0), freq="30min")
datetimes = [date.to_pydatetime() for date in dates]
time_df = spark.createDataFrame(datetimes, TimestampType()).withColumnRenamed("value", "time").sort(F.asc("time"))

#Obtaning each consumption node
node_list = (
    spark.sql("SELECT node from df").rdd.flatMap(lambda x: x).collect()
)  # Getting the list with all the node names
# sql_query_dates = """
#                 SELECT 
#                     start_time,
#                     end_time
#                 FROM df

#             """.format(
#     node_list[0], node_list[0]
# )
sql_query_node_consumption = """
                SELECT 
                    EXPLODE(power) as (time, electric_consumption_node_{}) 
                FROM df
                WHERE 
                    node LIKE "{}"
            """.format(
    node_list[0], node_list[0]
)
node_consumption = spark.sql(sql_query_node_consumption)
# dates = spark.sql(sql_query_dates)
node_consumption = node_consumption.withColumn(
    "time", F.to_timestamp(node_consumption.time, "yyyy-MM-dd HH:MM:SS")
)
node_consumption = node_consumption.groupBy("time", F.window("time", "30 minutes")).agg(
    avg("electric_consumption_node_c6601").alias("electric_consumption_node_c6601"),
)
node_consumption = node_consumption.select(
    "time", "window.*", "electric_consumption_node_c6601"
).sort(F.asc("time"))
node_consumption = node_consumption.select(
    col("end").alias("time"), col("electric_consumption_node_c6601")
)
node_consumption = node_consumption.groupBy("time").agg(
    avg("electric_consumption_node_c6601").alias(
        "electric_consumption_node_c6601_average"
    )
)
node_consumption = node_consumption.select(
    "time", "electric_consumption_node_c6601_average"
).sort(F.asc("time"))
node_consumption.show(40)






end_t=time.time()
print("Time in seconds "+str(end_t-start_t))

In [None]:
#Obtaning each consumption node
#for node in node_list[:-3]:
#We are having two time related Spark Dataframes that originally prior to iterate will be identical
#time_df: will remain unchanged during the whole execution, just a reference to ensure all the times are met and if not a null is given for the corresponding electrical consumption column
#consumption_df: this will suffer a left join at the end of each iteration and will be the container for all the consumption columns
consumption_df = spark.createDataFrame(time_df.rdd, time_df.schema)
for node in node_list[:3]:
    sql_query_node_consumption = """
                    SELECT 
                        EXPLODE(power) as (time, node_{}_power_consumption) 
                    FROM df
                    WHERE 
                        node LIKE "{}"
                """.format(
        node, node
    )
    node_consumption = spark.sql(sql_query_node_consumption)
    node_consumption = node_consumption.withColumn(
        "time", F.to_timestamp(node_consumption.time, "yyyy-MM-dd HH:MM:SS")
    )
    node_consumption = node_consumption.groupBy("time", F.window("time", "30 minutes")).agg(
        avg("node_{}_power_consumption".format(node)).alias("node_{}_power_consumption".format(node)),
    )
    node_consumption = node_consumption.select(
        "time", "window.*", "node_{}_power_consumption".format(node)
    ).sort(F.asc("time"))
    node_consumption = node_consumption.select(
        col("end").alias("time"), col("node_{}_power_consumption".format(node))
    )
    node_consumption = node_consumption.groupBy("time").agg(
        avg("node_{}_power_consumption".format(node)).alias(
            "node_{}_average_power_consumption".format(node)
        )
    )
    node_consumption = node_consumption.select(
        "time", "node_{}_average_power_consumption".format(node)
    ).sort(F.asc("time"))
    consumption_df = consumption_df.join(node_consumption, ["time"], how="left").sort(F.asc("time"))


#  For cluster

In [None]:
## Imports
from pyspark.sql import SparkSession
from pyspark import SparkContext
import os
import copy
import time
import statsmodels
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# Spark dependencies
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark import StorageLevel, SparkConf
from pyspark.ml.feature import StandardScaler, VectorAssembler, PCA
from pyspark.mllib.linalg import SparseVector, DenseVector, VectorUDT
from pyspark.ml.classification import LogisticRegression

from pyspark.sql import functions as F
from pyspark.sql.window import Window
from datetime import datetime, timedelta
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import col, expr, udf, sequence

if __name__ == '__main__':
    spark = SparkSession \
        .builder \
        .appName('Generate_20_columns') \
        .getOrCreate()
    sc = spark.sparkContext
    # Other configs
    pd.options.display.float_format = "{:.2f}".format

    # Useful directory variables
    src_path = os.getcwd()
    root_path = os.path.dirname(src_path)
    data_path = root_path + "/datasets"
    visualization_path = root_path + "/data_visualization"

    # Start counting time
    start_t = time.time()
    # Reading the original file
    df = spark.read.parquet(
        "output_final.parquet"
    )  # Functional programming. Reading the raw data file with the Structured API
    # df.printSchema()
    df.createOrReplaceTempView("df")

    # Generating a dataFrame with the times (every 30 minutes from start_timestamp to end_timestamp)
    dates = pd.date_range(
        start=datetime(2018, 1, 1, 0, 0, 0),
        end=datetime(2021, 6, 30, 0, 0, 0),
        freq="30min",
    )
    datetimes = [date.to_pydatetime() for date in dates]
    time_df = (
        spark.createDataFrame(datetimes, TimestampType())
        .withColumnRenamed("value", "time")
        .sort(F.asc("time"))
    )

    # Obtaning each consumption node in a list
    node_list = (
        spark.sql("SELECT node from df").rdd.flatMap(lambda x: x).collect()
    )  # Getting the list with all the node names


    # Obtaning each consumption node:
    # We are having two time related Spark Dataframes that originally prior to iterate will be identical
    # time_df: will remain unchanged during the whole execution, just a reference to ensure all the times are met and if not a null is given for the corresponding electrical consumption column
    # consumption_df: this will suffer a left join at the end of each iteration and will be the container for all the consumption columns
    consumption_df = spark.createDataFrame(time_df.rdd, time_df.schema)
    for node in node_list[:20]:  # All the consumption related cluster nodes
        sql_query_node_consumption = """
                        SELECT 
                            EXPLODE(power) as (time, node_{}_power_consumption) 
                        FROM df
                        WHERE 
                            node LIKE "{}"
                    """.format(
            node, node
        )
        node_consumption = spark.sql(sql_query_node_consumption)
        node_consumption = node_consumption.withColumn(
            "time", F.to_timestamp(node_consumption.time, "yyyy-MM-dd HH:MM:SS")
        )
        node_consumption = node_consumption.groupBy(
            "time", F.window("time", "30 minutes")
        ).agg(
            avg("node_{}_power_consumption".format(node)).alias(
                "node_{}_power_consumption".format(node)
            ),
        )
        node_consumption = node_consumption.select(
            "time", "window.*", "node_{}_power_consumption".format(node)
        ).sort(F.asc("time"))
        node_consumption = node_consumption.select(
            col("end").alias("time"), col("node_{}_power_consumption".format(node))
        )
        node_consumption = node_consumption.groupBy("time").agg(
            avg("node_{}_power_consumption".format(node)).alias(
                "node_{}_average_power_consumption".format(node)
            )
        )
        node_consumption = node_consumption.select(
            "time", "node_{}_average_power_consumption".format(node)
        ).sort(F.asc("time"))
        consumption_df = consumption_df.j oin(node_consumption, ["time"], how="left").sort(
            F.asc("time")
        )
    consumption_df.cache()
    consumption_df.write.parquet("consumption_per_cluster_node_1to20_average_30min")
    end_t = time.time()
    print("Time in seconds " + str(end_t - start_t))
    spark.stop()


In [None]:
## Imports
from pyspark.sql import SparkSession
from pyspark import SparkContext
import os
import copy
import time
import statsmodels
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# Spark dependencies
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark import StorageLevel, SparkConf
from pyspark.ml.feature import StandardScaler, VectorAssembler, PCA
from pyspark.mllib.linalg import SparseVector, DenseVector, VectorUDT
from pyspark.ml.classification import LogisticRegression

from pyspark.sql import functions as F
from pyspark.sql.window import Window
from datetime import datetime, timedelta
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import col, expr, udf, sequence

# Other configs
pd.options.display.float_format = "{:.2f}".format

# Useful directory variables
src_path = os.getcwd()
root_path = os.path.dirname(src_path)
data_path = root_path + "/datasets"
visualization_path = root_path + "/data_visualization"

# Start counting time
start_t = time.time()
# Reading the original file
df = spark.read.parquet(
    "output_final.parquet"
)  # Functional programming. Reading the raw data file with the Structured API
# df.printSchema()
df.createOrReplaceTempView("df")

# Generating a dataFrame with the times (every 30 minutes from start_timestamp to end_timestamp)
dates = pd.date_range(
    start=datetime(2018, 1, 1, 0, 0, 0),
    end=datetime(2021, 6, 30, 0, 0, 0),
    freq="30min",
)
datetimes = [date.to_pydatetime() for date in dates]
time_df = (
    spark.createDataFrame(datetimes, TimestampType())
    .withColumnRenamed("value", "time")
    .sort(F.asc("time"))
)

# Obtaning each consumption node in a list
node_list = (
    spark.sql("SELECT node from df").rdd.flatMap(lambda x: x).collect()
)  # Getting the list with all the node names


# Obtaning each consumption node:
# We are having two time related Spark Dataframes that originally prior to iterate will be identical
# time_df: will remain unchanged during the whole execution, just a reference to ensure all the times are met and if not a null is given for the corresponding electrical consumption column
# consumption_df: this will suffer a left join at the end of each iteration and will be the container for all the consumption columns
consumption_df = spark.createDataFrame(time_df.rdd, time_df.schema)
for node in node_list[:30]:  # All the consumption related cluster nodes
    sql_query_node_consumption = """
                    SELECT 
                        EXPLODE(power) as (time, node_{}_power_consumption) 
                    FROM df
                    WHERE 
                        node LIKE "{}"
                """.format(
        node, node
    )
    node_consumption = spark.sql(sql_query_node_consumption)
    node_consumption = node_consumption.withColumn(
        "time", F.to_timestamp(node_consumption.time, "yyyy-MM-dd HH:MM:SS")
    )
    node_consumption = node_consumption.groupBy(
        "time", F.window("time", "30 minutes")
    ).agg(
        avg("node_{}_power_consumption".format(node)).alias(
            "node_{}_power_consumption".format(node)
        ),
    )
    node_consumption = node_consumption.select(
        "time", "window.*", "node_{}_power_consumption".format(node)
    ).sort(F.asc("time"))
    node_consumption = node_consumption.select(
        col("end").alias("time"), col("node_{}_power_consumption".format(node))
    )
    node_consumption = node_consumption.groupBy("time").agg(
        avg("node_{}_power_consumption".format(node)).alias(
            "node_{}_average_power_consumption".format(node)
        )
    )
    node_consumption = node_consumption.select(
        "time", "node_{}_average_power_consumption".format(node)
    ).sort(F.asc("time"))
    consumption_df = consumption_df.join(node_consumption, ["time"], how="left").sort(
        F.asc("time")
    )
consumption_df.cache()
consumption_df.write.parquet("consumption_per_cluster_node_average_30min_final")
end_t = time.time()
print("Time in seconds " + str(end_t - start_t))
