forked from micromdm/micromdm
/
webhook.go
133 lines (110 loc) · 3.19 KB
/
webhook.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package webhook
import (
"context"
"net/http"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/micromdm/micromdm/mdm"
"github.com/micromdm/micromdm/platform/pubsub"
)
type Event struct {
Topic string `json:"topic"`
EventID string `json:"event_id"`
CreatedAt time.Time `json:"created_at"`
AcknowledgeEvent *AcknowledgeEvent `json:"acknowledge_event,omitempty"`
CheckinEvent *CheckinEvent `json:"checkin_event,omitempty"`
}
type Worker struct {
logger log.Logger
url string
client *http.Client
sub pubsub.Subscriber
}
type Option func(*Worker)
func WithLogger(logger log.Logger) Option {
return func(w *Worker) {
w.logger = logger
}
}
func WithHTTPClient(client *http.Client) Option {
return func(w *Worker) {
w.client = client
}
}
func New(url string, sub pubsub.Subscriber, opts ...Option) *Worker {
worker := &Worker{
url: url,
sub: sub,
logger: log.NewNopLogger(),
client: http.DefaultClient,
}
for _, optFn := range opts {
optFn(worker)
}
return worker
}
func (w *Worker) Run(ctx context.Context) error {
const subscription = "webhook_worker"
ackEvents, err := w.sub.Subscribe(ctx, subscription, mdm.ConnectTopic)
if err != nil {
return errors.Wrapf(err, "subscribe %s to %s", subscription, mdm.ConnectTopic)
}
authenticateEvents, err := w.sub.Subscribe(ctx, subscription, mdm.AuthenticateTopic)
if err != nil {
return errors.Wrapf(err, "subscribe %s to %s", subscription, mdm.AuthenticateTopic)
}
tokenUpdateEvents, err := w.sub.Subscribe(ctx, subscription, mdm.TokenUpdateTopic)
if err != nil {
return errors.Wrapf(err, "subscribe %s to %s", subscription, mdm.TokenUpdateTopic)
}
checkoutEvents, err := w.sub.Subscribe(ctx, subscription, mdm.CheckoutTopic)
if err != nil {
return errors.Wrapf(err, "subscribe %s to %s", subscription, mdm.CheckoutTopic)
}
getBootstrapTokenEvents, err := w.sub.Subscribe(ctx, subscription, mdm.GetBootstrapTokenTopic)
if err != nil {
return errors.Wrapf(err, "subscribe %s to %s", subscription, mdm.GetBootstrapTokenTopic)
}
setBootstrapTokenEvents, err := w.sub.Subscribe(ctx, subscription, mdm.SetBootstrapTokenTopic)
if err != nil {
return errors.Wrapf(err, "subscribe %s to %s", subscription, mdm.SetBootstrapTokenTopic)
}
for {
var (
event *Event
err error
)
select {
case <-ctx.Done():
return ctx.Err()
case ev := <-ackEvents:
event, err = acknowledgeEvent(ev.Topic, ev.Message)
case ev := <-authenticateEvents:
event, err = checkinEvent(ev.Topic, ev.Message)
case ev := <-tokenUpdateEvents:
event, err = checkinEvent(ev.Topic, ev.Message)
case ev := <-checkoutEvents:
event, err = checkinEvent(ev.Topic, ev.Message)
case ev := <-getBootstrapTokenEvents:
event, err = checkinEvent(ev.Topic, ev.Message)
case ev := <-setBootstrapTokenEvents:
event, err = checkinEvent(ev.Topic, ev.Message)
}
if err != nil {
level.Info(w.logger).Log(
"msg", "create webhook event",
"err", err,
)
continue
}
if err := postWebhookEvent(ctx, w.client, w.url, event); err != nil {
level.Info(w.logger).Log(
"msg", "post webhook event",
"err", err,
)
continue
}
}
}