-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
52 lines (46 loc) · 1.21 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package main
import (
"context"
"log"
"time"
"fmt"
"github.com/apache/pulsar-client-go/pulsar"
)
var (
keys = []string{"project_id:project-1", "project_id:project-2", "project_id:project-3"}
)
func main() {
// get client connection
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
})
if err != nil {
log.Fatalf("Could not instantiate Pulsar client. error: %+v", err)
}
// close client
defer client.Close()
// create a producer
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "persistent://public/default/partitioned-topic",
Name: "producer01",
})
if err != nil {
log.Fatalf("Could not instantiate a producer. error: %+v\n", err)
}
// close producer
defer producer.Close()
// processing loop
for i := 0; i < 5; i++ {
key := keys[i%3]
msg := fmt.Sprintf("msg: msg-%d key: %s", i, key)
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte(msg),
Key: key,
})
if err != nil {
log.Printf("Failed to publish message. error: %+v\n", err)
}
}
}