-
Notifications
You must be signed in to change notification settings - Fork 0
/
responses.go
79 lines (67 loc) · 1.93 KB
/
responses.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package benthosx
import (
"context"
"sync"
"github.com/Jeffail/benthos/lib/types"
)
type ctxKeyType struct{}
var ctxKey = &ctxKeyType{}
// NewResponseContext injects a ResponseMap for tracking serverless responses.
func NewResponseContext(ctx context.Context) context.Context {
if v := ctx.Value(ctxKey); v != nil {
// ResponseMap already installed. Keep the existing one.
// This prevents components from accidentally replacing a
// populated map with an empty one.
return ctx
}
return context.WithValue(ctx, ctxKey, &ResponseMap{})
}
// ResponseFromContext fetches the ResponseMap. If one is not installed
// then an empty map is returned.
func ResponseFromContext(ctx context.Context) *ResponseMap {
v := ctx.Value(ctxKey)
if v == nil {
return &ResponseMap{}
}
return v.(*ResponseMap)
}
// ResponseMap is a thread-safe map for storing serverless responses.
// Each key of the map points to a slice of messages.
type ResponseMap struct {
m sync.Map
}
// Delete an entire key from the map.
func (r *ResponseMap) Delete(key string) {
r.m.Delete(key)
}
// Load a response set. Returns false if the key is not found.
func (r *ResponseMap) Load(key string) ([]types.Message, bool) {
v, ok := r.m.Load(key)
if !ok {
return nil, false
}
return v.([]types.Message), true
}
// Append a response to a key.
func (r *ResponseMap) Append(key string, response types.Message) {
v, ok := r.m.Load(key)
if !ok {
v = make([]types.Message, 0, 1)
}
r.m.Store(key, append(v.([]types.Message), response))
}
// Range over the values in the map.
func (r *ResponseMap) Range(f func(key string, response []types.Message) bool) {
r.m.Range(func(iKey interface{}, iValue interface{}) bool {
return f(iKey.(string), iValue.([]types.Message))
})
}
// Len returns the number of keys in the map.
func (r *ResponseMap) Len() int {
var length int
r.m.Range(func(_ interface{}, _ interface{}) bool {
length = length + 1
return true
})
return length
}