-
Notifications
You must be signed in to change notification settings - Fork 152
/
mqtt.go
110 lines (97 loc) · 2.31 KB
/
mqtt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package mqtt
import (
"context"
"time"
"github.com/influxdata/flux"
"github.com/influxdata/flux/codes"
"github.com/influxdata/flux/dependencies/mqtt"
"github.com/influxdata/flux/internal/errors"
"github.com/influxdata/flux/values"
)
const (
DefaultConnectMQTTTimeout = 1 * time.Second
DefaultClientID = "flux-mqtt"
)
type CommonMQTTOpSpec struct {
Broker string `json:"broker"`
ClientID string `json:"clientid"`
Username string `json:"username"`
Password string `json:"password"`
QoS int64 `json:"qos"`
Retain bool `json:"retain"`
Timeout time.Duration `json:"timeout"`
NoKeepAlive bool `json:"noKeepAlive"`
}
func (o *CommonMQTTOpSpec) ReadArgs(args flux.Arguments) error {
broker, err := args.GetRequiredString("broker")
if err != nil {
return err
}
o.Broker = broker
clientID, ok, err := args.GetString("clientid")
if err != nil {
return err
}
if ok {
o.ClientID = clientID
} else {
o.ClientID = DefaultClientID
}
username, ok, err := args.GetString("username")
if err != nil {
return err
}
if ok {
password, ok, err := args.GetString("password")
if err != nil {
return err
}
if !ok {
return errors.New(codes.Invalid, "password required with username")
}
o.Username = username
o.Password = password
}
qos, ok, err := args.GetInt("qos")
if err != nil {
return err
}
if !ok || qos < 0 || qos > 3 {
o.QoS = 0
} else {
o.QoS = qos
}
retain, ok, err := args.GetBool("retain")
if err != nil {
return err
}
o.Retain = ok && retain
timeout, ok, err := args.GetDuration("timeout")
if err != nil {
return err
}
if !ok {
o.Timeout = DefaultConnectMQTTTimeout
} else {
o.Timeout = values.Duration(timeout).Duration()
}
return nil
}
func publish(ctx context.Context, topic, message string, spec *CommonMQTTOpSpec) (bool, error) {
options := mqtt.Options{
ClientID: spec.ClientID,
Username: spec.Username,
Password: spec.Password,
Timeout: spec.Timeout,
}
provider := mqtt.GetDialer(ctx)
client, err := provider.Dial(ctx, []string{spec.Broker}, options)
if err != nil {
return false, err
}
defer func() { _ = client.Close() }()
if err := client.Publish(ctx, topic, byte(spec.QoS), spec.Retain, message); err != nil {
return false, err
}
return true, nil
}