-
Notifications
You must be signed in to change notification settings - Fork 14
/
mqtt.go
112 lines (98 loc) · 3.7 KB
/
mqtt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
/*
* skogul, mqtt-receiver
*
* Copyright (c) 2019-2020 Telenor Norge AS
* Author(s):
* - Kristian Lyngstøl <kly@kly.no>
* - Håkon Solbjørg <hakon.solbjorg@telenor.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301 USA
*/
package receiver
import (
"fmt"
"time"
"github.com/telenornms/skogul"
skmqtt "github.com/telenornms/skogul/internal/mqtt"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
var mqttLog = skogul.Logger("receiver", "mqtt")
/*
MQTT connects to a MQTT broker and listens for messages on a topic.
*/
type MQTT struct {
Broker string `doc:"Address of broker to connect to." example:"[::1]:8888"`
Topics []string `doc:"List of topics to subscribe to"`
Handler *skogul.HandlerRef `doc:"Handler used to parse, transform and send data."`
Password string `doc:"Username for authenticating to the broker."`
Username string `doc:"Password for authenticating."`
ClientID string `doc:"Custom client id to use (default: random)"`
RenewClientID bool `doc:"Renew the client ID on reconnects ([MQTT-3.1.4-2] @ https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc384800405)"`
DisplayMQTTLogs bool
mc skmqtt.MQTT
}
func appendTopic(container *skogul.Container, topic string) {
for _, metric := range container.Metrics {
if metric.Metadata == nil {
metric.Metadata = make(map[string]interface{})
}
metric.Metadata["_mqtt_topic"] = topic
}
}
// Handle a received message.
func (handler *MQTT) receiver(msg mqtt.Message) {
container, err := handler.Handler.H.Parse(msg.Payload())
if err != nil {
mqttLog.WithError(err).Error("Failed to parse payload from MQTT message")
return
}
appendTopic(container, msg.Topic())
err = handler.Handler.H.TransformAndSend(container)
if err != nil {
mqttLog.WithError(err).Error("Error during transform or send container")
}
}
// Start MQTT receiver.
func (handler *MQTT) Start() error {
handler.mc.MQTTLogs = handler.DisplayMQTTLogs
handler.mc.RenewClientID = handler.RenewClientID
handler.mc.Init(handler.Broker, handler.Username, handler.Password, handler.ClientID)
for _, topic := range handler.Topics {
handler.mc.Subscribe(topic, handler.receiver)
}
mqttLog.WithField("address", handler.Broker).Debug("Starting MQTT receiver")
handler.mc.Connect()
// Note that handler.listen() DOES return, because it only sets up
// subscriptions. This sillyness is to satisfy the requirement that
// Start() never returns. It should PROBABLY be more sensible.
timer := time.NewTicker(10 * time.Second)
for range timer.C {
}
return fmt.Errorf("unreachable")
}
// Verify makes sure required configuration options are set
func (handler *MQTT) Verify() error {
if handler.Broker == "" {
return skogul.MissingArgument("Broker")
}
if handler.Topics == nil {
return skogul.MissingArgument("Topics")
}
if handler.RenewClientID && handler.ClientID != "" {
mqttLog.Warning("RenewClientID AND ClientID is set - ClientID will change!")
}
return nil
}