Skip to content
This repository has been archived by the owner on Sep 2, 2022. It is now read-only.
/ jdk16 Public archive

Commit

Permalink
8253505: JFR: onFlush invoked out of order with a sorted event stream
Browse files Browse the repository at this point in the history
Reviewed-by: mgronlun
  • Loading branch information
egahlin committed Jan 14, 2021
1 parent 0148adf commit 4307fa6
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 50 deletions.
6 changes: 6 additions & 0 deletions src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ public RecordedEvent readEvent() throws IOException {
isLastEventInChunk = false;
RecordedEvent event = nextEvent;
nextEvent = chunkParser.readEvent();
while (nextEvent == ChunkParser.FLUSH_MARKER) {
nextEvent = chunkParser.readEvent();
}
if (nextEvent == null) {
isLastEventInChunk = true;
findNext();
Expand Down Expand Up @@ -251,6 +254,9 @@ private void findNext() throws IOException {
return;
}
nextEvent = chunkParser.readEvent();
while (nextEvent == ChunkParser.FLUSH_MARKER) {
nextEvent = chunkParser.readEvent();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,14 @@ protected final Runnable getFlushOperation() {
return flushOperation;
}


final protected void onFlush() {
Runnable r = getFlushOperation();
if (r != null) {
r.run();
}
}

private void startInternal(long startNanos) {
synchronized (streamConfiguration) {
if (streamConfiguration.started) {
Expand Down Expand Up @@ -291,7 +299,7 @@ public void onMetadata(Consumer<MetadataEvent> action) {
streamConfiguration.addMetadataAction(action);
}

protected final void emitMetadataEvent(ChunkParser parser) {
protected final void onMetadata(ChunkParser parser) {
if (parser.hasStaleMetadata()) {
if (dispatcher.hasMetadataHandler()) {
List<EventType> ce = parser.getEventTypes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ private boolean is(int flags) {
return (mask & flags) != 0;
}
}
public final static RecordedEvent FLUSH_MARKER = JdkJfrConsumer.instance().newRecordedEvent(null, null, 0L, 0L);

private static final long CONSTANT_POOL_TYPE_ID = 1;
private static final String CHUNKHEADER = "jdk.types.ChunkHeader";
Expand All @@ -104,7 +105,6 @@ private boolean is(int flags) {
private LongMap<Parser> parsers;
private boolean chunkFinished;

private Runnable flushOperation;
private ParserConfiguration configuration;
private volatile boolean closed;
private MetadataDescriptor previousMetadata;
Expand Down Expand Up @@ -194,6 +194,9 @@ void updateConfiguration(ParserConfiguration configuration, boolean resetEventCa
RecordedEvent readStreamingEvent() throws IOException {
long absoluteChunkEnd = chunkHeader.getEnd();
RecordedEvent event = readEvent();
if (event == ChunkParser.FLUSH_MARKER) {
return null;
}
if (event != null) {
return event;
}
Expand Down Expand Up @@ -253,8 +256,9 @@ public RecordedEvent readEvent() throws IOException {
// Not accepted by filter
} else {
if (typeId == 1) { // checkpoint event
if (flushOperation != null) {
parseCheckpoint();
if (CheckPointType.FLUSH.is(parseCheckpointType())) {
input.position(pos + size);
return FLUSH_MARKER;
}
} else {
if (typeId != 0) { // Not metadata event
Expand All @@ -267,16 +271,11 @@ public RecordedEvent readEvent() throws IOException {
return null;
}

private void parseCheckpoint() throws IOException {
// Content has been parsed previously. This
// is to trigger flush
private byte parseCheckpointType() throws IOException {
input.readLong(); // timestamp
input.readLong(); // duration
input.readLong(); // delta
byte typeFlags = input.readByte();
if (CheckPointType.FLUSH.is(typeFlags)) {
flushOperation.run();
}
return input.readByte();
}

private boolean awaitUpdatedHeader(long absoluteChunkEnd, long filterEnd) throws IOException {
Expand Down Expand Up @@ -451,10 +450,6 @@ public boolean isChunkFinished() {
return chunkFinished;
}

public void setFlushOperation(Runnable flushOperation) {
this.flushOperation = flushOperation;
}

public long getChunkDuration() {
return chunkHeader.getDurationNanos();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@

final class Dispatcher {

public final static RecordedEvent FLUSH_MARKER = JdkJfrConsumer.instance().newRecordedEvent(null, null, 0L, 0L);

final static class EventDispatcher {
private final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,14 @@ protected void processRecursionSafe() throws IOException {
long filterEnd = disp.endTime != null ? disp.endNanos : Long.MAX_VALUE;

while (!isClosed()) {
emitMetadataEvent(currentParser);
onMetadata(currentParser);
while (!isClosed() && !currentParser.isChunkFinished()) {
disp = dispatcher();
if (disp != lastDisp) {
ParserConfiguration pc = disp.parserConfiguration;
pc.filterStart = filterStart;
pc.filterEnd = filterEnd;
currentParser.updateConfiguration(pc, true);
currentParser.setFlushOperation(getFlushOperation());
lastDisp = disp;
}
if (disp.parserConfiguration.isOrdered()) {
Expand Down Expand Up @@ -221,9 +220,10 @@ private void processOrdered(Dispatcher c) throws IOException {
}
sortedCache[index++] = e;
}
emitMetadataEvent(currentParser);
onMetadata(currentParser);
// no events found
if (index == 0 && currentParser.isChunkFinished()) {
onFlush();
return;
}
// at least 2 events, sort them
Expand All @@ -233,18 +233,19 @@ private void processOrdered(Dispatcher c) throws IOException {
for (int i = 0; i < index; i++) {
c.dispatch(sortedCache[i]);
}
onFlush();
return;
}

private boolean processUnordered(Dispatcher c) throws IOException {
while (true) {
RecordedEvent e = currentParser.readStreamingEvent();
if (e == null) {
emitMetadataEvent(currentParser);
onFlush();
return true;
} else {
c.dispatch(e);
}
onMetadata(currentParser);
c.dispatch(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.io.IOException;
import java.nio.file.Path;
import java.security.AccessControlContext;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
Expand Down Expand Up @@ -87,7 +88,7 @@ protected void process() throws IOException {

currentParser = new ChunkParser(input, disp.parserConfiguration);
while (!isClosed()) {
emitMetadataEvent(currentParser);
onMetadata(currentParser);
if (currentParser.getStartNanos() > end) {
close();
return;
Expand All @@ -96,7 +97,6 @@ protected void process() throws IOException {
disp.parserConfiguration.filterStart = start;
disp.parserConfiguration.filterEnd = end;
currentParser.updateConfiguration(disp.parserConfiguration, true);
currentParser.setFlushOperation(getFlushOperation());
if (disp.parserConfiguration.isOrdered()) {
processOrdered(disp);
} else {
Expand All @@ -116,46 +116,42 @@ private void processOrdered(Dispatcher c) throws IOException {
}
RecordedEvent event;
int index = 0;
while (true) {
event = currentParser.readEvent();
if (event == Dispatcher.FLUSH_MARKER) {
emitMetadataEvent(currentParser);
dispatchOrdered(c, index);
index = 0;
continue;
}

if (event == null) {
emitMetadataEvent(currentParser);
dispatchOrdered(c, index);
return;
while (!currentParser.isChunkFinished()) {
while ((event = currentParser.readStreamingEvent()) != null) {
if (index == cacheSorted.length) {
RecordedEvent[] tmp = cacheSorted;
cacheSorted = new RecordedEvent[2 * tmp.length];
System.arraycopy(tmp, 0, cacheSorted, 0, tmp.length);
}
cacheSorted[index++] = event;
}
if (index == cacheSorted.length) {
RecordedEvent[] tmp = cacheSorted;
cacheSorted = new RecordedEvent[2 * tmp.length];
System.arraycopy(tmp, 0, cacheSorted, 0, tmp.length);
}
cacheSorted[index++] = event;
dispatchOrdered(c, index);
index = 0;
}
}

private void dispatchOrdered(Dispatcher c, int index) {
onMetadata(currentParser);
Arrays.sort(cacheSorted, 0, index, EVENT_COMPARATOR);
for (int i = 0; i < index; i++) {
c.dispatch(cacheSorted[i]);
}
onFlush();
}

private void processUnordered(Dispatcher c) throws IOException {
onMetadata(currentParser);
while (!isClosed()) {
RecordedEvent event = currentParser.readEvent();
RecordedEvent event = currentParser.readStreamingEvent();
if (event == null) {
emitMetadataEvent(currentParser);
return;
}
if (event != Dispatcher.FLUSH_MARKER) {
c.dispatch(event);
onFlush();
if (currentParser.isChunkFinished()) {
return;
}
continue;
}
onMetadata(currentParser);
c.dispatch(event);
}
}
}

1 comment on commit 4307fa6

@openjdk-notifier
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.