/
SelectorManager.java
290 lines (245 loc) · 11.5 KB
/
SelectorManager.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
/*
* Copyright 2009 Mustard Grain, Inc., 2009-2010 LinkedIn, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package voldemort.server.niosocket;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import voldemort.server.protocol.RequestHandlerFactory;
/**
* SelectorManager handles the non-blocking polling of IO events using the
* Selector/SelectionKey APIs from NIO.
* <p/>
* This is probably not the way to write NIO code, but it's much faster than the
* documented way. All the documentation on NIO suggested that a single Selector
* be used for all open sockets and then individual IO requests for selected
* keys be stuck in a thread pool and executed asynchronously. This seems
* logical and works fine. However, it was very slow, for two reasons.
* <p>
* First, the thread processing the event calls interestOps() on the
* SelectionKey to update what types of events it's interested in. In fact, it
* does this twice - first before any processing occurs it disables all events
* (so that the same channel isn't selected concurrently (similar to disabling
* interrupts)) and secondly after processing is completed to re-enable interest
* in events. Understandably, interestOps() has some internal state that it
* needs to update, and so the thread must grab a lock on the Selector to do
* internal interest state modifications. With hundreds/thousands of threads,
* this lock is very heavily contended as backed up by profiling and empirical
* testing.
* <p/>
* The second reason the thread pool approach was slow was that after calling
* interestOps() to re-enable events, the threads in the thread pool had to
* invoke the Selector API's wakeup() method or else the state change would go
* unnoticed (it's similar to notifyAll for basic thread synchronization). This
* causes the select() method to return immediately and process whatever
* requests are immediately available. However, with so many threads in play,
* this lead to a near constant spinning of the select()/wakeup() cycling.
* <p>
* Astonishingly it was found to be about 25% faster to simply execute all IO
* synchronously/serially as it eliminated the context switching, lock
* contention, etc. However, we actually have N simultaneous SelectorManager
* instances in play, which are round-robin-ed by the caller (NioSocketService).
* <p>
* In terms of the number of SelectorManager instances to use in parallel, the
* configuration defaults to the number of active CPUs (multi-cores count). This
* helps to balance out the load a little and help with the serial nature of
* processing.
* <p>
* Of course, potential problems exist.
* <p>
* First of all, I still can't believe my eyes that processing these serially is
* faster than in parallel. There may be something about my environment that is
* causing inaccurate reporting. At some point, with enough requests I would
* imagine this will start to slow down.
* <p/>
* Another potential problem is that a given SelectorManager could become
* overloaded. As new socket channels are established, they're distributed to a
* SelectorManager in a round-robin fashion. However, there's no re-balancing
* logic in case a disproportionate number of clients on one SelectorManager
* disconnect.
* <p/>
* For instance, let's say we have two SelectorManager instances and four
* connections. Connection 1 goes to SelectorManager A, connection 2 to
* SelectorManager B, 3 to A, and 4 to B. However, later on let's say that both
* connections 1 and 3 disconnect. This leaves SelectorManager B with two
* connections and SelectorManager A with none. There's no provision to
* re-balance the remaining requests evenly.
*
* @author Kirk True
*/
public class SelectorManager implements Runnable {
private final InetSocketAddress endpoint;
private final Selector selector;
private final Queue<SocketChannel> socketChannelQueue;
private final RequestHandlerFactory requestHandlerFactory;
private final int socketBufferSize;
private volatile boolean isClosed;
private final Logger logger = Logger.getLogger(getClass());
public SelectorManager(InetSocketAddress endpoint,
RequestHandlerFactory requestHandlerFactory,
int socketBufferSize) throws IOException {
this.endpoint = endpoint;
this.selector = Selector.open();
this.socketChannelQueue = new ConcurrentLinkedQueue<SocketChannel>();
this.requestHandlerFactory = requestHandlerFactory;
this.socketBufferSize = socketBufferSize;
isClosed = false;
}
public void accept(SocketChannel socketChannel) {
if(isClosed)
throw new IllegalStateException("Cannot accept more channels, selector manager closed");
socketChannelQueue.add(socketChannel);
selector.wakeup();
}
public void close() {
if(isClosed)
return;
isClosed = true;
try {
for(SelectionKey sk: selector.keys()) {
try {
if(logger.isTraceEnabled())
logger.trace("Closing SelectionKey's channel " + sk + " for " + endpoint);
sk.channel().close();
} catch(Exception e) {
if(logger.isEnabledFor(Level.WARN))
logger.warn(e.getMessage(), e);
}
try {
if(logger.isTraceEnabled())
logger.trace("Cancelling SelectionKey " + sk + " for " + endpoint);
sk.cancel();
} catch(Exception e) {
if(logger.isEnabledFor(Level.WARN))
logger.warn(e.getMessage(), e);
}
}
} catch(Exception e) {
if(logger.isEnabledFor(Level.WARN))
logger.warn(e.getMessage(), e);
}
try {
selector.close();
} catch(Exception e) {
if(logger.isEnabledFor(Level.WARN))
logger.warn(e.getMessage(), e);
}
}
public void run() {
try {
while(true) {
if(isClosed) {
if(logger.isInfoEnabled())
logger.info("Closed, exiting" + " for " + endpoint);
break;
}
processSockets();
try {
int selected = selector.select();
if(isClosed) {
if(logger.isInfoEnabled())
logger.info("Closed, exiting for " + endpoint);
break;
}
if(selected > 0) {
Iterator<SelectionKey> i = selector.selectedKeys().iterator();
while(i.hasNext()) {
SelectionKey selectionKey = i.next();
i.remove();
if(selectionKey.isReadable() || selectionKey.isWritable()) {
Runnable worker = (Runnable) selectionKey.attachment();
worker.run();
}
}
}
} catch(ClosedSelectorException e) {
if(logger.isDebugEnabled())
logger.debug("Selector is closed, exiting for " + endpoint);
break;
} catch(Throwable t) {
if(logger.isEnabledFor(Level.ERROR))
logger.error(t.getMessage(), t);
}
}
} catch(Throwable t) {
if(logger.isEnabledFor(Level.ERROR))
logger.error(t.getMessage(), t);
} finally {
try {
close();
} catch(Exception e) {
if(logger.isEnabledFor(Level.ERROR))
logger.error(e.getMessage(), e);
}
}
}
private void processSockets() {
try {
SocketChannel socketChannel = null;
while((socketChannel = socketChannelQueue.poll()) != null) {
if(isClosed) {
if(logger.isInfoEnabled())
logger.debug("Closed, exiting for " + endpoint);
break;
}
try {
if(logger.isDebugEnabled())
logger.debug("Registering connection from "
+ socketChannel.socket().getPort());
socketChannel.socket().setTcpNoDelay(true);
socketChannel.socket().setReuseAddress(true);
socketChannel.socket().setSendBufferSize(socketBufferSize);
if(socketChannel.socket().getReceiveBufferSize() != this.socketBufferSize)
if(logger.isDebugEnabled())
logger.debug("Requested socket receive buffer size was "
+ this.socketBufferSize + " bytes but actual size is "
+ socketChannel.socket().getReceiveBufferSize()
+ " bytes.");
if(socketChannel.socket().getSendBufferSize() != this.socketBufferSize)
if(logger.isDebugEnabled())
logger.debug("Requested socket send buffer size was "
+ this.socketBufferSize + " bytes but actual size is "
+ socketChannel.socket().getSendBufferSize() + " bytes.");
socketChannel.configureBlocking(false);
AsyncRequestHandler attachment = new AsyncRequestHandler(selector,
socketChannel,
requestHandlerFactory,
socketBufferSize);
if(!isClosed)
socketChannel.register(selector, SelectionKey.OP_READ, attachment);
} catch(ClosedSelectorException e) {
if(logger.isDebugEnabled())
logger.debug("Selector is closed, exiting");
close();
break;
} catch(Exception e) {
if(logger.isEnabledFor(Level.ERROR))
logger.error(e.getMessage(), e);
}
}
} catch(Exception e) {
if(logger.isEnabledFor(Level.ERROR))
logger.error(e.getMessage(), e);
}
}
}