From 5b77856625ac4f2b85a01699055dd95e2ed9913f Mon Sep 17 00:00:00 2001 From: "adi.yaskolka" Date: Mon, 26 Aug 2019 19:01:43 +0300 Subject: [PATCH 01/15] BPINFRA125 - MERGE MASTER INTO BRANCH --- gbus/worker.go | 1 + tests/bus_test.go | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/gbus/worker.go b/gbus/worker.go index 5bb83f1..2b04791 100644 --- a/gbus/worker.go +++ b/gbus/worker.go @@ -96,6 +96,7 @@ func (worker *worker) consumeMessages() { for msg := range worker.messages { if msg.Body == nil || len(msg.Body) == 0 { + worker.reject(false, msg) continue } worker.processMessage(msg, false) diff --git a/tests/bus_test.go b/tests/bus_test.go index dfd201c..16fb204 100644 --- a/tests/bus_test.go +++ b/tests/bus_test.go @@ -433,6 +433,43 @@ func TestSendingPanic(t *testing.T) { } } +func TestSendingEmptyBody(t *testing.T) { + rejectedMessages, err := metrics.GetRejectedMessagesValue() + if err != nil { + t.Error("failed to get rejected messages value") + } + + b := createBusForTest() + + err = b.Start() + if err != nil { + t.Errorf("could not start bus for test error: %s", err.Error()) + } + defer b.Shutdown() + + conn, err := amqp.Dial(connStr) + if err != nil { + t.Error("couldnt connect to rabbitmq") + } + + ch, err := conn.Channel() + if err != nil { + t.Error("couldnt open rabbitmq channel for publishing") + } + defer ch.Close() + + cmd := amqp.Publishing{} + err = ch.Publish("", testSvc1, true, false, cmd) + if err != nil { + t.Error("couldnt send message on rabbitmq channel") + } + time.Sleep(1 * time.Second) + count, _ := metrics.GetRejectedMessagesValue() + if count != rejectedMessages+1 { + t.Error("Should have one rejected message") + } +} + func TestHealthCheck(t *testing.T) { svc1 := createNamedBusForTest(testSvc1) err := svc1.Start() From 5a912582a06981e60bd8ae12ebb40eb2bc391c0f Mon Sep 17 00:00:00 2001 From: "adi.yaskolka" Date: Mon, 26 Aug 2019 19:02:32 +0300 Subject: [PATCH 02/15] reject messages with empty body --- tests/bus_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/bus_test.go b/tests/bus_test.go index 16fb204..4bec994 100644 --- a/tests/bus_test.go +++ b/tests/bus_test.go @@ -463,7 +463,7 @@ func TestSendingEmptyBody(t *testing.T) { if err != nil { t.Error("couldnt send message on rabbitmq channel") } - time.Sleep(1 * time.Second) + time.Sleep(1 * time.Millisecond) count, _ := metrics.GetRejectedMessagesValue() if count != rejectedMessages+1 { t.Error("Should have one rejected message") From fc2c60931ef5f3241270bbcaf9075dbf572800a6 Mon Sep 17 00:00:00 2001 From: "adi.yaskolka" Date: Tue, 27 Aug 2019 11:42:29 +0300 Subject: [PATCH 03/15] handle message with empty body --- gbus/worker.go | 18 +++++++++------ tests/bus_test.go | 57 ++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 63 insertions(+), 12 deletions(-) diff --git a/gbus/worker.go b/gbus/worker.go index 2b04791..6e5c55c 100644 --- a/gbus/worker.go +++ b/gbus/worker.go @@ -95,10 +95,6 @@ func (worker *worker) createMessagesChannel(q amqp.Queue, consumerTag string) (< func (worker *worker) consumeMessages() { for msg := range worker.messages { - if msg.Body == nil || len(msg.Body) == 0 { - worker.reject(false, msg) - continue - } worker.processMessage(msg, false) } } @@ -106,9 +102,6 @@ func (worker *worker) consumeMessages() { func (worker *worker) consumeRPC() { for msg := range worker.rpcMessages { - if msg.Body == nil || len(msg.Body) == 0 { - continue - } worker.processMessage(msg, true) } } @@ -318,6 +311,17 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { _ = worker.ack(delivery) return } + + if delivery.Body == nil || len(delivery.Body) == 0 { + worker.log(). + WithFields( + logrus.Fields{"message-name": msgName}). + Warn("body is missing for message. Cannot invoke handlers.") + worker.span.LogFields(slog.String("grabbit", "no body found")) + // if there are handlers registered for this type of message, it's a bug and the message must be rejected. + _ = worker.reject(false, delivery) + return + } /* extract the bus message only after we are sure there are registered handlers since it includes deserializing the amqp payload which we want to avoid if no handlers are found diff --git a/tests/bus_test.go b/tests/bus_test.go index 4bec994..da92080 100644 --- a/tests/bus_test.go +++ b/tests/bus_test.go @@ -434,13 +434,55 @@ func TestSendingPanic(t *testing.T) { } func TestSendingEmptyBody(t *testing.T) { - rejectedMessages, err := metrics.GetRejectedMessagesValue() + b := createBusForTest() + proceed := make(chan bool) + b.SetGlobalRawMessageHandler(func(tx *sql.Tx, delivery *amqp.Delivery) error { + proceed <- true + return nil + }) + + err := b.Start() if err != nil { - t.Error("failed to get rejected messages value") + t.Errorf("could not start bus for test error: %s", err.Error()) } + defer b.Shutdown() + conn, err := amqp.Dial(connStr) + if err != nil { + t.Error("couldnt connect to rabbitmq") + } + + ch, err := conn.Channel() + if err != nil { + t.Error("couldnt open rabbitmq channel for publishing") + } + defer ch.Close() + + cmd := amqp.Publishing{} + err = ch.Publish("", testSvc1, true, false, cmd) + if err != nil { + t.Error("couldnt send message on rabbitmq channel") + } + + select { + case <-proceed: + fmt.Println("success") + case <-time.After(2 * time.Second): + t.Fatal("timeout, failed to consume message with missing body") + } +} + +func TestFailHandlerInvokeOfMessageWithEmptyBody(t *testing.T) { b := createBusForTest() + err := b.HandleMessage(&Command1{}, func(invocation gbus.Invocation, message *gbus.BusMessage) error { + t.Error("handler invoked for non-grabbit message") + return nil + }) + if err != nil { + t.Errorf("could not register handler for bus %s", err.Error()) + } + err = b.Start() if err != nil { t.Errorf("could not start bus for test error: %s", err.Error()) @@ -458,16 +500,21 @@ func TestSendingEmptyBody(t *testing.T) { } defer ch.Close() - cmd := amqp.Publishing{} + headersMap := make(map[string]interface{}) + headersMap["x-msg-name"] = "grabbit.tests.command1" + cmd := amqp.Publishing{Headers: headersMap} err = ch.Publish("", testSvc1, true, false, cmd) if err != nil { t.Error("couldnt send message on rabbitmq channel") } - time.Sleep(1 * time.Millisecond) + + time.Sleep(1 * time.Second) + count, _ := metrics.GetRejectedMessagesValue() - if count != rejectedMessages+1 { + if count != 1 { t.Error("Should have one rejected message") } + } func TestHealthCheck(t *testing.T) { From 49f62044717569188fd7d3c9d5c112cd2a0912d3 Mon Sep 17 00:00:00 2001 From: "adi.yaskolka" Date: Tue, 27 Aug 2019 13:01:05 +0300 Subject: [PATCH 04/15] BPINFRA125 - MERGE MASTER INTO BRANCH --- tests/bus_test.go | 54 ++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 49 insertions(+), 5 deletions(-) diff --git a/tests/bus_test.go b/tests/bus_test.go index 4d6fe48..9150d68 100644 --- a/tests/bus_test.go +++ b/tests/bus_test.go @@ -487,7 +487,7 @@ func TestSendingPanic(t *testing.T) { } } -func TestSendingEmptyBody(t *testing.T) { +func TestEmptyBody(t *testing.T) { b := createBusForTest() proceed := make(chan bool) b.SetGlobalRawMessageHandler(func(tx *sql.Tx, delivery *amqp.Delivery) error { @@ -526,10 +526,54 @@ func TestSendingEmptyBody(t *testing.T) { } } +func TestDeadEmptyBody(t *testing.T) { + b := createBusWithConfig(testSvc1, "grabbit-dead", true, true, + gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0}) + + proceed := make(chan bool) + b.HandleDeadletter(func(tx *sql.Tx, delivery *amqp.Delivery) error { + proceed <- true + return nil + }) + + err := b.Start() + if err != nil { + t.Errorf("could not start bus for test error: %s", err.Error()) + } + defer b.Shutdown() + + conn, err := amqp.Dial(connStr) + if err != nil { + t.Error("couldnt connect to rabbitmq") + } + + ch, err := conn.Channel() + if err != nil { + t.Error("couldnt open rabbitmq channel for publishing") + } + defer ch.Close() + + headersMap := make(map[string]interface{}) + headersMap["x-death"] = make([]interface{}, 0) + cmd := amqp.Publishing{Headers: headersMap} + err = ch.Publish("", testSvc1, true, false, cmd) + if err != nil { + t.Error("couldnt send message on rabbitmq channel") + } + + select { + case <-proceed: + fmt.Println("success") + case <-time.After(2 * time.Second): + t.Fatal("timeout, failed to consume message with missing body") + } +} + func TestFailHandlerInvokeOfMessageWithEmptyBody(t *testing.T) { - b := createBusForTest() + b := createBusWithConfig(testSvc1, "grabbit-dead", true, true, + gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0}) - err := b.HandleMessage(&Command1{}, func(invocation gbus.Invocation, message *gbus.BusMessage) error { + err := b.HandleMessage(&Command2{}, func(invocation gbus.Invocation, message *gbus.BusMessage) error { t.Error("handler invoked for non-grabbit message") return nil }) @@ -555,14 +599,14 @@ func TestFailHandlerInvokeOfMessageWithEmptyBody(t *testing.T) { defer ch.Close() headersMap := make(map[string]interface{}) - headersMap["x-msg-name"] = "grabbit.tests.command1" + headersMap["x-msg-name"] = Command2{}.SchemaName() cmd := amqp.Publishing{Headers: headersMap} err = ch.Publish("", testSvc1, true, false, cmd) if err != nil { t.Error("couldnt send message on rabbitmq channel") } - time.Sleep(1 * time.Second) + time.Sleep(500 * time.Millisecond) count, _ := metrics.GetRejectedMessagesValue() if count != 1 { From d311de4c2ebb567c601a7fa444e3ac792f80dfe4 Mon Sep 17 00:00:00 2001 From: "adi.yaskolka" Date: Tue, 27 Aug 2019 16:55:45 +0300 Subject: [PATCH 05/15] BPINFRA125 - MERGE MASTER INTO BRANCH --- tests/bus_test.go | 27 ++++++++++++++++++++++----- tests/testMessages.go | 8 ++++++++ 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/tests/bus_test.go b/tests/bus_test.go index 9150d68..ba3da83 100644 --- a/tests/bus_test.go +++ b/tests/bus_test.go @@ -499,7 +499,12 @@ func TestEmptyBody(t *testing.T) { if err != nil { t.Errorf("could not start bus for test error: %s", err.Error()) } - defer b.Shutdown() + defer func() { + err := b.Shutdown() + if err != nil { + t.Fatal(err) + } + }() conn, err := amqp.Dial(connStr) if err != nil { @@ -540,7 +545,13 @@ func TestDeadEmptyBody(t *testing.T) { if err != nil { t.Errorf("could not start bus for test error: %s", err.Error()) } - defer b.Shutdown() + + defer func() { + err := b.Shutdown() + if err != nil { + t.Fatal(err) + } + }() conn, err := amqp.Dial(connStr) if err != nil { @@ -573,7 +584,7 @@ func TestFailHandlerInvokeOfMessageWithEmptyBody(t *testing.T) { b := createBusWithConfig(testSvc1, "grabbit-dead", true, true, gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0}) - err := b.HandleMessage(&Command2{}, func(invocation gbus.Invocation, message *gbus.BusMessage) error { + err := b.HandleMessage(&Command3{}, func(invocation gbus.Invocation, message *gbus.BusMessage) error { t.Error("handler invoked for non-grabbit message") return nil }) @@ -585,7 +596,13 @@ func TestFailHandlerInvokeOfMessageWithEmptyBody(t *testing.T) { if err != nil { t.Errorf("could not start bus for test error: %s", err.Error()) } - defer b.Shutdown() + + defer func() { + err := b.Shutdown() + if err != nil { + t.Fatal(err) + } + }() conn, err := amqp.Dial(connStr) if err != nil { @@ -599,7 +616,7 @@ func TestFailHandlerInvokeOfMessageWithEmptyBody(t *testing.T) { defer ch.Close() headersMap := make(map[string]interface{}) - headersMap["x-msg-name"] = Command2{}.SchemaName() + headersMap["x-msg-name"] = Command3{}.SchemaName() cmd := amqp.Publishing{Headers: headersMap} err = ch.Publish("", testSvc1, true, false, cmd) if err != nil { diff --git a/tests/testMessages.go b/tests/testMessages.go index 54aa9c0..b331940 100644 --- a/tests/testMessages.go +++ b/tests/testMessages.go @@ -33,6 +33,14 @@ func (Command2) SchemaName() string { return "grabbit.tests.Command2" } +type Command3 struct { + Data string +} + +func (Command3) SchemaName() string { + return "grabbit.tests.Command3" +} + type Reply1 struct { Data string } From f1841886799368ddfa5e6147cca2c297b309ef67 Mon Sep 17 00:00:00 2001 From: "adi.yaskolka" Date: Tue, 27 Aug 2019 17:00:02 +0300 Subject: [PATCH 06/15] handle empty body messages --- tests/bus_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/bus_test.go b/tests/bus_test.go index ba3da83..ee0f40d 100644 --- a/tests/bus_test.go +++ b/tests/bus_test.go @@ -510,6 +510,7 @@ func TestEmptyBody(t *testing.T) { if err != nil { t.Error("couldnt connect to rabbitmq") } + defer conn.Close() ch, err := conn.Channel() if err != nil { From 1c8b194153a2e6365fdc7a47677cf986bf74d8d6 Mon Sep 17 00:00:00 2001 From: "adi.yaskolka" Date: Tue, 27 Aug 2019 18:31:38 +0300 Subject: [PATCH 07/15] handle empty body incoming messages --- tests/bus_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/bus_test.go b/tests/bus_test.go index ee0f40d..7ce611a 100644 --- a/tests/bus_test.go +++ b/tests/bus_test.go @@ -558,7 +558,7 @@ func TestDeadEmptyBody(t *testing.T) { if err != nil { t.Error("couldnt connect to rabbitmq") } - + defer conn.Close() ch, err := conn.Channel() if err != nil { t.Error("couldnt open rabbitmq channel for publishing") @@ -609,6 +609,7 @@ func TestFailHandlerInvokeOfMessageWithEmptyBody(t *testing.T) { if err != nil { t.Error("couldnt connect to rabbitmq") } + defer conn.Close() ch, err := conn.Channel() if err != nil { From 58a14d7c3b2295171f830a24d8236258c4462265 Mon Sep 17 00:00:00 2001 From: "adi.yaskolka" Date: Tue, 27 Aug 2019 19:34:05 +0300 Subject: [PATCH 08/15] handle messages with empty body --- tests/bus_test.go | 31 +++++++++++++++++++------------ tests/consts.go | 2 ++ 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/tests/bus_test.go b/tests/bus_test.go index 7ce611a..ef12de6 100644 --- a/tests/bus_test.go +++ b/tests/bus_test.go @@ -488,7 +488,7 @@ func TestSendingPanic(t *testing.T) { } func TestEmptyBody(t *testing.T) { - b := createBusForTest() + b := createNamedBusForTest(testSvc5) proceed := make(chan bool) b.SetGlobalRawMessageHandler(func(tx *sql.Tx, delivery *amqp.Delivery) error { proceed <- true @@ -519,7 +519,7 @@ func TestEmptyBody(t *testing.T) { defer ch.Close() cmd := amqp.Publishing{} - err = ch.Publish("", testSvc1, true, false, cmd) + err = ch.Publish("", testSvc5, true, false, cmd) if err != nil { t.Error("couldnt send message on rabbitmq channel") } @@ -533,7 +533,7 @@ func TestEmptyBody(t *testing.T) { } func TestDeadEmptyBody(t *testing.T) { - b := createBusWithConfig(testSvc1, "grabbit-dead", true, true, + b := createBusWithConfig(testSvc5, "grabbit-dead", true, true, gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0}) proceed := make(chan bool) @@ -568,7 +568,7 @@ func TestDeadEmptyBody(t *testing.T) { headersMap := make(map[string]interface{}) headersMap["x-death"] = make([]interface{}, 0) cmd := amqp.Publishing{Headers: headersMap} - err = ch.Publish("", testSvc1, true, false, cmd) + err = ch.Publish("", testSvc5, true, false, cmd) if err != nil { t.Error("couldnt send message on rabbitmq channel") } @@ -582,9 +582,14 @@ func TestDeadEmptyBody(t *testing.T) { } func TestFailHandlerInvokeOfMessageWithEmptyBody(t *testing.T) { - b := createBusWithConfig(testSvc1, "grabbit-dead", true, true, + b := createBusWithConfig(testSvc5, "grabbit-dead1", true, true, gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0}) + proceed := make(chan bool) + b.HandleDeadletter(func(tx *sql.Tx, delivery *amqp.Delivery) error { + proceed <- true + return nil + }) err := b.HandleMessage(&Command3{}, func(invocation gbus.Invocation, message *gbus.BusMessage) error { t.Error("handler invoked for non-grabbit message") return nil @@ -620,18 +625,20 @@ func TestFailHandlerInvokeOfMessageWithEmptyBody(t *testing.T) { headersMap := make(map[string]interface{}) headersMap["x-msg-name"] = Command3{}.SchemaName() cmd := amqp.Publishing{Headers: headersMap} - err = ch.Publish("", testSvc1, true, false, cmd) + err = ch.Publish("", testSvc5, true, false, cmd) if err != nil { t.Error("couldnt send message on rabbitmq channel") } - time.Sleep(500 * time.Millisecond) - - count, _ := metrics.GetRejectedMessagesValue() - if count != 1 { - t.Error("Should have one rejected message") + select { + case <-proceed: + count, _ := metrics.GetRejectedMessagesValue() + if count != 1 { + t.Error("Should have one rejected message") + } + case <-time.After(2 * time.Second): + t.Fatal("timeout, failed to consume message with missing body") } - } func TestHealthCheck(t *testing.T) { diff --git a/tests/consts.go b/tests/consts.go index 964ae02..28d4276 100644 --- a/tests/consts.go +++ b/tests/consts.go @@ -13,6 +13,7 @@ var testSvc1 string var testSvc2 string var testSvc3 string var testSvc4 string +var testSvc5 string func init() { connStr = "amqp://rabbitmq:rabbitmq@localhost" @@ -20,6 +21,7 @@ func init() { testSvc2 = "testSvc2" testSvc3 = "testSvc3" testSvc4 = "test-svc4" + testSvc5 = "test-svc5" } func createBusWithConfig(svcName string, deadletter string, txnl, pos bool, conf gbus.BusConfiguration) gbus.Bus { From ac25b6c68ed854527ba11419aa97ad6bfeab5ac3 Mon Sep 17 00:00:00 2001 From: "adi.yaskolka" Date: Tue, 27 Aug 2019 19:36:36 +0300 Subject: [PATCH 09/15] handle messages with empty body --- tests/bus_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/bus_test.go b/tests/bus_test.go index ef12de6..c7fc105 100644 --- a/tests/bus_test.go +++ b/tests/bus_test.go @@ -582,6 +582,7 @@ func TestDeadEmptyBody(t *testing.T) { } func TestFailHandlerInvokeOfMessageWithEmptyBody(t *testing.T) { + rejected, _ := metrics.GetRejectedMessagesValue() b := createBusWithConfig(testSvc5, "grabbit-dead1", true, true, gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0}) @@ -633,7 +634,7 @@ func TestFailHandlerInvokeOfMessageWithEmptyBody(t *testing.T) { select { case <-proceed: count, _ := metrics.GetRejectedMessagesValue() - if count != 1 { + if count != rejected+1 { t.Error("Should have one rejected message") } case <-time.After(2 * time.Second): From ba05700d592d4fc1119cb901dd072a0f3ffbca14 Mon Sep 17 00:00:00 2001 From: "adi.yaskolka" Date: Thu, 29 Aug 2019 21:05:19 +0300 Subject: [PATCH 10/15] handle empty message body --- tests/bus_test.go | 185 +++++++++++++++++++++------------------------- 1 file changed, 85 insertions(+), 100 deletions(-) diff --git a/tests/bus_test.go b/tests/bus_test.go index eeabfbb..4dbdde9 100644 --- a/tests/bus_test.go +++ b/tests/bus_test.go @@ -50,6 +50,7 @@ func TestSendCommand(t *testing.T) { if err != nil { t.Errorf("could not start bus for test error: %s", err.Error()) } + defer assertBusShutdown(b, t) err = b.Send(noopTraceContext(), testSvc1, gbus.NewBusMessage(cmd)) if err != nil { @@ -57,9 +58,7 @@ func TestSendCommand(t *testing.T) { return } - <-proceed - b.Shutdown() - + proceedOrTimeout(2, proceed, nil, t) } func TestReply(t *testing.T) { @@ -84,7 +83,7 @@ func TestReply(t *testing.T) { replyHandler := func(invocation gbus.Invocation, message *gbus.BusMessage) error { _, ok := message.Payload.(*Reply1) if !ok { - t.Errorf("message handler for reply message invoced with wrong message type\r\n%v", message) + t.Errorf("message handler for reply message invoked with wrong message type\r\n%v", message) } proceed <- true @@ -95,13 +94,14 @@ func TestReply(t *testing.T) { svc1.HandleMessage(reply, replyHandler) svc1.Start() - defer svc1.Shutdown() + defer assertBusShutdown(svc1, t) svc2.Start() - defer svc2.Shutdown() + defer assertBusShutdown(svc2, t) svc1.Send(noopTraceContext(), testSvc2, cmdBusMsg) - <-proceed + + proceedOrTimeout(2, proceed, nil, t) } func TestPubSub(t *testing.T) { @@ -116,13 +116,13 @@ func TestPubSub(t *testing.T) { b.HandleEvent("test_exchange", "test_topic", event, eventHandler) b.Start() - defer b.Shutdown() + defer assertBusShutdown(b, t) err := b.Publish(noopTraceContext(), "test_exchange", "test_topic", gbus.NewBusMessage(event)) if err != nil { t.Fatal(err) } - <-proceed + proceedOrTimeout(2, proceed, nil, t) } func TestSubscribingOnTopic(t *testing.T) { @@ -137,12 +137,13 @@ func TestSubscribingOnTopic(t *testing.T) { b.HandleEvent("test_exchange", "a.*.c", nil, eventHandler) b.Start() - defer b.Shutdown() + defer assertBusShutdown(b, t) err := b.Publish(noopTraceContext(), "test_exchange", "a.b.c", gbus.NewBusMessage(event)) if err != nil { t.Fatal(err) } - <-proceed + + proceedOrTimeout(2, proceed, nil, t) } var ( @@ -167,7 +168,7 @@ func TestHandlerRetry(t *testing.T) { bus.HandleMessage(r1, handleRetry) bus.Start() - defer bus.Shutdown() + defer assertBusShutdown(bus, t) bus.Send(noopTraceContext(), testSvc1, cmd) <-handlerRetryProceed @@ -214,10 +215,10 @@ func TestRPC(t *testing.T) { svc1 := createNamedBusForTest(testSvc1) svc1.HandleMessage(c1, handler) svc1.Start() - defer svc1.Shutdown() + defer assertBusShutdown(svc1, t) svc2 := createNamedBusForTest(testSvc2) svc2.Start() - defer svc2.Shutdown() + defer assertBusShutdown(svc1, t) t.Log("Sending RPC") reply, _ = svc2.RPC(noopTraceContext(), testSvc1, cmd, reply, 5*time.Second) t.Log("Tested RPC") @@ -248,14 +249,15 @@ func TestDeadlettering(t *testing.T) { service1.HandleMessage(Command1{}, faultyHandler) deadletterSvc.Start() - defer deadletterSvc.Shutdown() + defer assertBusShutdown(deadletterSvc, t) service1.Start() - defer service1.Shutdown() + defer assertBusShutdown(service1, t) service1.Send(context.Background(), testSvc1, poison) service1.Send(context.Background(), testSvc1, gbus.NewBusMessage(Command1{})) - <-proceed + proceedOrTimeout(2, proceed, nil, t) + count, _ := metrics.GetRejectedMessagesValue() if count != 1 { t.Error("Should have one rejected message") @@ -290,13 +292,12 @@ func TestRawMessageHandling(t *testing.T) { svc1 := createNamedBusForTest(testSvc1) svc1.SetGlobalRawMessageHandler(handler) _ = svc1.Start() + defer assertBusShutdown(svc1, t) cmd1 := gbus.NewBusMessage(Command1{}) _ = svc1.Send(context.Background(), testSvc1, cmd1) - <-proceed - _ = svc1.Shutdown() - + proceedOrTimeout(2, proceed, nil, t) } func TestReturnDeadToQueue(t *testing.T) { @@ -330,18 +331,13 @@ func TestReturnDeadToQueue(t *testing.T) { service1.HandleMessage(Command1{}, faultyHandler) deadletterSvc.Start() - defer deadletterSvc.Shutdown() + defer assertBusShutdown(deadletterSvc, t) service1.Start() - defer service1.Shutdown() + defer assertBusShutdown(service1, t) service1.Send(context.Background(), testSvc1, poison) - select { - case <-proceed: - fmt.Println("success") - case <-time.After(2 * time.Second): - t.Fatal("timeout, failed to resend dead message to queue") - } + proceedOrTimeout(2, proceed, nil, t) } func TestDeadLetterHandlerPanic(t *testing.T) { @@ -382,13 +378,13 @@ func TestDeadLetterHandlerPanic(t *testing.T) { } deadletterSvc.Start() - defer deadletterSvc.Shutdown() + defer assertBusShutdown(deadletterSvc, t) service1.Start() - defer service1.Shutdown() + defer assertBusShutdown(service1, t) service1.Send(context.Background(), testSvc1, poison) - select { - case <-proceed: + + proceedOrTimeout(2, proceed, func() { count, _ := metrics.GetRejectedMessagesValue() //we expect only 1 rejcted meessage from the counter since rejected messages that get //requeued are not reported to the metric so the counter won't be increment when the message @@ -397,10 +393,7 @@ func TestDeadLetterHandlerPanic(t *testing.T) { t.Errorf("Should have 1 rejected messages but was %v", count) } - case <-time.After(2 * time.Second): - t.Fatal("timeout, dlq failed to reject message after handler panicked") - } - + }, t) } func TestRegistrationAfterBusStarts(t *testing.T) { @@ -413,14 +406,15 @@ func TestRegistrationAfterBusStarts(t *testing.T) { return nil } b.Start() - defer b.Shutdown() + defer assertBusShutdown(b, t) b.HandleEvent("test_exchange", "test_topic", event, eventHandler) err := b.Publish(noopTraceContext(), "test_exchange", "test_topic", gbus.NewBusMessage(event)) if err != nil { t.Fatal(err) } - <-proceed + + proceedOrTimeout(2, proceed, nil, t) } @@ -449,24 +443,20 @@ func TestOpenTracingReporting(t *testing.T) { if err != nil { t.Fatal(err) } - defer func() { - err := b.Shutdown() - if err != nil { - t.Fatal(err) - } - }() + defer assertBusShutdown(b, t) err = b.Publish(ctx, "test_exchange", "test_topic", gbus.NewBusMessage(event)) if err != nil { t.Fatal(err) } - <-proceed - time.Sleep(2 * time.Second) - span.Finish() - spans := mockTracer.FinishedSpans() - if len(spans) < 2 { - t.Fatal("didn't send any traces in the code") - } + proceedOrTimeout(2, proceed, func() { + time.Sleep(2 * time.Second) + span.Finish() + spans := mockTracer.FinishedSpans() + if len(spans) < 2 { + t.Fatal("didn't send any traces in the code") + } + }, t) } func TestSendingPanic(t *testing.T) { @@ -476,12 +466,7 @@ func TestSendingPanic(t *testing.T) { if err != nil { t.Fatal(err) } - defer func() { - err := b.Shutdown() - if err != nil { - t.Fatal(err) - } - }() + defer assertBusShutdown(b, t) defer func() { if p := recover(); p != nil { t.Fatal("expected not to have to recover this should be handled in grabbit", p) @@ -505,12 +490,7 @@ func TestEmptyBody(t *testing.T) { if err != nil { t.Errorf("could not start bus for test error: %s", err.Error()) } - defer func() { - err := b.Shutdown() - if err != nil { - t.Fatal(err) - } - }() + defer assertBusShutdown(b, t) conn, err := amqp.Dial(connStr) if err != nil { @@ -530,15 +510,15 @@ func TestEmptyBody(t *testing.T) { t.Error("couldnt send message on rabbitmq channel") } - select { - case <-proceed: - fmt.Println("success") - case <-time.After(2 * time.Second): - t.Fatal("timeout, failed to consume message with missing body") - } + proceedOrTimeout(2, proceed, nil, t) } -func TestDeadEmptyBody(t *testing.T) { +func TestEmptyMessageInvokesDeadHanlder(t *testing.T) { + /* + test call for dead letter handler when a message with nil or len 0 body is consumed. the handler + should handle the message successfully. + */ + b := createBusWithConfig(testSvc5, "grabbit-dead", true, true, gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0}) @@ -553,12 +533,7 @@ func TestDeadEmptyBody(t *testing.T) { t.Errorf("could not start bus for test error: %s", err.Error()) } - defer func() { - err := b.Shutdown() - if err != nil { - t.Fatal(err) - } - }() + defer assertBusShutdown(b, t) conn, err := amqp.Dial(connStr) if err != nil { @@ -579,17 +554,17 @@ func TestDeadEmptyBody(t *testing.T) { t.Error("couldnt send message on rabbitmq channel") } - select { - case <-proceed: - fmt.Println("success") - case <-time.After(2 * time.Second): - t.Fatal("timeout, failed to consume message with missing body") - } + proceedOrTimeout(2, proceed, nil, t) } func TestFailHandlerInvokeOfMessageWithEmptyBody(t *testing.T) { - rejected, _ := metrics.GetRejectedMessagesValue() - b := createBusWithConfig(testSvc5, "grabbit-dead1", true, true, + /* + The global and dead letter handlers can consume message with 0 or nil body but + "normal" handlers cannot. + If a "normal" handler is registered for this type of message, the bus must reject this message. + */ + metrics.ResetRejectedMessagesCounter() + b := createBusWithConfig(testSvc1, "grabbit-dead1", true, true, gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0}) proceed := make(chan bool) @@ -597,7 +572,7 @@ func TestFailHandlerInvokeOfMessageWithEmptyBody(t *testing.T) { proceed <- true return nil }) - err := b.HandleMessage(&Command3{}, func(invocation gbus.Invocation, message *gbus.BusMessage) error { + err := b.HandleMessage(&Command1{}, func(invocation gbus.Invocation, message *gbus.BusMessage) error { t.Error("handler invoked for non-grabbit message") return nil }) @@ -610,12 +585,7 @@ func TestFailHandlerInvokeOfMessageWithEmptyBody(t *testing.T) { t.Errorf("could not start bus for test error: %s", err.Error()) } - defer func() { - err := b.Shutdown() - if err != nil { - t.Fatal(err) - } - }() + defer assertBusShutdown(b, t) conn, err := amqp.Dial(connStr) if err != nil { @@ -630,22 +600,19 @@ func TestFailHandlerInvokeOfMessageWithEmptyBody(t *testing.T) { defer ch.Close() headersMap := make(map[string]interface{}) - headersMap["x-msg-name"] = Command3{}.SchemaName() + headersMap["x-msg-name"] = Command1{}.SchemaName() cmd := amqp.Publishing{Headers: headersMap} err = ch.Publish("", testSvc5, true, false, cmd) if err != nil { t.Error("couldnt send message on rabbitmq channel") } - select { - case <-proceed: + proceedOrTimeout(2, proceed, func() { count, _ := metrics.GetRejectedMessagesValue() - if count != rejected+1 { + if count != 1 { t.Error("Should have one rejected message") } - case <-time.After(2 * time.Second): - t.Fatal("timeout, failed to consume message with missing body") - } + }, t) } func TestHealthCheck(t *testing.T) { @@ -654,7 +621,7 @@ func TestHealthCheck(t *testing.T) { if err != nil { t.Error(err.Error()) } - defer svc1.Shutdown() + defer assertBusShutdown(svc1, t) health := svc1.GetHealth() fmt.Printf("%v", health) @@ -669,7 +636,7 @@ func TestSanitizingSvcName(t *testing.T) { if err != nil { t.Error(err.Error()) } - defer svc4.Shutdown() + defer assertBusShutdown(svc4, t) fmt.Println("succeeded sanitizing service name") } @@ -706,3 +673,21 @@ type panicPolicy struct { func (p panicPolicy) Apply(publishing *amqp.Publishing) { panic("vlad") } + +func assertBusShutdown(bus gbus.Bus, t *testing.T) { + err := bus.Shutdown() + if err != nil { + t.Fatal(err) + } +} + +func proceedOrTimeout(timeout time.Duration, p chan bool, onProceed func(), t *testing.T) { + select { + case <-p: + if onProceed != nil { + onProceed() + } + case <-time.After(timeout * time.Second): + t.Fatal("timeout") + } +} From 9957558b477b71c691b8f74bf8d153c8ccbade6e Mon Sep 17 00:00:00 2001 From: "adi.yaskolka" Date: Thu, 29 Aug 2019 22:27:37 +0300 Subject: [PATCH 11/15] handle messagw with empty body --- tests/bus_test.go | 158 +++++++++++++++++++++++++--------------------- 1 file changed, 87 insertions(+), 71 deletions(-) diff --git a/tests/bus_test.go b/tests/bus_test.go index 4dbdde9..d2782ab 100644 --- a/tests/bus_test.go +++ b/tests/bus_test.go @@ -50,15 +50,16 @@ func TestSendCommand(t *testing.T) { if err != nil { t.Errorf("could not start bus for test error: %s", err.Error()) } - defer assertBusShutdown(b, t) + // defer assertBusShutdown(b, t) err = b.Send(noopTraceContext(), testSvc1, gbus.NewBusMessage(cmd)) if err != nil { t.Errorf("could not send message error: %s", err.Error()) return } - - proceedOrTimeout(2, proceed, nil, t) + <-proceed + b.Shutdown() + //proceedOrTimeout(10, proceed, nil, t) } func TestReply(t *testing.T) { @@ -94,14 +95,14 @@ func TestReply(t *testing.T) { svc1.HandleMessage(reply, replyHandler) svc1.Start() - defer assertBusShutdown(svc1, t) + defer svc1.Shutdown() svc2.Start() - defer assertBusShutdown(svc2, t) + defer svc2.Shutdown() svc1.Send(noopTraceContext(), testSvc2, cmdBusMsg) - - proceedOrTimeout(2, proceed, nil, t) + <-proceed + //proceedOrTimeout(2, proceed, nil, t) } func TestPubSub(t *testing.T) { @@ -116,13 +117,13 @@ func TestPubSub(t *testing.T) { b.HandleEvent("test_exchange", "test_topic", event, eventHandler) b.Start() - defer assertBusShutdown(b, t) + defer b.Shutdown() err := b.Publish(noopTraceContext(), "test_exchange", "test_topic", gbus.NewBusMessage(event)) if err != nil { t.Fatal(err) } - - proceedOrTimeout(2, proceed, nil, t) + <-proceed + //proceedOrTimeout(2, proceed, nil, t) } func TestSubscribingOnTopic(t *testing.T) { @@ -137,13 +138,13 @@ func TestSubscribingOnTopic(t *testing.T) { b.HandleEvent("test_exchange", "a.*.c", nil, eventHandler) b.Start() - defer assertBusShutdown(b, t) + defer b.Shutdown() err := b.Publish(noopTraceContext(), "test_exchange", "a.b.c", gbus.NewBusMessage(event)) if err != nil { t.Fatal(err) } - - proceedOrTimeout(2, proceed, nil, t) + <-proceed + //proceedOrTimeout(2, proceed, nil, t) } var ( @@ -168,7 +169,7 @@ func TestHandlerRetry(t *testing.T) { bus.HandleMessage(r1, handleRetry) bus.Start() - defer assertBusShutdown(bus, t) + defer bus.Shutdown() bus.Send(noopTraceContext(), testSvc1, cmd) <-handlerRetryProceed @@ -215,10 +216,10 @@ func TestRPC(t *testing.T) { svc1 := createNamedBusForTest(testSvc1) svc1.HandleMessage(c1, handler) svc1.Start() - defer assertBusShutdown(svc1, t) + defer svc1.Shutdown() svc2 := createNamedBusForTest(testSvc2) svc2.Start() - defer assertBusShutdown(svc1, t) + defer svc2.Shutdown() t.Log("Sending RPC") reply, _ = svc2.RPC(noopTraceContext(), testSvc1, cmd, reply, 5*time.Second) t.Log("Tested RPC") @@ -249,14 +250,15 @@ func TestDeadlettering(t *testing.T) { service1.HandleMessage(Command1{}, faultyHandler) deadletterSvc.Start() - defer assertBusShutdown(deadletterSvc, t) + defer deadletterSvc.Shutdown() service1.Start() - defer assertBusShutdown(service1, t) + defer service1.Shutdown() service1.Send(context.Background(), testSvc1, poison) service1.Send(context.Background(), testSvc1, gbus.NewBusMessage(Command1{})) - proceedOrTimeout(2, proceed, nil, t) + <-proceed + //proceedOrTimeout(2, proceed, nil, t) count, _ := metrics.GetRejectedMessagesValue() if count != 1 { @@ -292,12 +294,13 @@ func TestRawMessageHandling(t *testing.T) { svc1 := createNamedBusForTest(testSvc1) svc1.SetGlobalRawMessageHandler(handler) _ = svc1.Start() - defer assertBusShutdown(svc1, t) cmd1 := gbus.NewBusMessage(Command1{}) _ = svc1.Send(context.Background(), testSvc1, cmd1) - proceedOrTimeout(2, proceed, nil, t) + <-proceed + _ = svc1.Shutdown() + } func TestReturnDeadToQueue(t *testing.T) { @@ -331,13 +334,13 @@ func TestReturnDeadToQueue(t *testing.T) { service1.HandleMessage(Command1{}, faultyHandler) deadletterSvc.Start() - defer assertBusShutdown(deadletterSvc, t) + defer deadletterSvc.Shutdown() service1.Start() - defer assertBusShutdown(service1, t) + defer service1.Shutdown() service1.Send(context.Background(), testSvc1, poison) - - proceedOrTimeout(2, proceed, nil, t) + <-proceed + //proceedOrTimeout(2, proceed, nil, t) } func TestDeadLetterHandlerPanic(t *testing.T) { @@ -378,22 +381,22 @@ func TestDeadLetterHandlerPanic(t *testing.T) { } deadletterSvc.Start() - defer assertBusShutdown(deadletterSvc, t) + defer deadletterSvc.Shutdown() service1.Start() - defer assertBusShutdown(service1, t) + defer service1.Shutdown() service1.Send(context.Background(), testSvc1, poison) - - proceedOrTimeout(2, proceed, func() { - count, _ := metrics.GetRejectedMessagesValue() - //we expect only 1 rejcted meessage from the counter since rejected messages that get - //requeued are not reported to the metric so the counter won't be increment when the message - //in the dlq gets rejected as it is rejected with the requeue option set to true - if count != 1 { - - t.Errorf("Should have 1 rejected messages but was %v", count) - } - }, t) + <-proceed + //proceedOrTimeout(2, proceed, func() { + // count, _ := metrics.GetRejectedMessagesValue() + // //we expect only 1 rejcted meessage from the counter since rejected messages that get + // //requeued are not reported to the metric so the counter won't be increment when the message + // //in the dlq gets rejected as it is rejected with the requeue option set to true + // if count != 1 { + // + // t.Errorf("Should have 1 rejected messages but was %v", count) + // } + //}, t) } func TestRegistrationAfterBusStarts(t *testing.T) { @@ -406,15 +409,15 @@ func TestRegistrationAfterBusStarts(t *testing.T) { return nil } b.Start() - defer assertBusShutdown(b, t) + defer b.Shutdown() b.HandleEvent("test_exchange", "test_topic", event, eventHandler) err := b.Publish(noopTraceContext(), "test_exchange", "test_topic", gbus.NewBusMessage(event)) if err != nil { t.Fatal(err) } - - proceedOrTimeout(2, proceed, nil, t) + <-proceed + // proceedOrTimeout(2, proceed, nil, t) } @@ -443,20 +446,26 @@ func TestOpenTracingReporting(t *testing.T) { if err != nil { t.Fatal(err) } - defer assertBusShutdown(b, t) + defer func() { + err := b.Shutdown() + if err != nil { + t.Fatal(err) + } + }() err = b.Publish(ctx, "test_exchange", "test_topic", gbus.NewBusMessage(event)) if err != nil { t.Fatal(err) } - proceedOrTimeout(2, proceed, func() { - time.Sleep(2 * time.Second) - span.Finish() - spans := mockTracer.FinishedSpans() - if len(spans) < 2 { - t.Fatal("didn't send any traces in the code") - } - }, t) + <-proceed + //proceedOrTimeout(2, proceed, func() { + // time.Sleep(2 * time.Second) + // span.Finish() + // spans := mockTracer.FinishedSpans() + // if len(spans) < 2 { + // t.Fatal("didn't send any traces in the code") + // } + //}, t) } func TestSendingPanic(t *testing.T) { @@ -466,7 +475,12 @@ func TestSendingPanic(t *testing.T) { if err != nil { t.Fatal(err) } - defer assertBusShutdown(b, t) + defer func() { + err := b.Shutdown() + if err != nil { + t.Fatal(err) + } + }() defer func() { if p := recover(); p != nil { t.Fatal("expected not to have to recover this should be handled in grabbit", p) @@ -490,8 +504,8 @@ func TestEmptyBody(t *testing.T) { if err != nil { t.Errorf("could not start bus for test error: %s", err.Error()) } - defer assertBusShutdown(b, t) - + //defer assertBusShutdown(b, t) + defer b.Shutdown() conn, err := amqp.Dial(connStr) if err != nil { t.Error("couldnt connect to rabbitmq") @@ -509,8 +523,8 @@ func TestEmptyBody(t *testing.T) { if err != nil { t.Error("couldnt send message on rabbitmq channel") } - - proceedOrTimeout(2, proceed, nil, t) + <-proceed + //proceedOrTimeout(2, proceed, nil, t) } func TestEmptyMessageInvokesDeadHanlder(t *testing.T) { @@ -532,8 +546,8 @@ func TestEmptyMessageInvokesDeadHanlder(t *testing.T) { if err != nil { t.Errorf("could not start bus for test error: %s", err.Error()) } - - defer assertBusShutdown(b, t) + defer b.Shutdown() + //defer assertBusShutdown(b, t) conn, err := amqp.Dial(connStr) if err != nil { @@ -553,8 +567,8 @@ func TestEmptyMessageInvokesDeadHanlder(t *testing.T) { if err != nil { t.Error("couldnt send message on rabbitmq channel") } - - proceedOrTimeout(2, proceed, nil, t) + <-proceed + //proceedOrTimeout(2, proceed, nil, t) } func TestFailHandlerInvokeOfMessageWithEmptyBody(t *testing.T) { @@ -564,7 +578,7 @@ func TestFailHandlerInvokeOfMessageWithEmptyBody(t *testing.T) { If a "normal" handler is registered for this type of message, the bus must reject this message. */ metrics.ResetRejectedMessagesCounter() - b := createBusWithConfig(testSvc1, "grabbit-dead1", true, true, + b := createBusWithConfig(testSvc5, "grabbit-dead1", true, true, gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0}) proceed := make(chan bool) @@ -572,7 +586,7 @@ func TestFailHandlerInvokeOfMessageWithEmptyBody(t *testing.T) { proceed <- true return nil }) - err := b.HandleMessage(&Command1{}, func(invocation gbus.Invocation, message *gbus.BusMessage) error { + err := b.HandleMessage(&Command3{}, func(invocation gbus.Invocation, message *gbus.BusMessage) error { t.Error("handler invoked for non-grabbit message") return nil }) @@ -585,7 +599,8 @@ func TestFailHandlerInvokeOfMessageWithEmptyBody(t *testing.T) { t.Errorf("could not start bus for test error: %s", err.Error()) } - defer assertBusShutdown(b, t) + //defer assertBusShutdown(b, t) + defer b.Shutdown() conn, err := amqp.Dial(connStr) if err != nil { @@ -600,19 +615,20 @@ func TestFailHandlerInvokeOfMessageWithEmptyBody(t *testing.T) { defer ch.Close() headersMap := make(map[string]interface{}) - headersMap["x-msg-name"] = Command1{}.SchemaName() + headersMap["x-msg-name"] = Command3{}.SchemaName() cmd := amqp.Publishing{Headers: headersMap} err = ch.Publish("", testSvc5, true, false, cmd) if err != nil { t.Error("couldnt send message on rabbitmq channel") } - proceedOrTimeout(2, proceed, func() { - count, _ := metrics.GetRejectedMessagesValue() - if count != 1 { - t.Error("Should have one rejected message") - } - }, t) + <-proceed + //proceedOrTimeout(2, proceed, func() { + // count, _ := metrics.GetRejectedMessagesValue() + // if count != 1 { + // t.Error("Should have one rejected message") + // } + //}, t) } func TestHealthCheck(t *testing.T) { @@ -621,7 +637,7 @@ func TestHealthCheck(t *testing.T) { if err != nil { t.Error(err.Error()) } - defer assertBusShutdown(svc1, t) + defer svc1.Shutdown() health := svc1.GetHealth() fmt.Printf("%v", health) @@ -636,7 +652,7 @@ func TestSanitizingSvcName(t *testing.T) { if err != nil { t.Error(err.Error()) } - defer assertBusShutdown(svc4, t) + defer svc4.Shutdown() fmt.Println("succeeded sanitizing service name") } From 97c44288f0b389d5166ef9f78b364e08f0854a6d Mon Sep 17 00:00:00 2001 From: "adi.yaskolka" Date: Thu, 29 Aug 2019 22:53:52 +0300 Subject: [PATCH 12/15] handle message with empty bodhy --- tests/bus_test.go | 140 +++++++++++++++++++--------------------------- 1 file changed, 56 insertions(+), 84 deletions(-) diff --git a/tests/bus_test.go b/tests/bus_test.go index d2782ab..0e399e6 100644 --- a/tests/bus_test.go +++ b/tests/bus_test.go @@ -50,16 +50,14 @@ func TestSendCommand(t *testing.T) { if err != nil { t.Errorf("could not start bus for test error: %s", err.Error()) } - // defer assertBusShutdown(b, t) + defer assertBusShutdown(b, t) err = b.Send(noopTraceContext(), testSvc1, gbus.NewBusMessage(cmd)) if err != nil { t.Errorf("could not send message error: %s", err.Error()) return } - <-proceed - b.Shutdown() - //proceedOrTimeout(10, proceed, nil, t) + proceedOrTimeout(2, proceed, nil, t) } func TestReply(t *testing.T) { @@ -95,14 +93,13 @@ func TestReply(t *testing.T) { svc1.HandleMessage(reply, replyHandler) svc1.Start() - defer svc1.Shutdown() + defer assertBusShutdown(svc1, t) svc2.Start() - defer svc2.Shutdown() + defer assertBusShutdown(svc2, t) svc1.Send(noopTraceContext(), testSvc2, cmdBusMsg) - <-proceed - //proceedOrTimeout(2, proceed, nil, t) + proceedOrTimeout(2, proceed, nil, t) } func TestPubSub(t *testing.T) { @@ -117,13 +114,12 @@ func TestPubSub(t *testing.T) { b.HandleEvent("test_exchange", "test_topic", event, eventHandler) b.Start() - defer b.Shutdown() + defer assertBusShutdown(b, t) err := b.Publish(noopTraceContext(), "test_exchange", "test_topic", gbus.NewBusMessage(event)) if err != nil { t.Fatal(err) } - <-proceed - //proceedOrTimeout(2, proceed, nil, t) + proceedOrTimeout(2, proceed, nil, t) } func TestSubscribingOnTopic(t *testing.T) { @@ -138,13 +134,12 @@ func TestSubscribingOnTopic(t *testing.T) { b.HandleEvent("test_exchange", "a.*.c", nil, eventHandler) b.Start() - defer b.Shutdown() + defer assertBusShutdown(b, t) err := b.Publish(noopTraceContext(), "test_exchange", "a.b.c", gbus.NewBusMessage(event)) if err != nil { t.Fatal(err) } - <-proceed - //proceedOrTimeout(2, proceed, nil, t) + proceedOrTimeout(2, proceed, nil, t) } var ( @@ -169,7 +164,7 @@ func TestHandlerRetry(t *testing.T) { bus.HandleMessage(r1, handleRetry) bus.Start() - defer bus.Shutdown() + defer assertBusShutdown(bus, t) bus.Send(noopTraceContext(), testSvc1, cmd) <-handlerRetryProceed @@ -216,10 +211,10 @@ func TestRPC(t *testing.T) { svc1 := createNamedBusForTest(testSvc1) svc1.HandleMessage(c1, handler) svc1.Start() - defer svc1.Shutdown() + defer assertBusShutdown(svc1, t) svc2 := createNamedBusForTest(testSvc2) svc2.Start() - defer svc2.Shutdown() + defer assertBusShutdown(svc2, t) t.Log("Sending RPC") reply, _ = svc2.RPC(noopTraceContext(), testSvc1, cmd, reply, 5*time.Second) t.Log("Tested RPC") @@ -250,15 +245,14 @@ func TestDeadlettering(t *testing.T) { service1.HandleMessage(Command1{}, faultyHandler) deadletterSvc.Start() - defer deadletterSvc.Shutdown() + defer assertBusShutdown(deadletterSvc, t) service1.Start() - defer service1.Shutdown() + defer assertBusShutdown(service1, t) service1.Send(context.Background(), testSvc1, poison) service1.Send(context.Background(), testSvc1, gbus.NewBusMessage(Command1{})) - <-proceed - //proceedOrTimeout(2, proceed, nil, t) + proceedOrTimeout(2, proceed, nil, t) count, _ := metrics.GetRejectedMessagesValue() if count != 1 { @@ -294,13 +288,12 @@ func TestRawMessageHandling(t *testing.T) { svc1 := createNamedBusForTest(testSvc1) svc1.SetGlobalRawMessageHandler(handler) _ = svc1.Start() + defer assertBusShutdown(svc1, t) cmd1 := gbus.NewBusMessage(Command1{}) _ = svc1.Send(context.Background(), testSvc1, cmd1) - <-proceed - _ = svc1.Shutdown() - + proceedOrTimeout(2, proceed, nil, t) } func TestReturnDeadToQueue(t *testing.T) { @@ -334,13 +327,12 @@ func TestReturnDeadToQueue(t *testing.T) { service1.HandleMessage(Command1{}, faultyHandler) deadletterSvc.Start() - defer deadletterSvc.Shutdown() + defer assertBusShutdown(deadletterSvc, t) service1.Start() - defer service1.Shutdown() + defer assertBusShutdown(service1, t) service1.Send(context.Background(), testSvc1, poison) - <-proceed - //proceedOrTimeout(2, proceed, nil, t) + proceedOrTimeout(2, proceed, nil, t) } func TestDeadLetterHandlerPanic(t *testing.T) { @@ -381,22 +373,20 @@ func TestDeadLetterHandlerPanic(t *testing.T) { } deadletterSvc.Start() - defer deadletterSvc.Shutdown() + defer assertBusShutdown(deadletterSvc, t) service1.Start() - defer service1.Shutdown() + defer assertBusShutdown(service1, t) service1.Send(context.Background(), testSvc1, poison) - <-proceed - //proceedOrTimeout(2, proceed, func() { - // count, _ := metrics.GetRejectedMessagesValue() - // //we expect only 1 rejcted meessage from the counter since rejected messages that get - // //requeued are not reported to the metric so the counter won't be increment when the message - // //in the dlq gets rejected as it is rejected with the requeue option set to true - // if count != 1 { - // - // t.Errorf("Should have 1 rejected messages but was %v", count) - // } - //}, t) + proceedOrTimeout(2, proceed, func() { + count, _ := metrics.GetRejectedMessagesValue() + //we expect only 1 rejcted meessage from the counter since rejected messages that get + //requeued are not reported to the metric so the counter won't be increment when the message + //in the dlq gets rejected as it is rejected with the requeue option set to true + if count != 1 { + t.Errorf("Should have 1 rejected messages but was %v", count) + } + }, t) } func TestRegistrationAfterBusStarts(t *testing.T) { @@ -409,15 +399,14 @@ func TestRegistrationAfterBusStarts(t *testing.T) { return nil } b.Start() - defer b.Shutdown() + defer assertBusShutdown(b, t) b.HandleEvent("test_exchange", "test_topic", event, eventHandler) err := b.Publish(noopTraceContext(), "test_exchange", "test_topic", gbus.NewBusMessage(event)) if err != nil { t.Fatal(err) } - <-proceed - // proceedOrTimeout(2, proceed, nil, t) + proceedOrTimeout(2, proceed, nil, t) } @@ -446,26 +435,20 @@ func TestOpenTracingReporting(t *testing.T) { if err != nil { t.Fatal(err) } - defer func() { - err := b.Shutdown() - if err != nil { - t.Fatal(err) - } - }() + defer assertBusShutdown(b, t) err = b.Publish(ctx, "test_exchange", "test_topic", gbus.NewBusMessage(event)) if err != nil { t.Fatal(err) } - <-proceed - //proceedOrTimeout(2, proceed, func() { - // time.Sleep(2 * time.Second) - // span.Finish() - // spans := mockTracer.FinishedSpans() - // if len(spans) < 2 { - // t.Fatal("didn't send any traces in the code") - // } - //}, t) + proceedOrTimeout(2, proceed, func() { + time.Sleep(2 * time.Second) + span.Finish() + spans := mockTracer.FinishedSpans() + if len(spans) < 2 { + t.Fatal("didn't send any traces in the code") + } + }, t) } func TestSendingPanic(t *testing.T) { @@ -475,12 +458,7 @@ func TestSendingPanic(t *testing.T) { if err != nil { t.Fatal(err) } - defer func() { - err := b.Shutdown() - if err != nil { - t.Fatal(err) - } - }() + defer assertBusShutdown(b, t) defer func() { if p := recover(); p != nil { t.Fatal("expected not to have to recover this should be handled in grabbit", p) @@ -504,8 +482,7 @@ func TestEmptyBody(t *testing.T) { if err != nil { t.Errorf("could not start bus for test error: %s", err.Error()) } - //defer assertBusShutdown(b, t) - defer b.Shutdown() + defer assertBusShutdown(b, t) conn, err := amqp.Dial(connStr) if err != nil { t.Error("couldnt connect to rabbitmq") @@ -523,8 +500,7 @@ func TestEmptyBody(t *testing.T) { if err != nil { t.Error("couldnt send message on rabbitmq channel") } - <-proceed - //proceedOrTimeout(2, proceed, nil, t) + proceedOrTimeout(2, proceed, nil, t) } func TestEmptyMessageInvokesDeadHanlder(t *testing.T) { @@ -546,7 +522,7 @@ func TestEmptyMessageInvokesDeadHanlder(t *testing.T) { if err != nil { t.Errorf("could not start bus for test error: %s", err.Error()) } - defer b.Shutdown() + defer assertBusShutdown(b, t) //defer assertBusShutdown(b, t) conn, err := amqp.Dial(connStr) @@ -567,8 +543,7 @@ func TestEmptyMessageInvokesDeadHanlder(t *testing.T) { if err != nil { t.Error("couldnt send message on rabbitmq channel") } - <-proceed - //proceedOrTimeout(2, proceed, nil, t) + proceedOrTimeout(2, proceed, nil, t) } func TestFailHandlerInvokeOfMessageWithEmptyBody(t *testing.T) { @@ -598,9 +573,7 @@ func TestFailHandlerInvokeOfMessageWithEmptyBody(t *testing.T) { if err != nil { t.Errorf("could not start bus for test error: %s", err.Error()) } - - //defer assertBusShutdown(b, t) - defer b.Shutdown() + defer assertBusShutdown(b, t) conn, err := amqp.Dial(connStr) if err != nil { @@ -622,13 +595,12 @@ func TestFailHandlerInvokeOfMessageWithEmptyBody(t *testing.T) { t.Error("couldnt send message on rabbitmq channel") } - <-proceed - //proceedOrTimeout(2, proceed, func() { - // count, _ := metrics.GetRejectedMessagesValue() - // if count != 1 { - // t.Error("Should have one rejected message") - // } - //}, t) + proceedOrTimeout(2, proceed, func() { + count, _ := metrics.GetRejectedMessagesValue() + if count != 1 { + t.Error("Should have one rejected message") + } + }, t) } func TestHealthCheck(t *testing.T) { @@ -637,7 +609,7 @@ func TestHealthCheck(t *testing.T) { if err != nil { t.Error(err.Error()) } - defer svc1.Shutdown() + defer assertBusShutdown(svc1, t) health := svc1.GetHealth() fmt.Printf("%v", health) @@ -652,7 +624,7 @@ func TestSanitizingSvcName(t *testing.T) { if err != nil { t.Error(err.Error()) } - defer svc4.Shutdown() + defer assertBusShutdown(svc4, t) fmt.Println("succeeded sanitizing service name") } From fba5de1beeb0fee796ed8508c33cb42f5d7836bf Mon Sep 17 00:00:00 2001 From: "adi.yaskolka" Date: Thu, 29 Aug 2019 22:55:14 +0300 Subject: [PATCH 13/15] handle message with empty body --- tests/bus_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/bus_test.go b/tests/bus_test.go index 0e399e6..31a836f 100644 --- a/tests/bus_test.go +++ b/tests/bus_test.go @@ -523,7 +523,6 @@ func TestEmptyMessageInvokesDeadHanlder(t *testing.T) { t.Errorf("could not start bus for test error: %s", err.Error()) } defer assertBusShutdown(b, t) - //defer assertBusShutdown(b, t) conn, err := amqp.Dial(connStr) if err != nil { From 48317a1575612ffe608ab2ef4fee1e266c26ead3 Mon Sep 17 00:00:00 2001 From: "adi.yaskolka" Date: Thu, 29 Aug 2019 23:01:57 +0300 Subject: [PATCH 14/15] handle message with empty body --- tests/bus_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/bus_test.go b/tests/bus_test.go index 31a836f..9dde28b 100644 --- a/tests/bus_test.go +++ b/tests/bus_test.go @@ -552,7 +552,7 @@ func TestFailHandlerInvokeOfMessageWithEmptyBody(t *testing.T) { If a "normal" handler is registered for this type of message, the bus must reject this message. */ metrics.ResetRejectedMessagesCounter() - b := createBusWithConfig(testSvc5, "grabbit-dead1", true, true, + b := createBusWithConfig(testSvc1, "grabbit-dead1", true, true, gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0}) proceed := make(chan bool) @@ -589,7 +589,7 @@ func TestFailHandlerInvokeOfMessageWithEmptyBody(t *testing.T) { headersMap := make(map[string]interface{}) headersMap["x-msg-name"] = Command3{}.SchemaName() cmd := amqp.Publishing{Headers: headersMap} - err = ch.Publish("", testSvc5, true, false, cmd) + err = ch.Publish("", testSvc1, true, false, cmd) if err != nil { t.Error("couldnt send message on rabbitmq channel") } From 107f4024ea5ea49192f3fb5dda5922ead37407d0 Mon Sep 17 00:00:00 2001 From: "adi.yaskolka" Date: Fri, 30 Aug 2019 16:03:18 +0300 Subject: [PATCH 15/15] handle empty body --- tests/bus_test.go | 12 ++++++------ tests/consts.go | 2 -- tests/testMessages.go | 8 -------- 3 files changed, 6 insertions(+), 16 deletions(-) diff --git a/tests/bus_test.go b/tests/bus_test.go index 9dde28b..e3da0ea 100644 --- a/tests/bus_test.go +++ b/tests/bus_test.go @@ -471,7 +471,7 @@ func TestSendingPanic(t *testing.T) { } func TestEmptyBody(t *testing.T) { - b := createNamedBusForTest(testSvc5) + b := createNamedBusForTest(testSvc1) proceed := make(chan bool) b.SetGlobalRawMessageHandler(func(tx *sql.Tx, delivery *amqp.Delivery) error { proceed <- true @@ -496,7 +496,7 @@ func TestEmptyBody(t *testing.T) { defer ch.Close() cmd := amqp.Publishing{} - err = ch.Publish("", testSvc5, true, false, cmd) + err = ch.Publish("", testSvc1, true, false, cmd) if err != nil { t.Error("couldnt send message on rabbitmq channel") } @@ -509,7 +509,7 @@ func TestEmptyMessageInvokesDeadHanlder(t *testing.T) { should handle the message successfully. */ - b := createBusWithConfig(testSvc5, "grabbit-dead", true, true, + b := createBusWithConfig(testSvc1, "grabbit-dead", true, true, gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0}) proceed := make(chan bool) @@ -538,7 +538,7 @@ func TestEmptyMessageInvokesDeadHanlder(t *testing.T) { headersMap := make(map[string]interface{}) headersMap["x-death"] = make([]interface{}, 0) cmd := amqp.Publishing{Headers: headersMap} - err = ch.Publish("", testSvc5, true, false, cmd) + err = ch.Publish("", testSvc1, true, false, cmd) if err != nil { t.Error("couldnt send message on rabbitmq channel") } @@ -560,7 +560,7 @@ func TestFailHandlerInvokeOfMessageWithEmptyBody(t *testing.T) { proceed <- true return nil }) - err := b.HandleMessage(&Command3{}, func(invocation gbus.Invocation, message *gbus.BusMessage) error { + err := b.HandleMessage(Command1{}, func(invocation gbus.Invocation, message *gbus.BusMessage) error { t.Error("handler invoked for non-grabbit message") return nil }) @@ -587,7 +587,7 @@ func TestFailHandlerInvokeOfMessageWithEmptyBody(t *testing.T) { defer ch.Close() headersMap := make(map[string]interface{}) - headersMap["x-msg-name"] = Command3{}.SchemaName() + headersMap["x-msg-name"] = Command1{}.SchemaName() cmd := amqp.Publishing{Headers: headersMap} err = ch.Publish("", testSvc1, true, false, cmd) if err != nil { diff --git a/tests/consts.go b/tests/consts.go index 28d4276..964ae02 100644 --- a/tests/consts.go +++ b/tests/consts.go @@ -13,7 +13,6 @@ var testSvc1 string var testSvc2 string var testSvc3 string var testSvc4 string -var testSvc5 string func init() { connStr = "amqp://rabbitmq:rabbitmq@localhost" @@ -21,7 +20,6 @@ func init() { testSvc2 = "testSvc2" testSvc3 = "testSvc3" testSvc4 = "test-svc4" - testSvc5 = "test-svc5" } func createBusWithConfig(svcName string, deadletter string, txnl, pos bool, conf gbus.BusConfiguration) gbus.Bus { diff --git a/tests/testMessages.go b/tests/testMessages.go index b331940..54aa9c0 100644 --- a/tests/testMessages.go +++ b/tests/testMessages.go @@ -33,14 +33,6 @@ func (Command2) SchemaName() string { return "grabbit.tests.Command2" } -type Command3 struct { - Data string -} - -func (Command3) SchemaName() string { - return "grabbit.tests.Command3" -} - type Reply1 struct { Data string }