/
tcp.go
165 lines (139 loc) · 3.84 KB
/
tcp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package logrus_ovh
import (
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
"net"
"os"
"path"
"sync"
"time"
"github.com/eapache/go-resiliency/retrier"
)
// TCPWriter implements io.Writer and is used to send both discret
// messages to a graylog2 server, or data from a stream-oriented
// interface (like the functions in log).
type TCPWriter struct {
mu sync.Mutex
conn net.Conn
connect func() (net.Conn, error)
Hostname string
Facility string
}
// NewTCPWriter returns a new TCP GELF Writer. This writer can be used to send the
// output of the standard Go log functions to a central GELF server by
// passing it to log.SetOutput()
func NewTCPWriter(addr string, tlsCfg *tls.Config) (*TCPWriter, error) {
var err error
w := new(TCPWriter)
const dialTimeout = 5 * time.Second
// If TLS configuration is specified, try to connect with it
if tlsCfg != nil {
w.connect = func() (net.Conn, error) {
return tls.DialWithDialer(&net.Dialer{Timeout: dialTimeout}, "tcp", addr, tlsCfg)
}
} else {
w.connect = func() (net.Conn, error) { return net.DialTimeout("tcp", addr, dialTimeout) }
}
if err != nil {
return nil, err
}
// Get Hostname if possible, otherwise just set to localhost
if w.Hostname, err = os.Hostname(); err != nil {
w.Hostname = "localhost"
}
// Set facility to binary name
w.Facility = path.Base(os.Args[0])
return w, nil
}
// Write writes a given data, converts it to a GELF message and writes it with
// the current TCP connection
func (w *TCPWriter) Write(p []byte) (int, error) {
// 1 for the function that called us.
file, line := getCallerIgnoringLogMulti(1)
// remove trailing and leading whitespace
p = bytes.TrimSpace(p)
// If there are newlines in the message, use the first line
// for the short message and set the full message to the
// original input. If the input has no newlines, stick the
// whole thing in Short.
short := p
full := []byte("")
if i := bytes.IndexRune(p, '\n'); i > 0 {
short = p[:i]
full = p
}
m := Message{
Version: "1.1",
Host: w.Hostname,
Short: string(short),
Full: string(full),
Time: float64(time.Now().UnixNano()) / 1E9,
Level: 6, // info
Facility: w.Facility,
File: file,
Line: line,
Extra: map[string]interface{}{},
}
if err := w.WriteMessage(&m); err != nil {
return 0, err
}
return len(p), nil
}
// WriteMessage writes a GELF message with current TCP connection
func (w *TCPWriter) WriteMessage(m *Message) error {
mBytes, err := json.Marshal(m)
if err != nil {
// should never fail
return err
}
mBytes = append(mBytes, byte(0))
w.mu.Lock()
defer w.mu.Unlock()
conn, err := w.get()
if err != nil {
return err
}
var n, nn int
for n, err = conn.Write(mBytes); n < len(mBytes) && err == nil; {
nn, err = conn.Write(mBytes[n:])
n += nn
}
if err != nil {
fmt.Fprintln(os.Stderr, "[gelf] error while sending message:", err)
if cerr := w.conn.Close(); cerr != nil {
fmt.Fprintln(os.Stderr, "[gelf] connection close error:", cerr)
}
w.conn = nil
return err
}
return nil
}
// get returns a connection, reconnecting if needed.
// w.connect MUST be set and w.mu must be held
func (w *TCPWriter) get() (c net.Conn, err error) {
if w.conn != nil {
c = w.conn
}
// Try 30 times, with 1 second interval, to connect to graylog endpoint.
// This could take up to a minute to execute (30 * (1 second delay + 1 second dial timeout)).
err = retrier.New(retrier.ConstantBackoff(30, time.Second), nil).Run(func() error {
if c == nil {
fmt.Fprintln(os.Stderr, "[gelf] connecting to logging server")
conn, err := w.connect()
if err != nil {
fmt.Fprintln(os.Stderr, "[gelf] cannot connect to logging server:", err)
return err
}
fmt.Fprintln(os.Stderr, "[gelf] connection to logging server opened")
c = conn
}
return nil
})
if err != nil {
return
}
w.conn = c
return
}