-
Notifications
You must be signed in to change notification settings - Fork 0
/
producer.go
150 lines (136 loc) · 3.82 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package pulsar
import (
"context"
"errors"
// "sync"
"time"
"github.com/skirrund/gcloud/logger"
"github.com/apache/pulsar-client-go/pulsar"
)
var producers = make(map[string]pulsar.Producer)
func (pc *PulsarClient) getProducer(topic string) (pulsar.Producer, error) {
p, ok := producers[topic]
if ok && p != nil {
logger.Info("[pulsar]load producer fromcache1:", topic, ",", true)
return p, nil
}
pc.mt.Lock()
defer pc.mt.Unlock()
p, ok = producers[topic]
var err error
if !ok || p == nil {
p, err = createProducer(topic)
producers[topic] = p
} else {
logger.Info("[pulsar]load producer fromcache2:", topic, ",", true)
}
return p, err
}
func createProducer(topic string) (pulsar.Producer, error) {
logger.Info("[pulsar]start create pulsar.Producer:", topic)
pp := pulsar.ProducerOptions{
Topic: topic,
Name: getAppName(pulsarClient.appName),
Schema: pulsar.NewJSONSchema(`"string"`, nil),
}
producer, err := pulsarClient.client.CreateProducer(pp)
if err != nil {
logger.Error("[pulsar]error create pulsar.Producer:", err)
} else {
logger.Info("[pulsar]finished create pulsar.Producer:", topic)
}
return producer, err
}
func createMsg(msg string, deliverAfter time.Duration) *pulsar.ProducerMessage {
message := &pulsar.ProducerMessage{
//Payload: msg,
Value: msg,
DeliverAfter: deliverAfter,
}
return message
}
func createMsgDeliverAt(msg string, deliverAt time.Time) *pulsar.ProducerMessage {
message := &pulsar.ProducerMessage{
Value: msg,
DeliverAt: deliverAt,
}
return message
}
func (pc *PulsarClient) doSend(topic string, msg string, deliverAfter time.Duration) error {
if len(topic) == 0 {
return errors.New("[pulsar] topic is empty")
}
logger.Info("[pulsar] send msg =>topic:" + topic + ":" + string(msg))
message := createMsg(msg, deliverAfter)
producer, err := pc.getProducer(topic)
if err != nil {
return err
}
msgId, err := producer.Send(context.Background(), message)
if err != nil {
logger.Error("[pulsar]发送消息失败: ", err)
return err
}
if msgId == nil {
return errors.New("[pulsar]发送消息失败[messageId为空]:" + topic)
}
return nil
}
func (pc *PulsarClient) doSendAsync(topic string, msg string, deliverAfter time.Duration) error {
var err error
if len(topic) == 0 {
err = errors.New("[pulsar] topic is empty")
logger.Error(err.Error())
return err
}
message := createMsg(msg, deliverAfter)
p, err := pc.getProducer(topic)
if err != nil {
return err
}
p.SendAsync(context.Background(), message, func(msgId pulsar.MessageID, msg *pulsar.ProducerMessage, err error) {
if err != nil {
logger.Error("[pulsar]发送doSendAsync消息失败:", err)
} else {
logger.Info("[pulsar] doSendAsync finish:", msgId)
}
})
return nil
}
func (pc *PulsarClient) doSendDelayAt(topic string, msg string, deliverAt time.Time) error {
if len(topic) == 0 {
return errors.New("[pulsar] topic is empty")
}
message := createMsgDeliverAt(msg, deliverAt)
p, err := pc.getProducer(topic)
if err != nil {
return err
}
msgId, err := p.Send(context.Background(), message)
if err != nil {
return err
}
if msgId == nil {
return errors.New("[pulsar]发送消息失败[messageId为空]:" + topic)
}
logger.Info("[pulsar] doSendDelayAt finish: ", msg)
return nil
}
func (pc *PulsarClient) doSendDelayAtAsync(topic string, msg string, deliverAt time.Time) error {
if len(topic) == 0 {
return errors.New("[pulsar] topic is not empty")
}
message := createMsgDeliverAt(msg, deliverAt)
p, err := pc.getProducer(topic)
if err != nil {
return err
}
p.SendAsync(context.Background(), message, func(msgId pulsar.MessageID, msg *pulsar.ProducerMessage, err error) {
if err != nil {
logger.Error("[pulsar]发送doSendDelayAtAsync消息失败:", err)
} else {
logger.Info("[pulsar] doSendDelayAtAsync finish:", msg.Value)
}
})
return nil
}