Skip to content

Commit

Permalink
8310978: JFR events SocketReadEvent/SocketWriteEvent for Socket adapt…
Browse files Browse the repository at this point in the history
…or ops

Reviewed-by: dfuchs, alanb
  • Loading branch information
Tim Prinzing authored and Alan Bateman committed Oct 30, 2023
1 parent 988e1df commit 1183b22
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,23 @@ public static long timestamp() {
* timestamp and the given start time. If the duration is meets
* or exceeds the configured value (determined by calling the generated method
* {@link #shouldCommit(long)}), an event will be emitted by calling
* {@link #emit(long, long, long, SocketAddress, long)}
*
* @param start the start time
* @param nbytes how many bytes were transferred
* @param remote the address of the remote socket
* @param timeout maximum time to wait
*/
public static void offer(long start, long nbytes, SocketAddress remote, long timeout) {
long duration = timestamp() - start;
if (shouldCommit(duration)) {
emit(start, duration, nbytes, remote, timeout);
}
}

/**
* Helper method to perform a common task of getting event data ready and
* then emitting the event by calling
* {@link #commit(long, long, String, String, int, long, long, boolean)}.
*
* @param start the start time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,22 @@ public static long timestamp() {
* timestamp and the given start time. If the duration is meets
* or exceeds the configured value (determined by calling the generated method
* {@link #shouldCommit(long)}), an event will be emitted by calling
* {@link #emit(long, long, long, SocketAddress)}.
*
* @param start the start time
* @param bytesWritten how many bytes were sent
* @param remote the address of the remote socket being written to
*/
public static void offer(long start, long bytesWritten, SocketAddress remote) {
long duration = timestamp() - start;
if (shouldCommit(duration)) {
emit(start, duration, bytesWritten, remote);
}
}

/**
* Helper method to perform a common task of getting event data ready and
* then emitting the event by calling
* {@link #commit(long, long, String, String, int, long)}.
*
* @param start the start time
Expand Down
20 changes: 4 additions & 16 deletions src/java.base/share/classes/sun/nio/ch/SocketChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -494,10 +494,7 @@ public int read(ByteBuffer buf) throws IOException {
}
long start = SocketReadEvent.timestamp();
int nbytes = implRead(buf);
long duration = SocketReadEvent.timestamp() - start;
if (SocketReadEvent.shouldCommit(duration)) {
SocketReadEvent.emit(start, duration, nbytes, remoteAddress(), 0);
}
SocketReadEvent.offer(start, nbytes, remoteAddress(), 0);
return nbytes;
}

Expand All @@ -511,10 +508,7 @@ public long read(ByteBuffer[] dsts, int offset, int length)
}
long start = SocketReadEvent.timestamp();
long nbytes = implRead(dsts, offset, length);
long duration = SocketReadEvent.timestamp() - start;
if (SocketReadEvent.shouldCommit(duration)) {
SocketReadEvent.emit(start, duration, nbytes, remoteAddress(), 0);
}
SocketReadEvent.offer(start, nbytes, remoteAddress(), 0);
return nbytes;
}

Expand Down Expand Up @@ -625,10 +619,7 @@ public int write(ByteBuffer buf) throws IOException {
}
long start = SocketWriteEvent.timestamp();
int nbytes = implWrite(buf);
long duration = SocketWriteEvent.timestamp() - start;
if (SocketWriteEvent.shouldCommit(duration)) {
SocketWriteEvent.emit(start, duration, nbytes, remoteAddress());
}
SocketWriteEvent.offer(start, nbytes, remoteAddress());
return nbytes;
}

Expand All @@ -641,10 +632,7 @@ public long write(ByteBuffer[] srcs, int offset, int length)
}
long start = SocketWriteEvent.timestamp();
long nbytes = implWrite(srcs, offset, length);
long duration = SocketWriteEvent.timestamp() - start;
if (SocketWriteEvent.shouldCommit(duration)) {
SocketWriteEvent.emit(start, duration, nbytes, remoteAddress());
}
SocketWriteEvent.offer(start, nbytes, remoteAddress());
return nbytes;
}

Expand Down
20 changes: 16 additions & 4 deletions src/java.base/share/classes/sun/nio/ch/SocketInputStream.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2022, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand All @@ -24,6 +24,8 @@
*/
package sun.nio.ch;

import jdk.internal.event.SocketReadEvent;

import java.io.IOException;
import java.io.InputStream;
import java.util.function.IntSupplier;
Expand Down Expand Up @@ -60,9 +62,7 @@ public int read() throws IOException {
return (n > 0) ? (a[0] & 0xff) : -1;
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
int timeout = timeoutSupplier.getAsInt();
private int implRead(byte[] b, int off, int len, int timeout) throws IOException {
if (timeout > 0) {
long nanos = MILLISECONDS.toNanos(timeout);
return sc.blockingRead(b, off, len, nanos);
Expand All @@ -71,6 +71,18 @@ public int read(byte[] b, int off, int len) throws IOException {
}
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
int timeout = timeoutSupplier.getAsInt();
if (!SocketReadEvent.enabled()) {
return implRead(b, off, len, timeout);
}
long start = SocketReadEvent.timestamp();
int n = implRead(b, off, len, timeout);
SocketReadEvent.offer(start, n, sc.remoteAddress(), timeout);
return n;
}

@Override
public int available() throws IOException {
return sc.available();
Expand Down
10 changes: 9 additions & 1 deletion src/java.base/share/classes/sun/nio/ch/SocketOutputStream.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2022, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand All @@ -24,6 +24,8 @@
*/
package sun.nio.ch;

import jdk.internal.event.SocketWriteEvent;

import java.io.IOException;
import java.io.OutputStream;

Expand Down Expand Up @@ -55,7 +57,13 @@ public void write(int b) throws IOException {

@Override
public void write(byte[] b, int off, int len) throws IOException {
if (!SocketWriteEvent.enabled()) {
sc.blockingWriteFully(b, off, len);
return;
}
long start = SocketWriteEvent.timestamp();
sc.blockingWriteFully(b, off, len);
SocketWriteEvent.offer(start, len, sc.remoteAddress());
}

@Override
Expand Down
127 changes: 127 additions & 0 deletions test/jdk/jdk/jfr/event/io/TestSocketAdapterEvents.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright (c) 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/

package jdk.jfr.event.io;

import static jdk.test.lib.Asserts.assertEquals;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

import jdk.jfr.Recording;
import jdk.jfr.consumer.RecordedEvent;
import jdk.test.lib.jfr.Events;
import jdk.test.lib.thread.TestThread;
import jdk.test.lib.thread.XRun;

/**
* @test
* @bug 8310978
* @summary test socket read/write events on socket adaptors
* @key jfr
* @requires vm.hasJFR
* @library /test/lib /test/jdk
* @run main/othervm jdk.jfr.event.io.TestSocketAdapterEvents
*/
public class TestSocketAdapterEvents {
private static final int writeInt = 'A';
private static final byte[] writeBuf = { 'B', 'C', 'D', 'E' };

private List<IOEvent> expectedEvents = new ArrayList<>();

private synchronized void addExpectedEvent(IOEvent event) {
expectedEvents.add(event);
}

public static void main(String[] args) throws Throwable {
new TestSocketAdapterEvents().test();
}

public void test() throws Throwable {
try (Recording recording = new Recording()) {
try (ServerSocketChannel ssc = ServerSocketChannel.open()) {
recording.enable(IOEvent.EVENT_SOCKET_READ).withThreshold(Duration.ofMillis(0));
recording.enable(IOEvent.EVENT_SOCKET_WRITE).withThreshold(Duration.ofMillis(0));
recording.start();

InetAddress lb = InetAddress.getLoopbackAddress();
ssc.bind(new InetSocketAddress(lb, 0));

TestThread readerThread = new TestThread(new XRun() {
@Override
public void xrun() throws IOException {
byte[] bs = new byte[4];
try (SocketChannel sc = ssc.accept(); Socket s = sc.socket();
InputStream is = s.getInputStream()) {

int readInt = is.read();
assertEquals(readInt, writeInt, "Wrong readInt");
addExpectedEvent(IOEvent.createSocketReadEvent(1, s));

int bytesRead = is.read(bs, 0, 3);
assertEquals(bytesRead, 3, "Wrong bytesRead partial buffer");
addExpectedEvent(IOEvent.createSocketReadEvent(bytesRead, s));

bytesRead = is.read(bs);
assertEquals(bytesRead, writeBuf.length, "Wrong bytesRead full buffer");
addExpectedEvent(IOEvent.createSocketReadEvent(bytesRead, s));

// Try to read more, but writer have closed. Should
// get EOF.
readInt = is.read();
assertEquals(readInt, -1, "Wrong readInt at EOF");
addExpectedEvent(IOEvent.createSocketReadEvent(-1, s));
}
}
});
readerThread.start();

try (SocketChannel sc = SocketChannel.open(ssc.getLocalAddress());
Socket s = sc.socket(); OutputStream os = s.getOutputStream()) {

os.write(writeInt);
addExpectedEvent(IOEvent.createSocketWriteEvent(1, s));
os.write(writeBuf, 0, 3);
addExpectedEvent(IOEvent.createSocketWriteEvent(3, s));
os.write(writeBuf);
addExpectedEvent(IOEvent.createSocketWriteEvent(writeBuf.length, s));
}

readerThread.joinAndThrow();
recording.stop();
List<RecordedEvent> events = Events.fromRecording(recording);
IOHelper.verifyEquals(events, expectedEvents);
}
}
}
}
15 changes: 9 additions & 6 deletions test/jdk/jdk/jfr/event/io/TestSocketChannelEvents.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, 2020, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand All @@ -26,6 +26,8 @@
import static jdk.test.lib.Asserts.assertEquals;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
Expand All @@ -41,6 +43,7 @@

/**
* @test
* @summary test socket read/write events on SocketChannel
* @key jfr
* @requires vm.hasJFR
* @library /test/lib /test/jdk
Expand All @@ -62,20 +65,20 @@ public static void main(String[] args) throws Throwable {

public void test() throws Throwable {
try (Recording recording = new Recording()) {
try (ServerSocketChannel ss = ServerSocketChannel.open()) {
try (ServerSocketChannel ssc = ServerSocketChannel.open()) {
recording.enable(IOEvent.EVENT_SOCKET_READ).withThreshold(Duration.ofMillis(0));
recording.enable(IOEvent.EVENT_SOCKET_WRITE).withThreshold(Duration.ofMillis(0));
recording.start();

ss.socket().setReuseAddress(true);
ss.socket().bind(null);
InetAddress lb = InetAddress.getLoopbackAddress();
ssc.bind(new InetSocketAddress(lb, 0));

TestThread readerThread = new TestThread(new XRun() {
@Override
public void xrun() throws IOException {
ByteBuffer bufA = ByteBuffer.allocate(bufSizeA);
ByteBuffer bufB = ByteBuffer.allocate(bufSizeB);
try (SocketChannel sc = ss.accept()) {
try (SocketChannel sc = ssc.accept()) {
int readSize = sc.read(bufA);
assertEquals(readSize, bufSizeA, "Wrong readSize bufA");
addExpectedEvent(IOEvent.createSocketReadEvent(bufSizeA, sc.socket()));
Expand All @@ -98,7 +101,7 @@ public void xrun() throws IOException {
});
readerThread.start();

try (SocketChannel sc = SocketChannel.open(ss.socket().getLocalSocketAddress())) {
try (SocketChannel sc = SocketChannel.open(ssc.getLocalAddress())) {
ByteBuffer bufA = ByteBuffer.allocateDirect(bufSizeA);
ByteBuffer bufB = ByteBuffer.allocateDirect(bufSizeB);
for (int i = 0; i < bufSizeA; ++i) {
Expand Down
9 changes: 6 additions & 3 deletions test/jdk/jdk/jfr/event/io/TestSocketEvents.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, 2020, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -28,6 +28,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.time.Duration;
Expand All @@ -42,6 +44,7 @@

/**
* @test
* @summary test socket read/write events on Socket
* @key jfr
* @requires vm.hasJFR
* @library /test/lib /test/jdk
Expand Down Expand Up @@ -69,8 +72,8 @@ private void test() throws Throwable {
recording.enable(IOEvent.EVENT_SOCKET_WRITE).withThreshold(Duration.ofMillis(0));
recording.start();

ss.setReuseAddress(true);
ss.bind(null);
InetAddress lb = InetAddress.getLoopbackAddress();
ss.bind(new InetSocketAddress(lb, 0));

TestThread readerThread = new TestThread(new XRun() {
@Override
Expand Down

1 comment on commit 1183b22

@openjdk-notifier
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.