-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathproducer.go
98 lines (77 loc) · 1.87 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package pubsub
import (
"context"
"errors"
"fmt"
"sync"
"time"
ps "cloud.google.com/go/pubsub"
"github.com/pcelvng/task/bus/info"
)
type Producer struct {
client *ps.Client
// context for clean shutdown
ctx context.Context
cncl context.CancelFunc
info info.Producer
mux sync.Mutex
}
// NewProducer will create a new pubsub producer for publishing messages to pubsub
func (o *Option) NewProducer() (p *Producer, err error) {
p = &Producer{
info: info.Producer{Bus: "pubsub", Sent: make(map[string]int)},
}
// create context for clean shutdown
p.ctx, p.cncl = context.WithCancel(context.Background())
p.client, err = o.newClient()
if err != nil {
return nil, err
}
return p, nil
}
// Send will send one message to the topic
// Settings are for publishing one message at a time with a 100 millisecond delay
// locking is used to update the topic count of published messages.
func (p *Producer) Send(topic string, msg []byte) (err error) {
// should not attempt to send if producer already stopped.
if p.ctx.Err() != nil {
errMsg := fmt.Sprintf("unable to send '%v'; producer already stopped", string(msg))
return errors.New(errMsg)
}
t := p.client.Topic(topic)
t.PublishSettings.CountThreshold = 1
t.PublishSettings.DelayThreshold = 100 * time.Millisecond
defer t.Stop()
ctx, _ := context.WithTimeout(p.ctx, 5*time.Second)
ok, err := t.Exists(ctx)
if err != nil {
return err
}
if !ok {
t, err = p.client.CreateTopic(ctx, topic)
if err != nil {
return err
}
}
// publish message to pubsub
res := t.Publish(ctx, &ps.Message{Data: msg})
_, err = res.Get(ctx)
if err != nil {
return err
}
p.mux.Lock()
p.info.Sent[topic]++
p.mux.Unlock()
return nil
}
func (p *Producer) Stop() (err error) {
if p.ctx.Err() != nil {
return nil
}
p.cncl()
p.client.Close()
return err
}
func (p *Producer) Info() (i info.Producer) {
return i
}