-
Notifications
You must be signed in to change notification settings - Fork 0
/
producer.go
114 lines (102 loc) · 3.5 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package producer
import (
"context"
"errors"
"log"
"time"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/pulsar-sigs/pulsar-client/pkg/pulsar-client/types"
"github.com/spf13/cobra"
)
func produceMessage(opt *types.ProducerMessageOption) {
var auth pulsar.Authentication
if opt.AuthType != "" && opt.AuthParams != "" {
autht, err := pulsar.NewAuthentication(opt.AuthType, opt.AuthParams)
if err != nil {
log.Fatalf("New auth failed: %v", err)
}
auth = autht
}
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: opt.BrokerUrl,
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
Authentication: auth,
})
if err != nil {
log.Fatalf("Could not instantiate Pulsar client: %v", err)
}
defer client.Close()
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: opt.Topic,
DisableBatching: true,
BatchingMaxPublishDelay: time.Second,
})
if err != nil {
log.Fatalln("create.pfoducer.failed", err)
}
if opt.Readness {
go types.RunReadnessAPI()
}
log.Println("will produce message total ", opt.MessageNum)
for i := 0; i < int(opt.MessageNum); i++ {
if opt.ProduceTime > 0 {
time.Sleep(time.Millisecond * time.Duration(opt.ProduceTime))
}
producer.SendAsync(context.TODO(), &pulsar.ProducerMessage{
Payload: []byte(opt.Message),
Key: types.MessageKey,
EventTime: time.Now(),
}, func(mid pulsar.MessageID, msg *pulsar.ProducerMessage, e error) {
if e != nil {
log.Println("producer.send.message.failed!", e)
} else {
log.Printf("producer.send.message.success! %s %d-%d-%d", opt.Topic, mid.LedgerID(), mid.EntryID(), mid.PartitionIdx())
}
})
}
log.Println("exit after 3 second")
time.Sleep(time.Second * 3)
}
func NewProducerCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "producer",
Short: "producer",
Example: "",
RunE: func(cmd *cobra.Command, args []string) (err error) {
if types.BrokerUrl == "" {
cmd.Help()
return errors.New("brokerUrl is empty")
}
if types.Topic == "" {
cmd.Help()
return errors.New("topic is empty")
}
log.Println("broker:", types.BrokerUrl)
log.Println("topic:", types.Topic)
log.Println("subscriptionName:", types.SubscriptionName)
produceMessage(&types.ProducerMessageOption{
Topic: types.Topic,
BrokerUrl: types.BrokerUrl,
MessageNum: types.MessageNum,
ProduceTime: types.ProduceTime,
Readness: types.Readness,
AuthType: types.AuthType,
AuthParams: types.AuthParams,
Message: types.Message,
MessageKey: types.MessageKey,
})
return nil
},
}
cmd.PersistentFlags().StringVar(&types.BrokerUrl, "broker", "pulsar://localhost:6650", "pulsar broker url")
cmd.PersistentFlags().StringVar(&types.AuthType, "auth-type", "", "auth type")
cmd.PersistentFlags().StringVar(&types.AuthParams, "auth-params", "", "auth-params")
cmd.PersistentFlags().StringVar(&types.Topic, "topic", "", "pulsar topic")
cmd.PersistentFlags().Int64Var(&types.MessageNum, "message-num", 10000, "produce message num")
cmd.PersistentFlags().StringVar(&types.Message, "message", "hello", "produce message")
cmd.PersistentFlags().StringVar(&types.MessageKey, "message key", "", "message topic key")
cmd.PersistentFlags().Int64Var(&types.ProduceTime, "produce-time", 0, "produce time for one message,0(millisecond) by default.")
cmd.PersistentFlags().BoolVar(&types.Readness, "readness", false, "start readness api endpoint, true by default.")
return cmd
}