Skip to content

Commit

Permalink
8295350: JFR: Add stop methods for recording streams
Browse files Browse the repository at this point in the history
Reviewed-by: mgronlun
  • Loading branch information
egahlin committed Dec 1, 2022
1 parent 9430f3e commit eec24aa
Show file tree
Hide file tree
Showing 12 changed files with 643 additions and 5 deletions.
38 changes: 38 additions & 0 deletions src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import jdk.jfr.internal.SecuritySupport;
import jdk.jfr.internal.Utils;
import jdk.jfr.internal.consumer.EventDirectoryStream;
import jdk.jfr.internal.management.StreamBarrier;

/**
* A recording stream produces events from the current JVM (Java Virtual
Expand Down Expand Up @@ -380,6 +381,43 @@ public void startAsync() {
directoryStream.startAsync(startNanos);
}

/**
* Stops the recording stream.
* <p>
* Stops a started stream and waits until all events in the recording have
* been consumed.
* <p>
* Invoking this method in an action, for example in the
* {@link #onEvent(Consumer)} method, could block the stream indefinitely.
* To stop the stream abruptly, use the {@link #close} method.
* <p>
* The following code snippet illustrates how this method can be used in
* conjunction with the {@link #startAsync()} method to monitor what happens
* during a test method:
* <p>
* {@snippet class="Snippets" region="RecordingStreamStop"}
*
* @return {@code true} if recording is stopped, {@code false} otherwise
*
* @throws IllegalStateException if the recording is not started or is already stopped
*
* @since 20
*/
public boolean stop() {
boolean stopped = false;
try {
try (StreamBarrier sb = directoryStream.activateStreamBarrier()) {
stopped = recording.stop();
directoryStream.setCloseOnComplete(false);
sb.setStreamEnd(recording.getStopTime().toEpochMilli());
}
directoryStream.awaitTermination();
} catch (InterruptedException | IOException e) {
// OK, return
}
return stopped;
}

