forked from minio/minio
/
notify-nats.go
196 lines (176 loc) · 5.28 KB
/
notify-nats.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"io/ioutil"
"net"
"github.com/Sirupsen/logrus"
"github.com/nats-io/go-nats-streaming"
"github.com/nats-io/nats"
)
// natsNotifyStreaming contains specific options related to connection
// to a NATS streaming server
type natsNotifyStreaming struct {
Enable bool `json:"enable"`
ClusterID string `json:"clusterID"`
ClientID string `json:"clientID"`
Async bool `json:"async"`
MaxPubAcksInflight int `json:"maxPubAcksInflight"`
}
// natsNotify - represents logrus compatible NATS hook.
// All fields represent NATS configuration details.
type natsNotify struct {
Enable bool `json:"enable"`
Address string `json:"address"`
Subject string `json:"subject"`
Username string `json:"username"`
Password string `json:"password"`
Token string `json:"token"`
Secure bool `json:"secure"`
PingInterval int64 `json:"pingInterval"`
Streaming natsNotifyStreaming `json:"streaming"`
}
func (n *natsNotify) Validate() error {
if !n.Enable {
return nil
}
if _, _, err := net.SplitHostPort(n.Address); err != nil {
return err
}
return nil
}
// natsIOConn abstracts connection to any type of NATS server
type natsIOConn struct {
params natsNotify
natsConn *nats.Conn
stanConn stan.Conn
}
// dialNATS - dials and returns an natsIOConn instance,
// for sending notifications. Returns error if nats logger
// is not enabled.
func dialNATS(natsL natsNotify, testDial bool) (nioc natsIOConn, e error) {
if !natsL.Enable {
return nioc, errNotifyNotEnabled
}
// Construct natsIOConn which holds all NATS connection information
conn := natsIOConn{params: natsL}
if natsL.Streaming.Enable {
// Construct scheme to differentiate between clear and TLS connections
scheme := "nats"
if natsL.Secure {
scheme = "tls"
}
// Construct address URL
addressURL := scheme + "://" + natsL.Username + ":" + natsL.Password + "@" + natsL.Address
// Fetch the user-supplied client ID and provide a random one if not provided
clientID := natsL.Streaming.ClientID
if clientID == "" {
clientID = mustGetUUID()
}
// Add test suffix to clientID to avoid clientID already registered error
if testDial {
clientID += "-test"
}
connOpts := []stan.Option{
stan.NatsURL(addressURL),
}
// Setup MaxPubAcksInflight parameter
if natsL.Streaming.MaxPubAcksInflight > 0 {
connOpts = append(connOpts,
stan.MaxPubAcksInflight(natsL.Streaming.MaxPubAcksInflight))
}
// Do the real connection to the NATS server
sc, err := stan.Connect(natsL.Streaming.ClusterID, clientID, connOpts...)
if err != nil {
return nioc, err
}
// Save the created connection
conn.stanConn = sc
} else {
// Configure and connect to NATS server
natsC := nats.DefaultOptions
natsC.Url = "nats://" + natsL.Address
natsC.User = natsL.Username
natsC.Password = natsL.Password
natsC.Token = natsL.Token
natsC.Secure = natsL.Secure
// Do the real connection
nc, err := natsC.Connect()
if err != nil {
return nioc, err
}
// Save the created connection
conn.natsConn = nc
}
return conn, nil
}
// closeNATS - close the underlying NATS connection
func closeNATS(conn natsIOConn) {
if conn.params.Streaming.Enable {
conn.stanConn.Close()
} else {
conn.natsConn.Close()
}
}
func newNATSNotify(accountID string) (*logrus.Logger, error) {
natsL := globalServerConfig.Notify.GetNATSByID(accountID)
// Connect to nats server.
natsC, err := dialNATS(natsL, false)
if err != nil {
return nil, err
}
natsLog := logrus.New()
// Disable writing to console.
natsLog.Out = ioutil.Discard
// Add a nats hook.
natsLog.Hooks.Add(natsC)
// Set default JSON formatter.
natsLog.Formatter = new(logrus.JSONFormatter)
// Successfully enabled all NATSs.
return natsLog, nil
}
// Fire is called when an event should be sent to the message broker
func (n natsIOConn) Fire(entry *logrus.Entry) error {
body, err := entry.Reader()
if err != nil {
return err
}
if n.params.Streaming.Enable {
// Streaming flag is enabled, publish the log synchronously or asynchronously
// depending on the user supplied parameter
if n.params.Streaming.Async {
_, err = n.stanConn.PublishAsync(n.params.Subject, body.Bytes(), nil)
} else {
err = n.stanConn.Publish(n.params.Subject, body.Bytes())
}
if err != nil {
return err
}
} else {
// Publish the log
err = n.natsConn.Publish(n.params.Subject, body.Bytes())
if err != nil {
return err
}
}
return nil
}
// Levels is available logging levels.
func (n natsIOConn) Levels() []logrus.Level {
return []logrus.Level{
logrus.InfoLevel,
}
}