<a href="https://colab.research.google.com/github/leabizbille/ProjetIOT/blob/main/thermometre.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
! pip install bleak -q

Préparer l'environnement pour la création d'un outil de communication BLE, capable de scanner des périphériques, de se connecter et d'échanger des données, dans le contexte d'une application IoT (Internet of Things) ou de gestion de périphériques connectés.


In [6]:
# Importation du module argparse pour gérer les arguments de la ligne de commande
import argparse
import asyncio# Importation du module asyncio pour la programmation asynchrone
import json # Importation du module json
import math
import os # Importation du module os pour interagir avec le système d'exploitation
import re # Importation du module re pour utiliser des expressions régulières
import struct  # Importation du module struct pour travailler avec des structures de données binaires
import sys #  système
from datetime import datetime, timedelta  # Importation des classes datetime et timedelta pour la gestion des dates et heures

# Importation des classes et fonctions de la bibliothèque bleak pour le développement BLE
from bleak import AdvertisementData, BleakClient, BleakScanner, BLEDevice

# Les informations renvoyées par le thermometre :


1.   Définition d'une classe Measurement

Définition d'une classe Measurement qui est utilisée pour représenter une
mesure météorologique comprenant la température, l'humidité relative, et plusieurs propriétés calculées associées, comme l'humidité absolue, le point de rosée et la pression de vapeur.



*   La pression de vapeur saturante :
C'est la pression exercée par la vapeur d'eau lorsqu'elle est en équilibre avec sa phase liquide à une température donnée. La formule utilisée, 6.1 * e(z1 * 2.3025851), permet de calculer es en fonction de la température en °C.

*   Pression de vapeur actuelle :
C'est la pression de vapeur actuelle de l'air, calculée en multipliant la pression de vapeur saturante es par l'humidité relative (divisée par 100).

*   L'humidité absolue :
C'est la quantité de vapeur d'eau dans l'air, exprimée en grammes par mètre cube (g/m3). La formule (216.7 * e) / (273.15 + self.temperatureC) est utilisée pour calculer l'humidité absolue.

*   Le point de rosée est la température à laquelle l'air devient saturé et commence à se condenser.
Il est calculé en utilisant la formule int((235 * z3) / (7.45 - z3) * 10) / 10.0 après des calculs intermédiaires de z2 et z3.

*   La pression de vapeur actuelle est arrondie à une décimale pour obtenir la pression de vapeur en hPa.


