Skip to content
Permalink
Browse files

8257906: JFR: RecordingStream leaks memory

Reviewed-by: mgronlun
  • Loading branch information
egahlin committed Dec 16, 2020
1 parent 0c8cc2c commit 3c6648501589bf36945340cb1e82c833ebd7485d
@@ -45,7 +45,6 @@
import jdk.jfr.internal.SecuritySupport;
import jdk.jfr.internal.Utils;
import jdk.jfr.internal.consumer.EventDirectoryStream;
import jdk.jfr.internal.consumer.JdkJfrConsumer;

/**
* A recording stream produces events from the current JVM (Java Virtual
@@ -68,9 +67,27 @@
*/
public final class RecordingStream implements AutoCloseable, EventStream {

final static class ChunkConsumer implements Consumer<Long> {

private final Recording recording;

ChunkConsumer(Recording recording) {
this.recording = recording;
}

@Override
public void accept(Long endNanos) {
Instant t = Utils.epochNanosToInstant(endNanos);
PlatformRecording p = PrivateAccess.getInstance().getPlatformRecording(recording);
p.removeBefore(t);
}
}

private final Recording recording;
private final Instant creationTime;
private final EventDirectoryStream directoryStream;
private long maxSize;
private Duration maxAge;

/**
* Creates an event stream for the current JVM (Java Virtual Machine).
@@ -247,7 +264,11 @@ public EventSettings disable(Class<? extends Event> eventClass) {
* state
*/
public void setMaxAge(Duration maxAge) {
recording.setMaxAge(maxAge);
synchronized (directoryStream) {
recording.setMaxAge(maxAge);
this.maxAge = maxAge;
updateOnCompleteHandler();
}
}

/**
@@ -270,7 +291,11 @@ public void setMaxAge(Duration maxAge) {
* @throws IllegalStateException if the recording is in {@code CLOSED} state
*/
public void setMaxSize(long maxSize) {
recording.setMaxSize(maxSize);
synchronized (directoryStream) {
recording.setMaxSize(maxSize);
this.maxSize = maxSize;
updateOnCompleteHandler();
}
}

@Override
@@ -320,6 +345,7 @@ public void onError(Consumer<Throwable> action) {

@Override
public void close() {
directoryStream.setChunkCompleteHandler(null);
recording.close();
directoryStream.close();
}
@@ -333,6 +359,7 @@ public boolean remove(Object action) {
public void start() {
PlatformRecording pr = PrivateAccess.getInstance().getPlatformRecording(recording);
long startNanos = pr.start();
updateOnCompleteHandler();
directoryStream.start(startNanos);
}

@@ -363,6 +390,7 @@ public void start() {
public void startAsync() {
PlatformRecording pr = PrivateAccess.getInstance().getPlatformRecording(recording);
long startNanos = pr.start();
updateOnCompleteHandler();
directoryStream.startAsync(startNanos);
}

@@ -380,4 +408,13 @@ public void awaitTermination() throws InterruptedException {
public void onMetadata(Consumer<MetadataEvent> action) {
directoryStream.onMetadata(action);
}

private void updateOnCompleteHandler() {
if (maxAge != null || maxSize != 0) {
// User has set a chunk removal policy
directoryStream.setChunkCompleteHandler(null);
} else {
directoryStream.setChunkCompleteHandler(new ChunkConsumer(recording));
}
}
}
@@ -488,4 +488,16 @@ public void setStaleMetadata(boolean stale) {
public boolean hasStaleMetadata() {
return staleMetadata;
}

public void resetCache() {
LongMap<Parser> ps = this.parsers;
if (ps != null) {
ps.forEach(p -> {
if (p instanceof EventParser) {
EventParser ep = (EventParser) p;
ep.resetCache();
}
});
}
}
}
@@ -159,6 +159,7 @@ protected void processRecursionSafe() throws IOException {
} else {
processUnordered(disp);
}
currentParser.resetCache();
if (currentParser.getStartNanos() + currentParser.getChunkDuration() > filterEnd) {
close();
return;
@@ -102,6 +102,7 @@ protected void process() throws IOException {
} else {
processUnordered(disp);
}
currentParser.resetCache();
if (isClosed() || currentParser.isLastChunk()) {
return;
}

1 comment on commit 3c66485

@openjdk-notifier

This comment has been minimized.

Copy link

@openjdk-notifier openjdk-notifier bot commented on 3c66485 Dec 16, 2020

Please sign in to comment.