/**
* Writes recording data to a file.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import jdk.jfr.consumer.EventStream;
import jdk.jfr.consumer.RecordingFile;
Expand Down Expand Up @@ -166,4 +167,26 @@ void RecordingStreamStartAsync() throws Exception {
}
// @end
}

void RecordingStreamStop() throws Exception {
// @start region="RecordingStreamStop"
AtomicBoolean socketUse = new AtomicBoolean();
try (var r = new RecordingStream()) {
r.setMaxSize(Long.MAX_VALUE);
r.enable("jdk.SocketWrite").withoutThreshold();
r.enable("jdk.SocketRead").withoutThreshold();
r.onEvent(event -> socketUse.set(true));
r.startAsync();
testFoo();
r.stop();
if (socketUse.get()) {
r.dump(Path.of("socket-events.jfr"));
throw new AssertionError("testFoo() should not use network");
}
}
// @end
}

void testFoo() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,11 @@ public abstract class AbstractEventStream implements EventStream {
private final StreamConfiguration streamConfiguration = new StreamConfiguration();
private final List<Configuration> configurations;
private final ParserState parserState = new ParserState();
private volatile boolean closeOnComplete = true;
private Dispatcher dispatcher;
private boolean daemon = false;


AbstractEventStream(@SuppressWarnings("removal") AccessControlContext acc, List<Configuration> configurations) throws IOException {
this.accessControllerContext = Objects.requireNonNull(acc);
this.configurations = configurations;
Expand Down Expand Up @@ -107,6 +109,13 @@ public final void setDaemon(boolean daemon) {
this.daemon = daemon;
}

// When set to false, it becomes the callers responsibility
// to invoke close() and clean up resources. By default,
// the resource is cleaned up when the process() call has finished.
public final void setCloseOnComplete(boolean closeOnComplete) {
this.closeOnComplete = closeOnComplete;
}

@Override
public final void setStartTime(Instant startTime) {
Objects.requireNonNull(startTime, "startTime");
Expand Down Expand Up @@ -258,7 +267,9 @@ private void execute() {
} finally {
Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended.");
try {
close();
if (closeOnComplete) {
close();
}
} finally {
terminated.countDown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import jdk.jfr.internal.PlatformRecording;
import jdk.jfr.internal.SecuritySupport;
import jdk.jfr.internal.Utils;
import jdk.jfr.internal.management.StreamBarrier;

/**
* Implementation of an {@code EventStream}} that operates against a directory
Expand All @@ -54,11 +55,11 @@ public final class EventDirectoryStream extends AbstractEventStream {
private final RepositoryFiles repositoryFiles;
private final FileAccess fileAccess;
private final PlatformRecording recording;
private final StreamBarrier barrier = new StreamBarrier();
private ChunkParser currentParser;
private long currentChunkStartNanos;
private RecordedEvent[] sortedCache;
private int threadExclusionLevel = 0;

private volatile Consumer<Long> onCompleteHandler;

public EventDirectoryStream(
Expand Down Expand Up @@ -150,7 +151,6 @@ protected void processRecursionSafe() throws IOException {
long segmentStart = currentParser.getStartNanos() + currentParser.getChunkDuration();
long filterStart = validStartTime ? disp.startNanos : segmentStart;
long filterEnd = disp.endTime != null ? disp.endNanos : Long.MAX_VALUE;

while (!isClosed()) {
onMetadata(currentParser);
while (!isClosed() && !currentParser.isChunkFinished()) {
Expand All @@ -166,8 +166,14 @@ protected void processRecursionSafe() throws IOException {
processUnordered(disp);
}
currentParser.resetCache();
if (currentParser.getStartNanos() + currentParser.getChunkDuration() > filterEnd) {
close();
barrier.check(); // block if recording is being stopped
long endNanos = currentParser.getStartNanos() + currentParser.getChunkDuration();
// same conversion as in RecordingInfo
long endMillis = Instant.ofEpochSecond(0, endNanos).toEpochMilli();
if (barrier.getStreamEnd() <= endMillis) {
return;
}
if (endNanos > filterEnd) {
return;
}
}
Expand Down Expand Up @@ -205,6 +211,7 @@ protected void processRecursionSafe() throws IOException {
}
}


private boolean isLastChunk() {
if (!isRecording()) {
return false;
Expand Down Expand Up @@ -259,4 +266,9 @@ private boolean processUnordered(Dispatcher c) throws IOException {
c.dispatch(e);
}
}

public StreamBarrier activateStreamBarrier() {
barrier.activate();
return barrier;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import jdk.jfr.internal.SecuritySupport.SafePath;
import jdk.jfr.internal.Utils;
import jdk.jfr.internal.WriteableUserPath;
import jdk.jfr.internal.consumer.AbstractEventStream;
import jdk.jfr.internal.consumer.EventDirectoryStream;
import jdk.jfr.internal.consumer.FileAccess;
import jdk.jfr.internal.instrument.JDKEvents;
Expand Down Expand Up @@ -178,4 +179,18 @@ public static EventStream newEventDirectoryStream(
false
);
}

// An EventStream is passive, so a stop() method doesn't fit well in the API.
// RemoteRecordingStream::stop() implementation need to prevent stream
// from being closed, so this method is needed
public static void setCloseOnComplete(EventStream stream, boolean closeOnComplete) {
AbstractEventStream aes = (AbstractEventStream) stream;
aes.setCloseOnComplete(closeOnComplete);
}

// Internal method needed to block parser
public static StreamBarrier activateStreamBarrier(EventStream stream) {
EventDirectoryStream aes = (EventDirectoryStream) stream;
return aes.activateStreamBarrier();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright (c) 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.jfr.internal.management;

import java.io.Closeable;
import java.io.IOException;

/**
* Purpose of this class is to provide a synchronization point when stopping a
* recording. Without it, a race can happen where a stream advances beyond the
* last chunk of the recording.
*
* Code that is processing the stream calls check() and Unless the recording is
* in the process of being stopped, it will just return. On the other hand, if
* the recording is stopping, the thread waits and when it wakes up an end
* position should have been set (last chunk position) beyond which the stream
* processing should not continue.
*/
public final class StreamBarrier implements Closeable {

private boolean activated = false;
private long end = Long.MAX_VALUE;

// Blocks thread until barrier is deactivated
public synchronized void check() {
while (activated) {
try {
this.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

public synchronized void setStreamEnd(long timestamp) {
end = timestamp;
}

public synchronized long getStreamEnd() {
return end;
}

public synchronized void activate() {
activated = true;
}

@Override
public synchronized void close() throws IOException {
activated = false;
this.notifyAll();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

import jdk.jfr.internal.management.ChunkFilename;
import jdk.jfr.internal.management.ManagementSupport;
import jdk.jfr.internal.management.StreamBarrier;

final class DiskRepository implements Closeable {

Expand Down Expand Up @@ -126,6 +127,7 @@ public State next() {
private final ByteBuffer buffer = ByteBuffer.allocate(256);
private final Path directory;
private final ChunkFilename chunkFilename;
private final StreamBarrier barrier = new StreamBarrier();

private RandomAccessFile raf;
private RandomAccessFile previousRAF;
Expand Down Expand Up @@ -153,6 +155,7 @@ public DiskRepository(Path path, boolean deleteDirectory) throws IOException {
}

public synchronized void write(byte[] bytes) throws IOException {
barrier.check();
index = 0;
lastFlush = 0;
currentByteArray = bytes;
Expand Down Expand Up @@ -345,6 +348,10 @@ private void writeCheckpointHeader() throws IOException {
long endTimeNanos = currentChunk.startTimeNanos + durationNanos;
currentChunk.endTimeNanos = endTimeNanos;
currentChunk.endTime = ManagementSupport.epochNanosToInstant(endTimeNanos);
if (currentChunk.endTime.toEpochMilli() == barrier.getStreamEnd()) {
// Recording has been stopped, need to complete last chunk
completePrevious(currentChunk);
}
}
raf.seek(position);
}
Expand Down Expand Up @@ -512,4 +519,9 @@ public synchronized FileDump newDump(long endTime) {
}
return fd;
}

public StreamBarrier activateStreamBarrier() {
barrier.activate();
return barrier;
}
}

1 comment on commit eec24aa

@openjdk-notifier
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.