/
notification2service.go
355 lines (304 loc) · 10.5 KB
/
notification2service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
package c8y
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"net/http"
"strings"
"time"
"github.com/golang-jwt/jwt/v4"
"github.com/reubenmiller/go-c8y/pkg/c8y/notification2"
"github.com/tidwall/gjson"
)
var (
MinTokenMinutes int64 = 1
)
// Notification2Service manages tokens and subscriptions for the notification2 interface
type Notification2Service service
// EventCollectionOptions todo
type Notification2TokenOptions struct {
// The token expiration duration
ExpiresInMinutes int64 `json:"expiresInMinutes,omitempty"`
// The subscriber name which the client wishes to be identified with
Subscriber string `json:"subscriber,omitempty"`
// The subscription name. This value must match the same that was used when the subscription was created
Subscription string `json:"subscription,omitempty"`
// Subscription is shared by multiple consumers
Shared bool `json:"shared,omitempty"`
}
// Notificatioin2Subscription notification subscription object
type Notification2Subscription struct {
ID string `json:"id,omitempty"`
Self string `json:"self,omitempty"`
Context string `json:"context,omitempty"`
FragmentsToCopy []string `json:"fragmentsToCopy,omitempty"`
Source *Source `json:"source,omitempty"`
Subscription string `json:"subscription,omitempty"`
SubscriptionFilter Notification2SubscriptionFilter `json:"subscriptionFilter,omitempty"`
// Allow access to custom fields
Item gjson.Result `json:"-"`
}
// Notificatioin2Subscription collection options
type Notification2SubscriptionCollectionOptions struct {
Context string `url:"context,omitempty"`
Source string `url:"source,omitempty"`
PaginationOptions
}
// Notification2SubscriptionDeleteOptions options when deleting a subscription by source
type Notification2SubscriptionDeleteOptions struct {
Context string `url:"context,omitempty"`
Source string `url:"source,omitempty"`
}
type Notification2SubscriptionFilter struct {
Apis []string `json:"apis,omitempty"`
TypeFilter string `json:"typeFilter,omitempty"`
}
// Notification2Token notification2 token which can be used by client to subscribe to notifications
type Notification2Token struct {
*BaseResponse
Token string `json:"token"`
// Allow access to custom fields
Items []gjson.Result `json:"-"`
}
type Notification2SubscriptionCollection struct {
*BaseResponse
Subscriptions []Notification2Subscription `json:"subscriptions"`
// Allow access to custom fields
Items []gjson.Result `json:"-"`
}
// UnsubscribeResponse response after unsubscribing a subscriber
type UnsubscribeResponse struct {
Result string `json:"result,omitempty"`
}
// Get subscription by id
func (s *Notification2Service) GetSubscription(ctx context.Context, ID string) (*Notification2Subscription, *Response, error) {
data := new(Notification2Subscription)
resp, err := s.client.SendRequest(ctx, RequestOptions{
Method: http.MethodGet,
Path: "notification2/subscriptions/" + ID,
ResponseData: data,
})
return data, resp, err
}
// Get collection of subscriptions
func (s *Notification2Service) GetSubscriptions(ctx context.Context, opt *Notification2SubscriptionCollectionOptions) (*Notification2SubscriptionCollection, *Response, error) {
data := new(Notification2SubscriptionCollection)
resp, err := s.client.SendRequest(ctx, RequestOptions{
Method: http.MethodGet,
Path: "notification2/subscriptions",
Query: opt,
ResponseData: data,
})
return data, resp, err
}
// Create token
func (s *Notification2Service) CreateToken(ctx context.Context, options Notification2TokenOptions) (*Notification2Token, *Response, error) {
data := new(Notification2Token)
resp, err := s.client.SendRequest(ctx, RequestOptions{
Method: "POST",
Path: "notification2/token",
Body: options,
ResponseData: data,
})
return data, resp, err
}
// Unsubscribe a notification subscriber using the notification token
func (s *Notification2Service) UnsubscribeSubscriber(ctx context.Context, token string) (*UnsubscribeResponse, *Response, error) {
data := new(UnsubscribeResponse)
resp, err := s.client.SendRequest(ctx, RequestOptions{
Method: "POST",
Path: "notification2/unsubscribe?token=" + token,
ResponseData: data,
})
return data, resp, err
}
// Update updates properties on an existing event
func (s *Notification2Service) CreateSubscription(ctx context.Context, ID string, subscription Notification2Subscription) (*Event, *Response, error) {
data := new(Event)
resp, err := s.client.SendRequest(ctx, RequestOptions{
Method: http.MethodPost,
Path: "notification2/subscriptions",
Body: subscription,
ResponseData: data,
})
return data, resp, err
}
// Delete subscription by id
func (s *Notification2Service) DeleteSubscription(ctx context.Context, ID string) (*Response, error) {
return s.client.SendRequest(ctx, RequestOptions{
Method: "DELETE",
Path: "notification2/subscriptions/" + ID,
})
}
// DeleteSubscription removes a subscription by source
func (s *Notification2Service) DeleteSubscriptionBySource(ctx context.Context, opt Notification2SubscriptionDeleteOptions) (*Response, error) {
return s.client.SendRequest(ctx, RequestOptions{
Method: "DELETE",
Path: "notification2/subscriptions",
Query: opt,
})
}
type Notification2ClientOptions struct {
Token string
Consumer string
Options Notification2TokenOptions
ConnectionOptions notification2.ConnectionOptions
}
type Notification2TokenClaim struct {
Subscriber string `json:"sub,omitempty"`
Topic string `json:"topic,omitempty"`
Shared string `json:"shared,omitempty"`
jwt.RegisteredClaims
}
func (c *Notification2TokenClaim) IsShared() bool {
return strings.EqualFold(c.Shared, "true")
}
func (c *Notification2TokenClaim) Tenant() string {
index := strings.Index(c.Topic, "/")
if index == -1 {
return ""
}
return c.Topic[0:index]
}
func (c *Notification2TokenClaim) Subscription() string {
index := strings.LastIndex(c.Topic, "/")
if index == -1 {
return ""
}
return c.Topic[index+1:]
}
func (c *Notification2TokenClaim) HasExpired() bool {
now := time.Now()
return !(c.VerifyIssuedAt(now, true) && c.VerifyExpiresAt(now, true))
}
func (s *Notification2Service) ParseToken(tokenString string) (*Notification2TokenClaim, error) {
parts := strings.Split(tokenString, ".")
if len(parts) != 3 {
return nil, fmt.Errorf("invalid token. expected 3 fields")
}
raw, err := base64.RawStdEncoding.DecodeString(parts[1])
if err != nil {
return nil, err
}
claim := &Notification2TokenClaim{}
err = json.Unmarshal(raw, claim)
return claim, err
}
func (s *Notification2Service) RenewToken(ctx context.Context, opt Notification2ClientOptions) (string, error) {
isValid := true
claimMatch := true // default to true in case if the user does not provide any expected information
subscription := opt.Options.Subscription
subscriber := opt.Options.Subscriber
expiresInMinutes := opt.Options.ExpiresInMinutes
shared := opt.Options.Shared
if opt.Token != "" {
claims := Notification2TokenClaim{}
parser := jwt.NewParser()
token, _, err := parser.ParseUnverified(opt.Token, &claims)
if err != nil {
Logger.Infof("Token is invalid. %s", err)
isValid = false
} else if err := token.Claims.Valid(); err != nil {
Logger.Infof("Token is invalid. %s", err)
isValid = false
}
Logger.Infof("Existing token: alg=%s, valid=%v, expired=%v, issuedAt: %v, expiresAt: %v, subscription=%s, subscriber=%s, shared=%v, tenant=%s", token.Method.Alg(), claims.Valid() == nil, claims.HasExpired(), claims.IssuedAt, claims.ExpiresAt, claims.Subscription(), claims.Subscriber, claims.IsShared(), claims.Tenant())
if opt.Options.Subscription != "" {
if claims.Subscription() != opt.Options.Subscription {
claimMatch = false
}
} else {
subscription = claims.Subscription()
}
if opt.Options.Subscriber != "" {
if claims.Subscriber != opt.Options.Subscriber {
claimMatch = false
}
} else {
subscriber = claims.Subscriber
}
shared = claims.IsShared()
if claimMatch && expiresInMinutes == 0 {
// Reuse the expiration time given in the token
if claims.ExpiresAt != nil && claims.IssuedAt != nil {
expiresInMinutes = claims.ExpiresAt.Unix() - claims.IssuedAt.Unix()
}
}
if isValid && claimMatch {
Logger.Infof("Using existing valid token")
return opt.Token, nil
}
Logger.Infof("Token does not match claim. Invalid information will be ignored in the token")
}
if expiresInMinutes < MinTokenMinutes {
expiresInMinutes = MinTokenMinutes
}
Logger.Infof("Creating new token")
updatedToken, _, err := s.CreateToken(ctx, Notification2TokenOptions{
ExpiresInMinutes: expiresInMinutes,
Subscription: subscription,
Subscriber: subscriber,
Shared: shared,
})
if err != nil {
return "", err
}
return updatedToken.Token, nil
}
// Create a notification2 client to subscribe to new options
//
// # Example
//
// ```
//
// notifClient, err := client.Notification2.CreateClient(context.Background(), c8y.Notification2ClientOptions{
// Token: os.Getenv("NOTIFICATION2_TOKEN"),
// Consumer: *consumer,
// Options: &c8y.Notification2TokenOptions{
// ExpiresInMinutes: 2,
// Subscription: *subscription,
// Subscriber: *subscriber,
// },
// })
//
// if err != nil {
// panic(err)
// }
//
// messagesCh := make(chan notifications2.Message)
// notifClient.Register("*", messagesCh)
// signalCh := make(chan os.Signal, 1)
// signal.Notify(signalCh, os.Interrupt)
//
// for {
// select {
// case msg := <-messagesCh:
// log.Printf("Received message. %s", msg.Payload)
// notifClient.SendMessageAck(msg.Identifier)
//
// case <-signalCh:
// // Enable ctrl-c to stop
// notificationClient.Close()
// return
// }
// }
//
// ```
func (s *Notification2Service) CreateClient(ctx context.Context, opt Notification2ClientOptions) (*notification2.Notification2Client, error) {
// Validate token against expected subscriptions
token, err := s.RenewToken(ctx, opt)
if err != nil {
return nil, err
}
client := notification2.NewNotification2Client(s.client.BaseURL.String(), nil, notification2.Subscription{
TokenRenewal: func(v string) (string, error) {
return s.RenewToken(ctx, Notification2ClientOptions{
Token: v,
})
},
Consumer: opt.Consumer,
Token: token,
}, opt.ConnectionOptions)
return client, nil
}