-
Notifications
You must be signed in to change notification settings - Fork 3
/
dead_letter_receives_event_test.go
107 lines (88 loc) · 2.86 KB
/
dead_letter_receives_event_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package tests
import (
"context"
"fmt"
"testing"
"time"
"github.com/google/uuid"
"github.com/pmorelli92/bunnify/bunnify"
"go.uber.org/goleak"
)
func TestDeadLetterReceivesEvent(t *testing.T) {
// Setup
queueName := uuid.NewString()
deadLetterQueueName := uuid.NewString()
exchangeName := uuid.NewString()
routingKey := "order.orderCreated"
type orderCreated struct {
ID string `json:"id"`
}
publishedOrderCreated := orderCreated{
ID: uuid.NewString(),
}
publishedEvent := bunnify.NewPublishableEvent(
publishedOrderCreated,
bunnify.WithEventID("custom-event-id"),
bunnify.WithCorrelationID("custom-correlation-id"),
)
eventHandler := func(ctx context.Context, event bunnify.ConsumableEvent[orderCreated]) error {
return fmt.Errorf("error, this event will go to dead-letter")
}
var deadEvent bunnify.ConsumableEvent[orderCreated]
deadEventHandler := func(ctx context.Context, event bunnify.ConsumableEvent[orderCreated]) error {
deadEvent = event
return nil
}
// Exercise
connection := bunnify.NewConnection()
if err := connection.Start(); err != nil {
t.Fatal(err)
}
consumer := connection.NewConsumer(
queueName,
bunnify.WithQoS(2, 0),
bunnify.WithBindingToExchange(exchangeName),
bunnify.WithHandler(routingKey, eventHandler),
bunnify.WithDeadLetterQueue(deadLetterQueueName))
if err := consumer.Consume(); err != nil {
t.Fatal(err)
}
deadLetterConsumer := connection.NewConsumer(
deadLetterQueueName,
bunnify.WithHandler(routingKey, deadEventHandler))
if err := deadLetterConsumer.Consume(); err != nil {
t.Fatal(err)
}
publisher := connection.NewPublisher()
err := publisher.Publish(context.TODO(), exchangeName, routingKey, publishedEvent)
if err != nil {
t.Fatal(err)
}
time.Sleep(50 * time.Millisecond)
if err := connection.Close(); err != nil {
t.Fatal(err)
}
// Assert
if publishedEvent.ID != deadEvent.ID {
t.Fatalf("expected event ID %s, got %s", publishedEvent.ID, deadEvent.ID)
}
if publishedEvent.CorrelationID != deadEvent.CorrelationID {
t.Fatalf("expected correlation ID %s, got %s", publishedEvent.CorrelationID, deadEvent.CorrelationID)
}
if !publishedEvent.Timestamp.Equal(deadEvent.Timestamp) {
t.Fatalf("expected timestamp %s, got %s", publishedEvent.Timestamp, deadEvent.Timestamp)
}
if publishedOrderCreated.ID != deadEvent.Payload.ID {
t.Fatalf("expected order created ID %s, got %s", publishedOrderCreated.ID, deadEvent.Payload.ID)
}
if exchangeName != deadEvent.DeliveryInfo.Exchange {
t.Fatalf("expected exchange %s, got %s", exchangeName, deadEvent.DeliveryInfo.Exchange)
}
if queueName != deadEvent.DeliveryInfo.Queue {
t.Fatalf("expected queue %s, got %s", queueName, deadEvent.DeliveryInfo.Queue)
}
if routingKey != deadEvent.DeliveryInfo.RoutingKey {
t.Fatalf("expected routing key %s, got %s", routingKey, deadEvent.DeliveryInfo.RoutingKey)
}
goleak.VerifyNone(t)
}