In [59]:
import datetime
import enum
import time
import dataclasses
import struct
import math
import binascii

import smbus
import trio
#import bokeh.io
#import bokeh.plotting
#import bokeh as bk
import numpy as np
import RPi.GPIO as GPIO
import httpx

#bk.io.output_notebook()
%autoawait trio

In [3]:
## I2C address select
NH3_ADDRESS                = 0x74
CO_ADDRESS                 = 0x75
O2_ADDRESS                 = 0x76

i2cbus = 1

In [4]:
buzzerpin = 23
ledpin_red = 24
ledpin_green = 25

In [36]:
class CmdCode(enum.Enum):
    read_concentration = 0x86
    read_temp = 0x87
    read_all = 0x88

In [42]:
class SensorType(enum.Enum):
    O2         =  0x05
    CO         =  0x04
    H2S        =  0x03
    NO2        =  0x2C
    O3         =  0x2A
    CL2        =  0x31
    NH3        =  0x02
    H2         =  0x06
    HCL        =  0X2E
    SO2        =  0X2B
    HF         =  0x33
    PH3        =  0x45

In [43]:
@dataclasses.dataclass
class SensorData:
    gas_concentration: float  # ppm
    sensor_type: SensorType
    temperature: float        # degree Celsius

In [76]:
class MultiGasSensor:
    def __init__(self, bus_number: int, i2c_address: int, expected_sensor_type: SensorType | None = None):
        self.i2c_bus = smbus.SMBus(bus_number)
        self.i2c_address = i2c_address
        self.expected_sensor_type = expected_sensor_type

    
    @classmethod
    def calc_check_sum(cls, data:bytes)->int:
        return (~sum(data)+1) & 0xff

    
    def command(self, code: CmdCode, *args:bytes) -> bytes:
        data = bytes([0xFF, 0x01, code.value]) + b"".join(args)
        data += b"\x00"*(8-len(data))
        data += bytes([self.calc_check_sum(data[1:-1])])
        self.i2c_bus.write_i2c_block_data(self.i2c_address, 0, list(data))
        time.sleep(0.1)                                                         # TO-DO: async-sleept machen
        
        result = self.i2c_bus.read_i2c_block_data(self.i2c_address, 0, 9)
        result = bytes(result)
        result_str = binascii.hexlify(result," ",1)
        
        assert result[0] == 0xFF, result_str
        assert result[1] == code.value, result_str
        assert result[8] == self.calc_check_sum(result[1:-2]), f"CRC failure: received 0x{result[8]:02x}, calculated 0x{calc_check_sum(result[1:-1]):02x}, ({result_str})"

        return result[2:-1]

    
    def read_all(self) -> SensorData:
        result = self.command(CmdCode.read_all)

        # '>': big-endian encoded struct (MSB first: most significant byte first);
        # 'H': 2 Bytes unsigned integer ("half long integer")
        # 'B': 1 Byte unsigned integer  ("byte")
        gas_concentration_raw, sensor_type, decimal_places, temperature_raw = struct.unpack(">HBBH", result)
        gas_concentration = gas_concentration_raw * 10**-decimal_places
        sensor_type = SensorType(sensor_type)
   
        Vpd3 = 3*temperature_raw/1024 # Spannung in Volt
        Rth = Vpd3*10000/(3-Vpd3) # Spannung mit Spannnungsteiler vonem 10k-Widerstand
        temperature = 1/(1/(273.15+25)+1/3380.13*(math.log(Rth/10000)))-273.15 # Transfer-Kurve von temperaturfühler mit 10kOhm bei 25°C und alpha-Wert von 3380.13

        if self.expected_sensor_type is not None:
            assert sensor_type == self.expected_sensor_type
        
        return SensorData(
            gas_concentration=gas_concentration,
            sensor_type=sensor_type,
            temperature=temperature,
        )

In [77]:
nice_NH3 = MultiGasSensor(i2cbus, NH3_ADDRESS, SensorType.NH3)
nice_CO = MultiGasSensor(i2cbus, CO_ADDRESS, SensorType.CO)

In [83]:
print(nice_CO.read_all())
print(nice_NH3.read_all())

SensorData(gas_concentration=0, sensor_type=<SensorType.CO: 4>, temperature=28.222990369665126)
SensorData(gas_concentration=0, sensor_type=<SensorType.NH3: 2>, temperature=28.328388829909784)


