/
manager.go
79 lines (63 loc) · 2.32 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package offsets
import (
"fmt"
"github.com/Shopify/sarama"
"github.com/tryfix/log"
)
type Manager interface {
OffsetValid(topic string, partition int32, offset int64) (isValid bool, err error)
GetOffsetLatest(topic string, partition int32) (offset int64, err error)
GetOffsetOldest(topic string, partition int32) (offset int64, err error)
Close() error
}
type Config struct {
Config *sarama.Config
BootstrapServers []string
Logger log.Logger
}
type manager struct {
client sarama.Client
}
func NewManager(config *Config) Manager {
logger := config.Logger.NewLog(log.Prefixed(`offset-manager`))
client, err := sarama.NewClient(config.BootstrapServers, config.Config)
if err != nil {
logger.Fatal(fmt.Sprintf(`cannot initiate builder deu to [%+v]`, err))
}
return &manager{client: client}
}
func (m *manager) OffsetValid(topic string, partition int32, offset int64) (isValid bool, err error) {
isValid, _, err = m.validate(topic, partition, offset)
return
}
func (m *manager) GetOffsetLatest(topic string, partition int32) (offset int64, err error) {
partitionStart, err := m.client.GetOffset(topic, partition, sarama.OffsetNewest)
if err != nil {
return offset, fmt.Errorf(`cannot get latest offset for %s-%d due to %w`, topic, partition, err)
}
return partitionStart, nil
}
func (m *manager) GetOffsetOldest(topic string, partition int32) (offset int64, err error) {
partitionStart, err := m.client.GetOffset(topic, partition, sarama.OffsetOldest)
if err != nil {
return offset, fmt.Errorf(`cannot get oldes offset for %s-%d due to %w`, topic, partition, err)
}
return partitionStart, nil
}
func (m *manager) Close() error {
return m.client.Close()
}
func offsetValid(offset, bkStart, bkEnd int64) bool {
return offset >= bkStart && offset < bkEnd
}
func (m *manager) validate(topic string, partition int32, offset int64) (isValid bool, valid int64, err error) {
startOffset, err := m.GetOffsetLatest(topic, partition)
if err != nil {
return false, 0, fmt.Errorf(`offset validate failed for %s-%d due to %w`, topic, partition, err)
}
endOffset, err := m.client.GetOffset(topic, partition, sarama.OffsetNewest)
if err != nil {
return false, 0, fmt.Errorf(`offset validate failed for %s-%d due to %w`, topic, partition, err)
}
return offsetValid(offset, startOffset, endOffset), startOffset, nil
}