-
Notifications
You must be signed in to change notification settings - Fork 588
/
ttl.go
97 lines (85 loc) · 3.16 KB
/
ttl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package broker
import (
"context"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/client"
cetypes "github.com/cloudevents/sdk-go/v2/types"
"go.uber.org/zap"
)
const (
// TTLAttribute is the name of the CloudEvents extension attribute used to store the
// Broker's TTL (number of times a single event can reply through a Broker continuously). All
// interactions with the attribute should be done through the GetTTL and SetTTL functions.
TTLAttribute = "knativebrokerttl"
)
// GetTTL finds the TTL in the EventContext using a case insensitive comparison
// for the key. The second return param, is the case preserved key that matched.
// Depending on the encoding/transport, the extension case could be changed.
func GetTTL(ctx cloudevents.EventContext) (int32, error) {
ttl, err := ctx.GetExtension(TTLAttribute)
if err != nil {
return 0, err
}
return cetypes.ToInteger(ttl)
}
// SetTTL sets the TTL into the EventContext. ttl should be a positive integer.
func SetTTL(ctx cloudevents.EventContext, ttl int32) error {
return ctx.SetExtension(TTLAttribute, ttl)
}
// DeleteTTL removes the TTL CE extension attribute
func DeleteTTL(ctx cloudevents.EventContext) error {
return ctx.SetExtension(TTLAttribute, nil)
}
// TTLDefaulter returns a cloudevents event defaulter that will manage the TTL
// for events with the following rules:
// If TTL is not found, it will set it to the default passed in.
// If TTL is <= 0, it will remain 0.
// If TTL is > 1, it will be reduced by one.
func TTLDefaulter(logger *zap.Logger, defaultTTL int32) client.EventDefaulter {
return func(ctx context.Context, event cloudevents.Event) cloudevents.Event {
// Get the current or default TTL from the event.
var ttl int32
if ttlraw, err := event.Context.GetExtension(TTLAttribute); err != nil {
logger.Debug("TTL not found in outbound event, defaulting.",
zap.String("event.id", event.ID()),
zap.Int32(TTLAttribute, defaultTTL),
zap.Error(err),
)
ttl = defaultTTL
} else if ttl, err = cetypes.ToInteger(ttlraw); err != nil {
logger.Warn("Failed to convert existing TTL into integer, defaulting.",
zap.String("event.id", event.ID()),
zap.Any(TTLAttribute, ttlraw),
zap.Error(err),
)
ttl = defaultTTL
} else {
// Decrement TTL.
ttl = ttl - 1
if ttl < 0 {
ttl = 0
}
}
// Overwrite the TTL into the event.
if err := event.Context.SetExtension(TTLAttribute, ttl); err != nil {
logger.Error("Failed to set TTL on outbound event.",
zap.String("event.id", event.ID()),
zap.Int32(TTLAttribute, ttl),
zap.Error(err),
)
}
return event
}
}