-
Notifications
You must be signed in to change notification settings - Fork 0
/
pubsub_gateway.go
48 lines (41 loc) · 1017 Bytes
/
pubsub_gateway.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package pubsub
import (
"context"
"fmt"
"github.com/rabbitmq/amqp091-go"
"github.com/slntopp/core-chatting/cc"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)
func (s *PubSub) PubGateway(ctx context.Context, event *cc.Event, gateways []string) {
log := s.log.Named(fmt.Sprintf("Pub-Gateway"))
for _, gateway := range gateways {
queueName := fmt.Sprintf("cc.gateway.%s", gateway)
queue, err := s.ch.QueueDeclare(queueName, true, false, false, false, nil)
if err != nil {
log.Error("failed to create exchange", zap.Error(err))
return
}
marshal, err := proto.Marshal(event)
if err != nil {
log.Error("Failed to marshal event", zap.Error(err))
return
}
log.Debug("Publish event", zap.Any("event", event))
err = s.ch.PublishWithContext(
ctx,
"",
queue.Name,
false,
false,
amqp091.Publishing{
ContentType: "text/plain",
Body: marshal,
},
)
if err != nil {
log.Error("Failed to publish event", zap.Error(err))
return
}
}
}