/
Publisher.go
64 lines (56 loc) · 1.53 KB
/
Publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package transport
import (
"context"
"fmt"
"github.com/goccy/go-json"
models "github.com/kattana-io/models/pkg/storage"
"github.com/segmentio/kafka-go"
"go.uber.org/zap"
)
type Publisher struct {
log *zap.Logger
w *kafka.Writer
address []string
}
func NewPublisher(topic string, address []string, log *zap.Logger) *Publisher {
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: address,
Topic: topic,
Balancer: &kafka.LeastBytes{},
Async: true,
})
return &Publisher{
log: log,
w: w,
address: address,
}
}
func (p *Publisher) PublishBlock(ctx context.Context, block []byte) {
if err := p.w.WriteMessages(ctx, kafka.Message{Value: block}); err != nil {
p.log.Fatal(fmt.Sprintf("failed to write messages: %s", err.Error()))
}
}
func (p *Publisher) Close() {
if err := p.w.Close(); err != nil {
p.log.Fatal(fmt.Sprintf("failed to close writer: %s", err.Error()))
}
}
// PublishFailedBlock Create a temporary failed publisher and return block to sender
func (p *Publisher) PublishFailedBlock(ctx context.Context, block models.Block) bool {
failedBlocksWriter := kafka.NewWriter(kafka.WriterConfig{
Brokers: p.address,
Topic: "failed_blocks",
Balancer: &kafka.LeastBytes{},
Async: true,
})
Value, err := json.Marshal(block)
if err != nil {
p.log.Error(err.Error())
return false
}
if err := failedBlocksWriter.WriteMessages(ctx, kafka.Message{Value: Value}); err != nil {
p.log.Fatal(fmt.Sprintf("failed to write messages: %s", err.Error()))
return false
}
return true
}