-
Notifications
You must be signed in to change notification settings - Fork 3
/
mqtt.go
124 lines (108 loc) · 2.78 KB
/
mqtt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package source
import (
"context"
"errors"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/protojson"
"github.com/vanti-dev/sc-bos/pkg/auto/export/config"
"github.com/vanti-dev/sc-bos/pkg/gen"
"github.com/vanti-dev/sc-bos/pkg/task"
"github.com/vanti-dev/sc-bos/pkg/util/pull"
)
func NewMqtt(services Services) task.Starter {
r := &mqtt{services: services}
r.Lifecycle = task.NewLifecycle(r.applyConfig)
r.Logger = services.Logger.Named("smart-core")
return r
}
type mqtt struct {
*task.Lifecycle[config.MqttServiceSource]
services Services
}
func (m *mqtt) applyConfig(ctx context.Context, cfg config.MqttServiceSource) error {
clients := m.services.Node
var client gen.MqttServiceClient
err := clients.Client(&client)
if err != nil {
return err
}
sent := allowDuplicates()
if cfg.Duplicates.TrackDuplicates() {
sent = trackDuplicates(cfg.Duplicates.Cmp())
}
grp, ctx := errgroup.WithContext(ctx)
for _, name := range cfg.RpcNames {
name := name // save for go routine usage
puller := &mqttMessagePuller{
client: client,
name: name,
}
changes := make(chan *gen.PullMessagesResponse)
grp.Go(func() error {
defer close(changes)
err := pull.Changes[*gen.PullMessagesResponse](ctx, puller, changes, pull.WithLogger(m.Logger.Named(name)))
if status.Code(err) == codes.Unimplemented {
m.Logger.Debug("read not supported")
return nil
}
return err
})
grp.Go(func() error {
for change := range changes {
if commit, publish := sent.Changed(name, change); publish {
data, err := protojson.MarshalOptions{
EmitUnpopulated: true,
}.Marshal(change)
if err != nil {
return err
}
err = m.services.Publisher.Publish(ctx, name, string(data))
if err != nil {
return err
}
commit()
}
}
return nil
})
}
go func() {
err := grp.Wait()
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
return
}
if err != nil {
m.Logger.Warn("source shut down", zap.Error(err))
} else {
m.Logger.Debug("source shut down")
}
}()
return nil
}
type mqttMessagePuller struct {
client gen.MqttServiceClient
name string
}
func (m *mqttMessagePuller) Pull(ctx context.Context, changes chan<- *gen.PullMessagesResponse) error {
stream, err := m.client.PullMessages(ctx, &gen.PullMessagesRequest{Name: m.name})
if err != nil {
return err
}
for {
change, err := stream.Recv()
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case changes <- change:
}
}
}
func (m *mqttMessagePuller) Poll(ctx context.Context, changes chan<- *gen.PullMessagesResponse) error {
return status.Error(codes.Unimplemented, "not supported")
}