In [4]:
def setup():
    GPIO.setmode(GPIO.BCM) # use LOGICAL GPIO Numbering
    GPIO.setup(ledpin_red, GPIO.OUT) # set the red ledPin to OUTPUT mode
    GPIO.output(ledpin_red, GPIO.LOW) # make red ledPin output LOW level
    GPIO.setup(ledpin_green, GPIO.OUT) # set the green ledPin to OUTPUT mode
    GPIO.output(ledpin_green, GPIO.HIGH) # make green ledPin output HIGH level
    GPIO.setup(buzzerpin, GPIO.OUT) # set buzzerPin to OUTPUT modea

In [5]:
def aggregate_data(alldata):
    time = np.array([time for time, data in alldata])
    data = np.array([data for time, data in alldata])

    min = data.min()
    max = data.max()
    avg = data.mean()
    start_time, _ = alldata[0]

    aggregation = dict(
        time=start_time,
        min=min,
        max=max,
        avg=avg,
    )
    
    return aggregation

In [6]:
def read_oxygen(i2cbus, address):
    result = i2cbus.read_i2c_block_data(address, OXYGEN_DATA_REGISTER, 3)
    return calibration_factor * (result[0] + result[1] / 10 + result[2] / 100) # Encoding des Sensors - siehe Dokumentation

In [16]:
async def alert():
    while True:
        if low_oxygen:
            GPIO.output(ledpin_green, GPIO.LOW)
            GPIO.output(ledpin_red, GPIO.HIGH)
            GPIO.output(buzzerpin, GPIO.HIGH)
            await trio.sleep(0.5)
            GPIO.output(ledpin_red, GPIO.LOW)
            GPIO.output(buzzerpin, GPIO.LOW)
            await trio.sleep(0.5)
        else:
            GPIO.output(ledpin_green, GPIO.HIGH)
            GPIO.output(ledpin_red, GPIO.LOW)
            GPIO.output(buzzerpin, GPIO.LOW)
            await trio.sleep(0.1)

In [17]:
def fig_setup():
    fig = bk.plotting.figure(x_axis_type="datetime")
    time = np.array([], dtype=np.datetime64)
    ds = bk.models.ColumnDataSource(data=dict(time=time,oxygenlevel=[])) # like empty df
    fig.line(x="time",y="oxygenlevel",source=ds)
    handle = bk.io.show(fig,notebook_handle=True) # um Daten nachzuschieben im Notebook

    return ds, handle

In [18]:
def plot(ds, handle, time, oxygen):
    ds.stream(dict(time=[time],oxygenlevel=[oxygen]),rollover=600) # Neue Daten werden an das Ende der Kolonnen angehängt, ab 600 werden die ältesten Daten herausgeworfen
    bk.io.push_notebook(handle=handle)

In [19]:
async def measurement(*,measurement_interval=0.1, aggregation_interval=10):
    
    global low_oxygen
    ds, handle = fig_setup()
    next_measurement = trio.current_time()+measurement_interval
    next_aggregation = trio.current_time()+aggregation_interval
    
    while True:
        alldata = [] # um alle Daten zu speichern (!Überlegen wegen Memory)   
        
        while trio.current_time()<next_aggregation:      
            try:
                time = np.datetime64(datetime.datetime.now())
                oxygen = read_oxygen(i2cbus, address)
            except Exception as ex:
                print(f'{ex!r} - retry')
                await trio.sleep(0.05)
                continue
    
            alldata.append((time, oxygen))
            
            plot(ds, handle, time, oxygen)
            low_oxygen = oxygen<20
            
            await trio.sleep_until(next_measurement)
            next_measurement += measurement_interval

        next_aggregation += aggregation_interval
        aggregation = aggregate_data(alldata)
        print(aggregation)

In [20]:
low_oxygen = False

In [21]:
setup()

try: 
    async with trio.open_nursery() as nursery:
        nursery.start_soon(alert)
        nursery.start_soon(measurement)
finally:
    GPIO.cleanup()

{'time': numpy.datetime64('2024-03-06T17:19:52.837759'), 'min': 19.358333333333334, 'max': 43.95300000000001, 'avg': 20.869625000000006}


KeyboardInterrupt: 