Skip to content

Commit

Permalink
8297338: JFR: RemoteRecordingStream doesn't respect setMaxAge and set…
Browse files Browse the repository at this point in the history
…MaxSize

Reviewed-by: mgronlun
  • Loading branch information
egahlin committed Nov 23, 2022
1 parent 8df3bc4 commit 2afb4c3
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2020, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -429,7 +429,7 @@ private void trimToAge(Instant oldest) {
}
int count = 0;
while (chunks.size() > 1) {
DiskChunk oldestChunk = chunks.getLast();
DiskChunk oldestChunk = chunks.peekLast();
if (oldestChunk.endTime.isAfter(oldest)) {
return;
}
Expand All @@ -440,14 +440,14 @@ private void trimToAge(Instant oldest) {
}

private void removeOldestChunk() {
DiskChunk chunk = chunks.poll();
DiskChunk chunk = chunks.pollLast();
chunk.release();
size -= chunk.size;
}

public synchronized void onChunkComplete(long endTimeNanos) {
while (!chunks.isEmpty()) {
DiskChunk oldestChunk = chunks.peek();
DiskChunk oldestChunk = chunks.peekLast();
if (oldestChunk.startTimeNanos < endTimeNanos) {
removeOldestChunk();
} else {
Expand All @@ -460,7 +460,7 @@ private void addChunk(DiskChunk chunk) {
if (maxAge != null) {
trimToAge(chunk.endTime.minus(maxAge));
}
chunks.push(chunk);
chunks.addFirst(chunk);
size += chunk.size;
trimToSize();

Expand Down Expand Up @@ -500,9 +500,13 @@ public synchronized void complete() {

public synchronized FileDump newDump(long endTime) {
FileDump fd = new FileDump(endTime);
for (DiskChunk dc : chunks) {
// replay history by iterating from oldest to most recent
Iterator<DiskChunk> it = chunks.descendingIterator();
while (it.hasNext()) {
DiskChunk dc = it.next();
fd.add(dc);
}

if (!fd.isComplete()) {
fileDumps.add(fd);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2021, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -29,12 +29,12 @@
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.Deque;

import jdk.management.jfr.DiskRepository.DiskChunk;

final class FileDump {
private final Queue<DiskChunk> chunks = new ArrayDeque<>();
private final Deque<DiskChunk> chunks = new ArrayDeque<>();
private final long stopTimeMillis;
private boolean complete;

Expand All @@ -47,7 +47,7 @@ public synchronized void add(DiskChunk dc) {
return;
}
dc.acquire();
chunks.add(dc);
chunks.addFirst(dc);
long endMillis = dc.endTimeNanos / 1_000_000;
if (endMillis >= stopTimeMillis) {
setComplete();
Expand Down Expand Up @@ -75,7 +75,7 @@ private DiskChunk oldestChunk() throws InterruptedException {
while (true) {
synchronized (this) {
if (!chunks.isEmpty()) {
return chunks.poll();
return chunks.pollLast();
}
if (complete) {
return null;
Expand All @@ -86,14 +86,19 @@ private DiskChunk oldestChunk() throws InterruptedException {
}

public void write(Path path) throws IOException, InterruptedException {
DiskChunk chunk = null;
try (FileChannel out = FileChannel.open(path, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)) {
DiskChunk chunk = null;
while ((chunk = oldestChunk()) != null) {
try (FileChannel in = FileChannel.open(chunk.path(), StandardOpenOption.READ)) {
in.transferTo(0, in.size(), out);
}
chunk.release();
chunk = null;
}
} finally {
if (chunk != null) {
chunk.release();
}
close();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2020, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -156,7 +156,10 @@ public void accept(Long endNanos) {
volatile Instant startTime;
volatile Instant endTime;
volatile boolean closed;
private boolean started; // always guarded by lock
// always guarded by lock
private boolean started;
private Duration maxAge;
private long maxSize;

/**
* Creates an event stream that operates against a {@link MBeanServerConnection}
Expand Down Expand Up @@ -415,7 +418,11 @@ public EventSettings enable(String name) {
*/
public void setMaxAge(Duration maxAge) {
Objects.requireNonNull(maxAge);
repository.setMaxAge(maxAge);
synchronized (lock) {
repository.setMaxAge(maxAge);
this.maxAge = maxAge;
updateOnCompleteHandler();
}
}

/**
Expand All @@ -441,7 +448,11 @@ public void setMaxSize(long maxSize) {
if (maxSize < 0) {
throw new IllegalArgumentException("Max size of recording can't be negative");
}
repository.setMaxSize(maxSize);
synchronized (lock) {
repository.setMaxSize(maxSize);
this.maxSize = maxSize;
updateOnCompleteHandler();
}
}

@Override
Expand Down Expand Up @@ -645,6 +656,15 @@ private static Path makeTempDirectory() throws IOException {
return Files.createTempDirectory("jfr-streaming");
}

private void updateOnCompleteHandler() {
if (maxAge != null || maxSize != 0) {
// User has set a chunk removal policy
ManagementSupport.setOnChunkCompleteHandler(stream, null);
} else {
ManagementSupport.setOnChunkCompleteHandler(stream, new ChunkConsumer(repository));
}
}

private void startDownload() {
String name = "JFR: Download Thread " + creationTime;
Thread downLoadThread = new DownLoadThread(this, name);
Expand Down
130 changes: 130 additions & 0 deletions test/jdk/jdk/jfr/jmx/streaming/TestDumpOrder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright (c) 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/

package jdk.jfr.jmx.streaming;

import java.lang.management.ManagementFactory;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;

import javax.management.MBeanServerConnection;

import jdk.jfr.Event;
import jdk.jfr.StackTrace;
import jdk.jfr.Recording;
import jdk.jfr.consumer.RecordedEvent;
import jdk.jfr.consumer.RecordingFile;
import jdk.management.jfr.RemoteRecordingStream;

/**
* @test
* @key jfr
* @summary Tests that chunks arrive in the same order they were committed
* @requires vm.hasJFR
* @library /test/lib /test/jdk
* @run main/othervm jdk.jfr.jmx.streaming.TestDumpOrder
*/
public class TestDumpOrder {

private static final MBeanServerConnection CONNECTION = ManagementFactory.getPlatformMBeanServer();

@StackTrace(false)
static class Ant extends Event {
long id;
}

public static void main(String... args) throws Exception {
// Set up the test so half of the events have been consumed
// when the dump occurs.
AtomicLong eventCount = new AtomicLong();
CountDownLatch halfLatch = new CountDownLatch(1);
CountDownLatch dumpLatch = new CountDownLatch(1);
Path directory = Path.of("chunks");
Files.createDirectory(directory);
try (var rs = new RemoteRecordingStream(CONNECTION, directory)) {
rs.setMaxSize(100_000_000); // keep all data
rs.onEvent(event -> {
try {
eventCount.incrementAndGet();
if (eventCount.get() == 10) {
halfLatch.countDown();
dumpLatch.await();
}
} catch (InterruptedException ie) {
ie.printStackTrace();
}
});
rs.startAsync();
long counter = 0;
for (int i = 0; i < 10; i++) {
try (Recording r = new Recording()) {
r.start();
Ant a = new Ant();
a.id = counter++;
a.commit();
Ant b = new Ant();
b.id = counter++;
b.commit();
}
if (counter == 10) {
halfLatch.await();
}
}
Path file = Path.of("events.jfr");
// Wait for most (but not all) chunk files to be downloaded
// before invoking dump()
awaitChunkFiles(directory);
// To stress the implementation, release consumer thread
// during the dump
dumpLatch.countDown();
rs.dump(file);
List<RecordedEvent> events = RecordingFile.readAllEvents(file);
if (events.isEmpty()) {
throw new AssertionError("No events found");
}
// Print events for debugging purposes
events.forEach(System.out::println);
long expected = 0;
for (var event : events) {
long value = event.getLong("id");
if (value != expected) {
throw new Exception("Expected " + expected + ", got " + value);
}
expected++;
}
if (expected != counter) {
throw new Exception("Not all events found");
}
}
}

private static void awaitChunkFiles(Path directory) throws Exception {
while (Files.list(directory).count() < 7) {
Thread.sleep(10);
}
}
}

1 comment on commit 2afb4c3

@openjdk-notifier
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.