-
Notifications
You must be signed in to change notification settings - Fork 0
/
aqi_py3_win.py
159 lines (128 loc) · 4.1 KB
/
aqi_py3_win.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
#!/usr/bin/python -u
# coding=utf-8
# "DATASHEET": http://cl.ly/ekot
# https://gist.github.com/kadamski/92653913a53baf9dd1a8
from __future__ import print_function
import serial, struct, sys, time, json, subprocess, binascii
DEBUG = 1
CMD_MODE = 2
CMD_QUERY_DATA = 4
CMD_DEVICE_ID = 5
CMD_SLEEP = 6
CMD_FIRMWARE = 7
CMD_WORKING_PERIOD = 8
MODE_ACTIVE = 0
MODE_QUERY = 1
PERIOD_CONTINUOUS = 0
JSON_FILE = '/var/www/html/aqi.json'
MQTT_HOST = ''
MQTT_TOPIC = '/weather/particulatematter'
ser = serial.Serial()
ser.port = "/dev/ttyUSB0" # "COM6"
ser.baudrate = 9600
ser.open()
ser.flushInput()
byte, data = 0, ""
def dump(d, prefix=''):
print(prefix + ' '.join(str(x) for x in d))
def construct_command(cmd, data=[]):
assert len(data) <= 12
data += [0,]*(12-len(data))
checksum = (sum(data)+cmd-2)%256
ret = b"\xaa\xb4" + chr(cmd).encode()
ret = ret + b''.join(chr(x).encode() for x in data)
ret = ret + b"\xff\xff" + chr(checksum).encode() + b"\xab"
if DEBUG:
dump(ret, '> ')
return ret #bytes(ret, 'utf-8')
def process_data(d):
r = struct.unpack(u'<HHxxBB', d[2:])
pm25 = r[0]/10.0
pm10 = r[1]/10.0
checksum = sum( (v) for v in d[2:8])%256
print("PM 2.5: {} μg/m^3 PM 10: {} μg/m^3 CRC={}".format(pm25, pm10, "OK" if (checksum==r[2] and r[3]==0xab) else "NOK"))
return [pm25, pm10]
def ov(v):
try:
return ord(v)
except:
return ord(str(v))
def process_version(d):
r = struct.unpack(u'<BBBHBB', d[3:])
checksum = sum((v) for v in d[2:8])%256
print("Y: {}, M: {}, D: {}, ID: {}, CRC={}".format(r[0], r[1], r[2], hex(r[3]), "OK" if (checksum==r[4] and r[5]==0xab) else "NOK"))
def read_response():
byte = 0
while byte != b"\xaa":
byte = ser.read(size=1)
d = ser.read(size=9)
if DEBUG:
dump(d, '< ')
return byte + d
def cmd_set_mode(mode=MODE_QUERY):
ser.write(construct_command(CMD_MODE, [0x1, mode]))
read_response()
def cmd_query_data():
ser.write(construct_command(CMD_QUERY_DATA))
d = read_response()
values = []
if d[1] == 192: # b"\xc0":
values = process_data(d)
return values
def cmd_set_sleep(sleep):
mode = 0 if sleep else 1
ser.write(construct_command(CMD_SLEEP, [0x1, mode]))
read_response()
def cmd_set_working_period(period):
ser.write(construct_command(CMD_WORKING_PERIOD, [0x1, period]))
read_response()
def cmd_firmware_ver():
ser.write(construct_command(CMD_FIRMWARE))
d = read_response()
process_version(d)
def cmd_set_id(id):
id_h = (id>>8) % 256
id_l = id % 256
ser.write(construct_command(CMD_DEVICE_ID, [0]*10+[id_l, id_h]))
read_response()
def pub_mqtt(jsonrow):
cmd = ['mosquitto_pub', '-h', MQTT_HOST, '-t', MQTT_TOPIC, '-s']
print('Publishing using:', cmd)
with subprocess.Popen(cmd, shell=False, bufsize=0, stdin=subprocess.PIPE).stdin as f:
json.dump(jsonrow, f)
def init():
cmd_set_sleep(0)
cmd_set_working_period(PERIOD_CONTINUOUS)
cmd_set_mode(MODE_QUERY)
if __name__ == "__main__":
cmd_set_sleep(0)
cmd_firmware_ver()
cmd_set_working_period(PERIOD_CONTINUOUS)
cmd_set_mode(MODE_QUERY);
while True:
cmd_set_sleep(0)
for t in range(25):
values = cmd_query_data();
if values is not None and len(values) == 2:
print("PM2.5: ", values[0], ", PM10: ", values[1])
time.sleep(2)
# open stored data
try:
with open(JSON_FILE) as json_data:
data = json.load(json_data)
except IOError as e:
data = []
# check if length is more than 100 and delete first element
if len(data) > 100:
data.pop(0)
# append new values
jsonrow = {'pm25': values[0], 'pm10': values[1], 'time': time.strftime("%d.%m.%Y %H:%M:%S")}
data.append(jsonrow)
# save it
with open(JSON_FILE, 'w') as outfile:
json.dump(data, outfile)
if MQTT_HOST != '':
pub_mqtt(jsonrow)
print("Going to sleep for 30secs...")
cmd_set_sleep(1)
time.sleep(30)