-
Notifications
You must be signed in to change notification settings - Fork 35
/
co2meter.py
466 lines (391 loc) · 15.6 KB
/
co2meter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
# coding=utf-8
""" Class for reading data from CO2 monitor.
(c) Vladimir Filimonov, 2016-2021
E-mail: vladimir.a.filimonov@gmail.com
"""
try:
import hid
except AttributeError as e:
if 'windll' in e.message:
raise ImportError(('Import failed with an error "AttributeError: %s". '
'Possibly there''s a name conflict. Please check if '
'library "hid" is instlled and if so - uninstall it, '
'keeping only "hidapi".' % str(e)))
else:
raise
import datetime as dt
from contextlib import contextmanager
import threading
import time
import os
plt = None # To be imported on demand only
try:
import pandas as pd
import numpy as np
except ImportError:
pd = np = None
_CO2MON_HID_VENDOR_ID = 0x04d9
_CO2MON_HID_PRODUCT_ID = 0xa052
_CO2MON_MAGIC_WORD = b'Htemp99e'
_CO2MON_MAGIC_TABLE = (0, 0, 0, 0, 0, 0, 0, 0)
_CODE_END_MESSAGE = 0x0D
_CODE_CO2 = 0x50
_CODE_TEMPERATURE = 0x42
_COLORS = {'r': (0.86, 0.37, 0.34),
'g': (0.56, 0.86, 0.34),
'b': 'b'}
CO2_HIGH = 1200
CO2_LOW = 800
#############################################################################
def now():
return dt.datetime.now().replace(microsecond=0)
#############################################################################
def list_to_longint(x):
return sum([val << (i * 8) for i, val in enumerate(x[::-1])])
#############################################################################
def longint_to_list(x):
return [(x >> i) & 0xFF for i in (56, 48, 40, 32, 24, 16, 8, 0)]
#############################################################################
def convert_temperature(val):
""" Convert temperature from Kelvin (unit of 1/16th K) to Celsius
"""
return val * 0.0625 - 273.15
#############################################################################
# Class to operate with CO2 monitor
#############################################################################
class CO2monitor:
def __init__(self, bypass_decrypt=False, interface_path=None):
""" Initialize the CO2monitor object and retrieve basic HID info.
Args:
bypass_decrypt (bool): For certain CO2 meter models packages that
are sent over USB are not encrypted. In this case instance
of CO2monitor will return no data in .read_data().
If this happens, setting bypass_decrypt to True might
solve the issue.
interface_path (bytes): when multiple devices are active, allows
you to choose which one should be used for this CO2monitor instance.
See also:
https://github.com/vfilimonov/co2meter/issues/16
"""
self.bypass_decrypt = bypass_decrypt
self._info = {'vendor_id': _CO2MON_HID_VENDOR_ID,
'product_id': _CO2MON_HID_PRODUCT_ID}
self.init_device(interface_path)
# Number of requests to open connection
self._status = 0
self._magic_word = [((w << 4) & 0xFF) | (w >> 4)
for w in bytearray(_CO2MON_MAGIC_WORD)]
self._magic_table = _CO2MON_MAGIC_TABLE
self._magic_table_int = list_to_longint(_CO2MON_MAGIC_TABLE)
# Initialisation of continuous monitoring
if pd is None:
self._data = []
else:
self._data = pd.DataFrame()
self._keep_monitoring = False
self._interval = 10
# Device info
with self.co2hid():
self._info['manufacturer'] = self._h.get_manufacturer_string()
self._info['product_name'] = self._h.get_product_string()
self._info['serial_no'] = self._h.get_serial_number_string()
def init_device(self, interface_path=None):
"""" Finds a device in the list of available devices and opens one with interface_number or first available
if no interface_number is None
"""
checked_interfaces = []
for interface in hid.enumerate(self._info['vendor_id'], self._info['product_id']):
if interface_path is None or interface['path'] == interface_path:
self._h = hid.device()
self._info['path'] = interface['path']
return
checked_interfaces.append(interface)
raise Exception('Unable to find hid device.', interface_path, checked_interfaces)
#########################################################################
def hid_open(self, send_magic_table=True):
""" Open connection to HID device. If connection is already open,
then only the counter of requests is incremented (so hid_close()
knows how many sub-processes keep the HID handle)
Parameters
----------
send_magic_table : bool
If True then the internal "magic table" will be sent to
the device (it is used for decryption)
"""
if self._status == 0:
# If connection was not opened before
self._h.open_path(self._info['path'])
if send_magic_table:
self._h.send_feature_report(self._magic_table)
self._status += 1
def hid_close(self, force=False):
""" Close connection to HID device. If there were several hid_open()
attempts then the connection will be closed only after respective
number of calls to hid_close() method
Parameters
----------
force : bool
Force-close of connection irrespectively of the counter of
open requests
"""
if force:
self._status = 0
elif self._status > 0:
self._status -= 1
if self._status == 0:
self._h.close()
def hid_read(self):
""" Read 8-byte string from HID device """
msg = self._h.read(8)
return self._decrypt(msg)
@contextmanager
def co2hid(self, send_magic_table=True):
self.hid_open(send_magic_table=send_magic_table)
try:
yield
finally:
self.hid_close()
#########################################################################
@property
def info(self):
""" Device info """
return self._info
@property
def is_alive(self):
""" If the device is still connected """
try:
with self.co2hid(send_magic_table=True):
return True
except:
return False
#########################################################################
def _decrypt(self, message):
""" Decode message received from CO2 monitor.
"""
if self.bypass_decrypt:
return message
# Rearrange message and convert to long int
msg = list_to_longint([message[i] for i in [2, 4, 0, 7, 1, 6, 5, 3]])
# XOR with magic_table
res = msg ^ self._magic_table_int
# Cyclic shift by 3 to the right
res = (res >> 3) | ((res << 61) & 0xFFFFFFFFFFFFFFFF)
# Convert to list
res = longint_to_list(res)
# Subtract and convert to uint8
res = [(r - mw) & 0xFF for r, mw in zip(res, self._magic_word)]
return res
@staticmethod
def decode_message(msg):
""" Decode value from the decrypted message
Parameters
----------
msg : list
Decrypted message retrieved with hid_read() method
Returns
-------
CntR : int
CO2 concentration in ppm
Tamb : float
Temperature in Celsius
"""
# Expected 3 zeros at the end
bad_msg = (msg[5] != 0) or (msg[6] != 0) or (msg[7] != 0)
# End of message should be 0x0D
bad_msg |= msg[4] != _CODE_END_MESSAGE
# Check sum: LSB of sum of first 3 bytes
bad_msg |= (sum(msg[:3]) & 0xFF) != msg[3]
if bad_msg:
return None, None
value = (msg[1] << 8) | msg[2]
if msg[0] == _CODE_CO2: # CO2 concentration in ppm
return int(value), None
elif msg[0] == _CODE_TEMPERATURE: # Temperature in Celsius
return None, convert_temperature(value)
else: # Other codes - so far not decoded
return None, None
def _read_co2_temp(self, max_requests=50):
""" Read one pair of values from the device.
HID device should be open before
"""
co2, temp = None, None
for ii in range(max_requests):
_co2, _temp = self.decode_message(self.hid_read())
if _co2 is not None:
co2 = _co2
if _temp is not None:
temp = _temp
if (co2 is not None) and (temp is not None):
break
return now(), co2, temp
#########################################################################
def read_data_raw(self, max_requests=50):
with self.co2hid(send_magic_table=True):
vals = self._read_co2_temp(max_requests=max_requests)
self._last_data = vals
return vals
def read_data(self, max_requests=50):
""" Listen to values from device and retrieve temperature and CO2.
Parameters
----------
max_requests : int
Effective timeout: number of attempts after which None is returned
Returns
-------
tuple (timestamp, co2, temperature)
or
pandas.DataFrame indexed with timestamp
Results of measurements
"""
if self._keep_monitoring:
if pd is None:
return self._data[-1]
else:
return self._data.iloc[[-1]]
else:
vals = self.read_data_raw(max_requests=max_requests)
# If pandas is available - return pandas.DataFrame
if pd is not None:
vals = pd.DataFrame({'co2': vals[1], 'temp': vals[2]},
index=[vals[0]])
return vals
#########################################################################
def _monitoring(self):
""" Private function for continuous monitoring.
"""
with self.co2hid(send_magic_table=True):
while self._keep_monitoring:
vals = self._read_co2_temp(max_requests=1000)
if pd is None:
self._data.append(vals)
else:
vals = pd.DataFrame({'co2': vals[1], 'temp': vals[2]},
index=[vals[0]])
self._data = pd.concat([self._data, vals])
time.sleep(self._interval)
def start_monitoring(self, interval=5):
""" Start continuous monitoring of the values and collecting them
in the list / pandas.DataFrame.
The monitoring is started in a separate thread, so the current
interpreter session is not blocked.
Parameters
----------
interval : float
Interval in seconds between consecutive data reads
"""
self._interval = interval
if self._keep_monitoring:
# If already started then we should not start a new thread
return
self._keep_monitoring = True
t = threading.Thread(target=self._monitoring)
t.start()
def stop_monitoring(self):
""" Stop continuous monitoring
"""
self._keep_monitoring = False
#########################################################################
@property
def data(self):
""" All data retrieved with continuous monitoring
"""
return self._data
def log_data_to_csv(self, fname):
""" Log data retrieved with continuous monitoring to CSV file. If the
file already exists, then it will be appended.
Note, that the method requires pandas package (so far alternative
is not implemented).
Parameters
----------
fname : string
Filename
"""
if pd is None:
raise NotImplementedError('Logging to CSV is implemented '
'using pandas package only (so far)')
if os.path.isfile(fname):
# Check the last line to get the timestamp of the last record
df = pd.read_csv(fname)
last = pd.Timestamp(df.iloc[-1, 0])
# Append only new data
with open(fname, 'a') as f:
self._data[self._data.index > last].to_csv(f, header=False)
else:
self._data.to_csv(fname)
#############################################################################
def read_csv(fname):
""" Read data from CSV file.
Parameters
----------
fname : string
Filename
"""
if pd is None:
raise NotImplementedError('Reading CSV files is implemented '
'using pandas package only (so far)')
return pd.read_csv(fname, index_col=0, parse_dates=0)
#############################################################################
def plot(data, plot_temp=False, ewma_halflife=30., **kwargs):
""" Plot recorded data
Parameters
----------
data : pandas.DataFrame
Data indexed by timestamps. Should have columns 'co2' and 'temp'
plot_temp : bool
If True temperature will be also plotted
ewma_halflife : float
If specified (not None) data will be smoothed using EWMA
"""
global plt
if plt is None:
import matplotlib.pyplot as _plt
plt = _plt
if pd is None:
raise NotImplementedError('Plotting is implemented so far '
'using pandas package only')
# DataFrames
if (ewma_halflife is not None) and (ewma_halflife > 0):
halflife = pd.Timedelta(ewma_halflife, 's') / np.mean(np.diff(data.index))
co2 = pd.ewma(data.co2, halflife=halflife, min_periods=0)
temp = pd.ewma(data.temp, halflife=2 * halflife, min_periods=0)
else:
co2 = data.co2
temp = data.temp
co2_r = co2.copy()
co2_g = co2.copy()
co2_r[co2_r <= CO2_HIGH] = np.NaN
co2_g[co2_g >= CO2_LOW] = np.NaN
# Plotting
ax = kwargs.pop('ax', plt.gca())
ax.fill_between(co2_r.index, co2_r.values, CO2_HIGH,
alpha=0.5, color=_COLORS['r'])
ax.fill_between(co2_g.index, co2_g.values, CO2_LOW,
alpha=0.5, color=_COLORS['g'])
ax.axhline(CO2_LOW, color=_COLORS['g'], lw=2, ls='--')
ax.axhline(CO2_HIGH, color=_COLORS['r'], lw=2, ls='--')
ax.plot(co2.index, co2.values, lw=2, color='k')
yl = ax.get_ylim()
ax.set_ylim([min(600, yl[0]), max(1400, yl[1])])
ax.set_ylabel('CO2 concentration, ppm')
plt.setp(ax.xaxis.get_majorticklabels(), rotation=0,
horizontalalignment='center')
if plot_temp:
ax2 = ax.twinx()
ax2.plot(temp.index, temp.values, color=_COLORS['b'])
ax2.set_ylabel('Temperature, °C')
yl = ax2.get_ylim()
ax2.set_ylim([min(19, yl[0]), max(23, yl[1])])
ax2.grid('off')
plt.tight_layout()
#############################################################################
# Entry points
#############################################################################
def start_homekit():
from .homekit import start_homekit as start
start()
def start_server():
from .server import start_server as start
start()
def start_server_homekit():
from .server import start_server_homekit as start
start()