-
Notifications
You must be signed in to change notification settings - Fork 58
/
GelfTcpAppender.java
192 lines (158 loc) · 5.71 KB
/
GelfTcpAppender.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
/*
* Logback GELF - zero dependencies Logback GELF appender library.
* Copyright (C) 2016 Oliver Siegmar
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
package de.siegmar.logbackgelf;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Arrays;
public class GelfTcpAppender extends AbstractGelfAppender {
private static final int DEFAULT_CONNECT_TIMEOUT = 15_000;
private static final int DEFAULT_RECONNECT_INTERVAL = 300;
private static final int DEFAULT_MAX_RETRIES = 2;
private static final int DEFAULT_RETRY_DELAY = 3_000;
private static final int SEC_TO_MSEC = 1000;
private final Object lock = new Object();
/**
* Maximum time (in milliseconds) to wait for establishing a connection. A value of 0 disables
* the connect timeout. Default: 15,000 milliseconds.
*/
private int connectTimeout = DEFAULT_CONNECT_TIMEOUT;
/**
* Time interval (in seconds) after an existing connection is closed and re-opened.
* A value of 0 disables automatic reconnects. Default: 300 seconds.
*/
private int reconnectInterval = DEFAULT_RECONNECT_INTERVAL;
/**
* Number of retries. A value of 0 disables retry attempts. Default: 2.
*/
private int maxRetries = DEFAULT_MAX_RETRIES;
/**
* Time (in milliseconds) between retry attempts. Ignored if maxRetries is 0.
* Default: 3,000 milliseconds.
*/
private int retryDelay = DEFAULT_RETRY_DELAY;
private OutputStream outputStream;
/**
* Timestamp scheduled for the next reconnect.
*/
private long nextReconnect;
public int getConnectTimeout() {
return connectTimeout;
}
public void setConnectTimeout(final int connectTimeout) {
this.connectTimeout = connectTimeout;
}
public int getReconnectInterval() {
return reconnectInterval;
}
public void setReconnectInterval(final int reconnectInterval) {
this.reconnectInterval = reconnectInterval;
}
public int getMaxRetries() {
return maxRetries;
}
public void setMaxRetries(final int maxRetries) {
this.maxRetries = maxRetries;
}
public int getRetryDelay() {
return retryDelay;
}
public void setRetryDelay(final int retryDelay) {
this.retryDelay = retryDelay;
}
@Override
protected void appendMessage(final byte[] messageToSend) throws IOException {
// GELF via TCP requires 0 termination
final byte[] tcpMessage = Arrays.copyOf(messageToSend, messageToSend.length + 1);
int openRetries = maxRetries;
do {
if (sendMessage(tcpMessage)) {
// Message was sent successfully - we're done with it
break;
}
if (retryDelay > 0 && openRetries > 0) {
try {
Thread.sleep(retryDelay);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
} while (openRetries-- > 0 && isStarted());
}
/**
* Send message to socket's output stream.
*
* @param messageToSend message to send.
*
* @return {@code true} if message was sent successfully, {@code false} otherwise.
*/
private boolean sendMessage(final byte[] messageToSend) {
synchronized (lock) {
try {
if (System.currentTimeMillis() > nextReconnect) {
connect();
}
outputStream.write(messageToSend);
return true;
} catch (final IOException e) {
addError(String.format("Error sending message via tcp://%s:%s",
getGraylogHost(), getGraylogPort()), e);
// force reconnect int next loop cycle
nextReconnect = 0;
}
}
return false;
}
/**
* Opens a new connection (and closes the old one - if existent).
*
* @throws IOException if the connection failed.
*/
private void connect() throws IOException {
closeOut();
final Socket socket = getSocket();
outputStream = socket.getOutputStream();
nextReconnect = reconnectInterval < 0
? Long.MAX_VALUE
: System.currentTimeMillis() + (reconnectInterval * SEC_TO_MSEC);
}
protected Socket getSocket() throws IOException {
final Socket socket = new Socket();
socket.connect(new InetSocketAddress(getGraylogHost(), getGraylogPort()), connectTimeout);
socket.shutdownInput();
return socket;
}
private void closeOut() {
if (outputStream != null) {
try {
outputStream.close();
} catch (final IOException e) {
addError("Can't close stream", e);
}
}
}
@Override
protected void close() throws IOException {
synchronized (lock) {
closeOut();
}
}
}