Skip to content
Permalink
Browse files
8247614: java/nio/channels/DatagramChannel/Connect.java timed out
Reviewed-by: dfuchs, alanb
  • Loading branch information
c-cleary authored and dfuch committed Oct 29, 2020
1 parent 38574d5 commit ea26ff1142b6b7413f01d83375ff3be8a3ab3e74
Showing 1 changed file with 75 additions and 69 deletions.
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2001, 2018, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2001, 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -24,15 +24,18 @@
/* @test
* @bug 4313882 7183800
* @summary Test DatagramChannel's send and receive methods
* @author Mike McCloskey
*/

import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.*;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.stream.Stream;

public class Connect {

@@ -43,124 +46,127 @@ public static void main(String[] args) throws Exception {
}

static void test() throws Exception {
Reactor r = new Reactor();
Actor a = new Actor(r.port());
invoke(a, r);
ExecutorService threadPool = Executors.newCachedThreadPool();
try (Reactor r = new Reactor();
Actor a = new Actor(r.getSocketAddress())
) {
invoke(threadPool, a, r);
} finally {
threadPool.shutdown();
}
}

static void invoke(Sprintable reader, Sprintable writer) throws Exception {

Thread writerThread = new Thread(writer);
writerThread.start();

Thread readerThread = new Thread(reader);
readerThread.start();

writerThread.join();
readerThread.join();

reader.throwException();
writer.throwException();
static void invoke(ExecutorService e, Runnable reader, Runnable writer) throws CompletionException {
CompletableFuture<Void> f1 = CompletableFuture.runAsync(writer, e);
CompletableFuture<Void> f2 = CompletableFuture.runAsync(reader, e);
wait(f1, f2);
}

public interface Sprintable extends Runnable {
public void throwException() throws Exception;
}

public static class Actor implements Sprintable {
final int port;
Exception e = null;
// This method waits for either one of the given futures to complete exceptionally
// or for all of the given futures to complete successfully.
private static void wait(CompletableFuture<?>... futures) throws CompletionException {
CompletableFuture<?> future = CompletableFuture.allOf(futures);
Stream.of(futures)
.forEach(f -> f.exceptionally(ex -> {
future.completeExceptionally(ex);
return null;
}));
future.join();
}

Actor(int port) {
this.port = port;
}
public static class Actor implements AutoCloseable, Runnable {
final SocketAddress socketAddress;
final DatagramChannel dc;

public void throwException() throws Exception {
if (e != null)
throw e;
Actor(SocketAddress socketAddress) throws IOException {
this.socketAddress = socketAddress;
dc = DatagramChannel.open();
}

public void run() {
try {
DatagramChannel dc = DatagramChannel.open();

// Send a message
ByteBuffer bb = ByteBuffer.allocateDirect(256);
bb.put("hello".getBytes());
bb.flip();
InetAddress address = InetAddress.getLocalHost();
if (address.isLoopbackAddress()) {
address = InetAddress.getLoopbackAddress();
}
InetSocketAddress isa = new InetSocketAddress(address, port);
dc.connect(isa);
dc.connect(socketAddress);

// Send a message
log.println("Actor attempting to write to Reactor at " + socketAddress.toString());
dc.write(bb);

// Try to send to some other address
address = InetAddress.getLocalHost();
InetSocketAddress bogus = new InetSocketAddress(address, 3333);
try {
dc.send(bb, bogus);
throw new RuntimeException("Allowed bogus send while connected");
int port = dc.socket().getLocalPort();
InetAddress loopback = InetAddress.getLoopbackAddress();
InetSocketAddress otherAddress = new InetSocketAddress(loopback, (port == 3333 ? 3332 : 3333));
log.println("Testing if Actor throws AlreadyConnectedException" + otherAddress.toString());
dc.send(bb, otherAddress);
throw new RuntimeException("Actor allowed send to other address while already connected");
} catch (AlreadyConnectedException ace) {
// Correct behavior
}

// Read a reply
bb.flip();
log.println("Actor waiting to read");
dc.read(bb);
bb.flip();
CharBuffer cb = Charset.forName("US-ASCII").
newDecoder().decode(bb);
log.println("From Reactor: "+isa+ " said " +cb);

// Clean up
dc.disconnect();
dc.close();
CharBuffer cb = StandardCharsets.US_ASCII.
newDecoder().decode(bb);
log.println("Actor received from Reactor at " + socketAddress + ": " + cb);
} catch (Exception ex) {
e = ex;
log.println("Actor threw exception: " + ex);
throw new RuntimeException(ex);
} finally {
log.println("Actor finished");
}
}

@Override
public void close() throws IOException {
dc.close();
}
}

public static class Reactor implements Sprintable {
public static class Reactor implements AutoCloseable, Runnable {
final DatagramChannel dc;
Exception e = null;

Reactor() throws IOException {
dc = DatagramChannel.open().bind(new InetSocketAddress(0));
}

int port() {
return dc.socket().getLocalPort();
}

public void throwException() throws Exception {
if (e != null)
throw e;
SocketAddress getSocketAddress() throws IOException {
return dc.getLocalAddress();
}

public void run() {
try {
// Listen for a message
ByteBuffer bb = ByteBuffer.allocateDirect(100);
log.println("Reactor waiting to receive");
SocketAddress sa = dc.receive(bb);
bb.flip();
CharBuffer cb = Charset.forName("US-ASCII").
newDecoder().decode(bb);
log.println("From Actor: "+sa+ " said " +cb);
CharBuffer cb = StandardCharsets.US_ASCII.
newDecoder().decode(bb);
log.println("Reactor received from Actor at" + sa + ": " + cb);

// Reply to sender
dc.connect(sa);
bb.flip();
log.println("Reactor attempting to write: " + dc.getRemoteAddress().toString());
dc.write(bb);

// Clean up
dc.disconnect();
dc.close();
} catch (Exception ex) {
e = ex;
log.println("Reactor threw exception: " + ex);
throw new RuntimeException(ex);
} finally {
log.println("Reactor finished");
}
}

@Override
public void close() throws IOException {
dc.close();
}
}
}

3 comments on commit ea26ff1

@bridgekeeper
Copy link

@bridgekeeper bridgekeeper bot commented on ea26ff1 Oct 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@GoeLin
Copy link
Member

@GoeLin GoeLin commented on ea26ff1 Mar 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/backport jdk11u-dev

@openjdk
Copy link

@openjdk openjdk bot commented on ea26ff1 Mar 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@GoeLin the backport was successfully created on the branch GoeLin-backport-ea26ff11 in my personal fork of openjdk/jdk11u-dev. To create a pull request with this backport targeting openjdk/jdk11u-dev:master, just click the following link:

➡️ Create pull request

The title of the pull request is automatically filled in correctly and below you find a suggestion for the pull request body:

Hi all,

This pull request contains a backport of commit ea26ff11 from the openjdk/jdk repository.

The commit being backported was authored by Conor Cleary on 29 Oct 2020 and was reviewed by Daniel Fuchs and Alan Bateman.

Thanks!

If you need to update the source branch of the pull then run the following commands in a local clone of your personal fork of openjdk/jdk11u-dev:

$ git fetch https://github.com/openjdk-bots/jdk11u-dev GoeLin-backport-ea26ff11:GoeLin-backport-ea26ff11
$ git checkout GoeLin-backport-ea26ff11
# make changes
$ git add paths/to/changed/files
$ git commit --message 'Describe additional changes made'
$ git push https://github.com/openjdk-bots/jdk11u-dev GoeLin-backport-ea26ff11

Please sign in to comment.