This repository has been archived by the owner on Mar 22, 2023. It is now read-only.
/
jobProducer.go
121 lines (103 loc) · 2.33 KB
/
jobProducer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package routines
import (
"sync"
"time"
"github.com/Shopify/sarama"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"github.com/ovh/metronome/src/metronome/kafka"
"github.com/ovh/metronome/src/metronome/models"
)
// JobProducer handle the internal states of the producer.
type JobProducer struct {
producer sarama.AsyncProducer
wg sync.WaitGroup
stopSig chan struct{}
offsets map[int32]int64
offsetsMutex sync.RWMutex
}
// NewJobProducer return a new job producer.
// Read jobs to send from jobs channel.
func NewJobProducer(jobs <-chan []models.Job) (*JobProducer, error) {
config := kafka.NewConfig()
config.ClientID = "metronome-scheduler"
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Timeout = 1 * time.Second
config.Producer.Compression = sarama.CompressionSnappy
config.Producer.Flush.Frequency = 300 * time.Millisecond
config.Producer.Return.Successes = true
config.Producer.Retry.Max = 3
brokers := viper.GetStringSlice("kafka.brokers")
producer, err := sarama.NewAsyncProducer(brokers, config)
if err != nil {
return nil, err
}
jp := &JobProducer{
producer: producer,
stopSig: make(chan struct{}),
offsets: make(map[int32]int64),
}
go func() {
for {
select {
case js, ok := <-jobs:
if !ok {
return
}
for _, j := range js {
producer.Input() <- j.ToKafka()
}
case <-jp.stopSig:
return
}
}
}()
// Success handling
jp.wg.Add(1)
go func() {
for {
select {
case msg, ok := <-producer.Successes():
if !ok {
jp.wg.Done()
return
}
jp.offsetsMutex.Lock()
jp.offsets[msg.Partition] = msg.Offset
jp.offsetsMutex.Unlock()
log.Debugf("Msg send: %v", msg)
}
}
}()
// Failure handling
jp.wg.Add(1)
go func() {
for {
select {
case err, ok := <-producer.Errors():
if !ok {
jp.wg.Done()
return
}
log.Errorf("Failed to send message: %v", err)
}
}
}()
return jp, nil
}
// Close the job producer
func (jp *JobProducer) Close() {
jp.stopSig <- struct{}{}
jp.producer.AsyncClose()
jp.wg.Wait()
}
// Indexes return the current write indexes by partition
func (jp *JobProducer) Indexes() map[int32]int64 {
res := make(map[int32]int64)
jp.offsetsMutex.RLock()
defer jp.offsetsMutex.RUnlock()
for k, v := range jp.offsets {
res[k] = v
}
return res
}