Skip to content

Commit

Permalink
Added content type safeguard in case the header is not present on the…
Browse files Browse the repository at this point in the history
… rabbitmq message + fixed malformed json in e2e test producer

fixing e2e and conformance tests to match expected output

removed unnecesary cast
  • Loading branch information
gabo1208 committed May 11, 2022
1 parent db544fd commit c839f5a
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 16 deletions.
31 changes: 17 additions & 14 deletions pkg/rabbit/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
sourcesv1alpha1 "knative.dev/eventing-rabbitmq/pkg/apis/sources/v1alpha1"

"github.com/NeowayLabs/wabbit"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/binding/format"
Expand All @@ -41,6 +40,8 @@ const (
prefix = "ce-"
contentTypeHeader = "content-type"
specversionHeader = "specversion"
traceparent = "traceparent"
tracestate = "tracestate"
)

var specs = spec.WithPrefix(prefix)
Expand Down Expand Up @@ -72,20 +73,17 @@ func ConvertMessageToHTTPRequest(
logger.Error("Something went wrong while trying to finalizing the message", zap.Error(err))
}
}()

// if the msg is a cloudevent send it as it is to http
if msgBinding.ReadEncoding() != binding.EncodingUnknown {
return http.WriteRequest(cloudevents.WithEncodingBinary(ctx), msgBinding, req)
}

// if the rabbitmq msg is not a cloudevent transform it into one
event := cloudevents.NewEvent()
err := ConvertToCloudEvent(&event, msg, namespace, sourceName, queueName)
if err != nil {
logger.Error("Error converting RabbitMQ msg to CloudEvent", zap.Error(err))
return err
}

return http.WriteRequest(ctx, binding.ToMessage(&event), req)
}

Expand All @@ -94,14 +92,15 @@ func ConvertMessageToHTTPRequest(
func NewMessageFromDelivery(sourceName, namespace, queueName string, msg wabbit.Delivery) *Message {
headers := make(map[string][]byte, len(msg.Headers()))
for key, val := range msg.Headers() {
if key == traceparent || key == tracestate {
continue
}
k := strings.ToLower(key)
headers[k] = []byte(fmt.Sprint(val))
headers[strings.TrimPrefix(k, prefix)] = []byte(fmt.Sprint(val))
}

if _, ok := headers["source"]; !ok {
headers["source"] = []byte(sourcesv1alpha1.RabbitmqEventSource(namespace, sourceName, queueName))
}

return NewMessage(msg.Body(), msg.ContentType(), headers)
}

Expand Down Expand Up @@ -147,19 +146,25 @@ func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter)
if m.version == nil {
return binding.ErrNotBinary
}

var contentTypeSet bool
for k, v := range m.Headers {
if k == contentTypeHeader {
contentTypeSet = true
err = encoder.SetAttribute(m.version.AttributeFromKind(spec.DataContentType), string(v))
// avoid converting any RabbitMQ related headers to the CloudEvent
} else if !strings.HasPrefix(k, "x-") {
attr := m.version.Attribute(prefix + k)
if attr != nil {
err = encoder.SetAttribute(attr, string(v))
} else {
err = encoder.SetExtension(k, string(v))
if err == nil {
if attr != nil {
err = encoder.SetAttribute(attr, string(v))
} else {
err = encoder.SetExtension(k, string(v))
}
}
}
if !contentTypeSet && err == nil {
err = encoder.SetAttribute(m.version.AttributeFromKind(spec.DataContentType), m.ContentType)
}
if err != nil {
return
}
Expand All @@ -175,7 +180,6 @@ func (m *Message) ReadStructured(ctx context.Context, encoder binding.Structured
if m.format != nil {
return encoder.SetStructuredEvent(ctx, m.format, bytes.NewReader(m.Value))
}

return binding.ErrNotStructured
}

Expand All @@ -184,7 +188,6 @@ func (m *Message) GetAttribute(k spec.Kind) (spec.Attribute, interface{}) {
if attr != nil {
return attr, string(m.Headers[attr.PrefixedName()])
}

return nil, nil
}

Expand Down
4 changes: 2 additions & 2 deletions test/e2e/cmd/rabbitproducer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func main() {
contentType = "application/cloudevents+json"
headers = amqp.Table{}
body = fmt.Sprintf(`{
"id": %d,
"id": "%d",
"type": "knative.producer.e2etest",
"source": "example/source.uri",
"data": "Hello, CEWorld!",
Expand All @@ -95,7 +95,7 @@ func main() {
case 2:
contentType = "text/plain"
headers = amqp.Table{}
body = fmt.Sprintf(`{ "id": %d, "message": "Hello, World!" }`, i)
body = fmt.Sprintf(`{ "id": "%d", "message": "Hello, World!" }`, i)
}

err = ch.Publish(
Expand Down

0 comments on commit c839f5a

Please sign in to comment.