/
adapter.go
244 lines (202 loc) · 7.52 KB
/
adapter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
package http
import (
"context"
"fmt"
"net/http"
"net/url"
cloudevents "github.com/cloudevents/sdk-go"
cloudeventshttp "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http"
"github.com/pkg/errors"
"go.opencensus.io/trace"
"go.uber.org/zap"
"knative.dev/eventing/pkg/adapter"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/eventing/pkg/utils"
"knative.dev/pkg/logging"
"knative.dev/pkg/source"
pkgtracing "knative.dev/pkg/tracing"
"github.com/kyma-project/kyma/components/event-sources/apis/sources"
)
var _ adapter.EnvConfigAccessor = (*envConfig)(nil)
type envConfig struct {
adapter.EnvConfig
EventSource string `envconfig:"EVENT_SOURCE" required:"true"`
// PORT to access the event-source
Port int `envconfig:"PORT" required:"true" default:"8080"`
}
func (e *envConfig) GetSource() string {
return e.EventSource
}
func (e *envConfig) GetPort() int {
return e.Port
}
type httpAdapter struct {
ceClient cloudevents.Client
statsReporter source.StatsReporter
accessor AdapterEnvConfigAccessor
adapterContext context.Context
logger *zap.Logger
}
type AdapterEnvConfigAccessor interface {
adapter.EnvConfigAccessor
GetSource() string
GetPort() int
}
const (
defaultMaxIdleConnections = 1000
defaultMaxIdleConnectionsPerHost = 1000
)
const resourceGroup = "http." + sources.GroupName
const (
// endpoint for cloudevents
endpointCE = "/"
// endpoint for readiness check
endpointReadiness = "/healthz"
)
const (
ErrorResponseCEVersionUnsupported = "unsupported cloudevents version"
ErrorResponseSendToSinkFailed = "unable to forward event to sink"
)
func NewEnvConfig() adapter.EnvConfigAccessor {
return &envConfig{}
}
func NewAdapter(ctx context.Context, processed adapter.EnvConfigAccessor, ceClient cloudevents.Client, reporter source.StatsReporter) adapter.Adapter {
accessor, ok := processed.(AdapterEnvConfigAccessor)
if !ok {
panic(fmt.Sprintf("cannot create adapter, expecting a *envconfig, but got a %T", processed))
}
return &httpAdapter{
adapterContext: ctx,
ceClient: ceClient,
statsReporter: reporter,
accessor: accessor,
logger: logging.FromContext(ctx).Desugar(),
}
}
// NewCloudEventsClient creates a new client for receiving and sending cloud events
func NewCloudEventsClient(port int) (cloudevents.Client, error) {
options := []cloudeventshttp.Option{
cloudevents.WithBinaryEncoding(),
cloudevents.WithMiddleware(pkgtracing.HTTPSpanMiddleware),
cloudevents.WithPort(port),
cloudevents.WithPath(endpointCE),
cloudevents.WithMiddleware(WithReadinessMiddleware),
}
httpTransport, err := cloudevents.NewHTTPTransport(
options...,
)
if err != nil {
return nil, errors.Wrap(err, "failed to create transport")
}
connectionArgs := kncloudevents.ConnectionArgs{
MaxIdleConns: defaultMaxIdleConnections,
MaxIdleConnsPerHost: defaultMaxIdleConnectionsPerHost,
}
ceClient, err := kncloudevents.NewDefaultClientGivenHttpTransport(
httpTransport,
&connectionArgs)
if err != nil {
return nil, errors.Wrap(err, "failed to create client")
}
return ceClient, nil
}
// Start is the entrypoint for the adapter and is called by sharedmain coming from pkg/adapter
func (h *httpAdapter) Start(_ <-chan struct{}) error {
h.logger.Info("listening on", zap.String("address", fmt.Sprintf("%d%s", h.accessor.GetPort(), endpointCE)))
// note about graceful shutdown:
// TLDR; StartReceiver unblocks as soon as a stop signal is received
// `StartReceiver` waits internally until `ctx.Done()` does not block anymore
// the context `h.adapterContext` returns a channel (when calling `ctx.Done()`)
// which is closed as soon as a stop signal is received, see https://github.com/knative/pkg/blob/master/signals/signal.go#L37
if err := h.ceClient.StartReceiver(h.adapterContext, h.serveHTTP); err != nil {
return errors.Wrap(err, "error occurred while serving")
}
h.logger.Info("adapter stopped")
return nil
}
type readinessMiddleware struct {
handler http.Handler
}
func WithReadinessMiddleware(next http.Handler) http.Handler {
return readinessMiddleware{handler: next}
}
// ServeHTTP implements a readiness probe
func (r readinessMiddleware) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.URL.Path == endpointReadiness {
w.WriteHeader(http.StatusOK)
return
}
r.handler.ServeHTTP(w, req)
}
// serveHTTP handles incoming events
// enriches them with the source
// and sends them to the sink
// NOTE: EventResponse reason is empty since it is ignored: https://github.com/cloudevents/sdk-go/blob/master/pkg/cloudevents/transport/http/transport.go#L496
// NOTE: instead the error is used as error message for request response
func (h *httpAdapter) serveHTTP(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error {
logger := h.logger
logger.Debug("got event", zap.Any("event_context", event.Context))
tctx := cloudevents.HTTPTransportContextFrom(ctx)
if !isSupportedCloudEvent(event) {
resp.Error(http.StatusBadRequest, "")
return errors.New(ErrorResponseCEVersionUnsupported)
}
// validate event conforms to cloudevents specification
if err := event.Validate(); err != nil {
resp.Error(http.StatusBadRequest, "")
return err
}
// enrich the event with the application source
// the application source is injected into this adapter from the http source controller
event.SetSource(h.accessor.GetSource())
reportArgs := &source.ReportArgs{
Namespace: h.accessor.GetNamespace(),
EventSource: event.Source(),
EventType: event.Type(),
Name: "http_adapter",
ResourceGroup: resourceGroup,
}
logger.Debug("sending event", zap.Any("sink", h.accessor.GetSinkURI()))
// Shamelessly copied from https://github.com/knative/eventing/blob/5631d771968bbf00e64988a0e4217c2915ee778e/pkg/broker/ingress/ingress_handler.go#L116
// Due to an issue in utils.ContextFrom, we don't retain the original trace context from ctx, so
// bring it in manually.
uri, err := url.Parse(h.accessor.GetSinkURI())
if err != nil {
return err
}
sendingCTX := utils.ContextFrom(tctx, uri)
trc := trace.FromContext(ctx)
sendingCTX = trace.NewContext(sendingCTX, trc)
rctx, revt, err := h.ceClient.Send(sendingCTX, event)
if err != nil {
h.logger.Error("failed to send cloudevent to sink", zap.Error(err), zap.Any("sink", h.accessor.GetSinkURI()))
resp.Error(http.StatusBadGateway, "")
// do not show this error to user, might contain sensitive information
return nil
}
rtctx := cloudevents.HTTPTransportContextFrom(rctx)
if rtctx.StatusCode == 0 {
resp.RespondWith(http.StatusInternalServerError, revt)
return nil
}
// report a sent event
if err := h.statsReporter.ReportEventCount(reportArgs, rtctx.StatusCode); err != nil {
h.logger.Warn("cannot report event count", zap.Error(err))
}
if is2XXStatusCode(rtctx.StatusCode) {
resp.RespondWith(http.StatusOK, revt)
return nil
}
h.logger.Debug("Got unexpected response from sink", zap.Any("response_context", rctx), zap.Any("response_event", revt), zap.Int("http_status", rtctx.StatusCode))
resp.Error(http.StatusInternalServerError, "")
return nil
}
// is2XXStatusCode checks whether status code is a 2XX status code
func is2XXStatusCode(statusCode int) bool {
return statusCode >= http.StatusOK && statusCode < http.StatusMultipleChoices
}
// isSupportedCloudEvent determines if an incoming cloud event is accepted
func isSupportedCloudEvent(event cloudevents.Event) bool {
eventVersion := event.SpecVersion()
return eventVersion != cloudevents.VersionV01 && eventVersion != cloudevents.VersionV02 && eventVersion != cloudevents.VersionV03
}