Skip to content

Commit

Permalink
set the correct Type and Content-Type headers on out going messages
Browse files Browse the repository at this point in the history
  • Loading branch information
Guy Baron committed Sep 16, 2019
1 parent 1a33290 commit 6a6ea52
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 10 deletions.
13 changes: 7 additions & 6 deletions gbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,12 +655,13 @@ func (b *DefaultBus) sendImpl(sctx context.Context, tx *sql.Tx, toService, reply
}

msg := amqp.Publishing{
Body: buffer,
ReplyTo: replyTo,
MessageId: message.ID,
CorrelationId: message.CorrelationID,
ContentEncoding: b.Serializer.Name(),
Headers: headers,
Type: message.PayloadFQN,
Body: buffer,
ReplyTo: replyTo,
MessageId: message.ID,
CorrelationId: message.CorrelationID,
ContentType: b.Serializer.Name(),
Headers: headers,
}
span.LogFields(message.GetTraceLog()...)

Expand Down
11 changes: 11 additions & 0 deletions gbus/policy/generic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package policy

import "github.com/streadway/amqp"

type Generic struct {
Funk func(publishing *amqp.Publishing)
}

func (g *Generic) Apply(publishing *amqp.Publishing) {
g.Funk(publishing)
}
2 changes: 1 addition & 1 deletion gbus/serialization/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func NewAvroSerializer(schemaRegistryUrls ...string) *Avro {

//Name implements Serializer.Name
func (as *Avro) Name() string {
return "avro"
return "avro/binary"
}

//Encode encodes an object into a byte array
Expand Down
2 changes: 1 addition & 1 deletion gbus/serialization/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func NewProtoSerializer(logger logrus.FieldLogger) gbus.Serializer {

//Name implements Serializer.Name
func (as *Proto) Name() string {
return "proto"
return "application/x-protobuf"
}

//Encode encodes an object into a byte array
Expand Down
22 changes: 22 additions & 0 deletions tests/bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/wework/grabbit/gbus/metrics"
"github.com/wework/grabbit/gbus/policy"

"github.com/opentracing/opentracing-go"
olog "github.com/opentracing/opentracing-go/log"
Expand Down Expand Up @@ -619,6 +620,27 @@ func TestFailHandlerInvokeOfMessageWithEmptyBody(t *testing.T) {
}, t)
}

func TestTypeAndContentTypeHeadersSet(t *testing.T) {
cmd := Command1{}

bus := createNamedBusForTest(testSvc1)

policy := &policy.Generic{
Funk: func(publishing *amqp.Publishing) {
if publishing.Type != cmd.SchemaName() {
t.Errorf("publishing.Type != cmd.SchemaName()")
}
dfb := bus.(*gbus.DefaultBus)
if publishing.ContentType != dfb.Serializer.Name() {
t.Errorf("expected %s as content-type but actual value was %s", dfb.Serializer.Name(), publishing.ContentType)
}
}}

bus.Start()
defer bus.Shutdown()
bus.Send(context.Background(), testSvc1, gbus.NewBusMessage(cmd), policy)
}

func TestHealthCheck(t *testing.T) {
svc1 := createNamedBusForTest(testSvc1)
err := svc1.Start()
Expand Down
4 changes: 2 additions & 2 deletions tests/protoSerialization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ func TestProtoSerialization(t *testing.T) {
serializer := serialization.NewProtoSerializer(logger)

name := serializer.Name()
if name != "proto" {
t.Fatalf("incorrect serializer name. expected:proto actual:%s", name)
if name != "application/x-protobuf" {
t.Fatalf("incorrect serializer name. expected:application/x-protobuf actual:%s", name)
}
cmd := getProtoCommand()
schemaName := cmd.SchemaName()
Expand Down

0 comments on commit 6a6ea52

Please sign in to comment.