Skip to content

Commit

Permalink
8247614: java/nio/channels/DatagramChannel/Connect.java timed out
Browse files Browse the repository at this point in the history
Backport-of: ea26ff1
  • Loading branch information
GoeLin committed Apr 1, 2022
1 parent f2b92d0 commit 4a045a7
Showing 1 changed file with 75 additions and 69 deletions.
144 changes: 75 additions & 69 deletions test/jdk/java/nio/channels/DatagramChannel/Connect.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2001, 2018, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2001, 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand All @@ -24,15 +24,18 @@
/* @test
* @bug 4313882 7183800
* @summary Test DatagramChannel's send and receive methods
* @author Mike McCloskey
*/

import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.*;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.stream.Stream;

public class Connect {

Expand All @@ -43,124 +46,127 @@ public static void main(String[] args) throws Exception {
}

static void test() throws Exception {
Reactor r = new Reactor();
Actor a = new Actor(r.port());
invoke(a, r);
ExecutorService threadPool = Executors.newCachedThreadPool();
try (Reactor r = new Reactor();
Actor a = new Actor(r.getSocketAddress())
) {
invoke(threadPool, a, r);
} finally {
threadPool.shutdown();
}
}

static void invoke(Sprintable reader, Sprintable writer) throws Exception {

Thread writerThread = new Thread(writer);
writerThread.start();

Thread readerThread = new Thread(reader);
readerThread.start();

writerThread.join();
readerThread.join();

reader.throwException();
writer.throwException();
static void invoke(ExecutorService e, Runnable reader, Runnable writer) throws CompletionException {
CompletableFuture<Void> f1 = CompletableFuture.runAsync(writer, e);
CompletableFuture<Void> f2 = CompletableFuture.runAsync(reader, e);
wait(f1, f2);
}

public interface Sprintable extends Runnable {
public void throwException() throws Exception;
}

public static class Actor implements Sprintable {
final int port;
Exception e = null;
// This method waits for either one of the given futures to complete exceptionally
// or for all of the given futures to complete successfully.
private static void wait(CompletableFuture<?>... futures) throws CompletionException {
CompletableFuture<?> future = CompletableFuture.allOf(futures);
Stream.of(futures)
.forEach(f -> f.exceptionally(ex -> {
future.completeExceptionally(ex);
return null;
}));
future.join();
}

Actor(int port) {
this.port = port;
}
public static class Actor implements AutoCloseable, Runnable {
final SocketAddress socketAddress;
final DatagramChannel dc;

public void throwException() throws Exception {
if (e != null)
throw e;
Actor(SocketAddress socketAddress) throws IOException {
this.socketAddress = socketAddress;
dc = DatagramChannel.open();
}

public void run() {
try {
DatagramChannel dc = DatagramChannel.open();

// Send a message
ByteBuffer bb = ByteBuffer.allocateDirect(256);
bb.put("hello".getBytes());
bb.flip();
InetAddress address = InetAddress.getLocalHost();
if (address.isLoopbackAddress()) {
address = InetAddress.getLoopbackAddress();
}
InetSocketAddress isa = new InetSocketAddress(address, port);
dc.connect(isa);
dc.connect(socketAddress);

// Send a message
log.println("Actor attempting to write to Reactor at " + socketAddress.toString());
dc.write(bb);

// Try to send to some other address
address = InetAddress.getLocalHost();
InetSocketAddress bogus = new InetSocketAddress(address, 3333);
try {
dc.send(bb, bogus);
throw new RuntimeException("Allowed bogus send while connected");
int port = dc.socket().getLocalPort();
InetAddress loopback = InetAddress.getLoopbackAddress();
InetSocketAddress otherAddress = new InetSocketAddress(loopback, (port == 3333 ? 3332 : 3333));
log.println("Testing if Actor throws AlreadyConnectedException" + otherAddress.toString());
dc.send(bb, otherAddress);
throw new RuntimeException("Actor allowed send to other address while already connected");
} catch (AlreadyConnectedException ace) {
// Correct behavior
}

// Read a reply
bb.flip();
log.println("Actor waiting to read");
dc.read(bb);
bb.flip();
CharBuffer cb = Charset.forName("US-ASCII").
newDecoder().decode(bb);
log.println("From Reactor: "+isa+ " said " +cb);

// Clean up
dc.disconnect();
dc.close();
CharBuffer cb = StandardCharsets.US_ASCII.
newDecoder().decode(bb);
log.println("Actor received from Reactor at " + socketAddress + ": " + cb);
} catch (Exception ex) {
e = ex;
log.println("Actor threw exception: " + ex);
throw new RuntimeException(ex);
} finally {
log.println("Actor finished");
}
}

@Override
public void close() throws IOException {
dc.close();
}
}

public static class Reactor implements Sprintable {
public static class Reactor implements AutoCloseable, Runnable {
final DatagramChannel dc;
Exception e = null;

Reactor() throws IOException {
dc = DatagramChannel.open().bind(new InetSocketAddress(0));
}

int port() {
return dc.socket().getLocalPort();
}

public void throwException() throws Exception {
if (e != null)
throw e;
SocketAddress getSocketAddress() throws IOException {
return dc.getLocalAddress();
}

public void run() {
try {
// Listen for a message
ByteBuffer bb = ByteBuffer.allocateDirect(100);
log.println("Reactor waiting to receive");
SocketAddress sa = dc.receive(bb);
bb.flip();
CharBuffer cb = Charset.forName("US-ASCII").
newDecoder().decode(bb);
log.println("From Actor: "+sa+ " said " +cb);
CharBuffer cb = StandardCharsets.US_ASCII.
newDecoder().decode(bb);
log.println("Reactor received from Actor at" + sa + ": " + cb);

// Reply to sender
dc.connect(sa);
bb.flip();
log.println("Reactor attempting to write: " + dc.getRemoteAddress().toString());
dc.write(bb);

// Clean up
dc.disconnect();
dc.close();
} catch (Exception ex) {
e = ex;
log.println("Reactor threw exception: " + ex);
throw new RuntimeException(ex);
} finally {
log.println("Reactor finished");
}
}

@Override
public void close() throws IOException {
dc.close();
}
}
}

1 comment on commit 4a045a7

@openjdk-notifier
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.