In [12]:
class Measurement():
    # Constructeur de la classe Measurement pour initialiser les attributs de l'objet
    def __init__(self, timestamp: datetime, temperatureC: float, relHumidity: float) -> None:
        # Attributs de l'instance pour la date/heure, température en °C, et humidité relative en %
        self.timestamp: datetime = timestamp
        self.temperatureC: float = temperatureC
        self.relHumidity: float = relHumidity

        # Calcul de la pression de vapeur saturante (es) en utilisant la température
        z1 = (7.45 * self.temperatureC) / (235 + self.temperatureC)
        es = 6.1 * math.exp(z1 * 2.3025851)  # La formule de pression de vapeur saturante
        e = es * self.relHumidity / 100.0  # Calcul de la pression de vapeur actuelle

        # Calcul de l'humidité absolue en g/m3 (arrondie à 1 décimale)
        self.absHumidity: float = round(
            (216.7 * e) / (273.15 + self.temperatureC) * 10) / 10.0  # Conversion en g/m3

        # Calcul du point de rosée en °C (arrondi à 1 décimale)
        z2 = e / 6.1
        z3 = 0.434292289 * math.log(z2)
        self.dewPointC: float = int((235 * z3) / (7.45 - z3) * 10) / 10.0  # Conversion et arrondi

        # Calcul de la pression de vapeur en hPa (arrondie à 1 décimale)
        self.steamPressure: float = int(e * 10) / 10.0  # Conversion et arrondi

 #a enlever
        #self.temperatureF: float = self.temperatureC * 9.0/5.0 + 32
        #self.dewPointF: float = self.dewPointC * 9.0/5.0 + 32

    def __str__(self) -> str:
    # Retourne une chaîne de caractères formatée qui représente les attributs de l'objet Measurement
        return "\n".join([
            # Affiche le timestamp avec un format lisible (année-mois-jour heure:minute)
            "Timestamp:      %s" % self.timestamp.strftime("%Y-%m-%d %H:%M"),
            # Affiche la température en Celsius avec une précision d'une décimale
            f"Temperature:    {self.temperatureC:.1f} °C",
            f"Dew point:      {self.dewPointC:.1f} °C", # Affiche le point de rosée en Celsius
            "",  # Ligne vide pour la séparation visuelle
            # Affiche la température en Fahrenheit
            #f"Temperature:    {self.temperatureF:.1f} °F",
            # Affiche le point de rosée en Fahrenheit
            #f"Dew point:      {self.dewPointF:.1f} °F",
            #"",  # Ligne vide pour la séparation visuelle
            f"Rel. humidity:  {self.relHumidity:.1f} %", # Affiche l'humidité relative
            f"Abs. humidity:  {self.absHumidity:.1f} g/m³",  # Affiche l'humidité absolue en g/m³
            f"Steam pressure: {self.steamPressure:.1f} mbar" # Affiche la pression de vapeur en mbar
        ])


    def to_dict(self) -> dict:
        return {
            "timestamp": self.timestamp.strftime("%Y-%m-%d %H:%M"),
            "temperatureC": round(self.temperatureC, 1),
            #"temperatureF": round(self.temperatureF, 1),
            "relHumidity": round(self.relHumidity, 1),
            "absHumidity": round(self.absHumidity, 1),
            "dewPointC": round(self.dewPointC, 1),
            #"dewPointF": round(self.dewPointF, 1),
            "steamPressure": round(self.steamPressure, 1)
        }


2. Explication de la classe DeviceInfo :

La classe DeviceInfo est une classe qui représente les informations d'un appareil. Elle contient des attributs relatifs à un appareil spécifique, tels que l'adresse MAC, le nom, le fabricant, le modèle, le matériel et la version du firmware. Cette classe est utile pour organiser et manipuler des informations sur un appareil de manière structurée.

Stocker les détails d'un appareil de manière organisée.
Afficher ces détails de manière lisible pour l'utilisateur.
Convertir l'instance en un format de dictionnaire, ce qui est pratique pour la sérialisation, la sauvegarde dans des bases de données, ou l'envoi de données via une API.

In [13]:
class DeviceInfo():
    # Constructeur de la classe qui initialise les attributs de l'objet
    def __init__(self, macAddress: str, name: str, manufacturer: str, model: str, hardware: str, firmware: str) -> None:
        # Attributs de l'objet, définis avec des types spécifiques (str) pour chaque attribut
        self.macAddress: str = macAddress
        self.name: str = name
        self.manufacturer: str = manufacturer
        self.model: str = model
        self.hardware: str = hardware
        self.firmware: str = firmware

    # Méthode qui retourne une chaîne de caractères représentant l'objet de façon lisible
    def __str__(self) -> str:
        # Retourne une chaîne formatée avec les attributs de l'appareil, chaque attribut sur une nouvelle ligne
        return "\n".join([
            f"MAC-Address:    {self.macAddress}",
            f"Devicename:     {self.name}",
            f"Manufacturer:   {self.manufacturer}",
            f"Model:          {self.model}",
            f"Hardware-Rev.:  {self.hardware}",
            f"Firmware-Rev.:  {self.firmware}"
        ])

    def to_dict(self) -> dict:
        # Retourne un dictionnaire
        return {
            "mac": self.macAddress,
            "name": self.name,
            "manufacturer": self.manufacturer,
            "model": self.model,
            "hardware": self.hardware,
            "firmware": self.firmware
        }


La classe DataControl est conçue pour gérer et suivre la réception de données. Elle permet de contrôler et de surveiller le flux reçus par rapport aux messages attendus.

