-
Notifications
You must be signed in to change notification settings - Fork 0
/
output.go
148 lines (126 loc) · 3.38 KB
/
output.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package benthosx
import (
"sync"
"time"
"github.com/Jeffail/benthos/lib/log"
"github.com/Jeffail/benthos/lib/message"
"github.com/Jeffail/benthos/lib/metrics"
"github.com/Jeffail/benthos/lib/output"
"github.com/Jeffail/benthos/lib/response"
"github.com/Jeffail/benthos/lib/types"
)
const (
// TypeServerless selects the serverless response option.
TypeServerless = "serverless_response"
)
func init() {
output.RegisterPlugin(
"serverless_response",
func() interface{} {
return NewServerlessResponseConfig()
},
func(iconf interface{}, mgr types.Manager, logger log.Modular, stats metrics.Type) (types.Output, error) {
if iconf == nil {
iconf = NewServerlessResponseConfig()
}
return NewServerlessResponse(iconf.(ServerlessResponseConfig), mgr, logger, stats)
},
)
output.DocumentPlugin(
"serverless_response",
`
This plugin enables serverless instances of Benthos to return the processed
message value from the function.`,
nil, // No need to sanitise the config.
)
}
// ServerlessResponseConfig contains configuration fields for the
// ServerlessResponse output.
type ServerlessResponseConfig struct {
Name string
}
// NewServerlessResponseConfig returns a ServerlessResponseConfig with
// default values.
func NewServerlessResponseConfig() ServerlessResponseConfig {
return ServerlessResponseConfig{}
}
// ServerlessResponse captures the final message value and writes it to a
// store where it can be retrieved by the serverless function.
type ServerlessResponse struct {
transactionsChan <-chan types.Transaction
mgr types.Manager
log log.Modular
stats metrics.Type
closeOnce sync.Once
closeChan chan struct{}
closedChan chan struct{}
name string
}
// NewServerlessResponse creates a new plugin output type.
func NewServerlessResponse(
conf ServerlessResponseConfig,
mgr types.Manager,
log log.Modular,
stats metrics.Type,
) (output.Type, error) {
e := &ServerlessResponse{
mgr: mgr,
log: log,
stats: stats,
name: conf.Name,
closeChan: make(chan struct{}),
closedChan: make(chan struct{}),
}
return e, nil
}
//------------------------------------------------------------------------------
func (e *ServerlessResponse) loop() {
defer func() {
close(e.closedChan)
}()
for {
var tran types.Transaction
var open bool
select {
case tran, open = <-e.transactionsChan:
if !open {
return
}
case <-e.closeChan:
return
}
if tran.Payload.Len() > 0 {
ResponseFromContext(message.GetContext(tran.Payload.Get(0))).Append(e.name, tran.Payload)
}
select {
case tran.ResponseChan <- response.NewAck():
case <-e.closeChan:
return
}
}
}
// Connected returns true if this output is currently connected to its target.
func (e *ServerlessResponse) Connected() bool {
return true // We're always connected
}
// Consume starts this output consuming from a transaction channel.
func (e *ServerlessResponse) Consume(tChan <-chan types.Transaction) error {
e.transactionsChan = tChan
go e.loop()
return nil
}
// CloseAsync shuts down the output and stops processing requests.
func (e *ServerlessResponse) CloseAsync() {
e.closeOnce.Do(func() {
close(e.closeChan)
})
}
// WaitForClose blocks until the output has closed down.
func (e *ServerlessResponse) WaitForClose(timeout time.Duration) error {
select {
case <-e.closedChan:
case <-time.After(timeout):
return types.ErrTimeout
}
return nil
}