Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Java] Introduce clustered service specific SnapshotDurationTracker #1575

Merged
merged 3 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 12 additions & 0 deletions aeron-client/src/main/java/io/aeron/AeronCounters.java
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,18 @@ public final class AeronCounters
*/
public static final int CLUSTER_TOTAL_SNAPSHOT_DURATION_THRESHOLD_EXCEEDED_TYPE_ID = 235;

/**
* The type id of the {@link Counter} used for keeping track of the maximum snapshot duration
* for a given clustered service.
*/
public static final int CLUSTERED_SERVICE_MAX_SNAPSHOT_DURATION_TYPE_ID = 236;

/**
* The type id of the {@link Counter} used for keeping track of the count snapshot duration
* has exceeded the threshold for a given clustered service.
*/
public static final int CLUSTERED_SERVICE_SNAPSHOT_DURATION_THRESHOLD_EXCEEDED_TYPE_ID = 237;

private AeronCounters()
{
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ final class ClusteredServiceAgent extends ClusteredServiceAgentRhsPadding implem
private final Collection<ClientSession> unmodifiableClientSessions = Collections.unmodifiableCollection(sessions);
private final BoundedLogAdapter logAdapter;
private final DutyCycleTracker dutyCycleTracker;
private final SnapshotDurationTracker snapshotDurationTracker;
private final String subscriptionAlias;
private final int standbySnapshotFlags;

Expand All @@ -136,6 +137,7 @@ final class ClusteredServiceAgent extends ClusteredServiceAgentRhsPadding implem
epochClock = ctx.epochClock();
nanoClock = ctx.nanoClock();
dutyCycleTracker = ctx.dutyCycleTracker();
snapshotDurationTracker = ctx.snapshotDurationTracker();
subscriptionAlias = "log-sc-" + ctx.serviceId();

final String channel = ctx.controlChannel();
Expand Down Expand Up @@ -923,7 +925,11 @@ private long onTakeSnapshot(final long logPosition, final long leadershipTermId)
snapshotState(publication, logPosition, leadershipTermId);
checkForClockTick(nanoClock.nanoTime());
archive.checkForErrorResponse();

snapshotDurationTracker.onSnapshotBegin(nanoClock.nanoTime());
service.onTakeSnapshot(publication);
snapshotDurationTracker.onSnapshotEnd(nanoClock.nanoTime());

awaitRecordingComplete(recordingId, publication.position(), counters, counterId, archive);

return recordingId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,16 @@ public static final class Configuration
*/
public static final long CYCLE_THRESHOLD_DEFAULT_NS = TimeUnit.MILLISECONDS.toNanos(1000);

/**
* Property name for threshold value, which is used for tracking snapshot duration breaches.
*/
public static final String SNAPSHOT_DURATION_THRESHOLD_PROP_NAME = "aeron.cluster.service.snapshot.threshold";

/**
* Default threshold value, which is used for tracking snapshot duration breaches.
*/
public static final long SNAPSHOT_DURATION_THRESHOLD_DEFAULT_NS = TimeUnit.MILLISECONDS.toNanos(1000);

/**
* Counter type id for the cluster node role.
*/
Expand Down Expand Up @@ -559,6 +569,17 @@ public static long cycleThresholdNs()
return getDurationInNanos(CYCLE_THRESHOLD_PROP_NAME, CYCLE_THRESHOLD_DEFAULT_NS);
}

/**
* Get threshold value, which is used for monitoring snapshot duration breaches of its predefined
* threshold.
*
* @return threshold value in nanoseconds.
*/
public static long snapshotDurationThresholdNs()
{
return getDurationInNanos(SNAPSHOT_DURATION_THRESHOLD_PROP_NAME, SNAPSHOT_DURATION_THRESHOLD_DEFAULT_NS);
}

