Skip to content

Commit

Permalink
Refactor CheckPointThreshold code
Browse files Browse the repository at this point in the history
This centralises and simplifies the logic around check point threshold
configuration, such that it is easier to extend and test in the future.
  • Loading branch information
chrisvest committed Nov 29, 2017
1 parent 71ed837 commit c258c32
Show file tree
Hide file tree
Showing 10 changed files with 329 additions and 375 deletions.
Expand Up @@ -118,12 +118,9 @@
import org.neo4j.kernel.impl.transaction.log.TransactionMetadataCache;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointScheduler;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointThreshold;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointThresholds;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointerImpl;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CountCommittedTransactionThreshold;
import org.neo4j.kernel.impl.transaction.log.checkpoint.SimpleTriggerInfo;
import org.neo4j.kernel.impl.transaction.log.checkpoint.StoreCopyCheckPointMutex;
import org.neo4j.kernel.impl.transaction.log.checkpoint.TimeCheckPointThreshold;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntry;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryStart;
Expand Down Expand Up @@ -645,20 +642,13 @@ private NeoStoreTransactionLogModule buildTransactionLogs(
final LogicalTransactionStore logicalTransactionStore =
new PhysicalLogicalTransactionStore( logFile, transactionMetadataCache, logEntryReader );

int txThreshold = config.get( GraphDatabaseSettings.check_point_interval_tx );
final CountCommittedTransactionThreshold countCommittedTransactionThreshold =
new CountCommittedTransactionThreshold( txThreshold );

long timeMillisThreshold = config.get( GraphDatabaseSettings.check_point_interval_time ).toMillis();
TimeCheckPointThreshold timeCheckPointThreshold = new TimeCheckPointThreshold( timeMillisThreshold, clock );

CheckPointThreshold threshold =
CheckPointThresholds.or( countCommittedTransactionThreshold, timeCheckPointThreshold );
CheckPointThreshold threshold = CheckPointThreshold.createThreshold( config, clock );

final CheckPointerImpl checkPointer = new CheckPointerImpl(
transactionIdStore, threshold, storageEngine, logPruning, appender, databaseHealth, logProvider,
tracers.checkPointTracer, ioLimiter, storeCopyCheckPointMutex );

long timeMillisThreshold = config.get( GraphDatabaseSettings.check_point_interval_time ).toMillis();
long recurringPeriod = Math.min( timeMillisThreshold, TimeUnit.SECONDS.toMillis( 10 ) );
CheckPointScheduler checkPointScheduler = new CheckPointScheduler( checkPointer, ioLimiter, scheduler,
recurringPeriod, databaseHealth );
Expand Down
Expand Up @@ -22,29 +22,28 @@
import java.util.function.Consumer;

/**
* Abstract class that implement common logic for making the consumer to consume the {@link #description()} of this
* Abstract class that implement common logic for making the consumer to consume the description of this
* threshold if {@link #thresholdReached(long)} is true.
*/
abstract class AbstractCheckPointThreshold implements CheckPointThreshold
{
private final String description;

public AbstractCheckPointThreshold( String description )
{
this.description = description;
}

@Override
public final boolean isCheckPointingNeeded( long lastCommittedTransactionId, Consumer<String> consumer )
{
boolean result = thresholdReached( lastCommittedTransactionId );
try
if ( thresholdReached( lastCommittedTransactionId ) )
{
return result;
}
finally
{
if ( result )
{
consumer.accept( description() );
}
consumer.accept( description );
return true;
}
return false;
}

protected abstract boolean thresholdReached( long lastCommittedTransactionId );

protected abstract String description();
}
Expand Up @@ -19,8 +19,13 @@
*/
package org.neo4j.kernel.impl.transaction.log.checkpoint;

import java.time.Clock;
import java.util.function.Consumer;

import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.kernel.configuration.Config;


/**
* A check point threshold provides information if a check point is required or not.
*/
Expand Down Expand Up @@ -51,4 +56,55 @@ public interface CheckPointThreshold
* @param transactionId the latest transaction committed id used by the check point
*/
void checkPointHappened( long transactionId );

static CheckPointThreshold createThreshold( Config config, Clock clock )
{

int txThreshold = config.get( GraphDatabaseSettings.check_point_interval_tx );
final CountCommittedTransactionThreshold countCommittedTransactionThreshold =
new CountCommittedTransactionThreshold( txThreshold );

long timeMillisThreshold = config.get( GraphDatabaseSettings.check_point_interval_time ).toMillis();
TimeCheckPointThreshold timeCheckPointThreshold = new TimeCheckPointThreshold( timeMillisThreshold, clock );

return or( countCommittedTransactionThreshold, timeCheckPointThreshold );
}

static CheckPointThreshold or( final CheckPointThreshold... thresholds )
{
return new CheckPointThreshold()
{
@Override
public void initialize( long transactionId )
{
for ( CheckPointThreshold threshold : thresholds )
{
threshold.initialize( transactionId );
}
}

@Override
public boolean isCheckPointingNeeded( long transactionId, Consumer<String> consumer )
{
for ( CheckPointThreshold threshold : thresholds )
{
if ( threshold.isCheckPointingNeeded( transactionId, consumer ) )
{
return true;
}
}

return false;
}

@Override
public void checkPointHappened( long transactionId )
{
for ( CheckPointThreshold threshold : thresholds )
{
threshold.checkPointHappened( transactionId );
}
}
};
}
}

