Skip to content

Commit a851b8f

Browse files
pj892031achmelo
andauthored
fix: recovering after TCP/IP stack was restarted (#2421)
* fix Signed-off-by: Pavel Jareš <pavel.jares@broadcom.com> * attempt to fix pipeline Signed-off-by: Pavel Jareš <pavel.jares@broadcom.com> * small refactor Signed-off-by: Pavel Jareš <pavel.jares@broadcom.com> * improve Code Coverage Signed-off-by: Pavel Jareš <pavel.jares@broadcom.com> Co-authored-by: achmelo <37397715+achmelo@users.noreply.github.com>
1 parent d2c0bdc commit a851b8f

File tree

3 files changed

+615
-1
lines changed

3 files changed

+615
-1
lines changed

apiml-tomcat-common/build.gradle

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ dependencies {
1111
compile libraries.tomcat_embed_websocket
1212
compile libraries.logback_classic
1313
implementation libraries.jackson_core
14-
implementation libraries.jackson_databind
14+
implementation libraries.jackson_databind
15+
16+
compileOnly libraries.lombok
17+
annotationProcessor libraries.lombok
1518

1619
testCompile libraries.mockito_core
1720
testCompile libraries.mockito_jupiter
Lines changed: 288 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,288 @@
1+
/*
2+
* This program and the accompanying materials are made available under the terms of the
3+
* Eclipse Public License v2.0 which accompanies this distribution, and is available at
4+
* https://www.eclipse.org/legal/epl-v20.html
5+
*
6+
* SPDX-License-Identifier: EPL-2.0
7+
*
8+
* Copyright Contributors to the Zowe Project.
9+
*/
10+
package org.zowe.apiml.product.web;
11+
12+
import lombok.experimental.Delegate;
13+
import lombok.extern.slf4j.Slf4j;
14+
import org.apache.catalina.LifecycleState;
15+
import org.apache.catalina.connector.Connector;
16+
import org.apache.coyote.AbstractProtocol;
17+
import org.apache.coyote.ProtocolHandler;
18+
import org.apache.tomcat.util.net.AbstractEndpoint;
19+
import org.apache.tomcat.util.net.NioEndpoint;
20+
import org.springframework.beans.factory.annotation.Value;
21+
import org.springframework.boot.web.embedded.tomcat.TomcatConnectorCustomizer;
22+
import org.springframework.context.annotation.Bean;
23+
import org.springframework.context.annotation.Configuration;
24+
25+
import javax.annotation.PreDestroy;
26+
import java.io.IOException;
27+
import java.lang.invoke.MethodHandle;
28+
import java.lang.invoke.MethodHandles;
29+
import java.lang.reflect.Field;
30+
import java.lang.reflect.Method;
31+
import java.net.SocketAddress;
32+
import java.nio.channels.*;
33+
import java.nio.channels.spi.AbstractSelectableChannel;
34+
import java.nio.channels.spi.SelectorProvider;
35+
import java.util.concurrent.atomic.AtomicBoolean;
36+
import java.util.concurrent.atomic.AtomicInteger;
37+
38+
/**
39+
* This class extends embedded Tomcat to handle a case with restarting TCP/IP stack. Tomcat itself is not able
40+
* recovery from this situation. It tries accepting new request, and it fails until the service is restarted.
41+
*
42+
* This handler detects this issue and try to rebind the port. It can also wait till the TCP/IP stack is available again.
43+
*
44+
* This bean is related only to z/OS.
45+
*/
46+
@Slf4j
47+
@Configuration
48+
public class TomcatAcceptFixConfig {
49+
50+
@Value("${server.tomcat.retryRebindTimeoutSecs:10}")
51+
int retryRebindTimeoutSecs;
52+
53+
private static final Field ENDPOINT_FIELD;
54+
private static final Field NIO_SOCKET_FIELD;
55+
56+
private static final MethodHandle IMPL_CLOSE_SELECTABGLE_CHANNEL_HANLE; // NOSONAR
57+
private static final MethodHandle IMPL_CONFIGURE_BLOCKING; // NOSONAR
58+
59+
/**
60+
* To mitigate parallel treatment of socket rebinding
61+
*/
62+
private static final AtomicBoolean running = new AtomicBoolean(true);
63+
64+
static {
65+
try {
66+
ENDPOINT_FIELD = AbstractProtocol.class.getDeclaredField("endpoint");
67+
NIO_SOCKET_FIELD = NioEndpoint.class.getDeclaredField("serverSock");
68+
69+
Method implCloseSelectableChannel = AbstractSelectableChannel.class.getDeclaredMethod("implCloseSelectableChannel");
70+
implCloseSelectableChannel.setAccessible(true); // NOSONAR
71+
IMPL_CLOSE_SELECTABGLE_CHANNEL_HANLE = MethodHandles.lookup().unreflect(implCloseSelectableChannel);
72+
73+
Method implConfigureBlocking = AbstractSelectableChannel.class.getDeclaredMethod("implConfigureBlocking", boolean.class);
74+
implConfigureBlocking.setAccessible(true); // NOSONAR
75+
IMPL_CONFIGURE_BLOCKING = MethodHandles.lookup().unreflect(implConfigureBlocking);
76+
} catch (NoSuchFieldException | NoSuchMethodException | SecurityException | IllegalAccessException e) {
77+
throw new IllegalStateException("Unknown structure of protocols", e);
78+
}
79+
ENDPOINT_FIELD.setAccessible(true); // NOSONAR
80+
NIO_SOCKET_FIELD.setAccessible(true); // NOSONAR
81+
}
82+
83+
/**
84+
* Update Protocol class of Tomcat. For supported protocols (Nio) it will replace endpoint implementation
85+
* with the wrapper class providing the fix - handling of restart of TCP/IP stack.
86+
*
87+
* @param abstractProtocol instance to update
88+
* @param rebindHandler call-back (to next update after TCP/IP restart)
89+
*/
90+
private void update(AbstractProtocol<?> abstractProtocol, Runnable rebindHandler) {
91+
try {
92+
AbstractEndpoint<?, ?> abstractEndpoint = (AbstractEndpoint<Object, Object>) ENDPOINT_FIELD.get(abstractProtocol);
93+
94+
if (abstractEndpoint instanceof NioEndpoint) {
95+
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) NIO_SOCKET_FIELD.get(abstractEndpoint);
96+
serverSocketChannel = new FixedServerSocketChannel(serverSocketChannel, abstractEndpoint, rebindHandler);
97+
NIO_SOCKET_FIELD.set(abstractEndpoint, serverSocketChannel); // NOSONAR
98+
} else {
99+
log.warn("Unsupported protocol: {}", abstractEndpoint.getClass().getName());
100+
}
101+
} catch (Exception e) {
102+
log.warn("Cannot update connector to handle errors on socket accepting", e);
103+
}
104+
}
105+
106+
/**
107+
* Update protocol inside the protocol. This method is on the highest level to allow updating each required part.
108+
*
109+
* @param connector connector to update
110+
*
111+
* Note: If protocol of connector is not supported no change happened.
112+
*/
113+
private void update(Connector connector) {
114+
ProtocolHandler protocolHandler = connector.getProtocolHandler();
115+
if (protocolHandler instanceof AbstractProtocol) {
116+
update((AbstractProtocol<?>) protocolHandler, () -> update(connector));
117+
} else {
118+
log.warn("Unsupported protocol handler: {}", protocolHandler.getClass().getName());
119+
}
120+
}
121+
122+
/**
123+
* @return customizer class for Embedded Tomcat to extend connectors. It will restart server socket in the case
124+
* of TCP/IP stack restart
125+
*/
126+
@Bean
127+
public TomcatConnectorCustomizer tomcatAcceptorFix() {
128+
return connector -> connector.addLifecycleListener(event -> {
129+
if (event.getLifecycle().getState() == LifecycleState.STARTED) {
130+
update(connector);
131+
}
132+
});
133+
}
134+
135+
/**
136+
* Detection of service stopping. It will unblock threads to successful shutdown.
137+
*/
138+
@PreDestroy
139+
public void stopping() {
140+
running.set(false);
141+
}
142+
143+
/**
144+
* Socket implementation wrapper to handle rebinding on TCP Stack restart
145+
*/
146+
class FixedServerSocketChannel extends ServerSocketChannel {
147+
148+
/**
149+
* Wrapper server socket inside
150+
*/
151+
@Delegate(excludes = Overridden.class)
152+
private final ServerSocketChannel socket;
153+
154+
/**
155+
* The endpoint instance used by Server socket
156+
*/
157+
private final AbstractEndpoint<?, ?> abstractEndpoint;
158+
159+
/**
160+
* Define the generation of binding, each rebinding the value is increased
161+
*/
162+
private final AtomicInteger state = new AtomicInteger();
163+
164+
/**
165+
* Handler to call after successful rebind of server socket
166+
*/
167+
private final Runnable rebindHandler;
168+
169+
FixedServerSocketChannel(ServerSocketChannel socket, AbstractEndpoint<?, ?> abstractEndpoint, Runnable rebindHandler) {
170+
super(socket.provider());
171+
this.socket = socket;
172+
this.abstractEndpoint = abstractEndpoint;
173+
this.rebindHandler = rebindHandler;
174+
}
175+
176+
@Override
177+
protected void implCloseSelectableChannel() throws IOException {
178+
try {
179+
IMPL_CLOSE_SELECTABGLE_CHANNEL_HANLE.invoke(socket);
180+
} catch (IOException | RuntimeException e) {
181+
throw e;
182+
} catch (Throwable t) {
183+
throw new IllegalStateException(t);
184+
}
185+
}
186+
187+
@Override
188+
protected void implConfigureBlocking(boolean block) throws IOException {
189+
try {
190+
IMPL_CONFIGURE_BLOCKING.invoke(socket, block);
191+
} catch (IOException | RuntimeException e) {
192+
throw e;
193+
} catch (Throwable t) {
194+
throw new IllegalStateException(t);
195+
}
196+
}
197+
198+
/**
199+
* Method tries to bind port. In case it is not available it waits until TCP/IP stack is restarted
200+
*
201+
* @throws IOException bind is not possible at the moment for an unspecific error
202+
* @throws InterruptedException if thread was interrupted. It cannot wait to next attempt of binding.
203+
*/
204+
private void bindWithWait() throws IOException, InterruptedException {
205+
while (true) {
206+
try {
207+
abstractEndpoint.bind();
208+
break;
209+
} catch (Throwable t) {
210+
log.debug("Cannot rebind socket", t);
211+
if (!running.get()) {
212+
throw new IOException("Application is stopping during the attempt to rebind the socket", t);
213+
}
214+
// delay between attempt to rebinding to avoid overloading
215+
Thread.sleep(retryRebindTimeoutSecs);
216+
}
217+
}
218+
}
219+
220+
/**
221+
* Rebind the server socket. The action could be done just by one thread. Other treats are waiting to be finish
222+
* by first one.
223+
*
224+
* @param stateBefore id of socket binding
225+
* @throws IOException unexpected exception during rebinding (socket cannot be closed or the connector cannot
226+
* be re-updated)
227+
*/
228+
private synchronized void rebind(int stateBefore) throws IOException {
229+
if (state.compareAndSet(stateBefore, stateBefore + 1)) {
230+
try {
231+
// socket must be closed before new binding
232+
socket.close();
233+
234+
// till TCP/IP stack is running again try to bind the port
235+
bindWithWait();
236+
237+
// in case of successfull update the connector instance, it is not handled by this wrapper anymore
238+
rebindHandler.run();
239+
} catch (InterruptedException e) {
240+
Thread.currentThread().interrupt();
241+
} catch (Exception e) {
242+
throw new IOException("Cannot rebind the port", e);
243+
}
244+
}
245+
}
246+
247+
public SocketChannel accept() throws IOException {
248+
// obtain current state of rebinding to detection parallel actions
249+
final int stateBefore = state.get();
250+
try {
251+
return socket.accept();
252+
} catch (IOException ioe) {
253+
if (ioe.getMessage().contains("EDC5122I")) {
254+
// the fix solve just one issue about stopped TCP/IP stack
255+
log.debug("The TCP/IP stack was probably restarted. The socket of Tomcat will rebind.");
256+
rebind(stateBefore);
257+
return socket.accept();
258+
} else {
259+
throw ioe;
260+
}
261+
}
262+
}
263+
264+
}
265+
266+
/**
267+
* The list of final methods, which cannot be delegated. See {@link FixedServerSocketChannel#socket}
268+
*/
269+
private interface Overridden {
270+
271+
SocketChannel accept() throws IOException;
272+
int validOps();
273+
ServerSocketChannel bind(SocketAddress local) throws IOException;
274+
SelectorProvider provider();
275+
boolean isRegistered();
276+
SelectionKey keyFor(Selector sel);
277+
SelectionKey register(Selector sel, int ops, Object att);
278+
void implCloseChannel() throws IOException;
279+
boolean isBlocking();
280+
Object blockingLock();
281+
SelectableChannel configureBlocking(boolean block) throws IOException;
282+
SelectionKey register(Selector sel, int ops) throws ClosedChannelException;
283+
void close() throws IOException;
284+
boolean isOpen();
285+
286+
}
287+
288+
}

0 commit comments

Comments
 (0)