This repository has been archived by the owner on Mar 1, 2023. It is now read-only.
forked from danielztolnai/zendisplay
-
Notifications
You must be signed in to change notification settings - Fork 0
/
luminance_mqtt.py
50 lines (41 loc) · 1.63 KB
/
luminance_mqtt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
"""Get or provide ambient lighting data through mqtt"""
from luminance_sources import LuminanceSource
class LuminanceMQTT(LuminanceSource):
"""Get ambient lighting information from MQTT"""
import paho.mqtt.client as mqtt
PARAMETER_MQTT_HOST = 'host'
PARAMETER_MQTT_TOPIC = 'topic'
def __init__(self, name=None, path=None, host=None):
super().__init__(name, path)
self.luminance = 0
# Create MQTT client
self.client = self.mqtt.Client()
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.connect(host, 1883, 60)
@classmethod
def detect(cls, parameters):
"""Add given MQTT configuration"""
topic = parameters[cls.PARAMETER_MQTT_TOPIC]
host = parameters[cls.PARAMETER_MQTT_HOST]
if host is None:
return
yield cls(name="mqtt", path=topic, host=host)
def get_luminance(self):
"""Get last luminance data received"""
return self.luminance
def enable(self):
"""Enable the source"""
self.client.loop_start()
def disable(self):
"""Disable the source"""
self.client.loop_stop(force=False)
def on_connect(self, _1, _2, _3, _4):
"""Called when connection is established to the MQTT server"""
self.client.subscribe(self.path)
def on_message(self, _1, _2, msg):
"""Called when message is received from the MQTT server"""
self.luminance = int(msg.payload)
def publish(self, value):
"""Publish given data to the topic of the object"""
self.client.publish(self.path, value)