This repository has been archived by the owner on Dec 14, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 279
/
mqtt.go
103 lines (90 loc) · 2.71 KB
/
mqtt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
// Copyright © 2017 The Things Network
// Use of this source code is governed by the MIT license that can be found in the LICENSE file.
package handler
import (
"time"
ttnlog "github.com/TheThingsNetwork/go-utils/log"
"github.com/TheThingsNetwork/ttn/core/types"
"github.com/TheThingsNetwork/ttn/mqtt"
)
// MQTTTimeout indicates how long we should wait for an MQTT publish
var MQTTTimeout = 2 * time.Second
// MQTTBufferSize indicates the size for uplink channel buffers
var MQTTBufferSize = 10
func (h *handler) HandleMQTT(username, password string, mqttBrokers ...string) error {
h.mqttClient = mqtt.NewClient(h.Ctx, "ttnhdl", username, password, mqttBrokers...)
err := h.mqttClient.Connect()
if err != nil {
return err
}
h.mqttUp = make(chan *types.UplinkMessage, MQTTBufferSize)
h.mqttEvent = make(chan *types.DeviceEvent, MQTTBufferSize)
token := h.mqttClient.SubscribeDownlink(func(client mqtt.Client, appID string, devID string, msg types.DownlinkMessage) {
down := &msg
down.DevID = devID
down.AppID = appID
go h.EnqueueDownlink(down)
})
token.Wait()
if token.Error() != nil {
return err
}
ctx := h.Ctx.WithField("Protocol", "MQTT")
go func() {
for up := range h.mqttUp {
ctx := ctx.WithFields(ttnlog.Fields{
"DevID": up.DevID,
"AppID": up.AppID,
})
ctx.Debug("Publish Uplink")
upToken := h.mqttClient.PublishUplink(*up)
go func(ctx ttnlog.Interface) {
if upToken.WaitTimeout(MQTTTimeout) {
if upToken.Error() != nil {
ctx.WithError(upToken.Error()).Warn("Could not publish Uplink")
}
} else {
ctx.Warn("Uplink publish timeout")
}
}(ctx)
if len(up.PayloadFields) > 0 {
fieldsToken := h.mqttClient.PublishUplinkFields(up.AppID, up.DevID, up.PayloadFields)
go func(ctx ttnlog.Interface) {
if fieldsToken.WaitTimeout(MQTTTimeout) {
if fieldsToken.Error() != nil {
ctx.WithError(fieldsToken.Error()).Warn("Could not publish Uplink Fields")
}
} else {
ctx.Warn("Uplink Fields publish timeout")
}
}(ctx)
}
}
}()
go func() {
for event := range h.mqttEvent {
ctx := ctx.WithFields(ttnlog.Fields{
"DevID": event.DevID,
"AppID": event.AppID,
"Event": event.Event,
})
ctx.Debug("Publish Event")
var token mqtt.Token
if event.DevID == "" {
token = h.mqttClient.PublishAppEvent(event.AppID, event.Event, event.Data)
} else {
token = h.mqttClient.PublishDeviceEvent(event.AppID, event.DevID, event.Event, event.Data)
}
go func() {
if token.WaitTimeout(MQTTTimeout) {
if token.Error() != nil {
ctx.WithError(token.Error()).Warn("Could not publish Event")
}
} else {
ctx.Warn("Event publish timeout")
}
}()
}
}()
return nil
}