Skip to content
Permalink
Browse files

DatagramChannel adaptor receive should park

  • Loading branch information
AlanBateman committed Nov 11, 2019
1 parent d84f670 commit 0fd9645ce8ab25f60f72f7dfa0a9713a2576a736
@@ -561,6 +561,7 @@ private SocketAddress trustedBlockingReceive(ByteBuffer dst)
try {
SocketAddress remote = beginRead(true, false);
boolean connected = (remote != null);
lockedConfigureNonBlockingIfNeeded();
n = receive(dst, connected);
while (n == IOStatus.UNAVAILABLE && isOpen()) {
park(Net.POLLIN);
@@ -608,7 +609,6 @@ private SocketAddress trustedBlockingReceive(ByteBuffer dst, long nanos)
// restore socket to blocking mode (if channel is open)
tryLockedConfigureBlocking(true);
}

} finally {
endRead(true, completed);
assert IOStatus.check(n);
@@ -29,8 +29,10 @@

import java.io.Closeable;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
@@ -76,7 +78,7 @@ public void testSocketChannelReadWrite1() throws Exception {
/**
* Lightweight thread blocks in SocketChannel read.
*/
public void testSocketChannelReadWrite2() throws Exception {
public void testSocketChannelRead() throws Exception {
TestHelper.runInLightWeightThread(() -> {
try (var connection = new Connection()) {
SocketChannel sc1 = connection.channel1();
@@ -98,7 +100,7 @@ public void testSocketChannelReadWrite2() throws Exception {
/**
* Lightweight thread blocks in SocketChannel write.
*/
public void testSocketChannelReadWrite3() throws Exception {
public void testSocketChannelWrite() throws Exception {
TestHelper.runInLightWeightThread(() -> {
try (var connection = new Connection()) {
SocketChannel sc1 = connection.channel1();
@@ -149,7 +151,6 @@ public void testSocketChannelReadInterrupt() throws Exception {
}
});
}


/**
* SocketChannel close while lightweight thread blocked in write.
@@ -190,6 +191,45 @@ public void testSocketChannelWriteInterrupt() throws Exception {
}
});
}

/**
* Lightweight thread blocks in SocketChannel read.
*/
private void testSocketAdaptorRead(int timeout) throws Exception {
TestHelper.runInLightWeightThread(() -> {
try (var connection = new Connection()) {
SocketChannel sc1 = connection.channel1();
SocketChannel sc2 = connection.channel2();

// schedule write
ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
ScheduledWriter.schedule(sc1, bb, DELAY);

// read should block
if (timeout > 0)
sc2.socket().setSoTimeout(timeout);

byte[] array = new byte[100];
int n = sc2.socket().getInputStream().read(array);
assertTrue(n > 0);
assertTrue(array[0] == 'X');
}
});
}

/**
* Lightweight thread blocks in SocketChannel adaptor read.
*/
public void testSocketAdaptorRead1() throws Exception {
testSocketAdaptorRead(0);
}

/**
* Lightweight thread blocks in SocketChannel adaptor read with timeout.
*/
public void testSocketAdaptorRead2() throws Exception {
testSocketAdaptorRead(60_000);
}

/**
* ServerSocketChannel accept, no blocking.
@@ -259,12 +299,43 @@ public void testServerSocketChannelAcceptInterrupt() throws Exception {
}
});
}


void testSocketChannelAdaptorAccept(int timeout) throws Exception {
TestHelper.runInLightWeightThread(() -> {
try (var ssc = ServerSocketChannel.open()) {
ssc.bind(new InetSocketAddress(InetAddress.getLocalHost(), 0));
var sc1 = SocketChannel.open();
ScheduledConnector.schedule(sc1, ssc.getLocalAddress(), DELAY);

if (timeout > 0)
ssc.socket().setSoTimeout(timeout);

// accept will block
Socket s = ssc.socket().accept();
sc1.close();
s.close();
}
});
}

/**
* Lightweight thread blocks in ServerSocketChannel adaptor accept.
*/
public void testSocketChannelAdaptorAccept1() throws Exception {
testSocketChannelAdaptorAccept(0);
}

/**
* Lightweight thread blocks in ServerSocketChannel adaptor accept with timeout.
*/
public void testSocketChannelAdaptorAccept2() throws Exception {
testSocketChannelAdaptorAccept(60_000);
}

/**
* DatagramChannel receive/send, no blocking.
*/
public void testDatagramhannelSendReceive1() throws Exception {
public void testDatagramChannelSendReceive1() throws Exception {
TestHelper.runInLightWeightThread(() -> {
try (DatagramChannel dc1 = DatagramChannel.open();
DatagramChannel dc2 = DatagramChannel.open()) {
@@ -288,7 +359,7 @@ public void testDatagramhannelSendReceive1() throws Exception {
/**
* Lightweight thread blocks in DatagramChannel receive.
*/
public void testDatagramhannelSendReceive2() throws Exception {
public void testDatagramChannelSendReceive2() throws Exception {
TestHelper.runInLightWeightThread(() -> {
try (DatagramChannel dc1 = DatagramChannel.open();
DatagramChannel dc2 = DatagramChannel.open()) {
@@ -311,7 +382,7 @@ public void testDatagramhannelSendReceive2() throws Exception {
/**
* DatagramChannel close while lightweight thread blocked in receive.
*/
public void testDatagramhannelReceiveAsyncClose() throws Exception {
public void testDatagramChannelReceiveAsyncClose() throws Exception {
TestHelper.runInLightWeightThread(() -> {
try (DatagramChannel dc = DatagramChannel.open()) {
InetAddress lh = InetAddress.getLocalHost();
@@ -328,7 +399,7 @@ public void testDatagramhannelReceiveAsyncClose() throws Exception {
/**
* Lightweight thread interrupted while blocked in DatagramChannel receive.
*/
public void testDatagramhannelReceiveInterrupt() throws Exception {
public void testDatagramChannelReceiveInterrupt() throws Exception {
TestHelper.runInLightWeightThread(() -> {
try (DatagramChannel dc = DatagramChannel.open()) {
InetAddress lh = InetAddress.getLocalHost();
@@ -342,6 +413,43 @@ public void testDatagramhannelReceiveInterrupt() throws Exception {
});
}

void testDatagramSocketAdaptorReceive(int timeout) throws Exception {
TestHelper.runInLightWeightThread(() -> {
try (DatagramChannel dc1 = DatagramChannel.open();
DatagramChannel dc2 = DatagramChannel.open()) {

InetAddress lh = InetAddress.getLocalHost();
dc2.bind(new InetSocketAddress(lh, 0));

// schedule send
ByteBuffer bb = ByteBuffer.wrap("XXX".getBytes("UTF-8"));
ScheduledSender.schedule(dc1, bb, dc2.getLocalAddress(), DELAY);

// receive should block
byte[] array = new byte[100];
DatagramPacket p = new DatagramPacket(array, 0, array.length);
if (timeout > 0)
dc2.socket().setSoTimeout(timeout);
dc2.socket().receive(p);
assertTrue(p.getLength() == 3 && array[0] == 'X');
}
});
}

/**
* Lightweight thread blocks in DatagramSocket adaptor receive
*/
public void testDatagramSocketAdaptorReceive1() throws Exception {
testDatagramSocketAdaptorReceive(0);
}

/**
* Lightweight thread blocks in DatagramSocket adaptor receive with timeout
*/
public void testDatagramSocketAdaptorReceive2() throws Exception {
testDatagramSocketAdaptorReceive(60_1000);
}

/**
* Pipe read/write, no blocking.
*/
@@ -441,7 +549,6 @@ public void testPipeReadInterrupt() throws Exception {
}
});
}


/**
* Pipe.SinkChannel close while lightweight thread blocked in write.
@@ -482,7 +589,6 @@ public void testPipeWriteInterrupt() throws Exception {
}
});
}


// -- supporting classes --

@@ -559,7 +665,6 @@ static void schedule(Thread thread, long delay) {
new Thread(new ScheduledInterrupter(thread, delay)).start();
}
}


/**
* Establish a connection to a socket address after a delay
@@ -678,4 +783,4 @@ static void schedule(DatagramChannel dc, ByteBuffer buf,
}
}

}
}

0 comments on commit 0fd9645

Please sign in to comment.
You can’t perform that action at this time.