Skip to content
Permalink
Browse files

8234888: EventStream::close doesn't abort streaming thread

Reviewed-by: mgronlun, mseledtsov
  • Loading branch information
Erik Gahlin
Erik Gahlin committed Nov 28, 2019
1 parent 09d0150 commit f0f09579349a3618769afbf9a66fcc0844e8b5c1
@@ -48,7 +48,7 @@
* an event stream.
*/
abstract class AbstractEventStream implements EventStream {
private final static AtomicLong counter = new AtomicLong(1);
private final static AtomicLong counter = new AtomicLong(0);

private final Object terminated = new Object();
private final Runnable flushOperation = () -> dispatcher().runFlushActions();
@@ -106,6 +106,7 @@ private boolean is(int flags) {

private Runnable flushOperation;
private ParserConfiguration configuration;
private volatile boolean closed;

public ChunkParser(RecordingInput input) throws IOException {
this(input, new ParserConfiguration());
@@ -284,6 +285,9 @@ private boolean awaitUpdatedHeader(long absoluteChunkEnd, long filterEnd) throws
Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Waiting for more data (streaming). Read so far: " + chunkHeader.getChunkSize() + " bytes");
}
while (true) {
if (closed) {
return true;
}
if (chunkHeader.getLastNanos() > filterEnd) {
return true;
}
@@ -455,4 +459,9 @@ public boolean isFinalChunk() {
return chunkHeader.isFinalChunk();
}

public void close() {
this.closed = true;
Utils.notifyFlush();
}

}
@@ -69,6 +69,9 @@ public void close() {
setClosed(true);
dispatcher().runCloseActions();
repositoryFiles.close();
if (currentParser != null) {
currentParser.close();
}
}

@Override
@@ -25,11 +25,15 @@

package jdk.jfr.api.consumer.recordingstream;

import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import jdk.jfr.Event;
import jdk.jfr.Recording;
import jdk.jfr.consumer.EventStream;
import jdk.jfr.consumer.RecordingStream;

/**
@@ -51,6 +55,7 @@ public static void main(String... args) throws Exception {
testCloseTwice();
testCloseStreaming();
testCloseMySelf();
testCloseNoEvents();
}

private static void testCloseMySelf() throws Exception {
@@ -122,6 +127,26 @@ private static void testCloseTwice() {
log("Leaving testCloseTwice()");
}

private static void testCloseNoEvents() throws Exception {
try (Recording r = new Recording()) {
r.start();
CountDownLatch finished = new CountDownLatch(2);
AtomicReference<Thread> streamingThread = new AtomicReference<>();
try (EventStream es = EventStream.openRepository()) {
es.setStartTime(Instant.EPOCH);
es.onFlush( () -> {
streamingThread.set(Thread.currentThread());
finished.countDown();;
});
es.startAsync();
finished.await();
} // <- EventStream::close should terminate thread
while (streamingThread.get().isAlive()) {
Thread.sleep(10);
}
}
}

private static void log(String msg) {
System.out.println(msg);
}

0 comments on commit f0f0957

Please sign in to comment.
You can’t perform that action at this time.