In [15]:
class DataControl():
    # Constantes de classe représentant les différents états de contrôle des données
    DATA_CONTROL_IDLE = 0        # État initial (aucun processus en cours)
    DATA_CONTROL_WAIT = 1        # État d'attente de messages
    DATA_CONTROL_STARTED = 2     # État lorsque le processus de contrôle a commencé
    DATA_CONTROL_COMPLETE = 3    # État lorsque le processus est terminé (tous les messages reçus)
    DATA_CONTROL_INCOMPLETE = -1 # État lorsque le processus est incomplet (messages manquants)

    # Constructeur de la classe
    def __init__(self, expected_msg: int) -> None:
        # Initialisation de la date et l'heure actuelle de la création de l'objet
        self.timestamp: datetime = datetime.now()
        # Initialisation de l'état à "idle"
        self.status: int = DataControl.DATA_CONTROL_IDLE
        # Nombre de messages attendus passé lors de l'initialisation
        self.expected_msg: int = expected_msg
        # Compteur de messages comptés
        self.counted_msg: int = 0
        # Compteur de messages reçus
        self.received_msg: int = 0
        # Liste pour stocker les objets de type Measurement
        self.measurements: 'list[Measurement]' = list()

    # Méthode pour incrémenter le compteur de messages comptés
    def count(self) -> None:
        self.counted_msg += 1


La classe **GoveeThermometerHygrometer** est conçue pour gérer l'interaction avec un dispositif de type thermomètre et hygromètre Bluetooth de la marque Govee. Elle utilise la bibliothèque bleak pour communiquer avec le dispositif via Bluetooth Low Energy (BLE).

