-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
TcpNioClientConnectionFactory.java
162 lines (144 loc) · 4.77 KB
/
TcpNioClientConnectionFactory.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
/*
* Copyright 2002-2011 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.ip.tcp.connection;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
/**
* A client connection factory that creates {@link TcpNioConnection}s.
* @author Gary Russell
* @since 2.0
*
*/
public class TcpNioClientConnectionFactory extends
AbstractClientConnectionFactory {
private boolean usingDirectBuffers;
private Selector selector;
private Map<SocketChannel, TcpNioConnection> connections = new ConcurrentHashMap<SocketChannel, TcpNioConnection>();
private BlockingQueue<SocketChannel> newChannels = new LinkedBlockingQueue<SocketChannel>();
/**
* Creates a TcpNioClientConnectionFactory for connections to the host and port.
* @param host the host
* @param port the port
*/
public TcpNioClientConnectionFactory(String host, int port) {
super(host, port);
}
/**
* @return
* @throws Exception
* @throws IOException
* @throws SocketException
*/
protected TcpConnection getOrMakeConnection() throws Exception {
int n = 0;
while (this.selector == null) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (n++ > 600) {
throw new Exception("Factory failed to start");
}
}
TcpConnection theConnection = this.getTheConnection();
if (theConnection != null && theConnection.isOpen()) {
return theConnection;
}
if (logger.isDebugEnabled()) {
logger.debug("Opening new socket channel connection to " + this.getHost() + ":" + this.getPort());
}
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(this.getHost(), this.getPort()));
setSocketAttributes(socketChannel.socket());
TcpNioConnection connection = new TcpNioConnection(socketChannel, false, this.isLookupHost());
connection.setUsingDirectBuffers(this.usingDirectBuffers);
connection.setTaskExecutor(this.getTaskExecutor());
TcpConnection wrappedConnection = wrapConnection(connection);
initializeConnection(wrappedConnection, socketChannel.socket());
socketChannel.configureBlocking(false);
if (this.getSoTimeout() > 0) {
connection.setLastRead(System.currentTimeMillis());
}
this.connections.put(socketChannel, connection);
newChannels.add(socketChannel);
selector.wakeup();
return wrappedConnection;
}
/**
* When set to true, connections created by this factory attempt
* to use direct buffers where possible.
* @param usingDirectBuffers
* @see ByteBuffer
*/
public void setUsingDirectBuffers(boolean usingDirectBuffers) {
this.usingDirectBuffers = usingDirectBuffers;
}
public void close() {
if (this.selector != null) {
this.selector.wakeup();
}
}
public void run() {
if (logger.isDebugEnabled()) {
logger.debug("Read selector running for connections to " + this.getHost() + ":" + this.getPort());
}
try {
this.selector = Selector.open();
while (this.isActive()) {
SocketChannel newChannel;
int soTimeout = this.getSoTimeout();
int selectionCount = selector.select(soTimeout < 0 ? 0 : soTimeout);
while ((newChannel = newChannels.poll()) != null) {
newChannel.register(this.selector, SelectionKey.OP_READ, connections.get(newChannel));
}
this.processNioSelections(selectionCount, selector, null, this.connections);
}
} catch (Exception e) {
logger.error("Exception in read selector thread", e);
this.setActive(false);
}
if (logger.isDebugEnabled()) {
logger.debug("Read selector exiting for connections to " + this.getHost() + ":" + this.getPort());
}
}
/**
* @return the usingDirectBuffers
*/
protected boolean isUsingDirectBuffers() {
return usingDirectBuffers;
}
/**
* @return the connections
*/
protected Map<SocketChannel, TcpNioConnection> getConnections() {
return connections;
}
/**
* @return the newChannels
*/
protected BlockingQueue<SocketChannel> getNewChannels() {
return newChannels;
}
}