-
Notifications
You must be signed in to change notification settings - Fork 60
/
aircat.py
275 lines (234 loc) · 9.57 KB
/
aircat.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
#!/usr/bin/env python
# encoding: utf-8
import json
import socket
import select
import logging
import time
_LOGGER = logging.getLogger(__name__)
class AirCatData():
"""Class for handling the data retrieval."""
def __init__(self, hass, macs, brightness_force_update):
"""Initialize the data object."""
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._socket.settimeout(1)
self._socket.bind(('', 9000)) # aircat.phicomm.com
self._socket.listen(5)
self._rlist = [self._socket]
self.devs = {}
self._hass = hass
self._macs = macs
self._brightness_force_update = brightness_force_update
self._last_brightness = {}
for mac, brightness_selection in macs.items():
self._last_brightness[mac] = ""
def shutdown(self):
"""Shutdown."""
if self._socket is not None:
#_LOGGER.debug("Socket shutdown")
self._socket.close()
self._socket = None
def loop(self):
while True:
self.update(None) # None = wait forever
def update(self, timeout=0): # 0 = return right now
rfd,wfd,efd = select.select(self._rlist, [], [], timeout)
for fd in rfd:
try:
if fd is self._socket:
conn, addr = self._socket.accept()
_LOGGER.debug('Connected %s', addr)
self._rlist.append(conn)
conn.settimeout(1)
else:
self.handle(fd)
except:
import traceback
_LOGGER.error('Exception: %s', traceback.format_exc())
def handle(self, conn):
"""Handle connection."""
data = conn.recv(4096) # If connection is closed, recv() will result a timeout exception and receive '' next time, so we can purge connection list
if not data:
_LOGGER.error('Closed %s', conn)
self._rlist.remove(conn)
conn.close()
return
if data.startswith(b'GET'):
_LOGGER.debug('Request from HTTP -->\n%s', data)
conn.sendall(b'HTTP/1.0 200 OK\nContent-Type: text/json\n\n' +
json.dumps(self.devs, indent=2).encode('utf-8'))
self._rlist.remove(conn)
conn.close()
return
end = data.rfind(b'\xff#END#')
payload = data.rfind(b'{', 0, end)
if payload == -1:
payload = end
if payload < 28: # begin(17) + mac(6)+size(5) + payload(0~) + end(6)
_LOGGER.error('Received invalid %s', data)
return
mac = ''
if payload != end:
try:
mac = ''.join(['%02X' % (x if isinstance(x, int) else ord(x)) for x in data[payload-11:payload-5]])
jsonStr = data[payload:end].decode('utf-8')
attributes = json.loads(jsonStr)
self.devs[mac] = attributes
_LOGGER.debug('Received %s: %s', mac, attributes)
except:
_LOGGER.error('Received invalid JSON: %s', data)
if len(mac) > 0 and len(self._macs.get(mac)):
_LOGGER.debug('mac:%s, name:%s', mac, self._macs.get(mac))
_LOGGER.debug('brightness %s: last %s', self._hass.states.get("input_select." + self._macs.get(mac)).state, self._last_brightness.get(mac))
brightness = self._hass.states.get("input_select." + self._macs.get(mac)).state
if self._last_brightness.get(mac).find(brightness) != -1 and not ( self._brightness_force_update and (int(time.time())%300 == 0) ):
response = data[payload-28:payload-5] + b'\x00\x18\x00\x00\x02{"type":5,"status":1}\xff#END#'
else:
self._last_brightness[mac] = brightness
if brightness.find("关闭") != -1:
response = data[payload-28:payload-5] + b'\x00\x18\x00\x00\x02{"brightness":"%b","type":2}\xff#END#' % str(round(float(0))).encode('utf8')
else:
if brightness.find("夜间") != -1:
response = data[payload-28:payload-5] + b'\x00\x18\x00\x00\x02{"brightness":"%b","type":2}\xff#END#' % str(round(float(25))).encode('utf8')
else:
response = data[payload-28:payload-5] + b'\x00\x18\x00\x00\x02{"brightness":"%b","type":2}\xff#END#' % str(round(float(50))).encode('utf8')
_LOGGER.debug('mac:%s, Response %s', mac, response)
else:
response = data[payload-28:payload-5] + b'\x00\x18\x00\x00\x02{"type":5,"status":1}\xff#END#'
_LOGGER.debug('mac:%s, Response %s', mac, response)
conn.sendall(response)
if __name__ == '__main__':
_LOGGER.setLevel(logging.DEBUG)
_LOGGER.addHandler(logging.StreamHandler())
aircat = AirCatData()
try:
aircat.loop()
except KeyboardInterrupt:
pass
aircat.shutdown()
exit(0)
"""
Support for AirCat air sensor.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/sensor.aircat/
"""
import voluptuous as vol
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import CONF_NAME, CONF_MAC, CONF_SENSORS, TEMP_CELSIUS
from homeassistant.helpers.entity import Entity
from homeassistant.helpers import config_validation as cv
SENSOR_PM25 = 'value'
SENSOR_HCHO = 'hcho'
SENSOR_TEMPERATURE = 'temperature'
SENSOR_HUMIDITY = 'humidity'
CONF_BFU = "brightness_force_update"
DEFAULT_NAME = 'AirCat'
DEFAULT_SENSORS = [SENSOR_PM25, SENSOR_HCHO,
SENSOR_TEMPERATURE, SENSOR_HUMIDITY]
SENSOR_MAP = {
SENSOR_PM25: ('PM2.5', 'μg/m³', 'blur'),
SENSOR_HCHO: ('HCHO', 'mg/m³', 'biohazard'),
SENSOR_TEMPERATURE: ('Temperature', TEMP_CELSIUS, 'thermometer'),
SENSOR_HUMIDITY: ('Humidity', '%', 'water-percent')
}
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_MAC): dict,
vol.Optional(CONF_BFU, default=False): cv.boolean,
vol.Optional(CONF_SENSORS, default=DEFAULT_SENSORS):
vol.All(cv.ensure_list, vol.Length(min=1), [vol.In(SENSOR_MAP)]),
})
AIRCAT_SENSOR_THREAD_MODE = True # True: Thread mode, False: HomeAssistant update/poll mode
def setup_platform(hass, config, add_devices, discovery_info=None):
"""Set up the AirCat sensor."""
name = config[CONF_NAME]
macs = config[CONF_MAC]
sensors = config[CONF_SENSORS]
brightness_force_update = config[CONF_BFU]
aircat = AirCatData(hass, macs, brightness_force_update)
count = len(macs)
if AIRCAT_SENSOR_THREAD_MODE:
import threading
threading.Thread(target=aircat.loop).start()
else:
AirCatSensor.times = 0
AirCatSensor.interval = len(sensors) * count
devices = []
index = 0
for mac, brightness in macs.items():
for sensor_type in sensors:
_LOGGER.debug("add sensor for mac:%s", mac)
devices.append(AirCatSensor(aircat,
name + str(index + 1) if index else name,
mac, sensor_type))
index += 1
add_devices(devices)
class AirCatSensor(Entity):
"""Implementation of a AirCat sensor."""
def __init__(self, aircat, name, mac, sensor_type):
"""Initialize the AirCat sensor."""
sensor_name, unit, icon = SENSOR_MAP[sensor_type]
self._name = name + ' ' + sensor_name
self._mac = mac
self._sensor_type = sensor_type
self._unit = unit
self._icon = 'mdi:' + icon
self._aircat = aircat
@property
def name(self):
"""Return the name of the sensor."""
return self._name
@property
def icon(self):
"""Return the icon of the sensor."""
return self._icon
@property
def unit_of_measurement(self):
"""Return the unit the value is expressed in."""
return self._unit
@property
def available(self):
"""Return if the sensor data are available."""
return self.attributes is not None
@property
def state(self):
"""Return the state of the device."""
attributes = self.attributes
if attributes is None:
return None
state = attributes[self._sensor_type]
if self._sensor_type == SENSOR_PM25:
return state
elif self._sensor_type == SENSOR_HCHO:
return float(state) / 1000
else:
return round(float(state), 1)
@property
def device_state_attributes(self):
"""Return the state attributes."""
return self.attributes if self._sensor_type == SENSOR_PM25 else None
@property
def attributes(self):
"""Return the attributes of the device."""
if self._mac:
return self._aircat.devs.get(self._mac)
for mac in self._aircat.devs:
return self._aircat.devs[mac]
return None
def update(self):
"""Update state."""
if AIRCAT_SENSOR_THREAD_MODE:
#_LOGGER.debug("Running in thread mode")
return
if AirCatSensor.times % AirCatSensor.interval == 0:
_LOGGER.debug("Begin update %d: %s %s", AirCatSensor.times,
self._mac, self._sensor_type)
self._aircat.update()
_LOGGER.debug("Ended update %d: %s %s", AirCatSensor.times,
self._mac, self._sensor_type)
AirCatSensor.times += 1
def shutdown(self, event):
"""Signal shutdown."""
#_LOGGER.debug('Shutdown')
self._aircat.shutdown()