-
Notifications
You must be signed in to change notification settings - Fork 558
/
NettyUDPClient.java
342 lines (323 loc) · 11.9 KB
/
NettyUDPClient.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
package org.menacheri.jetclient;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.socket.DatagramChannel;
import org.jboss.netty.channel.socket.DatagramChannelFactory;
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
import org.menacheri.jetclient.app.Session;
import org.menacheri.jetclient.event.Events;
import org.menacheri.jetclient.event.Event;
import org.menacheri.jetclient.handlers.netty.UDPUpstreamHandler;
/**
* This client class is used for UDP communication with a remote jetserver. Same
* client instance can be used to create multiple UDP "connections" to same
* jetserver. For connecting with multiple jetserver's use multiple instances of
* this class.
*
* @author Abraham Menacherry
*
*/
public class NettyUDPClient
{
/**
* The remote server address to which this client should connect.
*/
private final InetSocketAddress serverAddress;
/**
* The worker executor which will provide threads to Netty
* {@link ChannelFactory} for decoding encoding done on the
* {@link ChannelPipeline}.
*/
private final ExecutorService worker;
private final ConnectionlessBootstrap udpBootstrap;
/**
* The instance of {@link NioDatagramChannelFactory} created by constructor,
* or the one passed in to constructor.
*/
private final DatagramChannelFactory channelFactory;
/**
* For UDP there can only be one pipelineFactory per
* {@link ConnectionlessBootstrap}. This factory is hence part of the client
* class.
*/
private final ChannelPipelineFactory pipelineFactory;
/**
* This map is used to store the local address to which a session has bound
* itself using the {@link DatagramChannel#bind(java.net.SocketAddress)}
* method. When an incoming UDP packet is recieved the
* {@link UDPUpstreamHandler} will resolve which session to pass the event,
* using this map.
*/
public static final Map<InetSocketAddress, Session> CLIENTS = new HashMap<InetSocketAddress, Session>();
/**
* Creates an instance of a Netty UDP client which can then be used to
* connect to a remote jet-server. This constructor delegates to
* {@link #NettyUDPClient(InetSocketAddress, ChannelPipelineFactory)}
* constructor after creating a {@link InetSocketAddress} instance based on
* the host and port number passed in.
*
* @param jetserverHost
* The host name of the remote server on which jetserver is
* running.
* @param port
* The port to connect to, on the remote server.
* @param pipelineFactory
* The pipeline factory to be used while creating a Netty
* {@link Channel}
* @throws UnknownHostException
* @throws Exception
*/
public NettyUDPClient(String jetserverHost, int port,
final ChannelPipelineFactory pipelineFactory)
throws UnknownHostException, Exception
{
this(new InetSocketAddress(jetserverHost, port), pipelineFactory);
}
public NettyUDPClient(final InetSocketAddress serverAddress,
final ChannelPipelineFactory pipelineFactory)
throws UnknownHostException, Exception
{
this(serverAddress, pipelineFactory, null, Executors
.newCachedThreadPool());
}
/**
* Creates a new instance of the {@link NettyUDPClient}.
*
* @param serverAddress
* The remote servers address. This address will be used when any
* of the default write/connect methods are used.
* @param pipelineFactory
* The Netty factory used for creating a pipeline. For UDP, this
* pipeline factory should not have any stateful i.e non
* share-able handlers in it. Since Netty only has one channel
* for <b>ALL</b> UPD traffic.
* @param channelFactory
* <b>Can be provided as null</b>. If so, it will by default use
* {@link NioDatagramChannelFactory}. If not null, then the
* provided factory is set.
* @param worker
* The executor used for creating worker threads. Can be null if
* channelFactory parameter is <b>Not</b> null.
* @throws UnknownHostException
*/
public NettyUDPClient(final InetSocketAddress serverAddress,
final ChannelPipelineFactory pipelineFactory,
final DatagramChannelFactory channelFactory,
final ExecutorService worker) throws UnknownHostException,
Exception
{
this.worker = worker;
this.serverAddress = serverAddress;
if (channelFactory == null)
{
this.channelFactory = new NioDatagramChannelFactory(worker);
}
else
{
this.channelFactory = channelFactory;
}
this.udpBootstrap = new ConnectionlessBootstrap(this.channelFactory);
udpBootstrap.setOption("broadcast", "true");
this.pipelineFactory = pipelineFactory;
// The pipeline factory should not be set on the udpBootstrap since it
// invalidates the getPipeline.
udpBootstrap.setPipeline(pipelineFactory.getPipeline());
Runtime.getRuntime().addShutdownHook(new Thread()
{
public void run()
{
udpBootstrap.releaseExternalResources();
}
});
}
/**
* Creates a new datagram channel instance using the {@link #udpBootstrap}
* by binding to local host. This method delegates to
* {@link #createDatagramChannel(String)} internally, by passing the
* localhost's host name to it.
*
* @return The newly created instance of the datagram channel.
* @throws UnknownHostException
*/
public DatagramChannel createDatagramChannel() throws UnknownHostException
{
return createDatagramChannel(InetAddress.getLocalHost()
.getHostAddress());
}
/**
* Creates a new datagram channel instance using the {@link #udpBootstrap}
* by binding to local host.
*
* @param localhostName
* The host machine (for e.g. 'localhost') to which it needs to
* bind to. This is <b>Not</b> the remote jet-server hostname.
* @return The newly created instance of the datagram channel.
* @throws UnknownHostException
*/
public DatagramChannel createDatagramChannel(String localhostName)
throws UnknownHostException
{
DatagramChannel datagramChannel = (DatagramChannel) udpBootstrap
.bind(new InetSocketAddress(localhostName, 0));
return datagramChannel;
}
/**
* This method will connect the datagram channel with the server and send
* the {@link Events#CONNECT} message to server. This method will use
* {@link #serverAddress} by default when sending the
* {@link Events#CONNECT} message. <b>Note</b> Even if this connect
* message does not reach server, the first UDP message that the server
* receives from this particular DatagramChannels local address will be
* converted by server and used as {@link Events#CONNECT}.
*
* @param session
* The session for which the datagram channel is being created.
* @param datagramChannel
* The channel on which the message is to be sent to remote
* server.
* @return Returns a ChannelFuture which can be used to check the success of
* this operation. <b>NOTE</b> Success in case of UDP means message
* is sent to server. It does not mean that the server has received
* it.
* @throws UnknownHostException
*/
public ChannelFuture connect(Session session,
DatagramChannel datagramChannel) throws UnknownHostException,
InterruptedException
{
return connect(session, datagramChannel, this.serverAddress, 5,
TimeUnit.SECONDS);
}
/**
* This method will connect the datagram channel with the server and send
* the {@link Events#CONNECT} message to server.
*
* @param session
* The session for which the datagram channel is being created.
* @param datagramChannel
* The channel on which the message is to be sent to remote
* server.
* @param serverAddress
* The remote address of the server to which to send this
* message.
* @param timeout
* Amount of time to wait for the connection to happen.
* <b>NOTE</b> Since this is UDP there is actually no "real"
* connection.
* @return Returns a ChannelFuture which can be used to check the success of
* this operation. <b>NOTE</b> Success in case of UDP means message
* is sent to server. It does not mean that the server has received
* it.
* @throws UnknownHostException
*/
public ChannelFuture connect(Session session,
DatagramChannel datagramChannel, InetSocketAddress serverAddress,
int timeout, TimeUnit unit) throws UnknownHostException,
InterruptedException
{
if (null == datagramChannel)
{
throw new NullPointerException(
"DatagramChannel passed to connect method cannot be null");
}
if (!datagramChannel.isBound())
{
throw new IllegalStateException("DatagramChannel: "
+ datagramChannel
+ " Passed to connect method is not bound");
}
Event event = Events.event(null, Events.CONNECT);
ChannelFuture future = datagramChannel.write(event, serverAddress);
future.addListener(new ChannelFutureListener()
{
@Override
public void operationComplete(ChannelFuture future)
throws Exception
{
if (!future.isSuccess())
{
throw new RuntimeException(future.getCause());
}
}
});
CLIENTS.put(datagramChannel.getLocalAddress(), session);
return future;
}
/**
* Utility method used to send a message to the server. Users can also use
* datagramChannel.write(message, serverAddress) directly. This method
* delegates to {@link #write(DatagramChannel, Object, InetSocketAddress)}
* by passing in the InetSocketAddress stored in the class variable
* {@link #serverAddress}
*
* @param datagramChannel
* The channel on which the message is to be sent to remote
* server.
* @param message
* The message to be sent. <b>NOTE</b> The message should be a
* valid and encode-able by the encoders in the ChannelPipeline
* of this server.
* @return Returns a ChannelFuture which can be used to check the success of
* this operation. <b>NOTE</b> Success in case of UDP means message
* is sent to server. It does not mean that the server has received
* it.
*/
public ChannelFuture write(DatagramChannel datagramChannel, Object message)
{
return write(datagramChannel, message, serverAddress);
}
/**
* Utility method used to send a message to the server. Users can also use
* datagramChannel.write(message, serverAddress) directly.
*
* @param datagramChannel
* The channel on which the message is to be sent to remote
* server.
* @param message
* The message to be sent. <b>NOTE</b> The message should be a
* valid and encode-able by the encoders in the ChannelPipeline
* of this server.
* @return Returns a ChannelFuture which can be used to check the success of
* this operation. <b>NOTE</b> Success in case of UDP means message
* is sent to server. It does not mean that the server has received
* it.
*/
public static ChannelFuture write(DatagramChannel datagramChannel, Object message,
InetSocketAddress serverAddress)
{
return datagramChannel.write(message, serverAddress);
}
public InetSocketAddress getServerAddress()
{
return serverAddress;
}
public ExecutorService getWorker()
{
return worker;
}
public ConnectionlessBootstrap getUdpBootstrap()
{
return udpBootstrap;
}
public DatagramChannelFactory getChannelFactory()
{
return channelFactory;
}
public ChannelPipelineFactory getPipelineFactory()
{
return pipelineFactory;
}
}