Skip to content

Commit

Permalink
[Java] Introduce clustered service specific SnapshotDurationTracker (#…
Browse files Browse the repository at this point in the history
…1575)

* [Java] Introduce clustered service specific SnapshotDurationTracker

* [Java] Fix assertion in ClusterTest::shouldTrackSnapshotDuration

* [Java] Fix assertion in ClusterTest::shouldTrackSnapshotDuration
  • Loading branch information
eliquinox committed Mar 13, 2024
1 parent 2bff60f commit d66565a
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 26 deletions.
12 changes: 12 additions & 0 deletions aeron-client/src/main/java/io/aeron/AeronCounters.java
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,18 @@ public final class AeronCounters
*/
public static final int CLUSTER_TOTAL_SNAPSHOT_DURATION_THRESHOLD_EXCEEDED_TYPE_ID = 235;

/**
* The type id of the {@link Counter} used for keeping track of the maximum snapshot duration
* for a given clustered service.
*/
public static final int CLUSTERED_SERVICE_MAX_SNAPSHOT_DURATION_TYPE_ID = 236;

/**
* The type id of the {@link Counter} used for keeping track of the count snapshot duration
* has exceeded the threshold for a given clustered service.
*/
public static final int CLUSTERED_SERVICE_SNAPSHOT_DURATION_THRESHOLD_EXCEEDED_TYPE_ID = 237;

private AeronCounters()
{
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ final class ClusteredServiceAgent extends ClusteredServiceAgentRhsPadding implem
private final Collection<ClientSession> unmodifiableClientSessions = Collections.unmodifiableCollection(sessions);
private final BoundedLogAdapter logAdapter;
private final DutyCycleTracker dutyCycleTracker;
private final SnapshotDurationTracker snapshotDurationTracker;
private final String subscriptionAlias;
private final int standbySnapshotFlags;

Expand All @@ -160,6 +161,7 @@ final class ClusteredServiceAgent extends ClusteredServiceAgentRhsPadding implem
epochClock = ctx.epochClock();
nanoClock = ctx.nanoClock();
dutyCycleTracker = ctx.dutyCycleTracker();
snapshotDurationTracker = ctx.snapshotDurationTracker();
subscriptionAlias = "log-sc-" + ctx.serviceId();

final String channel = ctx.controlChannel();
Expand Down Expand Up @@ -951,7 +953,11 @@ private long onTakeSnapshot(final long logPosition, final long leadershipTermId)
snapshotState(publication, logPosition, leadershipTermId);
checkForClockTick(nanoClock.nanoTime());
archive.checkForErrorResponse();

snapshotDurationTracker.onSnapshotBegin(nanoClock.nanoTime());
service.onTakeSnapshot(publication);
snapshotDurationTracker.onSnapshotEnd(nanoClock.nanoTime());

awaitRecordingComplete(recordingId, publication.position(), counters, counterId, archive);

return recordingId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,16 @@ public static final class Configuration
*/
public static final long CYCLE_THRESHOLD_DEFAULT_NS = TimeUnit.MILLISECONDS.toNanos(1000);

/**
* Property name for threshold value, which is used for tracking snapshot duration breaches.
*/
public static final String SNAPSHOT_DURATION_THRESHOLD_PROP_NAME = "aeron.cluster.service.snapshot.threshold";

/**
* Default threshold value, which is used for tracking snapshot duration breaches.
*/
public static final long SNAPSHOT_DURATION_THRESHOLD_DEFAULT_NS = TimeUnit.MILLISECONDS.toNanos(1000);

/**
* Counter type id for the cluster node role.
*/
Expand Down Expand Up @@ -559,6 +569,17 @@ public static long cycleThresholdNs()
return getDurationInNanos(CYCLE_THRESHOLD_PROP_NAME, CYCLE_THRESHOLD_DEFAULT_NS);
}

/**
* Get threshold value, which is used for monitoring snapshot duration breaches of its predefined
* threshold.
*
* @return threshold value in nanoseconds.
*/
public static long snapshotDurationThresholdNs()
{
return getDurationInNanos(SNAPSHOT_DURATION_THRESHOLD_PROP_NAME, SNAPSHOT_DURATION_THRESHOLD_DEFAULT_NS);
}