In [17]:
class GoveeThermometerHygrometer():
    # Préfixe des adresses MAC des dispositifs Govee
    MAC_PREFIX = "A4:C1:38:"

    # Dictionnaire des commandes avec UUIDs et séquences de bytes
    _COMMANDS = {
        "NAME": {
            "UUID": "00002a00-0000-1000-8000-00805f9b34fb",
        },
        "FIRMWARE": {
            "UUID": "494e5445-4c4c-495f-524f-434b535f2011",
            "SEQUENCE": [0xaa, 0x0e] + [0] * 17 + [0xa4]
        },
        "HARDWARE": {
            "UUID": "494e5445-4c4c-495f-524f-434b535f2011",
            "SEQUENCE": [0xaa, 0x0d] + [0] * 17 + [0xa7]
        },
        "DATA_CONTROL": {
            "UUID": "494e5445-4c4c-495f-524f-434b535f2012",
            "SEQUENCE": [0x33, 0x01]
        },
        "DATA": {
            "UUID": "494e5445-4c4c-495f-524f-434b535f2013"
        }
    }

    def __init__(self, mac: str) -> None:
        # Initialisation du client BLE avec l'adresse MAC
        self._client = BleakClient(mac)
        self._mac = mac
        self._measurement = None
        self._deviceInfo = None
        self._buffer = dict()
        self._data_control: DataControl = None

    async def connect(self) -> None:
        # Fonction de rappel pour gérer les notifications de l'appareil
        async def notification_handler_device(c, bytes: bytearray) -> None:
            # Traitement des notifications de firmware et de matériel
            if bytes[0] == 0xaa and bytes[1] == 0x0e:
                self._buffer["FIRMWARE"] = bytes[2:9].decode()
            elif bytes[0] == 0xaa and bytes[1] == 0x0d:
                self._buffer["HARDWARE"] = bytes[2:9].decode()

        async def notification_handler_data(c, bytes: bytearray) -> None:
            # Gestion des données de mesure et ajout à la liste de mesures
            if not self._data_control:
                return
            for i in range(6):
                minutes_back = struct.unpack(">H", bytes[0:2])[0]
                if bytes[2 + 3 * i] == 0xff:
                    continue
                timestamp = self._data_control.timestamp - timedelta(minutes=minutes_back - i)
                _ba = bytearray([0])
                _ba.extend(bytes[2 + 3 * i:5 + 3 * i])
                temperatureC, relHumidity = GoveeThermometerHygrometer.decodeMeasurement(bytes=_ba)
                self._data_control.measurements.append(Measurement(timestamp=timestamp, temperatureC=temperatureC, relHumidity=relHumidity))
            self._data_control.count()

        async def notification_handler_data_control(c, bytes: bytearray) -> None:
            # Gestion des notifications de contrôle des données
            if bytes[0] == 0x33 and bytes[1] == 0x01 and self._data_control:
                self._data_control.status = DataControl.DATA_CONTROL_STARTED
            elif bytes[0] == 0xee and bytes[1] == 0x01 and self._data_control:
                self._data_control.received_msg = struct.unpack(">H", bytes[2:4])[0]
                if self._data_control.received_msg == self._data_control.counted_msg:
                    self._data_control.status = DataControl.DATA_CONTROL_COMPLETE
                else:
                    self._data_control.status = DataControl.DATA_CONTROL_INCOMPLETE

        # Connexion au périphérique BLE et démarrage des notifications
        await self._client.connect()
        if self._client.is_connected:
            await self._client.start_notify(self._COMMANDS["FIRMWARE"]["UUID"], callback=notification_handler_device)
            await self._client.start_notify(self._COMMANDS["DATA_CONTROL"]["UUID"], callback=notification_handler_data_control)
            await self._client.start_notify(self._COMMANDS["DATA"]["UUID"], callback=notification_handler_data)
            await asyncio.sleep(.2)

    async def disconnect(self) -> None:
        # Déconnexion de l'appareil BLE
        if self._client.is_connected:
            await self._client.disconnect()

    async def _sendCommand(self, command: dict, params: 'list[int]' = []) -> None:

        _bytearray = bytearray(command["SEQUENCE"])
        if params:
            _bytearray.extend(params)
        if len(_bytearray) < 20:
            _bytearray.extend([0] * (19 - len(_bytearray)))
            _checksum = 0
            for _b in _bytearray:
                _checksum ^= _b

            _bytearray.append(_checksum)

        await self._client.write_gatt_char(command["UUID"], _bytearray, response=True)

    async def requestRecordedData(self, start: int, end: int) -> 'list[Measurement]':

        if not self._client.is_connected:
            self.connect()

        self._data_control = DataControl(
            expected_msg=math.ceil((start - end + 1) / 6))
        await self._sendCommand(command=self._COMMANDS["DATA_CONTROL"], params=[start >> 8, start & 0xff, end >> 8, end & 0xff])

        i = 0
        while i < 600 and (self._data_control.status not in [DataControl.DATA_CONTROL_COMPLETE, DataControl.DATA_CONTROL_INCOMPLETE]):
            await asyncio.sleep(.1)
            i += 1

        measurements = self._data_control.measurements
        self._data_control = None
        return measurements

    async def requestDeviceInfo(self) -> DeviceInfo:

        self._buffer["HARDWARE"] = None
        self._buffer["FIRMWARE"] = None

        if not self._client.is_connected:
            self.connect()

        await self._sendCommand(self._COMMANDS["HARDWARE"])
        await self._sendCommand(self._COMMANDS["FIRMWARE"])

        i = 0
        while i < 10 and (not self._buffer["HARDWARE"] or not self._buffer["FIRMWARE"]):
            await asyncio.sleep(.1)
            i += 1

        _name = await self._client.read_gatt_char(self._COMMANDS["NAME"]["UUID"])
        if not _name or not self._buffer["HARDWARE"] or not self._buffer["FIRMWARE"]:
            return None

        _name = _name.decode()

        return DeviceInfo(macAddress=self._mac, name=_name, manufacturer=_name[:2], model=_name[2:7], hardware=self._buffer["HARDWARE"], firmware=self._buffer["FIRMWARE"])


    @staticmethod
    def decodeMeasurement(bytes) -> 'tuple[float,float]':

        raw = struct.unpack(">I", bytes)[0]
        if raw & 0x800000:
            is_negative = True
            raw = raw ^ 0x800000
        else:
            is_negative = False

        temperatureC = int(raw / 1000) / 10
        if is_negative:
            temperatureC = 0 - temperatureC

        relHumidity = (raw % 1000) / 10

        return temperatureC, relHumidity

    @staticmethod
    async def scan(consumer, duration: int = 20, unique: bool = True, mac_filter: str = None, progress=None):

        found_devices = list()

        def callback(device: BLEDevice, advertising_data: AdvertisementData):

            if unique is False or device.address not in found_devices:
                found_devices.append(device.address)
                if device.name and device.address.upper().startswith(GoveeThermometerHygrometer.MAC_PREFIX):
                    if 0xec88 in advertising_data.manufacturer_data:
                        temperatureC, relHumidity = GoveeThermometerHygrometer.decodeMeasurement(
                            advertising_data.manufacturer_data[0xec88][0:4])
                        battery = advertising_data.manufacturer_data[0xec88][4]
                        measurement = Measurement(timestamp=datetime.now(),
                                                  temperatureC=temperatureC, relHumidity=relHumidity)

                        consumer(device.address, device.name,
                                 battery, measurement)

                elif device.name and progress:
                    progress(len(found_devices))

        async with BleakScanner(callback) as scanner:
            if duration:
                await asyncio.sleep(duration)
            else:
                while True:
                    await asyncio.sleep(1)

 Gestion et d'interrogation pour des dispositifs de type thermomètre/hygromètre Govee H5075 via Bluetooth. Il fournit une interface pour exécuter diverses tâches comme l'exploration de dispositifs, la récupération de mesures ou encore l'affichage des données enregistrées.

