-
Notifications
You must be signed in to change notification settings - Fork 0
/
producer.go
57 lines (48 loc) · 1.16 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package nsq
import (
"github.com/nsqio/go-nsq"
"sync"
"time"
)
type ProducerPool struct {
nsqdAddresses []string
conf *nsq.Config
producers []*nsq.Producer
sync.Mutex // 互斥锁保护以下字段
index int // 当前使用的producer的索引
}
func newProducerPool(nsqdAddresses []string, conf *nsq.Config) (*ProducerPool, error) {
var producers []*nsq.Producer
for _, addr := range nsqdAddresses {
producer, err := nsq.NewProducer(addr, conf)
if err != nil {
return nil, err
}
err = producer.Ping()
if err != nil {
producer.Stop()
return nil, err
}
producers = append(producers, producer)
}
return &ProducerPool{
nsqdAddresses: nsqdAddresses,
conf: conf,
producers: producers,
}, nil
}
func (p *ProducerPool) Publish(topic string, delay time.Duration, message []byte) error {
p.Lock()
producer := p.producers[p.index]
p.index = (p.index + 1) % len(p.producers)
p.Unlock()
if delay > 0 {
return producer.DeferredPublish(topic, delay, message)
}
return producer.Publish(topic, message)
}
func (p *ProducerPool) Stop() {
for _, producer := range p.producers {
producer.Stop()
}
}