/**
* Get the configuration value to determine if this node should take standby snapshots be enabled.
*
Expand Down Expand Up @@ -656,6 +677,7 @@ public static final class Context implements Cloneable
private boolean isRespondingService = Configuration.isRespondingService();
private int logFragmentLimit = Configuration.logFragmentLimit();
private long cycleThresholdNs = Configuration.cycleThresholdNs();
private long snapshotDurationThresholdNs = Configuration.snapshotDurationThresholdNs();
private boolean standbySnapshotEnabled = Configuration.standbySnapshotEnabled();

private CountDownLatch abortLatch;
Expand All @@ -675,6 +697,7 @@ public static final class Context implements Cloneable
private String aeronDirectoryName = CommonContext.getAeronDirectoryName();
private Aeron aeron;
private DutyCycleTracker dutyCycleTracker;
private SnapshotDurationTracker snapshotDurationTracker;
private AppVersionValidator appVersionValidator;
private boolean ownsAeronClient;

Expand Down Expand Up @@ -853,6 +876,29 @@ public void conclude()
cycleThresholdNs);
}

if (null == snapshotDurationTracker)
{
snapshotDurationTracker = new SnapshotDurationTracker(
ClusterCounters.allocateServiceCounter(
aeron,
tempBuffer,
"Clustered service max snapshot duration in ns",
AeronCounters.CLUSTERED_SERVICE_MAX_SNAPSHOT_DURATION_TYPE_ID,
clusterId,
serviceId
),
ClusterCounters.allocateServiceCounter(
aeron,
tempBuffer,
"Clustered service max snapshot duration exceeded count: threshold=" +
snapshotDurationThresholdNs,
AeronCounters.CLUSTERED_SERVICE_SNAPSHOT_DURATION_THRESHOLD_EXCEEDED_TYPE_ID,
clusterId,
serviceId
),
snapshotDurationThresholdNs);
}

if (null == archiveContext)
{
archiveContext = new AeronArchive.Context()
Expand Down Expand Up @@ -1767,6 +1813,52 @@ public DutyCycleTracker dutyCycleTracker()
return dutyCycleTracker;
}

/**
* Set a threshold for snapshot duration which when exceeded will result in a counter increment.
*
* @param thresholdNs value in nanoseconds
* @return this for fluent API.
* @see Configuration#SNAPSHOT_DURATION_THRESHOLD_PROP_NAME
* @see Configuration#SNAPSHOT_DURATION_THRESHOLD_DEFAULT_NS
*/
public Context snapshotDurationThresholdNs(final long thresholdNs)
{
this.snapshotDurationThresholdNs = thresholdNs;
return this;
}

/**
* Threshold for snapshot duration which when exceeded will result in a counter increment.
*
* @return threshold value in nanoseconds.
*/
public long snapshotDurationThresholdNs()
{
return snapshotDurationThresholdNs;
}

/**
* Set snapshot duration tracker used for monitoring snapshot duration.
*
* @param snapshotDurationTracker snapshot duration tracker.
* @return this for fluent API.
*/
public Context snapshotDurationTracker(final SnapshotDurationTracker snapshotDurationTracker)
{
this.snapshotDurationTracker = snapshotDurationTracker;
return this;
}

/**
* Get snapshot duration tracker used for monitoring snapshot duration.
*
* @return snapshot duration tracker
*/
public SnapshotDurationTracker snapshotDurationTracker()
{
return snapshotDurationTracker;
}

/**
* Delete the cluster container directory.
*/
Expand Down
57 changes: 32 additions & 25 deletions aeron-system-tests/src/test/java/io/aeron/cluster/ClusterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,7 @@
import io.aeron.cluster.client.ClusterException;
import io.aeron.cluster.client.ControlledEgressListener;
import io.aeron.cluster.client.EgressListener;
import io.aeron.cluster.codecs.AdminRequestEncoder;
import io.aeron.cluster.codecs.AdminRequestType;
import io.aeron.cluster.codecs.AdminResponseCode;
import io.aeron.cluster.codecs.AdminResponseEncoder;
import io.aeron.cluster.codecs.MessageHeaderDecoder;
import io.aeron.cluster.codecs.MessageHeaderEncoder;
import io.aeron.cluster.codecs.SessionMessageHeaderDecoder;
import io.aeron.cluster.codecs.*;
import io.aeron.cluster.service.ClientSession;
import io.aeron.cluster.service.SnapshotDurationTracker;
import io.aeron.logbuffer.BufferClaim;
Expand All @@ -38,12 +32,7 @@
import io.aeron.logbuffer.LogBufferDescriptor;
import io.aeron.protocol.DataHeaderFlyweight;
import io.aeron.security.AuthorisationService;
import io.aeron.test.EventLogExtension;
import io.aeron.test.InterruptAfter;
import io.aeron.test.InterruptingTestCallback;
import io.aeron.test.SlowTest;
import io.aeron.test.SystemTestWatcher;
import io.aeron.test.Tests;
import io.aeron.test.*;
import io.aeron.test.cluster.ClusterTests;
import io.aeron.test.cluster.TestCluster;
import io.aeron.test.cluster.TestNode;
Expand All @@ -55,7 +44,6 @@
import org.agrona.collections.MutableBoolean;
import org.agrona.collections.MutableInteger;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.status.AtomicCounter;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -1930,36 +1918,49 @@ void shouldTakeASnapshotAfterReceivingAdminRequestOfTypeSnapshot()
@InterruptAfter(20)
void shouldTrackSnapshotDuration()
{
final long service1SleepDuration = TimeUnit.MILLISECONDS.toNanos(101);
final long service2SleepDuration = TimeUnit.MILLISECONDS.toNanos(201);
final long totalSleepDuration = service1SleepDuration + service2SleepDuration;

cluster = aCluster()
.withServiceSupplier(
(i) -> new TestNode.TestService[]
{
{
new TestNode.SleepOnSnapshotTestService()
.sleepNsOnTakeSnapshot(TimeUnit.MILLISECONDS.toNanos(101)).index(i)
})
.sleepNsOnTakeSnapshot(service1SleepDuration).index(i),
new TestNode.SleepOnSnapshotTestService()
.sleepNsOnTakeSnapshot(service2SleepDuration).index(i)
})
.withStaticNodes(3)
.withAuthorisationServiceSupplier(() -> AuthorisationService.ALLOW_ALL)
.start();
systemTestWatcher.cluster(cluster);

