Skip to content

Commit

Permalink
Add channel metrics support (#4259)
Browse files Browse the repository at this point in the history
* add channel ingress metrics

* more updates, fix tests

* fix imports

* add header

* fix code issues

* more fixes

* fix minor issue

* fix fmt

* fixes

* revert benchmark script

* revert ce-Time

* remove error from return values

* dedup error logic

* fix lint

* fixes
  • Loading branch information
skonto committed Oct 20, 2020
1 parent b391b9c commit 5f66209
Show file tree
Hide file tree
Showing 20 changed files with 542 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.name

- name: CONTAINER_NAME
value: dispatcher
ports:
- containerPort: 9090
name: metrics
100 changes: 81 additions & 19 deletions pkg/channel/fanout/fanout_message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,20 +68,22 @@ type MessageHandler struct {
// rather than a member variable.
timeout time.Duration

logger *zap.Logger
reporter channel.StatsReporter
logger *zap.Logger
}

// NewMessageHandler creates a new fanout.MessageHandler.
func NewMessageHandler(logger *zap.Logger, messageDispatcher channel.MessageDispatcher, config Config) (*MessageHandler, error) {
func NewMessageHandler(logger *zap.Logger, messageDispatcher channel.MessageDispatcher, config Config, reporter channel.StatsReporter) (*MessageHandler, error) {
handler := &MessageHandler{
logger: logger,
config: config,
dispatcher: messageDispatcher,
timeout: defaultTimeout,
reporter: reporter,
}
// The receiver function needs to point back at the handler itself, so set it up after
// initialization.
receiver, err := channel.NewMessageReceiver(createMessageReceiverFunction(handler), logger)
receiver, err := channel.NewMessageReceiver(createMessageReceiverFunction(handler), logger, reporter)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -120,87 +122,142 @@ func SubscriberSpecToFanoutConfig(sub eventingduckv1.SubscriberSpec) (*Subscript

func createMessageReceiverFunction(f *MessageHandler) func(context.Context, channel.ChannelReference, binding.Message, []binding.Transformer, nethttp.Header) error {
if f.config.AsyncHandler {
return func(ctx context.Context, _ channel.ChannelReference, message binding.Message, transformers []binding.Transformer, additionalHeaders nethttp.Header) error {
return func(ctx context.Context, ref channel.ChannelReference, message binding.Message, transformers []binding.Transformer, additionalHeaders nethttp.Header) error {
if len(f.config.Subscriptions) == 0 {
// Nothing to do here, finish the message and return
_ = message.Finish(nil)
return nil
}

parentSpan := trace.FromContext(ctx)
te := kncloudevents.TypeExtractorTransformer("")
transformers = append(transformers, &te)
// Message buffering here is done before starting the dispatch goroutine
// Because the message could be closed before the buffering happens
bufferedMessage, err := buffering.CopyMessage(ctx, message, transformers...)
if err != nil {
return err
}

reportArgs := channel.ReportArgs{}
reportArgs.EventType = string(te)
reportArgs.Ns = ref.Namespace

// We don't need the original message anymore
_ = message.Finish(nil)
go func(m binding.Message, h nethttp.Header, s *trace.Span) {
go func(m binding.Message, h nethttp.Header, s *trace.Span, r *channel.StatsReporter, args *channel.ReportArgs) {
// Run async dispatch with background context.
ctx = trace.NewContext(context.Background(), s)
// Any returned error is already logged in f.dispatch().
_ = f.dispatch(ctx, m, h)
}(bufferedMessage, additionalHeaders, parentSpan)
dispatchResultForFanout := f.dispatch(ctx, m, h)
_ = parseFanoutResultAndReportMetrics(dispatchResultForFanout, *r, *args)
}(bufferedMessage, additionalHeaders, parentSpan, &f.reporter, &reportArgs)
return nil
}
}
return func(ctx context.Context, _ channel.ChannelReference, message binding.Message, transformers []binding.Transformer, additionalHeaders nethttp.Header) error {
return func(ctx context.Context, ref channel.ChannelReference, message binding.Message, transformers []binding.Transformer, additionalHeaders nethttp.Header) error {
if len(f.config.Subscriptions) == 0 {
// Nothing to do here, finish the message and return
_ = message.Finish(nil)
return nil
}

te := kncloudevents.TypeExtractorTransformer("")
transformers = append(transformers, &te)
// We buffer the message to send it several times
bufferedMessage, err := buffering.CopyMessage(ctx, message, transformers...)
if err != nil {
return err
}
// We don't need the original message anymore
_ = message.Finish(nil)
return f.dispatch(ctx, bufferedMessage, additionalHeaders)

reportArgs := channel.ReportArgs{}
reportArgs.EventType = string(te)
reportArgs.Ns = ref.Namespace
dispatchResultForFanout := f.dispatch(ctx, bufferedMessage, additionalHeaders)
return parseFanoutResultAndReportMetrics(dispatchResultForFanout, f.reporter, reportArgs)
}
}

func (f *MessageHandler) ServeHTTP(response nethttp.ResponseWriter, request *nethttp.Request) {
f.receiver.ServeHTTP(response, request)
}

func parseFanoutResultAndReportMetrics(result dispatchResult, reporter channel.StatsReporter, reportArgs channel.ReportArgs) error {
if result.info != nil && result.info.Time > channel.NoDuration {
if result.info.ResponseCode > channel.NoResponse {
_ = reporter.ReportEventDispatchTime(&reportArgs, result.info.ResponseCode, result.info.Time)
} else {
_ = reporter.ReportEventDispatchTime(&reportArgs, nethttp.StatusInternalServerError, result.info.Time)
}
}
err := result.err
if err != nil {
channel.ReportEventCountMetricsForDispatchError(err, reporter, &reportArgs)
} else {
if result.info != nil {
_ = reporter.ReportEventCount(&reportArgs, result.info.ResponseCode)
}
}
return err
}

// dispatch takes the event, fans it out to each subscription in f.config. If all the fanned out
// events return successfully, then return nil. Else, return an error.
func (f *MessageHandler) dispatch(ctx context.Context, bufferedMessage binding.Message, additionalHeaders nethttp.Header) error {
func (f *MessageHandler) dispatch(ctx context.Context, bufferedMessage binding.Message, additionalHeaders nethttp.Header) dispatchResult {
subs := len(f.config.Subscriptions)

// Bind the lifecycle of the buffered message to the number of subs
bufferedMessage = buffering.WithAcksBeforeFinish(bufferedMessage, subs)

errorCh := make(chan error, subs)
errorCh := make(chan dispatchResult, subs)
for _, sub := range f.config.Subscriptions {
go func(s Subscription) {
errorCh <- f.makeFanoutRequest(ctx, bufferedMessage, additionalHeaders, s)
dispatchedResultPerSub, err := f.makeFanoutRequest(ctx, bufferedMessage, additionalHeaders, s)
errorCh <- dispatchResult{err: err, info: dispatchedResultPerSub}
}(sub)
}

var totalDispatchTimeForFanout time.Duration = channel.NoDuration
dispatchResultForFanout := dispatchResult{
info: &channel.DispatchExecutionInfo{
Time: channel.NoDuration,
ResponseCode: channel.NoResponse,
},
}
for range f.config.Subscriptions {
select {
case err := <-errorCh:
if err != nil {
f.logger.Error("Fanout had an error", zap.Error(err))
return err
case dispatchResult := <-errorCh:
if dispatchResult.info != nil {
if dispatchResult.info.Time > channel.NoDuration {
if totalDispatchTimeForFanout > channel.NoDuration {
totalDispatchTimeForFanout += dispatchResult.info.Time
} else {
totalDispatchTimeForFanout = dispatchResult.info.Time
}
}
dispatchResultForFanout.info.Time = totalDispatchTimeForFanout
dispatchResultForFanout.info.ResponseCode = dispatchResult.info.ResponseCode
}
if dispatchResult.err != nil {
f.logger.Error("Fanout had an error", zap.Error(dispatchResult.err))
dispatchResultForFanout.err = dispatchResult.err
return dispatchResultForFanout
}
case <-time.After(f.timeout):
f.logger.Error("Fanout timed out")
return errors.New("fanout timed out")
dispatchResultForFanout.err = errors.New("fanout timed out")
return dispatchResultForFanout
}
}
// All Subscriptions returned err = nil.
return nil
return dispatchResultForFanout
}

// makeFanoutRequest sends the request to exactly one subscription. It handles both the `call` and
// the `sink` portions of the subscription.
func (f *MessageHandler) makeFanoutRequest(ctx context.Context, message binding.Message, additionalHeaders nethttp.Header, sub Subscription) error {
func (f *MessageHandler) makeFanoutRequest(ctx context.Context, message binding.Message, additionalHeaders nethttp.Header, sub Subscription) (*channel.DispatchExecutionInfo, error) {
return f.dispatcher.DispatchMessageWithRetries(
ctx,
message,
Expand All @@ -211,3 +268,8 @@ func (f *MessageHandler) makeFanoutRequest(ctx context.Context, message binding.
sub.RetryConfig,
)
}

type dispatchResult struct {
err error
info *channel.DispatchExecutionInfo
}
4 changes: 3 additions & 1 deletion pkg/channel/fanout/fanout_message_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ func TestFanoutMessageHandler_ServeHTTP(t *testing.T) {

func testFanoutMessageHandler(t *testing.T, async bool, receiverFunc channel.UnbufferedMessageReceiverFunc, timeout time.Duration, inSubs []Subscription, subscriberHandler func(http.ResponseWriter, *http.Request), subscriberReqs int, replierHandler func(http.ResponseWriter, *http.Request), replierReqs int, expectedStatus int) {
var subscriberServerWg *sync.WaitGroup
reporter := channel.NewStatsReporter("testcontainer", "testpod")
if subscriberReqs != 0 {
subscriberServerWg = &sync.WaitGroup{}
subscriberServerWg.Add(subscriberReqs)
Expand Down Expand Up @@ -257,13 +258,14 @@ func testFanoutMessageHandler(t *testing.T, async bool, receiverFunc channel.Unb
Subscriptions: subs,
AsyncHandler: async,
},
reporter,
)
if err != nil {
t.Fatal("NewHandler failed =", err)
}

if receiverFunc != nil {
receiver, err := channel.NewMessageReceiver(receiverFunc, logger)
receiver, err := channel.NewMessageReceiver(receiverFunc, logger, reporter)
if err != nil {
t.Fatal("NewEventReceiver failed =", err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/channel/history_transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ func TestMessageHistory(t *testing.T) {
len: 0,
},
{
append: []string{"name.ns.service.local"},
expected: "name.ns.service.local",
append: []string{"name.Ns.service.local"},
expected: "name.Ns.service.local",
len: 1,
},
{
Expand Down
Loading

0 comments on commit 5f66209

Please sign in to comment.