* Objectif : Gérer des alias pour les adresses MAC des dispositifs Govee.



In [18]:
class Alias():

    _KNOWN_DEVICES_FILE = ".known_govees"
    def __init__(self) -> None:
        self.aliases: 'dict[str,str]' = dict()
        try:
            filename = os.path.join(os.environ['USERPROFILE'] if os.name == "nt" else os.environ['HOME']
                                    if "HOME" in os.environ else "~", Alias._KNOWN_DEVICES_FILE)
            if os.path.isfile(filename):
                with open(filename, "r") as ins:
                    for line in ins:
                        _m = re.match(
                            "([0-9A-Fa-f:]+) +(.*)$", line)
                        if _m and _m.groups()[0].upper().startswith(GoveeThermometerHygrometer.MAC_PREFIX):
                            self.aliases[_m.groups()[0]] = _m.groups()[1]
        except:
            pass
    def resolve(self, label: str) -> str:
        if label.upper().startswith(GoveeThermometerHygrometer.MAC_PREFIX):
            return label
        else:
            macs = [
                a for a in self.aliases if self.aliases[a].startswith(label)]
            return macs[0] if macs else None

def arg_parse(args: 'list[str]') -> dict:
    parser = argparse.ArgumentParser(prog='govee-h5075.py', description=
        'Shell script in order to request Govee H5075 temperature humidity sensor')
    parser.add_argument(
        '-s', '--scan', help='scan for devices for 20 seconds', action='store_true')
    parser.add_argument('-m', '--measure',
                        help='capture measurements/advertisements from nearby devices', action='store_true')
    parser.add_argument(
        '-i', '--info', help='request device information for given mac or alias', type=str)
    parser.add_argument(
        '-d', '--data', help='request recorded data for given mac or alias', type=str)
    parser.add_argument(
        '--start', help='request recorded data from start time expression, e.g. 480:00 (here max. value 20 days)', type=str, default=None)
    parser.add_argument(
        '--end', help='request recorded data to end time expression, e.g. 480:00 (here max. value 20 days)', type=str, default=None)
    parser.add_argument(
        '-j', '--json', help='print in JSON format', action='store_true')
    return parser.parse_args(args)

def scan():

    def stdout_consumer(address: str, name: str, battery: int, measurement: Measurement) -> None:
        label = (alias.aliases[address]
                 if address in alias.aliases else address) + " " * 21
        print(
            f"{label[:21]} {name}  {measurement.temperatureC:.1f}°C       {measurement.dewPointC:.1f}°C     {measurement.temperatureF:.1f}°F       {measurement.dewPointF:.1f}°F     {measurement.relHumidity:.1f}%          {measurement.absHumidity:.1f} g/m³      {measurement.steamPressure:.1f} mbar       {battery}%", flush=True)
    def progress(found: int) -> None:

        print(' %i bluetooth devices seen' % found, end='\r', file=sys.stderr)

    print("MAC-Address/Alias     Device name   Temperature  Dew point  Temperature  Dew point  Rel. humidity  Abs. humidity  Steam pressure  Battery", flush=True)
    asyncio.run(GoveeThermometerHygrometer.scan(
        consumer=stdout_consumer, progress=progress))


