Skip to content
Permalink
Browse files
8276779: (ch) InputStream returned by Channels.newInputStream should …
…have fast path for SelectableChannels

Reviewed-by: lancea, alanb
  • Loading branch information
mkarg authored and Alan Bateman committed Dec 4, 2021
1 parent 02ee337 commit 9642629d15d9631d8da9a3abdabc40323f3c774b
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 11 deletions.
@@ -69,9 +69,12 @@ public final class Channels {
/**
* Constructs a stream that reads bytes from the given channel.
*
* <p> The {@code read} methods of the resulting stream will throw an
* {@link IllegalBlockingModeException} if invoked while the underlying
* channel is in non-blocking mode. The stream will not be buffered, and
* <p> The {@code read} and {@code transferTo} methods of the resulting stream
* will throw an {@link IllegalBlockingModeException} if invoked while the
* underlying channel is in non-blocking mode. The {@code transferTo} method
* will also throw an {@code IllegalBlockingModeException} if invoked to
* transfer bytes to an output stream that writes to an underlying channel in
* non-blocking mode. The stream will not be buffered, and
* it will not support the {@link InputStream#mark mark} or {@link
* InputStream#reset reset} methods. The stream will be safe for access by
* multiple concurrent threads. Closing the stream will in turn cause the
@@ -34,6 +34,7 @@
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Arrays;
import java.util.Objects;
import jdk.internal.util.ArraysSupport;
@@ -238,15 +239,28 @@ public long transferTo(OutputStream out) throws IOException {
Objects.requireNonNull(out, "out");

if (out instanceof ChannelOutputStream cos
&& ch instanceof FileChannel fc
&& cos.channel() instanceof FileChannel dst) {
return transfer(fc, dst);
&& ch instanceof FileChannel fc) {
WritableByteChannel wbc = cos.channel();

if (wbc instanceof FileChannel dst) {
return transfer(fc, dst);
}

if (wbc instanceof SelectableChannel sc) {
synchronized (sc.blockingLock()) {
if (!sc.isBlocking())
throw new IllegalBlockingModeException();
return transfer(fc, wbc);
}
}

return transfer(fc, wbc);
}

return super.transferTo(out);
}

private static long transfer(FileChannel src, FileChannel dst) throws IOException {
private static long transfer(FileChannel src, WritableByteChannel dst) throws IOException {
long initialPos = src.position();
long pos = initialPos;
try {
@@ -28,11 +28,19 @@
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.IllegalBlockingModeException;
import java.nio.channels.Pipe;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
@@ -70,6 +78,8 @@ public class TransferTo {

private static final Random RND = RandomFactory.getRandom();

private static final Path CWD = Path.of(".");

/*
* Provides test scenarios, i. e. combinations of input and output streams to be tested.
*/
@@ -79,6 +89,12 @@ public static Object[][] streamCombinations() throws Exception {
// tests FileChannel.transferTo(FileChannel) optimized case
{ fileChannelInput(), fileChannelOutput() },

// tests FileChannel.transferTo(SelectableChannelOutput) optimized case
{ fileChannelInput(), selectableChannelOutput() },

// tests FileChannel.transferTo(WritableChannelOutput) optimized case
{ fileChannelInput(), writableByteChannelOutput() },

// tests InputStream.transferTo(OutputStream) default case
{ readableByteChannelInput(), defaultOutput() }
};
@@ -138,10 +154,10 @@ public void testStreamContents(InputStreamProvider inputStreamProvider,
*/
@Test
public void testMoreThanTwoGB() throws IOException {
Path sourceFile = Files.createTempFile("test2GBSource", null);
Path sourceFile = Files.createTempFile(CWD, "test2GBSource", null);
try {
// preparing two temporary files which will be compared at the end of the test
Path targetFile = Files.createTempFile("test2GBtarget", null);
Path targetFile = Files.createTempFile(CWD, "test2GBtarget", null);
try {
// writing 3 GB of random bytes into source file
for (int i = 0; i < NUM_WRITES; i++)
@@ -169,6 +185,37 @@ public void testMoreThanTwoGB() throws IOException {
}
}

/*
* Special test whether selectable channel based transfer throws blocking mode exception.
*/
@Test
public void testIllegalBlockingMode() throws IOException {
Pipe pipe = Pipe.open();
try {
// testing arbitrary input (here: empty file) to non-blocking selectable output
try (FileChannel fc = FileChannel.open(Files.createTempFile(CWD, "testIllegalBlockingMode", null));
InputStream is = Channels.newInputStream(fc);
SelectableChannel sc = pipe.sink().configureBlocking(false);
OutputStream os = Channels.newOutputStream((WritableByteChannel) sc)) {

// IllegalBlockingMode must be thrown when trying to perform a transfer
assertThrows(IllegalBlockingModeException.class, () -> is.transferTo(os));
}

// testing non-blocking selectable input to arbitrary output (here: byte array)
try (SelectableChannel sc = pipe.source().configureBlocking(false);
InputStream is = Channels.newInputStream((ReadableByteChannel) sc);
OutputStream os = new ByteArrayOutputStream()) {

// IllegalBlockingMode must be thrown when trying to perform a transfer
assertThrows(IllegalBlockingModeException.class, () -> is.transferTo(os));
}
} finally {
pipe.source().close();
pipe.sink().close();
}
}

/*
* Asserts that the transferred content is correct, i. e. compares the actually transferred bytes
* to the expected assumption. The position of the input and output stream before the transfer is
@@ -242,7 +289,7 @@ private static InputStreamProvider fileChannelInput() {
return new InputStreamProvider() {
@Override
public InputStream input(byte... bytes) throws Exception {
Path path = Files.createTempFile(null, null);
Path path = Files.createTempFile(CWD, "fileChannelInput", null);
Files.write(path, bytes);
FileChannel fileChannel = FileChannel.open(path);
return Channels.newInputStream(fileChannel);
@@ -268,7 +315,7 @@ public InputStream input(byte... bytes) throws Exception {
private static OutputStreamProvider fileChannelOutput() {
return new OutputStreamProvider() {
public OutputStream output(Consumer<Supplier<byte[]>> spy) throws Exception {
Path path = Files.createTempFile(null, null);
Path path = Files.createTempFile(CWD, "fileChannelOutput", null);
FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.WRITE);
spy.accept(() -> {
try {
@@ -282,4 +329,39 @@ public OutputStream output(Consumer<Supplier<byte[]>> spy) throws Exception {
};
}

private static OutputStreamProvider selectableChannelOutput() throws IOException {
return new OutputStreamProvider() {
public OutputStream output(Consumer<Supplier<byte[]>> spy) throws Exception {
Pipe pipe = Pipe.open();
Future<byte[]> bytes = CompletableFuture.supplyAsync(() -> {
try {
InputStream is = Channels.newInputStream(pipe.source());
return is.readAllBytes();
} catch (IOException e) {
throw new AssertionError("Exception while asserting content", e);
}
});
final OutputStream os = Channels.newOutputStream(pipe.sink());
spy.accept(() -> {
try {
os.close();
return bytes.get();
} catch (IOException | InterruptedException | ExecutionException e) {
throw new AssertionError("Exception while asserting content", e);
}
});
return os;
}
};
}

private static OutputStreamProvider writableByteChannelOutput() {
return new OutputStreamProvider() {
public OutputStream output(Consumer<Supplier<byte[]>> spy) throws Exception {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
spy.accept(outputStream::toByteArray);
return Channels.newOutputStream(Channels.newChannel(outputStream));
}
};
}
}

1 comment on commit 9642629

@openjdk-notifier
Copy link

@openjdk-notifier openjdk-notifier bot commented on 9642629 Dec 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.