Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broker ingress and Dispatcher now uses the protocol binding instead of json to process events #751

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .codecov.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
ignore:
- "**/zz_generated*.go" # Ignore generated files.
- "pkg/client"
- "pkg/dispatcher" #temporary until wabbit is gone
- "third_party"
- "vendor"
- "test"
1 change: 1 addition & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ run:
skip-dirs:
- pkg/client
- pkg/internal/thirdparty
- pkg/dispatcher

linters:
enable:
Expand Down
28 changes: 14 additions & 14 deletions cmd/ingress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package main

import (
"encoding/json"
"errors"
"fmt"
"net/http"
Expand Down Expand Up @@ -186,23 +185,24 @@ func (env *envConfig) ServeHTTP(writer http.ResponseWriter, request *http.Reques
}

func (env *envConfig) send(event *cloudevents.Event, span *trace.Span) (int, time.Duration, error) {
bytes, err := json.Marshal(event)
if err != nil {
return http.StatusBadRequest, noDuration, fmt.Errorf("failed to marshal event, %w", err)
}

tp, ts := (&tracecontext.HTTPFormat{}).SpanContextToHeaders(span.SpanContext())
headers := amqp.Table{
"type": event.Type(),
"source": event.Source(),
"subject": event.Subject(),
"traceparent": tp,
"tracestate": ts,
"content-type": event.DataContentType(),
"specversion": event.SpecVersion(),
"time": cloudevents.Timestamp{Time: event.Time().UTC()}.String(),
"type": event.Type(),
"source": event.Source(),
"subject": event.Subject(),
"id": event.ID(),
"dataschema": event.DataSchema(),
"traceparent": tp,
"tracestate": ts,
}

for key, val := range event.Extensions() {
headers[key] = val
}

start := time.Now()
dc, err := env.channel.PublishWithDeferredConfirm(
env.ExchangeName,
Expand All @@ -211,11 +211,11 @@ func (env *envConfig) send(event *cloudevents.Event, span *trace.Span) (int, tim
false, // immediate
amqp.Publishing{
Headers: headers,
ContentType: "application/json",
Body: bytes,
MessageId: event.ID(),
ContentType: event.DataContentType(),
Body: event.Data(),
DeliveryMode: amqp.Persistent,
})

if err != nil {
return http.StatusInternalServerError, noDuration, fmt.Errorf("failed to publish message: %w", err)
}
Expand Down
54 changes: 17 additions & 37 deletions pkg/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,9 @@ import (
"github.com/NeowayLabs/wabbit/amqp"
"github.com/NeowayLabs/wabbit/amqptest"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/protocol/http"

"go.uber.org/zap"

"knative.dev/eventing-rabbitmq/pkg/rabbit"
"knative.dev/eventing/pkg/adapter/v2"
v1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/pkg/kncloudevents"
Expand Down Expand Up @@ -256,58 +253,41 @@ func (a *Adapter) PollForMessages(channel *wabbit.Channel,

func (a *Adapter) processMessages(wg *sync.WaitGroup, queue <-chan wabbit.Delivery) {
defer wg.Done()

for msg := range queue {
a.logger.Info("Received: ", zap.Any("value", string(msg.Body())))

a.logger.Info("Received: ", zap.String("MessageId", msg.MessageId()))
if err := a.postMessage(msg); err == nil {
a.logger.Info("Successfully sent event to sink")
err = msg.Ack(false)
if err != nil {
a.logger.Error("Sending Ack failed with Delivery Tag")
a.logger.Error("sending Ack failed with Delivery Tag")
}
} else {
a.logger.Error("Sending event to sink failed: ", zap.Error(err))
a.logger.Error("sending event to sink failed: ", zap.Error(err))
err = msg.Nack(false, false)
if err != nil {
a.logger.Error("Sending Nack failed with Delivery Tag")
a.logger.Error("sending Nack failed with Delivery Tag")
}
}
}
}

func (a *Adapter) postMessage(msg wabbit.Delivery) error {
a.logger.Info("url ->" + a.httpMessageSender.Target)
a.logger.Info("target: " + a.httpMessageSender.Target)
req, err := a.httpMessageSender.NewCloudEventRequest(a.context)
if err != nil {
return err
}

var msgBinding binding.Message = NewMessageFromDelivery(msg)

defer func() {
err := msgBinding.Finish(nil)
if err != nil {
a.logger.Error("Something went wrong while trying to finalizing the message", zap.Error(err))
}
}()

// if the msg is a cloudevent send it as it is to http
if msgBinding.ReadEncoding() == binding.EncodingUnknown {
// if the rabbitmq msg is not a cloudevent transform it into one
event := cloudevents.NewEvent()
err = convertToCloudEvent(&event, msg, a)
if err != nil {
a.logger.Error("Error converting RabbitMQ msg to CloudEvent", zap.Error(err))
return err
}

msgBinding = binding.ToMessage(&event)
}

err = http.WriteRequest(a.context, msgBinding, req)
err = rabbit.ConvertMessageToHTTPRequest(
a.context,
a.config.Name,
a.config.Namespace,
a.config.QueueConfig.Name,
msg,
req,
a.logger)
if err != nil {
a.logger.Error(fmt.Sprintf("Error writting event to http, encoding: %s", msgBinding.ReadEncoding()), zap.Error(err))
a.logger.Error("error writing event to http", zap.Error(err))
return err
}

Expand All @@ -323,12 +303,12 @@ func (a *Adapter) postMessage(msg wabbit.Delivery) error {
})

if err != nil {
a.logger.Error("Error while sending the message", zap.Error(err))
a.logger.Error("error while sending the message", zap.Error(err))
return err
}

if res.StatusCode/100 != 2 {
a.logger.Error("Unexpected status code", zap.Int("status code", res.StatusCode))
a.logger.Error("unexpected status code", zap.Int("status code", res.StatusCode))
return fmt.Errorf("%d %s", res.StatusCode, nethttp.StatusText(res.StatusCode))
}

Expand Down
33 changes: 19 additions & 14 deletions pkg/adapter/adapter_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2020 The Knative Authors
Copyright 2022 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -67,18 +67,17 @@ func TestPostMessage_ServeHTTP(t *testing.T) {
reqBody: `{"test":"test"}`,
withMsgId: true,
reqHeaders: http.Header{
"Ce-Specversion": []string{"1.0"},
"Ce-Source": []string{"example/source.uri"},
"Ce-Testheader": []string{"testHeader"},
"Specversion": []string{"1.0"},
"Source": []string{"example/source.uri"},
"Testheader": []string{"testHeader"},
},
data: map[string]interface{}{
"test": "test",
},
headers: wabbit.Option{
"ce-specversion": "1.0",
"ce-source": "example/source.uri",
"ce-testheader": "testHeader",
"ignore": "ignore",
"specversion": "1.0",
"source": "example/source.uri",
"testheader": "testHeader",
},
isCe: true,
},
Expand Down Expand Up @@ -167,8 +166,7 @@ func TestPostMessage_ServeHTTP(t *testing.T) {
if err != nil {
t.Errorf("Error unmarshaling wanted request body %s %s", tc.reqBody, err)
}

err = json.Unmarshal([]byte(tc.reqBody), &gotBody)
err = json.Unmarshal(h.body, &gotBody)
if err != nil {
t.Errorf("Error unmarshaling got request body %s %s", h.body, err)
}
Expand All @@ -180,19 +178,26 @@ func TestPostMessage_ServeHTTP(t *testing.T) {
if tc.isCe {
ceHeaders := http.Header{}
for key, value := range h.header {
if strings.HasPrefix(key, "Ce-") {
ceHeaders[key] = value
}
ceHeaders[strings.TrimPrefix(key, "Ce-")] = value
}

if len(ceHeaders) > 0 && len(ceHeaders) != len(tc.reqHeaders) && !reflect.DeepEqual(ceHeaders, tc.reqHeaders) {
if !compareHeaders(tc.reqHeaders, ceHeaders, t) {
t.Errorf("Expected request headers '%s', but got '%s' %s", tc.reqHeaders, ceHeaders, err)
}
}
})
}
}

func compareHeaders(expected, received http.Header, t *testing.T) bool {
for key, val := range expected {
if val2, ok := received[key]; !ok || val[0] != val2[0] {
return false
}
}
return true
}

func TestAdapter_CreateConn(t *testing.T) {
fakeServer := server.NewServer("amqp://localhost:5672/%2f")
err := fakeServer.Start()
Expand Down
135 changes: 0 additions & 135 deletions pkg/adapter/message_test.go

This file was deleted.

Loading