# Exploitation d'Apache Spark pour la Surveillance des Systèmes CVC dans les Bâtiments Intelligents

**Temps estimé : 30 minutes**

## Objectifs

Après avoir terminé ce laboratoire, vous serez capable de :

- Expliquer l’architecture distribuée de Spark dans le cadre de la surveillance des bâtiments intelligents
- Simuler des données de capteurs en temps réel pour les systèmes CVC (Chauffage, Ventilation et Climatisation) d’un bâtiment
- Exécuter des requêtes SQL pour détecter des conditions environnementales critiques et calculer des moyennes de mesures
- Afficher les résultats agrégés sur la console pour obtenir des informations immédiates sur les conditions des pièces

## Contexte

Smart Building Solutions, Inc. est une entreprise spécialisée dans l’optimisation des systèmes CVC (Chauffage, Ventilation et Climatisation) afin d’améliorer le confort et l’efficacité énergétique dans les bâtiments commerciaux. En surveillant en temps réel les niveaux de température et d'humidité dans différentes pièces, l’entreprise vise à garantir des conditions intérieures optimales et à anticiper d’éventuels problèmes liés aux systèmes CVC.

Avec un flux continu de données issues des capteurs, Smart Building Solutions doit traiter et analyser ces données en temps réel pour maintenir la qualité de l’environnement intérieur.

## Description du jeu de données

Le jeu de données simulé comprend :

- **`room_id`** : Identifiant unique de chaque pièce (ex. : R001, R002).
- **`temperature`** : Relevé actuel de la température du capteur (en °C).
- **`humidity`** : Relevé actuel du niveau d’humidité du capteur (en %).
- **`timestamp`** : Heure à laquelle la mesure a été enregistrée (générée automatiquement par Spark).

Les données sont générées à un rythme de **5 lignes par seconde**, simulant plusieurs pièces avec diverses conditions environnementales.



## Challenges
Monitoring indoor environmental conditions poses several challenges:

**High data velocity**: Continuous data from multiple sensors can overwhelm traditional systems.

**Need for immediate alerts**: Delays in identifying critical conditions can lead to discomfort or system inefficiencies.

**Need for data aggregation and analysis**: Efficiently aggregating and analyzing real-time data for proactive maintenance and optimization is essential.

## Apache Spark with structured streaming
To address these challenges, Apache Spark is employed for its powerful distributed computing capabilities, enabling real-time data processing and analytics.


In [None]:
!pip install pyspark==3.1.2 -q
!pip install findspark -q

In [5]:
# You can also use this section to suppress warnings generated by your code:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

# FindSpark simplifies the process of using Apache Spark with Python

import findspark
findspark.init()

#import functions/Classes for sparkml

from pyspark.ml.clustering import KMeans


from pyspark.sql import SparkSession


### Set up the Spark session:


In [7]:
from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Smart Building HVAC Monitoring") \
    .getOrCreate()


25/03/05 12:59:51 WARN Utils: Your hostname, tegongue-Latitude-5580 resolves to a loopback address: 127.0.1.1; using 192.168.2.226 instead (on interface wlp1s0)
25/03/05 12:59:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
25/03/05 12:59:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/05 12:59:53 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/03/05 12:59:53 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/03/05 12:59:53 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
25/03/05 12:59:53 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
25

### Simulate sensor data:

Use Spark’s rate source to generate continuous readings from multiple rooms.


In [9]:
from pyspark.sql.functions import expr, rand,when

# Simulate sensor data with room IDs and readings
sensor_data = spark.readStream.format("rate").option("rowsPerSecond", 5).load() \
    .withColumn("room_id", expr("CAST(value % 10 AS STRING)")) \
    .withColumn("temperature", when(expr("value % 10 == 0"), 15)  # Set temperature to 15 for one specific record
                .otherwise(20 + rand() * 25)) \
    .withColumn("humidity", expr("40 + rand() * 30"))

### Create a temporary SQL view:

Create temporary SQL view to perform SQL queries on the streaming data.


In [11]:
# Create a temporary SQL view for the sensor data
sensor_data.createOrReplaceTempView("sensor_table")


### Define SQL queries for aggregation and analysis:

* **Critical temperature query**: Detect rooms with critical temperature levels
* **Average readings query**: Calculate average readings over a 1-minute window
* **Attention needed query**: Identify rooms that need immediate attention based on humidity levels


In [13]:
# SQL Query to detect rooms with critical temperatures
critical_temperature_query = """
    SELECT 
        room_id, 
        temperature, 
        humidity, 
        timestamp 
    FROM sensor_table 
    WHERE temperature < 18 OR temperature > 60
"""

# SQL Query to calculate average readings over a 1-minute window
average_readings_query = """
    SELECT 
        room_id, 
        AVG(temperature) AS avg_temperature, 
        AVG(humidity) AS avg_humidity, 
        window.start AS window_start 
    FROM sensor_table
    GROUP BY room_id, window(timestamp, '1 minute')
"""

# SQL Query to find rooms that need immediate attention based on humidity
attention_needed_query = """
    SELECT 
        room_id, 
        COUNT(*) AS critical_readings 
    FROM sensor_table 
    WHERE humidity < 45 OR humidity > 75
    GROUP BY room_id
"""


### Execute the SQL queries:

Execute each SQL query to create streaming DataFrames.


In [15]:
# Execute the critical temperature query
critical_temperatures_stream = spark.sql(critical_temperature_query)


# Execute the average readings query
average_readings_stream = spark.sql(average_readings_query)

# Execute the attention needed query
attention_needed_stream = spark.sql(attention_needed_query)






### Output the results to the console:

Display the results from each query in real-time.


In [17]:
# Output the results to the console for all queries
critical_query = critical_temperatures_stream.writeStream \
    .outputMode("append") \
    .format("console") \
    .queryName("Critical Temperatures") \
    .start()

average_query = average_readings_stream.writeStream \
    .outputMode("complete") \
    .format("console") \
    .queryName("Average Readings") \
    .start()

attention_query = attention_needed_stream.writeStream \
    .outputMode("complete") \
    .format("console") \
    .queryName("Attention Needed") \
    .start()



25/03/05 13:07:43 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-8b9f1a5f-59a8-46ed-aa76-8bdd463a2459. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/03/05 13:07:44 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-f6dffa3b-b2d7-492e-a897-a02d9d349d3e. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/03/05 13:07:44 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-2cdf72b4-8b76-4fa7-85e5-bd0e48b93b69. If it's required to delete it under a

### Keep the streams running:

Ensure that the streaming queries continue to run to process incoming data.


In [None]:
# Keep the streams running
spark.stop()
print("********Critical Temperature Values*******")
critical_query.awaitTermination()

print("********Average Readings Values********")
average_query.awaitTermination()
print("********Attention Needed Values********")
attention_query.awaitTermination()


### Conclusion
In this lab, you explored the use of Apache Spark in smart building monitoring, particularly focusing on HVAC (heating, ventilation, and air conditioning) systems. You now understand the Spark's distributed architecture. You also understand how to simulate real-time sensor data for temperature and humidity, execute SQL queries to identify critical environmental conditions, and output aggregated results for immediate insights.


## Author(s)

Lakshmi Holla

## Other Contributors
Malika Singla
