Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[UNDERTOW-1818] AbstractServletInputStreamTestCase.runTestParallel fails with bytes out of order #1384

Merged
merged 1 commit into from Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 6 additions & 2 deletions core/src/main/java/io/undertow/server/Connectors.java
Expand Up @@ -144,8 +144,12 @@ public static void ungetRequestBytes(final HttpServerExchange exchange, PooledBy
System.arraycopy(buffers, 0, newArray, 0, buffers.length);
} else {
newArray = new PooledByteBuffer[existing.length + buffers.length];
System.arraycopy(existing, 0, newArray, 0, existing.length);
System.arraycopy(buffers, 0, newArray, existing.length, buffers.length);
// If there are previous buffers we are re-buffering data so although
// counterintuitive first put the new data and then the existing buffers.
// Example: there are buffered data with buffers A,B and A is retrieved
// but returned, it should be A,B again and not B,A
System.arraycopy(buffers, 0, newArray, 0, buffers.length);
System.arraycopy(existing, 0, newArray, buffers.length, existing.length);
}
exchange.putAttachment(HttpServerExchange.BUFFERED_REQUEST_DATA, newArray); //todo: force some kind of wakeup?
exchange.addExchangeCompleteListener(BufferedRequestDataCleanupListener.INSTANCE);
Expand Down
18 changes: 12 additions & 6 deletions core/src/test/java/io/undertow/testutils/DefaultServer.java
Expand Up @@ -448,10 +448,11 @@ public static boolean startServer() {
proxyAcceptListener = ChannelListeners.openListenerAdapter(wrapOpenListener(proxyOpenListener));
proxyServer = worker.createStreamConnectionServer(new InetSocketAddress(Inet4Address.getByName(getHostAddress(DEFAULT)), getHostPort(DEFAULT)), proxyAcceptListener, serverOptions);
loadBalancingProxyClient = new LoadBalancingProxyClient(GSSAPIAuthenticationMechanism.EXCLUSIVITY_CHECKER)
.setMaxQueueSize(20)
.addHost(new URI("ajp", null, getHostAddress(DEFAULT), getHostPort(DEFAULT) + PROXY_OFFSET, "/", null, null));
ProxyHandler proxyHandler = ProxyHandler.builder()
.setProxyClient(loadBalancingProxyClient)
.setMaxRequestTime(120000)
.setMaxRequestTime(60000)
.setNext(HANDLE_404)
.setReuseXForwarded(true)
.build();
Expand All @@ -470,10 +471,11 @@ public static boolean startServer() {
proxyAcceptListener = ChannelListeners.openListenerAdapter(wrapOpenListener(proxyOpenListener));
proxyServer = worker.createStreamConnectionServer(new InetSocketAddress(Inet4Address.getByName(getHostAddress(DEFAULT)), getHostPort(DEFAULT)), proxyAcceptListener, serverOptions);
loadBalancingProxyClient = new LoadBalancingProxyClient(GSSAPIAuthenticationMechanism.EXCLUSIVITY_CHECKER)
.setMaxQueueSize(20)
.addHost(new URI("h2", null, getHostAddress(DEFAULT), getHostPort(DEFAULT) + PROXY_OFFSET, "/", null, null), null, new UndertowXnioSsl(xnio, OptionMap.EMPTY, SSL_BUFFER_POOL, clientContext), OptionMap.create(UndertowOptions.ENABLE_HTTP2, true));
ProxyHandler proxyHandler = ProxyHandler.builder()
.setProxyClient(loadBalancingProxyClient)
.setMaxRequestTime(120000)
.setMaxRequestTime(60000)
.setNext(HANDLE_404)
.setReuseXForwarded(true)
.build();
Expand All @@ -491,10 +493,11 @@ public static boolean startServer() {
proxyAcceptListener = ChannelListeners.openListenerAdapter(wrapOpenListener(proxyOpenListener));
proxyServer = worker.createStreamConnectionServer(new InetSocketAddress(Inet4Address.getByName(getHostAddress(DEFAULT)), getHostPort(DEFAULT)), proxyAcceptListener, serverOptions);
loadBalancingProxyClient = new LoadBalancingProxyClient(GSSAPIAuthenticationMechanism.EXCLUSIVITY_CHECKER)
.setMaxQueueSize(20)
.addHost(new URI(h2cUpgrade ? "http" : "h2c-prior", null, getHostAddress(DEFAULT), getHostPort(DEFAULT) + PROXY_OFFSET, "/", null, null), null, null, OptionMap.create(UndertowOptions.ENABLE_HTTP2, true));
ProxyHandler proxyHandler = ProxyHandler.builder()
.setProxyClient(loadBalancingProxyClient)
.setMaxRequestTime(30000)
.setMaxRequestTime(60000)
.setNext(HANDLE_404)
.setReuseXForwarded(true)
.build();
Expand All @@ -514,10 +517,11 @@ public static boolean startServer() {
proxyAcceptListener = ChannelListeners.openListenerAdapter(wrapOpenListener(proxyOpenListener));
proxyServer = worker.createStreamConnectionServer(new InetSocketAddress(Inet4Address.getByName(getHostAddress(DEFAULT)), getHostPort(DEFAULT)), proxyAcceptListener, serverOptions);
loadBalancingProxyClient = new LoadBalancingProxyClient(GSSAPIAuthenticationMechanism.EXCLUSIVITY_CHECKER)
.setMaxQueueSize(20)
.addHost(new URI("https", null, getHostAddress(DEFAULT), getHostPort(DEFAULT) + PROXY_OFFSET, "/", null, null), clientSsl);
ProxyHandler proxyHandler = ProxyHandler.builder()
.setProxyClient(loadBalancingProxyClient)
.setMaxRequestTime(30000)
.setMaxRequestTime(60000)
.setNext(HANDLE_404)
.setReuseXForwarded(true)
.build();
Expand All @@ -541,10 +545,11 @@ public static boolean startServer() {
proxyAcceptListener = ChannelListeners.openListenerAdapter(wrapOpenListener(proxyOpenListener));
proxyServer = worker.createStreamConnectionServer(new InetSocketAddress(Inet4Address.getByName(getHostAddress(DEFAULT)), getHostPort(DEFAULT)), proxyAcceptListener, serverOptions);
loadBalancingProxyClient = new LoadBalancingProxyClient(GSSAPIAuthenticationMechanism.EXCLUSIVITY_CHECKER)
.setMaxQueueSize(20)
.addHost(new URI("http", null, getHostAddress(DEFAULT), getHostPort(DEFAULT) + PROXY_OFFSET, "/", null, null));
ProxyHandler proxyHandler = ProxyHandler.builder()
.setProxyClient(loadBalancingProxyClient)
.setMaxRequestTime(30000)
.setMaxRequestTime(60000)
.setNext(HANDLE_404)
.setReuseXForwarded(true)
.build();
Expand Down Expand Up @@ -891,7 +896,8 @@ public static void stopSSLServer() throws IOException {
if (proxyOpenListener != null) {
proxyOpenListener.closeConnections();
shuttingDown = true;
} else if (openListener != null) {
}
if (openListener != null) {
openListener.closeConnections();
shuttingDown = true;
}
Expand Down
Expand Up @@ -59,6 +59,7 @@ public abstract class AbstractServletInputStreamTestCase {
public static final String BLOCKING_SERVLET = "blockingInput";
public static final String ASYNC_SERVLET = "asyncInput";
public static final String ASYNC_EAGER_SERVLET = "asyncEagerInput";
private static final int CONCURRENCY = Math.min(20, 4 * Runtime.getRuntime().availableProcessors());

@Test
public void testBlockingServletInputStream() {
Expand Down Expand Up @@ -117,7 +118,7 @@ public void testAsyncServletInputStreamInParallel() throws Exception {
builder.append(HELLO_WORLD);
}
String message = builder.toString();
runTestParallel(20, message, ASYNC_SERVLET, false, false);
runTestParallel(CONCURRENCY, message, ASYNC_SERVLET, false, false);
}

@Test
Expand All @@ -127,7 +128,7 @@ public void testAsyncServletInputStreamInParallelOffIoThread() throws Exception
builder.append(HELLO_WORLD);
}
String message = builder.toString();
runTestParallel(20, message, ASYNC_SERVLET, false, true);
runTestParallel(CONCURRENCY, message, ASYNC_SERVLET, false, true);
}

@Test
Expand Down
Expand Up @@ -42,6 +42,7 @@ protected void doPost(final HttpServletRequest req, final HttpServletResponse re
final int preamble = Math.max(0, req.getIntHeader("preamble"));
final boolean offIoThread = req.getHeader("offIoThread") != null;
final AsyncContext context = req.startAsync();
context.setTimeout(60000);

final ServletOutputStream outputStream = resp.getOutputStream();
ServletInputStream inputStream = req.getInputStream();
Expand Down
Expand Up @@ -45,6 +45,7 @@ protected void doGet(final HttpServletRequest req, final HttpServletResponse res
final AtomicInteger count = new AtomicInteger();

final AsyncContext context = req.startAsync();
context.setTimeout(60000);
final ServletOutputStream outputStream = resp.getOutputStream();
if(preable) {
for(int i = 0; i < reps; ++i) {
Expand Down
Expand Up @@ -26,9 +26,7 @@
import io.undertow.servlet.api.ServletInfo;
import io.undertow.servlet.test.util.DeploymentUtils;
import io.undertow.testutils.DefaultServer;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;

/**
Expand All @@ -52,16 +50,4 @@ public void handleDeployment(DeploymentInfo deploymentInfo, ServletContext servl
.addMapping("/" + ASYNC_SERVLET)
.setAsyncSupported(true));
}

@Test
public void testAsyncServletInputStreamInParallel() throws Exception {
Assume.assumeFalse(DefaultServer.isH2upgrade()); // FIXME UNDERTOW-1818 bytes out of order
super.testAsyncServletInputStreamInParallel();
}

@Test
public void testAsyncServletInputStreamInParallelOffIoThread() throws Exception {
Assume.assumeFalse(DefaultServer.isH2upgrade()); // FIXME UNDERTOW-1818 bytes out of order
super.testAsyncServletInputStreamInParallelOffIoThread();
}
}
Expand Up @@ -67,7 +67,7 @@ public void testAsyncServletInputStreamEagerIsReady() {
public void testAsyncServletInputStreamInParallelOffIoThread() {
}

@Override @Test @Ignore ("UNDERTOW-1927 503 result received sporadically UNDERTOW-1818 bytes out of order") // FIXME
@Override @Test @Ignore ("UNDERTOW-1927 503 result received sporadically") // FIXME
public void testAsyncServletInputStreamInParallel() {
}
}