forked from brocaar/chirpstack-application-server
-
Notifications
You must be signed in to change notification settings - Fork 0
/
gcppubsub.go
133 lines (109 loc) · 3.37 KB
/
gcppubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package gcppubsub
import (
"context"
"encoding/json"
"fmt"
"sync"
"cloud.google.com/go/pubsub"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"google.golang.org/api/option"
"github.com/brocaar/lora-app-server/internal/handler"
"github.com/brocaar/lorawan"
)
// Config holds the GCP Pub/Sub integration configuration.
type Config struct {
CredentialsFile string `mapstructure:"credentials_file"`
ProjectID string `mapstructure:"project_id"`
TopicName string `mapstructure:"topic_name"`
}
// Handler implements a Google Cloud Pub/Sub handler.
type Handler struct {
sync.RWMutex
ctx context.Context
cancel context.CancelFunc
client *pubsub.Client
topic *pubsub.Topic
}
// NewHandler creates a new Pub/Sub handler.
func NewHandler(conf Config) (handler.Handler, error) {
h := Handler{
ctx: context.Background(),
}
var err error
var o []option.ClientOption
h.ctx, h.cancel = context.WithCancel(h.ctx)
if conf.CredentialsFile != "" {
o = append(o, option.WithCredentialsFile(conf.CredentialsFile))
}
log.Info("handler/gcp_pub_sub: setting up client")
h.client, err = pubsub.NewClient(h.ctx, conf.ProjectID, o...)
if err != nil {
return nil, errors.Wrap(err, "new pubsub client error")
}
log.WithField("topic", conf.TopicName).Info("handler/gcp_pub_sub: setup topic")
h.topic = h.client.Topic(conf.TopicName)
ok, err := h.topic.Exists(h.ctx)
if err != nil {
return nil, errors.Wrap(err, "topic exists error")
}
if !ok {
return nil, fmt.Errorf("topic %s does not exist", conf.TopicName)
}
return &h, nil
}
// Close closes the handler.
func (h *Handler) Close() error {
log.Info("handler/gcp_pub_sub: closing handler")
h.cancel()
return h.client.Close()
}
// SendDataUp sends an uplink data payload.
func (h *Handler) SendDataUp(pl handler.DataUpPayload) error {
return h.publish("up", pl.DevEUI, pl)
}
// SendJoinNotification sends a join notification.
func (h *Handler) SendJoinNotification(pl handler.JoinNotification) error {
return h.publish("join", pl.DevEUI, pl)
}
// SendACKNotification sends an ack notification.
func (h *Handler) SendACKNotification(pl handler.ACKNotification) error {
return h.publish("ack", pl.DevEUI, pl)
}
// SendErrorNotification sends an error notification.
func (h *Handler) SendErrorNotification(pl handler.ErrorNotification) error {
return h.publish("error", pl.DevEUI, pl)
}
// SendStatusNotification sends a status notification.
func (h *Handler) SendStatusNotification(pl handler.StatusNotification) error {
return h.publish("status", pl.DevEUI, pl)
}
// SendLocationNotification sends a location notification.
func (h *Handler) SendLocationNotification(pl handler.LocationNotification) error {
return h.publish("location", pl.DevEUI, pl)
}
// DataDownChan return nil.
func (h *Handler) DataDownChan() chan handler.DataDownPayload {
return nil
}
func (h *Handler) publish(event string, devEUI lorawan.EUI64, v interface{}) error {
jsonB, err := json.Marshal(v)
if err != nil {
return errors.Wrap(err, "marshal json error")
}
res := h.topic.Publish(h.ctx, &pubsub.Message{
Data: jsonB,
Attributes: map[string]string{
"event": event,
"devEUI": devEUI.String(),
},
})
if _, err := res.Get(h.ctx); err != nil {
return errors.Wrap(err, "get publish result error")
}
log.WithFields(log.Fields{
"dev_eui": devEUI,
"event": event,
}).Info("handler/gcp_pub_sub: event published")
return nil
}