/**
* Get the configuration value to determine if this node should take standby snapshots be enabled.
*
Expand Down Expand Up @@ -656,6 +677,7 @@ public static final class Context implements Cloneable
private boolean isRespondingService = Configuration.isRespondingService();
private int logFragmentLimit = Configuration.logFragmentLimit();
private long cycleThresholdNs = Configuration.cycleThresholdNs();
private long snapshotDurationThresholdNs = Configuration.snapshotDurationThresholdNs();
private boolean standbySnapshotEnabled = Configuration.standbySnapshotEnabled();

private CountDownLatch abortLatch;
Expand All @@ -675,6 +697,7 @@ public static final class Context implements Cloneable
private String aeronDirectoryName = CommonContext.getAeronDirectoryName();
private Aeron aeron;
private DutyCycleTracker dutyCycleTracker;
private SnapshotDurationTracker snapshotDurationTracker;
private AppVersionValidator appVersionValidator;
private boolean ownsAeronClient;

Expand Down Expand Up @@ -853,6 +876,29 @@ public void conclude()
cycleThresholdNs);
}

if (null == snapshotDurationTracker)
{
snapshotDurationTracker = new SnapshotDurationTracker(
ClusterCounters.allocateServiceCounter(
aeron,
tempBuffer,
"Clustered service max snapshot duration in ns",
AeronCounters.CLUSTERED_SERVICE_MAX_SNAPSHOT_DURATION_TYPE_ID,
clusterId,
serviceId
),
ClusterCounters.allocateServiceCounter(
aeron,
tempBuffer,
"Clustered service max snapshot duration exceeded count: threshold=" +
snapshotDurationThresholdNs,
AeronCounters.CLUSTERED_SERVICE_SNAPSHOT_DURATION_THRESHOLD_EXCEEDED_TYPE_ID,
clusterId,
serviceId
),
snapshotDurationThresholdNs);
}

if (null == archiveContext)
{
archiveContext = new AeronArchive.Context()
Expand Down Expand Up @@ -1767,6 +1813,52 @@ public DutyCycleTracker dutyCycleTracker()
return dutyCycleTracker;
}

/**
* Set a threshold for snapshot duration which when exceeded will result in a counter increment.
*
* @param thresholdNs value in nanoseconds
* @return this for fluent API.
* @see Configuration#SNAPSHOT_DURATION_THRESHOLD_PROP_NAME
* @see Configuration#SNAPSHOT_DURATION_THRESHOLD_DEFAULT_NS
*/
public Context snapshotDurationThresholdNs(final long thresholdNs)
{
this.snapshotDurationThresholdNs = thresholdNs;
return this;
}

/**
* Threshold for snapshot duration which when exceeded will result in a counter increment.
*
* @return threshold value in nanoseconds.
*/
public long snapshotDurationThresholdNs()
{
return snapshotDurationThresholdNs;
}

/**
* Set snapshot duration tracker used for monitoring snapshot duration.
*
* @param snapshotDurationTracker snapshot duration tracker.
* @return this for fluent API.
*/
public Context snapshotDurationTracker(final SnapshotDurationTracker snapshotDurationTracker)
{
this.snapshotDurationTracker = snapshotDurationTracker;
return this;
}

/**
* Get snapshot duration tracker used for monitoring snapshot duration.
*
* @return snapshot duration tracker
*/
public SnapshotDurationTracker snapshotDurationTracker()
{
return snapshotDurationTracker;
}

/**
* Delete the cluster container directory.
*/
Expand Down
57 changes: 32 additions & 25 deletions aeron-system-tests/src/test/java/io/aeron/cluster/ClusterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,15 @@
import io.aeron.cluster.client.ClusterException;
import io.aeron.cluster.client.ControlledEgressListener;
import io.aeron.cluster.client.EgressListener;
import io.aeron.cluster.codecs.AdminRequestEncoder;
import io.aeron.cluster.codecs.AdminRequestType;
import io.aeron.cluster.codecs.AdminResponseCode;
import io.aeron.cluster.codecs.AdminResponseEncoder;
import io.aeron.cluster.codecs.MessageHeaderDecoder;
import io.aeron.cluster.codecs.MessageHeaderEncoder;
import io.aeron.cluster.codecs.SessionMessageHeaderDecoder;
import io.aeron.cluster.codecs.*;
import io.aeron.cluster.service.ClientSession;
import io.aeron.cluster.service.SnapshotDurationTracker;
import io.aeron.logbuffer.BufferClaim;
import io.aeron.logbuffer.ControlledFragmentHandler;
import io.aeron.logbuffer.Header;
import io.aeron.protocol.DataHeaderFlyweight;
import io.aeron.security.AuthorisationService;
import io.aeron.test.EventLogExtension;
import io.aeron.test.InterruptAfter;
import io.aeron.test.InterruptingTestCallback;
import io.aeron.test.SlowTest;
import io.aeron.test.SystemTestWatcher;
import io.aeron.test.Tests;
import io.aeron.test.*;
import io.aeron.test.cluster.ClusterTests;
import io.aeron.test.cluster.TestCluster;
import io.aeron.test.cluster.TestNode;
Expand All @@ -54,7 +43,6 @@
import org.agrona.collections.MutableBoolean;
import org.agrona.collections.MutableInteger;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.status.AtomicCounter;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -1929,36 +1917,49 @@ void shouldTakeASnapshotAfterReceivingAdminRequestOfTypeSnapshot()
@InterruptAfter(20)
void shouldTrackSnapshotDuration()
{
final long service1SleepDuration = TimeUnit.MILLISECONDS.toNanos(101);
final long service2SleepDuration = TimeUnit.MILLISECONDS.toNanos(201);
final long totalSleepDuration = service1SleepDuration + service2SleepDuration;

cluster = aCluster()
.withServiceSupplier(
(i) -> new TestNode.TestService[]
{
{
new TestNode.SleepOnSnapshotTestService()
.sleepNsOnTakeSnapshot(TimeUnit.MILLISECONDS.toNanos(101)).index(i)
})
.sleepNsOnTakeSnapshot(service1SleepDuration).index(i),
new TestNode.SleepOnSnapshotTestService()
.sleepNsOnTakeSnapshot(service2SleepDuration).index(i)
})
.withStaticNodes(3)
.withAuthorisationServiceSupplier(() -> AuthorisationService.ALLOW_ALL)
.start();
systemTestWatcher.cluster(cluster);