This file was deleted.

Expand Up @@ -19,14 +19,15 @@
*/
package org.neo4j.kernel.impl.transaction.log.checkpoint;

public class CountCommittedTransactionThreshold extends AbstractCheckPointThreshold
class CountCommittedTransactionThreshold extends AbstractCheckPointThreshold
{
private final int notificationThreshold;

private volatile long nextTransactionIdTarget;

public CountCommittedTransactionThreshold( int notificationThreshold )
{
super( "tx count threshold" );
this.notificationThreshold = notificationThreshold;
}

Expand All @@ -42,12 +43,6 @@ protected boolean thresholdReached( long lastCommittedTransactionId )
return lastCommittedTransactionId >= nextTransactionIdTarget;
}

@Override
protected String description()
{
return "tx count threshold";
}

@Override
public void checkPointHappened( long transactionId )
{
Expand Down
Expand Up @@ -22,7 +22,7 @@
import java.time.Clock;
import java.util.concurrent.ThreadLocalRandom;

public class TimeCheckPointThreshold extends AbstractCheckPointThreshold
class TimeCheckPointThreshold extends AbstractCheckPointThreshold
{
private volatile long lastCheckPointedTransactionId;
private volatile long nextCheckPointTime;
Expand All @@ -32,6 +32,7 @@ public class TimeCheckPointThreshold extends AbstractCheckPointThreshold

public TimeCheckPointThreshold( long thresholdMillis, Clock clock )
{
super( "time threshold" );
this.timeMillisThreshold = thresholdMillis;
this.clock = clock;
// The random start offset means database in a cluster will not all check-point at the same time.
Expand All @@ -53,12 +54,6 @@ protected boolean thresholdReached( long lastCommittedTransactionId )
clock.millis() >= nextCheckPointTime;
}

@Override
protected String description()
{
return "time threshold";
}

@Override
public void checkPointHappened( long transactionId )
{
Expand Down
Expand Up @@ -61,12 +61,11 @@ public void shouldNotCallConsumerProvidingTheDescriptionWhenThresholdIsFalse() t
private static class TheAbstractCheckPointThreshold extends AbstractCheckPointThreshold
{
private final boolean reached;
private final String description;

TheAbstractCheckPointThreshold( boolean reached, String description )
{
super( description );
this.reached = reached;
this.description = description;
}

@Override
Expand All @@ -86,11 +85,5 @@ protected boolean thresholdReached( long lastCommittedTransactionId )
{
return reached;
}

@Override
protected String description()
{
return description;
}
}
}

0 comments on commit c258c32

Please sign in to comment.