final TestNode leader = cluster.awaitLeader();

final SnapshotDurationTracker snapshotDurationTracker = leader.consensusModule().context()
final SnapshotDurationTracker totalSnapshotDurationTracker = leader.consensusModule().context()
.totalSnapshotDurationTracker();

final AtomicCounter snapshotDurationThresholdExceededCount = snapshotDurationTracker
.snapshotDurationThresholdExceededCount();
final SnapshotDurationTracker service1SnapshotDurationTracker = leader.container(0).context()
.snapshotDurationTracker();

final AtomicCounter maxSnapshotDuration = snapshotDurationTracker.maxSnapshotDuration();
final SnapshotDurationTracker service2SnapshotDurationTracker = leader.container(1).context()
.snapshotDurationTracker();

final long requestCorrelationId = System.nanoTime();
final MutableBoolean hasResponse = injectAdminResponseEgressListener(
requestCorrelationId, AdminRequestType.SNAPSHOT, AdminResponseCode.OK, EMPTY_MSG);

final AeronCluster client = cluster.connectClient();

assertEquals(0L, snapshotDurationThresholdExceededCount.get());
assertEquals(0, maxSnapshotDuration.get());
assertEquals(0L, totalSnapshotDurationTracker.snapshotDurationThresholdExceededCount().get());
assertEquals(0, totalSnapshotDurationTracker.maxSnapshotDuration().get());

assertEquals(0L, service1SnapshotDurationTracker.snapshotDurationThresholdExceededCount().get());
assertEquals(0, service1SnapshotDurationTracker.maxSnapshotDuration().get());

assertEquals(0L, service2SnapshotDurationTracker.snapshotDurationThresholdExceededCount().get());
assertEquals(0, service2SnapshotDurationTracker.maxSnapshotDuration().get());

while (!client.sendAdminRequestToTakeASnapshot(requestCorrelationId))
{
Expand All @@ -1975,8 +1976,14 @@ void shouldTrackSnapshotDuration()
cluster.awaitSnapshotCount(1);
cluster.awaitNeutralControlToggle(leader);

assertEquals(1L, snapshotDurationThresholdExceededCount.get());
assertTrue(maxSnapshotDuration.get() > 0);
assertEquals(1L, totalSnapshotDurationTracker.snapshotDurationThresholdExceededCount().get());
assertTrue(totalSnapshotDurationTracker.maxSnapshotDuration().get() >= totalSleepDuration);

assertEquals(1L, service1SnapshotDurationTracker.snapshotDurationThresholdExceededCount().get());
assertTrue(service1SnapshotDurationTracker.maxSnapshotDuration().get() >= service1SleepDuration);

assertEquals(1L, service2SnapshotDurationTracker.snapshotDurationThresholdExceededCount().get());
assertTrue(service2SnapshotDurationTracker.maxSnapshotDuration().get() >= service2SleepDuration);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public final class TestNode implements AutoCloseable
.clusterDir(context.consensusModuleContext.clusterDir())
.markFileDir(context.consensusModuleContext.markFileDir())
.clusteredService(services[i])
.snapshotDurationThresholdNs(TimeUnit.MILLISECONDS.toNanos(100))
.serviceId(i);
containers[i] = ClusteredServiceContainer.launch(ctx);
}
Expand Down Expand Up @@ -171,7 +172,12 @@ public ClusteredServiceContainer container()
throw new IllegalStateException("container count expected=1 actual=" + containers.length);
}

return containers[0];
return container(0);
}

public ClusteredServiceContainer container(final int index)
{
return containers[index];
}

public TestService service()
Expand Down