mqspeak - MQTT bridge
mqspeak is MQTT client which collect data and transforms them into ThingSpeak channel updates or Phant data streams. It is able to handle multiple MQTT connections and independetly update multiple channels.
This is part of my IoT project. You can read more about it on my blog.
Application can be installed with following command:
$ sudo pip3 install mqspeak
mqspeak is configured using configuration file specified with
/etc/mqspeak.conf). This is sample configuration file:
[Brokers] Enabled = temperature-broker humidity-broker door-broker [temperature-broker] Host = temperatureBrokerHostname Port = 1883 User = brokerUser Password = brokerPass Topic = sensors/temperature sensors/something [humidity-broker] Host = humidityBrokerHostname Port = 1883 User = brokerUser Password = brokerPass Topic = sensors/humidity [door-broker] Host = doorBrokerHostname Port = 1883 User = brokerUser Password = brokerPass Topic = # [Channels] Enabled = channel1 channel2 channel3 channel4 [channel1] Id = CHANNELID Key = CHANNELKEY Type = thingspeak UpdateRate = 15 UpdateType = blackout UpdateFields = dht-update [channel2] Id = CHANNELID Key = CHANNELKEY Type = thingspeak UpdateRate = 15 UpdateType = buffered UpdateFields = dht-update [channel3] Id = CHANNELID Key = CHANNELKEY Type = thingspeak UpdateRate = 15 UpdateType = average UpdateFields = dht-update [channel4] Id = CHANNELID Key = CHANNELKEY Type = phant UpdateRate = 15 UpdateType = onchange UpdateFields = door-update [dht-update] field1 = humidity-broker sensors/humidity field2 = temperature-broker sensors/temperature [door-update] state = door-broker sensors/door
Configuration file has two mandatory sections:
[Channels], each with
Enabled option. These options contains space separated broker and channel
Broker section has to define one mandatory
[Topic] option, which is space separated
list of MQTT topic subscriptions. Full list of possible options in broker section:
Host- Broker IP address or hostname (default 127.0.0.1).
Port- Broker port (default 1883).
Topic- Space separated list of topic subscriptions. Mandatory option.
Each channel section has to define
Id- Channel ID. This field is mandatory for Phant channels.
Key- Channel API write key. Mandatory option.
Type- Specify channel type. Mandatory option. Following types are supported:
UpdateRate- Channel update interval in seconds. Currently, ThinkSpeak allows interval 15 seconds or greater. Mandatory option.
WaitInterval- Maximum interval to wait for remaining data to arrive. When set to zero, wait forever (default). See Update waiting for more details.
UpdateType- Channel update type. Possible values are
onchange. Mandatory option.
UpdateRateinterval is expired, any incoming data are ignored. First data received after interval expiration are sent to ThingSpeak.
buffered- Incoming data are buffered during
UpdateRateinterval. After this interval expires, most recent values are immediately sent.
average- Similar to
bufferedbut mqspeak calculates average value of these data. Any data which cannot be converted into real numbers are ignored. Channel is immediately updated after
UpdateRateinterval is expired.
onchange- Data are marked with timestamp and stored in queue. Each item is sent after
UpdateRateinterval expires. Not implemented yet.
UpdateFields- Specify section which defines updates for this channel. Mandatory option.
When channel update consists of data from multiple sensors, it may happen that one sensor die. By default channel never will be updated until data from all sensors arrives. Inactive sensor causes channel update will be stalled.
When update waiting enabled, mqspeak will wait defined amount of seconds and then sends out even incomplete channel update.
Waiting scenario can be divided into following cases:
UpdateRatecondition is met but there are no data. Wait mechanism is not activated until some data arrives. After it received first part of channel update, mqspeak will wait defined time and tries collect remaining data. After
WaitIntervalexpires, data will be send.
- Partial data arrives before
UpdateRatecondition is met. Waiting is delayed until
UpdateRatecondition is met. After it expires and there are still not all required data, waiting is triggered. After
WaitIntervalexpires, data will be send.
- All required data are collected before
UpdateRatecondition is met. There is no need to activate update waiting. Simply send data.
UpdateFields section consists of any number of options. Each option key specifies field name. Its value must be space separated name of broker section and topic.
For ThinkSpeak channel, only option keys
Field8 are valid.
- mqspeak runs in foreground only. - Yes, there is no double fork combo to run mqspeak in background. I use systemd init and I prefer to run all services as simple systemd units, which runs in foreground. Sorry about that.
- It uses python3. Is python 2.x supported? - No, I don't plan to support python 2.x.