final TestNode leader = cluster.awaitLeader();

final SnapshotDurationTracker snapshotDurationTracker = leader.consensusModule().context()
final SnapshotDurationTracker totalSnapshotDurationTracker = leader.consensusModule().context()
.totalSnapshotDurationTracker();

final AtomicCounter snapshotDurationThresholdExceededCount = snapshotDurationTracker
.snapshotDurationThresholdExceededCount();
final SnapshotDurationTracker service1SnapshotDurationTracker = leader.container(0).context()
.snapshotDurationTracker();

final AtomicCounter maxSnapshotDuration = snapshotDurationTracker.maxSnapshotDuration();
final SnapshotDurationTracker service2SnapshotDurationTracker = leader.container(1).context()
.snapshotDurationTracker();

final long requestCorrelationId = System.nanoTime();
final MutableBoolean hasResponse = injectAdminResponseEgressListener(
requestCorrelationId, AdminRequestType.SNAPSHOT, AdminResponseCode.OK, EMPTY_MSG);

final AeronCluster client = cluster.connectClient();

assertEquals(0L, snapshotDurationThresholdExceededCount.get());
assertEquals(0, maxSnapshotDuration.get());
assertEquals(0L, totalSnapshotDurationTracker.snapshotDurationThresholdExceededCount().get());
assertEquals(0, totalSnapshotDurationTracker.maxSnapshotDuration().get());

assertEquals(0L, service1SnapshotDurationTracker.snapshotDurationThresholdExceededCount().get());
assertEquals(0, service1SnapshotDurationTracker.maxSnapshotDuration().get());

assertEquals(0L, service2SnapshotDurationTracker.snapshotDurationThresholdExceededCount().get());
assertEquals(0, service2SnapshotDurationTracker.maxSnapshotDuration().get());

while (!client.sendAdminRequestToTakeASnapshot(requestCorrelationId))
{
Expand All @@ -1974,8 +1975,14 @@ void shouldTrackSnapshotDuration()
cluster.awaitSnapshotCount(1);
cluster.awaitNeutralControlToggle(leader);

assertEquals(1L, snapshotDurationThresholdExceededCount.get());
assertTrue(maxSnapshotDuration.get() > 0);
assertEquals(1L, totalSnapshotDurationTracker.snapshotDurationThresholdExceededCount().get());
assertTrue(totalSnapshotDurationTracker.maxSnapshotDuration().get() >= totalSleepDuration);

assertEquals(1L, service1SnapshotDurationTracker.snapshotDurationThresholdExceededCount().get());
assertTrue(service1SnapshotDurationTracker.maxSnapshotDuration().get() >= service1SleepDuration);

assertEquals(1L, service2SnapshotDurationTracker.snapshotDurationThresholdExceededCount().get());
assertTrue(service2SnapshotDurationTracker.maxSnapshotDuration().get() >= service2SleepDuration);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public final class TestNode implements AutoCloseable
.clusterDir(context.consensusModuleContext.clusterDir())
.markFileDir(context.consensusModuleContext.markFileDir())
.clusteredService(services[i])
.snapshotDurationThresholdNs(TimeUnit.MILLISECONDS.toNanos(100))
.serviceId(i);
containers[i] = ClusteredServiceContainer.launch(ctx);
}
Expand Down Expand Up @@ -171,7 +172,12 @@ public ClusteredServiceContainer container()
throw new IllegalStateException("container count expected=1 actual=" + containers.length);
}

return containers[0];
return container(0);
}

public ClusteredServiceContainer container(final int index)
{
return containers[index];
}

public TestService service()
Expand Down

0 comments on commit d66565a

Please sign in to comment.