/
nats.go
83 lines (72 loc) · 1.63 KB
/
nats.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package bund
import (
"os"
"fmt"
"github.com/nats-io/nats.go"
"github.com/vulogov/nrbund/internal/conf"
"github.com/vulogov/nrbund/internal/signal"
"github.com/pieterclaerhout/go-log"
)
var Nats *nats.Conn
var QueueName string
var SysQueueName string
var HadSync bool
func SysQueueHandler(m *nats.Msg) {
msg := UnMarshal(m.Data)
if msg == nil {
log.Error("Invalid packet received")
}
if IfSTOP(msg) {
log.Infof("STOP(%v) message received", msg.PktId)
}
if IfSYNC(msg) {
if ! HadSync {
HadSync = true
log.Infof("SYNC(%v) message triggered SYNC state for %v", msg.PktId, ApplicationId)
}
}
}
func InitNatsAgent() {
var err error
log.Debugf("Connecting to NATS")
Nats, err = nats.Connect(
*conf.Gnats,
nats.Name(ApplicationId),
nats.ReconnectWait(*conf.Timeout),
nats.PingInterval(*conf.Timeout),
nats.Timeout(*conf.Timeout),
)
if err != nil {
log.Errorf("[ NATS ] %v", err)
signal.ExitRequest()
os.Exit(10)
}
QueueName = fmt.Sprintf("%s:%s", *conf.Id, *conf.Name)
SysQueueName = fmt.Sprintf("%s:%s:sys", *conf.Id, *conf.Name)
log.Debugf("Queue name: %v", QueueName)
log.Debugf("SysQueue name: %v", SysQueueName)
NatsRecvSys(SysQueueHandler)
}
func NatsSend(data []byte) {
if DoContinue {
Nats.Publish(QueueName, data)
}
}
func NatsSendSys(data []byte) {
if DoContinue {
Nats.Publish(SysQueueName, data)
}
}
func NatsRecv(fun nats.MsgHandler) {
Nats.QueueSubscribe(QueueName, *conf.Id, fun)
}
func NatsRecvSys(fun nats.MsgHandler) {
Nats.Subscribe(SysQueueName, fun)
}
func CloseNatsAgent() {
log.Debugf("Terminating and draining NATS session")
Nats.Flush()
}
func init() {
HadSync = false
}