Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -1178,6 +1178,13 @@ public static BodySubscriber<Path> ofFile(Path file) {
* the underlying HTTP connection to be closed and prevent it
* from being reused for subsequent operations.
*
* @implNote The {@code read} method of the {@code InputStream}
* returned by the default implementation of this method will
* throw an {@code IOException} with the {@linkplain Thread#isInterrupted()
* thread interrupt status set} if the thread is interrupted
* while blocking on read. In that case, the request will also be
* cancelled and the {@code InputStream} will be closed.
*
* @return a body subscriber that streams the response body as an
* {@link InputStream}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,12 @@ private ByteBuffer current() throws IOException {
if (debug.on()) debug.log("Next Buffer");
currentBuffer = currentListItr.next();
} catch (InterruptedException ex) {
// continue
try {
close();
} catch (IOException ignored) {
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will swallow the interrupt status, you' need to do Thread.currentThread().interrupt() before throwing the IOException.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have specific opinion here and this is more of a question - should we be doing:

} catch (InterruptedException ex) {
    final IOException toThrow = new IOException(ex);
    try {
        close();
    } catch (IOException ignored) {
        toThrow.addSuppressed(ignored);
    }
    Thread.currentThread().interrupt();
    throw toThrow;    
}

i.e. should we be adding any failure to close() as a suppressed exception to the IOException that we throw?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure whether that would be helpful or confusing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's a good point, I don't really have a strong opinion one way or the other on it but would be curious if anyone else does?

Copy link
Member

@jaikiran jaikiran Dec 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave it the way you have it now.

Thread.currentThread().interrupt();
throw new IOException(ex);
}
}
assert currentBuffer == LAST_BUFFER || currentBuffer.hasRemaining();
Expand Down
176 changes: 176 additions & 0 deletions test/jdk/java/net/httpclient/HttpResponseInputStreamInterruptTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* Copyright (c) 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/

/**
* @test
* @bug 8294047
* @library /test/lib
* @run junit HttpResponseInputStreamInterruptTest
*/

import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import jdk.test.lib.net.URIBuilder;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.concurrent.CountDownLatch;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class HttpResponseInputStreamInterruptTest {

HttpServer server;
int port;
private final CountDownLatch interruptReadyLatch = new CountDownLatch(2);
private final CountDownLatch interruptDoneLatch = new CountDownLatch(1);
static final String FIRST_MESSAGE = "Should be received";
static final String SECOND_MESSAGE = "Shouldn't be received";

@BeforeAll
void before() throws Exception {
InetAddress loopback = InetAddress.getLoopbackAddress();
InetSocketAddress addr = new InetSocketAddress(loopback, 0);
server = HttpServer.create(addr, 0);
port = server.getAddress().getPort();
Handler handler = new Handler(interruptReadyLatch, interruptDoneLatch);
server.createContext("/HttpResponseInputStreamInterruptTest/", handler);
server.start();
}

@AfterAll
void after() throws Exception {
server.stop(0);
}

@Test
public void test() throws Exception {
// create client and interrupter threads
Thread clientThread = createClientThread(interruptReadyLatch, port);
Thread interrupterThread = new Thread(() -> {
try {
// wait until the clientThread is just about to read the second message sent by the server
// then interrupt the thread to cause an error to be thrown
interruptReadyLatch.await();
clientThread.interrupt();
interruptDoneLatch.countDown();
} catch (InterruptedException e) {
System.out.println("interrupterThread failed");
throw new RuntimeException(e);
}
});

// Start the threads then wait until clientThread completes
clientThread.start();
interrupterThread.start();
clientThread.join();
}

static class Handler implements HttpHandler {

CountDownLatch interruptReadyLatch;
CountDownLatch interruptDoneLatch;
Comment on lines +107 to +108
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be final

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this change was missed and is pending?


public Handler(CountDownLatch interruptReadyLatch, CountDownLatch interruptDoneLatch) {
this.interruptReadyLatch = interruptReadyLatch;
this.interruptDoneLatch = interruptDoneLatch;
}

@Override
public void handle(HttpExchange exchange) throws IOException {
try (OutputStream os = exchange.getResponseBody()) {
byte[] workingResponse = FIRST_MESSAGE.getBytes();
byte[] errorResponse = SECOND_MESSAGE.getBytes();
exchange.sendResponseHeaders(200, workingResponse.length + errorResponse.length);

// write and flush the first message which is expected to be received successfully
os.write(workingResponse);
os.flush();

// await the interrupt threads completion, then write the second message
interruptReadyLatch.countDown();
interruptDoneLatch.await();
os.write(errorResponse);
} catch (InterruptedException e) {
System.out.println("interruptDoneLatch await failed");
throw new RuntimeException(e);
}
}
}

static Thread createClientThread(CountDownLatch interruptReadyLatch, int port) {
return new Thread(() -> {
try {
HttpClient client = HttpClient
.newBuilder()
.proxy(HttpClient.Builder.NO_PROXY)
.build();

URI uri = URIBuilder.newBuilder()
.scheme("http")
.loopback()
.port(port)
.path("/HttpResponseInputStreamInterruptTest/")
.build();

HttpRequest request = HttpRequest
.newBuilder(uri)
.GET()
.build();

// Send a httpRequest and assert the first response is received as expected
HttpResponse<InputStream> response = client.send(request, HttpResponse.BodyHandlers.ofInputStream());
String firstOutput = new String(response.body().readNBytes(FIRST_MESSAGE.getBytes().length));
assertEquals(firstOutput, FIRST_MESSAGE);

// countdown on latch, and assert that an IOException is throw due to the interrupt
// and assert that the cause is a InterruptedException
interruptReadyLatch.countDown();
var thrown = assertThrows(IOException.class, () -> response.body().readAllBytes(), "expected IOException");
var cause = thrown.getCause();
assertTrue(cause instanceof InterruptedException, cause + " is not an InterruptedException");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah - and of course the test should verify that the thread interrupted status is set too, since we specified that :-)

var thread = Thread.currentThread();
assertTrue(thread.isInterrupted(), "Thread " + thread + " is not interrupted");

var thread = Thread.currentThread();
assertTrue(thread.isInterrupted(), "Thread " + thread + " is not interrupted");
} catch (Throwable t) {
t.printStackTrace();
fail();
}
});
}
}