Skip to content

Commit ea26ff1

Browse files
c-clearydfuch
authored andcommitted
8247614: java/nio/channels/DatagramChannel/Connect.java timed out
Reviewed-by: dfuchs, alanb
1 parent 38574d5 commit ea26ff1

File tree

1 file changed

+75
-69
lines changed

1 file changed

+75
-69
lines changed
Lines changed: 75 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2001, 2018, Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2001, 2020, Oracle and/or its affiliates. All rights reserved.
33
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44
*
55
* This code is free software; you can redistribute it and/or modify it
@@ -24,15 +24,18 @@
2424
/* @test
2525
* @bug 4313882 7183800
2626
* @summary Test DatagramChannel's send and receive methods
27-
* @author Mike McCloskey
2827
*/
2928

3029
import java.io.*;
3130
import java.net.*;
3231
import java.nio.*;
3332
import java.nio.channels.*;
3433
import java.nio.charset.*;
35-
34+
import java.util.concurrent.ExecutorService;
35+
import java.util.concurrent.Executors;
36+
import java.util.concurrent.CompletableFuture;
37+
import java.util.concurrent.CompletionException;
38+
import java.util.stream.Stream;
3639

3740
public class Connect {
3841

@@ -43,124 +46,127 @@ public static void main(String[] args) throws Exception {
4346
}
4447

4548
static void test() throws Exception {
46-
Reactor r = new Reactor();
47-
Actor a = new Actor(r.port());
48-
invoke(a, r);
49+
ExecutorService threadPool = Executors.newCachedThreadPool();
50+
try (Reactor r = new Reactor();
51+
Actor a = new Actor(r.getSocketAddress())
52+
) {
53+
invoke(threadPool, a, r);
54+
} finally {
55+
threadPool.shutdown();
56+
}
4957
}
5058

51-
static void invoke(Sprintable reader, Sprintable writer) throws Exception {
52-
53-
Thread writerThread = new Thread(writer);
54-
writerThread.start();
55-
56-
Thread readerThread = new Thread(reader);
57-
readerThread.start();
58-
59-
writerThread.join();
60-
readerThread.join();
61-
62-
reader.throwException();
63-
writer.throwException();
59+
static void invoke(ExecutorService e, Runnable reader, Runnable writer) throws CompletionException {
60+
CompletableFuture<Void> f1 = CompletableFuture.runAsync(writer, e);
61+
CompletableFuture<Void> f2 = CompletableFuture.runAsync(reader, e);
62+
wait(f1, f2);
6463
}
6564

66-
public interface Sprintable extends Runnable {
67-
public void throwException() throws Exception;
68-
}
6965

70-
public static class Actor implements Sprintable {
71-
final int port;
72-
Exception e = null;
66+
// This method waits for either one of the given futures to complete exceptionally
67+
// or for all of the given futures to complete successfully.
68+
private static void wait(CompletableFuture<?>... futures) throws CompletionException {
69+
CompletableFuture<?> future = CompletableFuture.allOf(futures);
70+
Stream.of(futures)
71+
.forEach(f -> f.exceptionally(ex -> {
72+
future.completeExceptionally(ex);
73+
return null;
74+
}));
75+
future.join();
76+
}
7377

74-
Actor(int port) {
75-
this.port = port;
76-
}
78+
public static class Actor implements AutoCloseable, Runnable {
79+
final SocketAddress socketAddress;
80+
final DatagramChannel dc;
7781

78-
public void throwException() throws Exception {
79-
if (e != null)
80-
throw e;
82+
Actor(SocketAddress socketAddress) throws IOException {
83+
this.socketAddress = socketAddress;
84+
dc = DatagramChannel.open();
8185
}
8286

8387
public void run() {
8488
try {
85-
DatagramChannel dc = DatagramChannel.open();
86-
87-
// Send a message
8889
ByteBuffer bb = ByteBuffer.allocateDirect(256);
8990
bb.put("hello".getBytes());
9091
bb.flip();
91-
InetAddress address = InetAddress.getLocalHost();
92-
if (address.isLoopbackAddress()) {
93-
address = InetAddress.getLoopbackAddress();
94-
}
95-
InetSocketAddress isa = new InetSocketAddress(address, port);
96-
dc.connect(isa);
92+
dc.connect(socketAddress);
93+
94+
// Send a message
95+
log.println("Actor attempting to write to Reactor at " + socketAddress.toString());
9796
dc.write(bb);
9897

9998
// Try to send to some other address
100-
address = InetAddress.getLocalHost();
101-
InetSocketAddress bogus = new InetSocketAddress(address, 3333);
10299
try {
103-
dc.send(bb, bogus);
104-
throw new RuntimeException("Allowed bogus send while connected");
100+
int port = dc.socket().getLocalPort();
101+
InetAddress loopback = InetAddress.getLoopbackAddress();
102+
InetSocketAddress otherAddress = new InetSocketAddress(loopback, (port == 3333 ? 3332 : 3333));
103+
log.println("Testing if Actor throws AlreadyConnectedException" + otherAddress.toString());
104+
dc.send(bb, otherAddress);
105+
throw new RuntimeException("Actor allowed send to other address while already connected");
105106
} catch (AlreadyConnectedException ace) {
106107
// Correct behavior
107108
}
108109

109110
// Read a reply
110111
bb.flip();
112+
log.println("Actor waiting to read");
111113
dc.read(bb);
112114
bb.flip();
113-
CharBuffer cb = Charset.forName("US-ASCII").
114-
newDecoder().decode(bb);
115-
log.println("From Reactor: "+isa+ " said " +cb);
116-
117-
// Clean up
118-
dc.disconnect();
119-
dc.close();
115+
CharBuffer cb = StandardCharsets.US_ASCII.
116+
newDecoder().decode(bb);
117+
log.println("Actor received from Reactor at " + socketAddress + ": " + cb);
120118
} catch (Exception ex) {
121-
e = ex;
119+
log.println("Actor threw exception: " + ex);
120+
throw new RuntimeException(ex);
121+
} finally {
122+
log.println("Actor finished");
122123
}
123124
}
125+
126+
@Override
127+
public void close() throws IOException {
128+
dc.close();
129+
}
124130
}
125131

126-
public static class Reactor implements Sprintable {
132+
public static class Reactor implements AutoCloseable, Runnable {
127133
final DatagramChannel dc;
128-
Exception e = null;
129134

130135
Reactor() throws IOException {
131136
dc = DatagramChannel.open().bind(new InetSocketAddress(0));
132137
}
133138

134-
int port() {
135-
return dc.socket().getLocalPort();
136-
}
137-
138-
public void throwException() throws Exception {
139-
if (e != null)
140-
throw e;
139+
SocketAddress getSocketAddress() throws IOException {
140+
return dc.getLocalAddress();
141141
}
142142

143143
public void run() {
144144
try {
145145
// Listen for a message
146146
ByteBuffer bb = ByteBuffer.allocateDirect(100);
147+
log.println("Reactor waiting to receive");
147148
SocketAddress sa = dc.receive(bb);
148149
bb.flip();
149-
CharBuffer cb = Charset.forName("US-ASCII").
150-
newDecoder().decode(bb);
151-
log.println("From Actor: "+sa+ " said " +cb);
150+
CharBuffer cb = StandardCharsets.US_ASCII.
151+
newDecoder().decode(bb);
152+
log.println("Reactor received from Actor at" + sa + ": " + cb);
152153

153154
// Reply to sender
154155
dc.connect(sa);
155156
bb.flip();
157+
log.println("Reactor attempting to write: " + dc.getRemoteAddress().toString());
156158
dc.write(bb);
157-
158-
// Clean up
159-
dc.disconnect();
160-
dc.close();
161159
} catch (Exception ex) {
162-
e = ex;
160+
log.println("Reactor threw exception: " + ex);
161+
throw new RuntimeException(ex);
162+
} finally {
163+
log.println("Reactor finished");
163164
}
164165
}
166+
167+
@Override
168+
public void close() throws IOException {
169+
dc.close();
170+
}
165171
}
166172
}

0 commit comments

Comments
 (0)