Skip to content

Commit

Permalink
fixed filters, improved msg readbinary method and fixed tests
Browse files Browse the repository at this point in the history
  • Loading branch information
gabo1208 committed May 10, 2022
1 parent e37d199 commit 779cd76
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 28 deletions.
20 changes: 10 additions & 10 deletions cmd/ingress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,16 +187,16 @@ func (env *envConfig) ServeHTTP(writer http.ResponseWriter, request *http.Reques
func (env *envConfig) send(event *cloudevents.Event, span *trace.Span) (int, error) {
tp, ts := (&tracecontext.HTTPFormat{}).SpanContextToHeaders(span.SpanContext())
headers := amqp.Table{
"content-type": event.DataContentType(),
"ce-specversion": event.SpecVersion(),
"ce-time": cloudevents.Timestamp{Time: event.Time().UTC()}.String(),
"ce-type": event.Type(),
"ce-source": event.Source(),
"ce-subject": event.Subject(),
"ce-id": event.ID(),
"ce-schemaurl": event.DataSchema(),
"traceparent": tp,
"tracestate": ts,
"content-type": event.DataContentType(),
"specversion": event.SpecVersion(),
"time": cloudevents.Timestamp{Time: event.Time().UTC()}.String(),
"type": event.Type(),
"source": event.Source(),
"subject": event.Subject(),
"id": event.ID(),
"schemaurl": event.DataSchema(),
"traceparent": tp,
"tracestate": ts,
}

for key, val := range event.Extensions() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func (a *Adapter) processMessages(wg *sync.WaitGroup, queue <-chan wabbit.Delive
}

func (a *Adapter) postMessage(msg wabbit.Delivery) error {
a.logger.Info("url ->" + a.httpMessageSender.Target)
a.logger.Info("target: " + a.httpMessageSender.Target)
req, err := a.httpMessageSender.NewCloudEventRequest(a.context)
if err != nil {
return err
Expand Down
15 changes: 7 additions & 8 deletions pkg/rabbit/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
const (
prefix = "ce-"
contentTypeHeader = "content-type"
specversionHeader = "specversion"
)

var specs = spec.WithPrefix(prefix)
Expand Down Expand Up @@ -91,22 +92,20 @@ func ConvertMessageToHTTPRequest(
// NewMessageFromDelivery returns a binding.Message that holds the provided RabbitMQ Message.
// The returned binding.Message *can* be read several times safely
func NewMessageFromDelivery(sourceName, namespace, queueName string, msg wabbit.Delivery) *Message {
var contentType string
headers := make(map[string][]byte, len(msg.Headers()))
for key, val := range msg.Headers() {
k := strings.ToLower(key)
if k == contentTypeHeader {
contentType = val.(string)
if k == contentTypeHeader || strings.HasPrefix("trace", k) {
continue
}

headers[k] = []byte(fmt.Sprint(val))
}

if _, ok := headers["ce-source"]; !ok {
headers["ce-source"] = []byte(sourcesv1alpha1.RabbitmqEventSource(namespace, sourceName, queueName))
if _, ok := headers["source"]; !ok {
headers["source"] = []byte(sourcesv1alpha1.RabbitmqEventSource(namespace, sourceName, queueName))
}

return NewMessage(msg.Body(), contentType, headers)
return NewMessage(msg.Body(), msg.ContentType(), headers)
}

// NewMessage returns a binding.Message that holds the provided rabbitmq message components.
Expand All @@ -119,7 +118,7 @@ func NewMessage(value []byte, contentType string, headers map[string][]byte) *Me
Headers: headers,
format: ft,
}
} else if v := specs.Version(string(headers[specs.PrefixedSpecVersionName()])); v != nil {
} else if v := specs.Version(string(headers[specversionHeader])); v != nil {
return &Message{
Value: value,
ContentType: contentType,
Expand Down
18 changes: 9 additions & 9 deletions pkg/rabbit/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,12 @@ func TestProtocol_NewMessage(t *testing.T) {
},
}, {
name: "msg with version",
headers: map[string][]byte{"ce-specversion": []byte("1.0")},
headers: map[string][]byte{"specversion": []byte("1.0")},
contentType: testContentType,
want: &Message{
ContentType: testContentType,
version: specs.Version("1.0"),
Headers: map[string][]byte{"ce-specversion": []byte("1.0")},
Headers: map[string][]byte{"specversion": []byte("1.0")},
},
}, {
name: "msg with format",
Expand Down Expand Up @@ -375,11 +375,12 @@ func TestProtocol_NewMessageFromDelivery(t *testing.T) {
name: "set content type header",
headers: map[string][]byte{"content-type": []byte(testContentType)},
delivery: &origamqp.Delivery{
MessageId: msgId,
Timestamp: msgTime,
Headers: amqp091.Table{"content-type": testContentType},
MessageId: msgId,
Timestamp: msgTime,
ContentType: testContentType,
Headers: amqp091.Table{},
},
want: &Message{Headers: map[string][]byte{"content-type": []byte(testContentType)}, ContentType: testContentType},
want: &Message{Headers: make(map[string][]byte), ContentType: testContentType},
}} {
t.Run(tt.name, func(t *testing.T) {
tt := tt
Expand All @@ -389,9 +390,8 @@ func TestProtocol_NewMessageFromDelivery(t *testing.T) {
Delivery: tt.delivery,
}
got := NewMessageFromDelivery(sourceName, namespace, queueName, m)
//got.Headers["time"] = []byte(time.String())
if _, ok := tt.want.Headers["ce-source"]; !ok {
tt.want.Headers["ce-source"] = []byte(source)
if _, ok := tt.want.Headers["source"]; !ok {
tt.want.Headers["source"] = []byte(source)
}
if !compareMessages(got, tt.want) {
t.Errorf("Unexpected message want:\n%v\ngot:\n%v", tt.want, got)
Expand Down

0 comments on commit 779cd76

Please sign in to comment.