def measure():
    def stdout_consumer(address: str, name: str, battery: int, measurement: Measurement) -> None:

        timestamp = measurement.timestamp.strftime("%Y-%m-%d %H:%M:%S")
        label = (alias.aliases[address]
                 if address in alias.aliases else address) + " " * 21
        print(
            f"{timestamp}   {label[:21]} {name}  {measurement.temperatureC:.1f}°C       {measurement.dewPointC:.1f}°C     {measurement.temperatureF:.1f}°F       {measurement.dewPointF:.1f}°F     {measurement.relHumidity:.1f}%          {measurement.absHumidity:.1f} g/m³      {measurement.steamPressure:.1f} mbar       {battery}%", flush=True)

    print("Timestamp             MAC-Address/Alias     Device name   Temperature  Dew point  Temperature  Dew point  Rel. humidity  Abs. humidity  Steam pressure  Battery", flush=True)
    asyncio.run(GoveeThermometerHygrometer.scan(
        unique=False, duration=0, consumer=stdout_consumer))

async def device_info(label: str, _json: bool = False) -> None:

    try:
        mac = alias.resolve(label=label)
        device = GoveeThermometerHygrometer(mac)
        await device.connect()
        deviceInfo = await device.requestDeviceInfo()
        if _json:
            print(json.dumps(deviceInfo.to_dict(), indent=2))
        else:
            print(str(deviceInfo))

    except Exception as e:
        print(e, file=sys.stderr)

    finally:
        await device.disconnect()

async def recorded_data(label: str, start: str, end: str, _json: bool = False):
    def parseTimeStr(s: str) -> int:
        a = s.split(":")
        return (int(a[0]) * 60 + int(a[1])) if len(a) == 2 else int(a[0])
    try:
        mac = alias.resolve(label=label)
        device = GoveeThermometerHygrometer(mac)
        await device.connect()
        start = min(parseTimeStr(start) if start else 60, 28800)
        end = min(parseTimeStr(end) if end else 0, 28800)
        measurements = await device.requestRecordedData(start=start if start > end else end, end=end if end < start else start)
        if _json:
            print(json.dumps([m.to_dict()
                              for m in measurements], indent=2))
        else:
            print("Timestamp         Temperature  Dew point  Temperature  Dew point  Rel. humidity  Abs. humidity  Steam pressure", flush=True)
            for m in measurements:
                timestamp = m.timestamp.strftime("%Y-%m-%d %H:%M")
                print(f"{timestamp}  {m.temperatureC:.1f}°C       {m.dewPointC:.1f}°C     {m.temperatureF:.1f}°F       {m.dewPointF:.1f}°F     {m.relHumidity:.1f}%          {m.absHumidity:.1f} g/m³      {m.steamPressure:.1f} mbar", flush=True)
    except Exception as e:
        print(e, file=sys.stderr)

    finally:
        await device.disconnect()

if __name__ == '__main__':

    alias = Alias()
    try:

        if len(sys.argv) == 1:
            scan()

        else:
            args = arg_parse(sys.argv[1:])
            if args.scan:
                scan()

            elif args.measure:
                measure()
            elif args.info:
                asyncio.run(device_info(label=args.info, _json=args.json))
            elif args.data:
                asyncio.run(recorded_data(label=args.data, start=args.start, end=args.end, _json=args.json))

    except KeyboardInterrupt:
        pass

usage: govee-h5075.py [-h] [-s] [-m] [-i INFO] [-d DATA] [--start START] [--end END] [-j]
govee-h5075.py: error: unrecognized arguments: -f /root/.local/share/jupyter/runtime/kernel-f46f25fb-06e9-4958-96a0-eb68b84f2109.json


SystemExit: 2