-
Notifications
You must be signed in to change notification settings - Fork 33
/
publisher.go
132 lines (104 loc) · 3.58 KB
/
publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package rabbitmq
import (
"context"
"github.com/ahmetb/go-linq/v3"
"github.com/iancoleman/strcase"
jsoniter "github.com/json-iterator/go"
"github.com/labstack/echo/v4"
"github.com/meysamhadeli/shop-golang-microservices/internal/pkg/logger"
"github.com/meysamhadeli/shop-golang-microservices/internal/pkg/otel"
uuid "github.com/satori/go.uuid"
"github.com/streadway/amqp"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"reflect"
"time"
)
//go:generate mockery --name IPublisher
type IPublisher interface {
PublishMessage(msg interface{}) error
IsPublished(msg interface{}) bool
}
var publishedMessages []string
type Publisher struct {
cfg *RabbitMQConfig
conn *amqp.Connection
log logger.ILogger
jaegerTracer trace.Tracer
ctx context.Context
}
func (p Publisher) PublishMessage(msg interface{}) error {
data, err := jsoniter.Marshal(msg)
if err != nil {
p.log.Error("Error in marshalling message to publish message")
return err
}
typeName := reflect.TypeOf(msg).Elem().Name()
snakeTypeName := strcase.ToSnake(typeName)
ctx, span := p.jaegerTracer.Start(p.ctx, typeName)
defer span.End()
// Inject the context in the headers
headers := otel.InjectAMQPHeaders(ctx)
channel, err := p.conn.Channel()
if err != nil {
p.log.Error("Error in opening channel to consume message")
return err
}
defer channel.Close()
err = channel.ExchangeDeclare(
snakeTypeName, // name
p.cfg.Kind, // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
p.log.Error("Error in declaring exchange to publish message")
return err
}
correlationId := ""
if ctx.Value(echo.HeaderXCorrelationID) != nil {
correlationId = ctx.Value(echo.HeaderXCorrelationID).(string)
}
publishingMsg := amqp.Publishing{
Body: data,
ContentType: "application/json",
DeliveryMode: amqp.Persistent,
MessageId: uuid.NewV4().String(),
Timestamp: time.Now(),
CorrelationId: correlationId,
Headers: headers,
}
err = channel.Publish(snakeTypeName, snakeTypeName, false, false, publishingMsg)
if err != nil {
p.log.Error("Error in publishing message")
return err
}
publishedMessages = append(publishedMessages, snakeTypeName)
h, err := jsoniter.Marshal(headers)
if err != nil {
p.log.Error("Error in marshalling headers to publish message")
return err
}
p.log.Infof("Published message: %s", publishingMsg.Body)
span.SetAttributes(attribute.Key("message-id").String(publishingMsg.MessageId))
span.SetAttributes(attribute.Key("correlation-id").String(publishingMsg.CorrelationId))
span.SetAttributes(attribute.Key("exchange").String(snakeTypeName))
span.SetAttributes(attribute.Key("kind").String(p.cfg.Kind))
span.SetAttributes(attribute.Key("content-type").String("application/json"))
span.SetAttributes(attribute.Key("timestamp").String(publishingMsg.Timestamp.String()))
span.SetAttributes(attribute.Key("body").String(string(publishingMsg.Body)))
span.SetAttributes(attribute.Key("headers").String(string(h)))
return nil
}
func (p Publisher) IsPublished(msg interface{}) bool {
typeName := reflect.TypeOf(msg).Name()
snakeTypeName := strcase.ToSnake(typeName)
isPublished := linq.From(publishedMessages).Contains(snakeTypeName)
return isPublished
}
func NewPublisher(ctx context.Context, cfg *RabbitMQConfig, conn *amqp.Connection, log logger.ILogger, jaegerTracer trace.Tracer) IPublisher {
return &Publisher{ctx: ctx, cfg: cfg, conn: conn, log: log, jaegerTracer: jaegerTracer}
}