39
39
import org .springframework .integration .ip .tcp .connection .AbstractConnectionFactory ;
40
40
import org .springframework .integration .ip .tcp .connection .TcpConnection ;
41
41
import org .springframework .integration .ip .tcp .connection .TcpConnectionFailedCorrelationEvent ;
42
- import org .springframework .integration .ip .tcp .connection .TcpConnectionSupport ;
43
42
import org .springframework .integration .ip .tcp .connection .TcpListener ;
44
43
import org .springframework .integration .ip .tcp .connection .TcpNioConnectionSupport ;
45
44
import org .springframework .integration .ip .tcp .connection .TcpSender ;
@@ -127,21 +126,6 @@ protected void doInit() {
127
126
}
128
127
Assert .state (!this .closeStreamAfterSend || this .isSingleUse ,
129
128
"Single use connection needed with closeStreamAfterSend" );
130
- if (isAsync ()) {
131
- try {
132
- TcpConnectionSupport connection = this .connectionFactory .getConnection ();
133
- if (connection instanceof TcpNioConnectionSupport ) {
134
- setAsync (false );
135
- this .logger .warn ("Async replies are not supported with NIO; see the reference manual" );
136
- }
137
- if (this .isSingleUse ) {
138
- connection .close ();
139
- }
140
- }
141
- catch (Exception e ) {
142
- this .logger .error ("Could not check if async is supported" , e );
143
- }
144
- }
145
129
}
146
130
147
131
/**
@@ -167,6 +151,7 @@ protected Object handleRequestMessage(Message<?> requestMessage) {
167
151
try {
168
152
haveSemaphore = acquireSemaphoreIfNeeded (requestMessage );
169
153
connection = this .connectionFactory .getConnection ();
154
+ checkAsync (connection , async );
170
155
Long remoteTimeout = getRemoteTimeout (requestMessage );
171
156
AsyncReply reply = new AsyncReply (remoteTimeout , connection , haveSemaphore , requestMessage , async );
172
157
connectionId = connection .getConnectionId ();
@@ -203,6 +188,13 @@ protected Object handleRequestMessage(Message<?> requestMessage) {
203
188
}
204
189
}
205
190
191
+ private void checkAsync (TcpConnection connection , boolean async ) {
192
+ if (async && connection instanceof TcpNioConnectionSupport ) {
193
+ setAsync (false );
194
+ this .logger .warn ("Async replies are not supported with NIO; see the reference manual" );
195
+ }
196
+ }
197
+
206
198
private boolean acquireSemaphoreIfNeeded (Message <?> requestMessage ) throws InterruptedException {
207
199
if (!this .isSingleUse ) {
208
200
logger .debug ("trying semaphore" );
0 commit comments