-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.go
106 lines (89 loc) · 2.12 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package main
import (
"fmt"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/wshops/wpc/v2/wpcl"
"github.com/wshops/wpc/v2/wpcm"
"github.com/wshops/zlog"
"os"
"os/signal"
"syscall"
)
const pulsarUrl = ""
const pulsarToken = ""
func main() {
zlog.New(zlog.LevelDev)
wpc.New(pulsarUrl, "wpc_test", zlog.Log(), &pulsar.ClientOptions{
TLSAllowInsecureConnection: true,
TLSValidateHostname: false,
Authentication: pulsar.NewAuthenticationToken(pulsarToken),
Logger: wpcl.NewBlackHoleLogger(zlog.Log()),
})
defer wpc.Close()
wpc.RegisterProducer("render", "testmsg")
wpc.RegisterConsumer("render", "testmsg", &TestRenderHandler{})
err := wpc.GetConsumer("render", "testmsg").Start()
if err != nil {
zlog.Log().Error(err)
return
}
go func() {
counter := 0
for true {
err := wpc.GetProducer("render", "testmsg").PublishOne(&wpcm.Message{
Payload: []byte(fmt.Sprintf("test message GG%d", counter)),
})
zlog.Log().Infof("[SENT MSG] GG%d", counter)
if err != nil {
return
}
counter++
}
}()
NewShutdownHook().Close(func() {
wpc.Close()
err := zlog.Log().Sync()
if err != nil {
return
}
})
}
type TestRenderHandler struct {
}
func (t *TestRenderHandler) HandleMessage(message *wpcm.Message) *wpcm.RetryMessage {
zlog.Log().Infof("[NEW MSG] Message: %s", string(message.Payload))
return nil
}
var _ Hook = (*hook)(nil)
// Hook a graceful shutdown hook, default with signals of SIGINT and SIGTERM
type Hook interface {
// WithSignals add more signals into hook
WithSignals(signals ...syscall.Signal) Hook
// Close register shutdown handles
Close(funcs ...func())
}
type hook struct {
ctx chan os.Signal
}
// NewHook create a Hook instance
func NewShutdownHook() Hook {
hook := &hook{
ctx: make(chan os.Signal, 1),
}
return hook.WithSignals(syscall.SIGINT, syscall.SIGTERM)
}
func (h *hook) WithSignals(signals ...syscall.Signal) Hook {
for _, s := range signals {
signal.Notify(h.ctx, s)
}
return h
}
func (h *hook) Close(funcs ...func()) {
select {
case <-h.ctx:
}
signal.Stop(h.ctx)
for _, f := range funcs {
f()
}
}