-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
mqtt_publisher.go
205 lines (184 loc) · 6.11 KB
/
mqtt_publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
package mqttpub
import (
"crypto/tls"
"fmt"
"strings"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
goVersion "github.com/jnovack/go-version"
"github.com/namsral/flag"
"github.com/rs/zerolog/log"
)
var (
// Client is an interface to the MQTT client
Client mqtt.Client
// Flag variables
endpoint = flag.String("mqtt_endpoint", "tcp://mosquitto:1883", "mosquitto message broker endpoint")
clientid = flag.String("mqtt_clientid", "random", "mqtt client id")
username = flag.String("mqtt_username", "", "username for mqtt authentication")
password = flag.String("mqtt_password", "", "password for mqtt authentication")
certFile = flag.String("mqtt_certfile", "", "certificate (in pem format) for mqtt authentication")
keyFile = flag.String("mqtt_keyfile", "", "private key (in pem format) for mqtt authentication")
)
func init() {
log.Logger = log.With().Str("component", "mqttpub").Logger()
}
// Noop is for testing without re-writing
func Noop() mqtt.Client {
return mqtt.NewClient(mqtt.NewClientOptions())
}
// Connect does nothing apparently
func Connect() mqtt.Client {
opts := mqtt.NewClientOptions()
opts.SetCleanSession(true)
opts.AddBroker(*endpoint)
// set client id if it is not random
if *clientid != "random" {
opts.SetClientID(*clientid)
} else {
opts.SetClientID(fmt.Sprintf("%s_%v", goVersion.Application, time.Now().Unix()))
}
// if you have a username you'll need a password with it
if *username != "" {
opts.SetUsername(*username)
if *password != "" {
opts.SetPassword(*password)
}
}
// if you have a client certificate you want a key aswell
if *certFile != "" && *keyFile != "" {
keyPair, err := tls.LoadX509KeyPair(*certFile, *keyFile)
if err != nil {
log.Err(err).Msg("Failed to load certificate/keypair")
}
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{keyPair},
InsecureSkipVerify: true,
ClientAuth: tls.NoClientCert,
}
opts.SetTLSConfig(tlsConfig)
if !strings.HasPrefix(*endpoint, "ssl://") &&
!strings.HasPrefix(*endpoint, "tls://") {
log.Warn().Msg("Warning: To use TLS the endpoint URL will have to begin with 'ssl://' or 'tls://'")
}
} else if (*certFile != "" && *keyFile == "") ||
(*certFile == "" && *keyFile != "") {
log.Warn().Msg("Warning: For TLS to work both certificate and private key are needed. Skipping TLS.")
}
opts.OnConnect = func(client mqtt.Client) {
log.Info().Msgf("Connected to %s", *endpoint)
// subscribe on every (re)connect
// may be useful later for "poll now"?
// token := client.Subscribe("$SYS/#", 0, func(_ mqtt.Client, msg mqtt.Message) {
// processUpdate(msg.Topic(), string(msg.Payload()))
// })
// if !token.WaitTimeout(10 * time.Second) {
// log.Error().Msg("Error: Timeout subscribing to topic $SYS/#")
// }
// if err := token.Error(); err != nil {
// log.Error().Msgf("Failed to subscribe to topic $SYS/#: %s", err)
// }
}
opts.OnConnectionLost = func(client mqtt.Client, err error) {
log.Warn().Msgf("Warning: Connection to %s lost: %s", *endpoint, err)
}
client := mqtt.NewClient(opts)
// launch the first connection in another thread so it is no blocking
// and exporter can serve metrics in case of no connection
go mqttConnect(client)
return client
}
// try to connect forever with the MQTT broker
func mqttConnect(client mqtt.Client) {
// try to connect forever
for {
token := client.Connect()
log.Info().Str("endpoint", *endpoint).Msg("Attempting to connect to mosquitto endpoint")
if token.WaitTimeout(5 * time.Second) {
if token.Error() == nil {
break
}
log.Error().Err(token.Error()).Str("endpoint", *endpoint).Msg("Failed to connect to mosquitto endpoint")
} else {
log.Error().Str("endpoint", *endpoint).Msg("Timeout connecting to mosquitto endpoint")
}
time.Sleep(5 * time.Second)
}
}
// process the messages received in $SYS/
/*
func processUpdate(topic, payload string) {
//log.Debugf("Got broker update with topic %s and data %s", topic, payload)
if _, ok := ignoreKeyMetrics[topic]; !ok {
if _, ok := counterKeyMetrics[topic]; ok {
log.Debug().Str("topic", topic).Str("payload", payload).Msg("Processing counter metric")
processCounterMetric(topic, payload)
} else {
log.Debug().Str("topic", topic).Str("payload", payload).Msg("Processing gauge metric")
processGaugeMetric(topic, payload)
}
// restartSecondsSinceLastUpdate()
} else {
log.Debug().Str("topic", topic).Str("payload", payload).Msg("Ignoring metric")
}
}
*/
/*
func processCounterMetric(topic, payload string) {
// if counterMetrics[topic] != nil {
// value := parseValue(payload)
// counterMetrics[topic]. .Set(value)
// } else {
// // create a mosquitto counter pointer
// mCounter := NewMosquittoCounter(prometheus.NewDesc(
// parseForPrometheus(topic),
// topic,
// []string{},
// prometheus.Labels{},
// ))
// // register the metric
// prometheus.MustRegister(mCounter)
// // add the first value
// value := parseValue(payload)
// counterMetrics[topic].Set(value)
// }
}
func processGaugeMetric(topic, payload string) {
if gaugeMetrics[topic] == nil {
gaugeMetrics[topic] = prometheus.NewGauge(prometheus.GaugeOpts{
Name: parseForPrometheus(topic),
Help: topic,
ConstLabels: prometheus.Labels{"broker": *endpoint},
})
// register the metric
prometheus.MustRegister(gaugeMetrics[topic])
// // add the first value
}
value := parseValue(payload)
gaugeMetrics[topic].Set(value)
}
*/
/*
func parseForPrometheus(incoming string) string {
outgoing := strings.Replace(incoming, "$SYS", "mqtt", 1)
outgoing = strings.Replace(outgoing, "/", "_", -1)
outgoing = strings.Replace(outgoing, " ", "_", -1)
outgoing = strings.Replace(outgoing, "-", "_", -1)
outgoing = strings.Replace(outgoing, ".", "_", -1)
return outgoing
}
func parseValue(payload string) float64 {
// fmt.Printf("Payload %s \n", payload)
var validValue = regexp.MustCompile(`-?\d{1,}[.]\d{1,}|\d{1,}`)
// get the first value of the string
strArray := validValue.FindAllString(payload, 1)
if len(strArray) > 0 {
// parse to float
value, err := strconv.ParseFloat(strArray[0], 64)
if err == nil {
return value
}
}
return 0
}
*/