diff --git a/pkg/rabbit/message.go b/pkg/rabbit/message.go index fff7ea8934..afe326fe8f 100644 --- a/pkg/rabbit/message.go +++ b/pkg/rabbit/message.go @@ -27,7 +27,6 @@ import ( sourcesv1alpha1 "knative.dev/eventing-rabbitmq/pkg/apis/sources/v1alpha1" "github.com/NeowayLabs/wabbit" - cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/cloudevents/sdk-go/v2/binding" "github.com/cloudevents/sdk-go/v2/binding/format" @@ -41,6 +40,8 @@ const ( prefix = "ce-" contentTypeHeader = "content-type" specversionHeader = "specversion" + traceparent = "traceparent" + tracestate = "tracestate" ) var specs = spec.WithPrefix(prefix) @@ -72,12 +73,10 @@ func ConvertMessageToHTTPRequest( logger.Error("Something went wrong while trying to finalizing the message", zap.Error(err)) } }() - // if the msg is a cloudevent send it as it is to http if msgBinding.ReadEncoding() != binding.EncodingUnknown { return http.WriteRequest(cloudevents.WithEncodingBinary(ctx), msgBinding, req) } - // if the rabbitmq msg is not a cloudevent transform it into one event := cloudevents.NewEvent() err := ConvertToCloudEvent(&event, msg, namespace, sourceName, queueName) @@ -85,7 +84,6 @@ func ConvertMessageToHTTPRequest( logger.Error("Error converting RabbitMQ msg to CloudEvent", zap.Error(err)) return err } - return http.WriteRequest(ctx, binding.ToMessage(&event), req) } @@ -94,14 +92,15 @@ func ConvertMessageToHTTPRequest( func NewMessageFromDelivery(sourceName, namespace, queueName string, msg wabbit.Delivery) *Message { headers := make(map[string][]byte, len(msg.Headers())) for key, val := range msg.Headers() { + if key == traceparent || key == tracestate { + continue + } k := strings.ToLower(key) - headers[k] = []byte(fmt.Sprint(val)) + headers[strings.TrimPrefix(k, prefix)] = []byte(fmt.Sprint(val)) } - if _, ok := headers["source"]; !ok { headers["source"] = []byte(sourcesv1alpha1.RabbitmqEventSource(namespace, sourceName, queueName)) } - return NewMessage(msg.Body(), msg.ContentType(), headers) } @@ -147,19 +146,25 @@ func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) if m.version == nil { return binding.ErrNotBinary } - + var contentTypeSet bool for k, v := range m.Headers { if k == contentTypeHeader { + contentTypeSet = true err = encoder.SetAttribute(m.version.AttributeFromKind(spec.DataContentType), string(v)) // avoid converting any RabbitMQ related headers to the CloudEvent } else if !strings.HasPrefix(k, "x-") { attr := m.version.Attribute(prefix + k) - if attr != nil { - err = encoder.SetAttribute(attr, string(v)) - } else { - err = encoder.SetExtension(k, string(v)) + if err == nil { + if attr != nil { + err = encoder.SetAttribute(attr, string(v)) + } else { + err = encoder.SetExtension(k, string(v)) + } } } + if !contentTypeSet && err == nil { + err = encoder.SetAttribute(m.version.AttributeFromKind(spec.DataContentType), m.ContentType) + } if err != nil { return } @@ -175,7 +180,6 @@ func (m *Message) ReadStructured(ctx context.Context, encoder binding.Structured if m.format != nil { return encoder.SetStructuredEvent(ctx, m.format, bytes.NewReader(m.Value)) } - return binding.ErrNotStructured } @@ -184,7 +188,6 @@ func (m *Message) GetAttribute(k spec.Kind) (spec.Attribute, interface{}) { if attr != nil { return attr, string(m.Headers[attr.PrefixedName()]) } - return nil, nil } diff --git a/test/e2e/cmd/rabbitproducer/main.go b/test/e2e/cmd/rabbitproducer/main.go index 1fe3bd13b3..684966f458 100644 --- a/test/e2e/cmd/rabbitproducer/main.go +++ b/test/e2e/cmd/rabbitproducer/main.go @@ -85,7 +85,7 @@ func main() { contentType = "application/cloudevents+json" headers = amqp.Table{} body = fmt.Sprintf(`{ - "id": %d, + "id": "%d", "type": "knative.producer.e2etest", "source": "example/source.uri", "data": "Hello, CEWorld!", @@ -95,7 +95,7 @@ func main() { case 2: contentType = "text/plain" headers = amqp.Table{} - body = fmt.Sprintf(`{ "id": %d, "message": "Hello, World!" }`, i) + body = fmt.Sprintf(`{ "id": "%d", "message": "Hello, World!" }`, i) } err = ch.Publish(