Skip to content

Commit

Permalink
Add the concept of check-point threshold policies.
Browse files Browse the repository at this point in the history
These are service loaded based on the 'dbms.checkpoint' setting, and
used as factories of the concrete CheckPointThreshold instances.

This allows us to easily add more varied kinds of check point thresholds
in the future.
  • Loading branch information
chrisvest committed Nov 29, 2017
1 parent 19d9e64 commit d423c75
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 30 deletions.
Expand Up @@ -341,6 +341,12 @@ public class GraphDatabaseSettings implements LoadableConfig
@Description( "Maximum number of history files for the debug log." ) @Description( "Maximum number of history files for the debug log." )
public static final Setting<Integer> store_internal_log_max_archives = setting("dbms.logs.debug.rotation.keep_number", INTEGER, "7", min(1) ); public static final Setting<Integer> store_internal_log_max_archives = setting("dbms.logs.debug.rotation.keep_number", INTEGER, "7", min(1) );


@Description( "Configures the general policy for when check-points should occur. The default policy is to " +
"check-point 'periodically', as specified by the 'dbms.checkpoint.interval.tx' and " +
"'dbms.checkpoint.interval.time' settings. This policy is alternatively known as 'periodic' and " +
"'interval'." )
public static final Setting<String> check_point_policy = setting( "dbms.checkpoint", STRING, "periodically" );

@Description( "Configures the transaction interval between check-points. The database will not check-point more " + @Description( "Configures the transaction interval between check-points. The database will not check-point more " +
"often than this (unless check pointing is triggered by a different event), but might check-point " + "often than this (unless check pointing is triggered by a different event), but might check-point " +
"less often than this interval, if performing a check-point takes longer time than the configured " + "less often than this interval, if performing a check-point takes longer time than the configured " +
Expand Down
Expand Up @@ -642,7 +642,7 @@ private NeoStoreTransactionLogModule buildTransactionLogs(
final LogicalTransactionStore logicalTransactionStore = final LogicalTransactionStore logicalTransactionStore =
new PhysicalLogicalTransactionStore( logFile, transactionMetadataCache, logEntryReader ); new PhysicalLogicalTransactionStore( logFile, transactionMetadataCache, logEntryReader );


CheckPointThreshold threshold = CheckPointThreshold.createThreshold( config, clock ); CheckPointThreshold threshold = CheckPointThreshold.createThreshold( config, clock, logProvider );


final CheckPointerImpl checkPointer = new CheckPointerImpl( final CheckPointerImpl checkPointer = new CheckPointerImpl(
transactionIdStore, threshold, storageEngine, logPruning, appender, databaseHealth, logProvider, transactionIdStore, threshold, storageEngine, logPruning, appender, databaseHealth, logProvider,
Expand Down
Expand Up @@ -20,12 +20,15 @@
package org.neo4j.kernel.impl.transaction.log.checkpoint; package org.neo4j.kernel.impl.transaction.log.checkpoint;


import java.time.Clock; import java.time.Clock;
import java.util.NoSuchElementException;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.stream.LongStream; import java.util.stream.LongStream;
import java.util.stream.Stream; import java.util.stream.Stream;


import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Config;
import org.neo4j.logging.LogProvider;

import static org.neo4j.graphdb.factory.GraphDatabaseSettings.check_point_policy;




/** /**
Expand Down Expand Up @@ -72,19 +75,30 @@ public interface CheckPointThreshold
*/ */
LongStream checkFrequencyMillis(); LongStream checkFrequencyMillis();


static CheckPointThreshold createThreshold( Config config, Clock clock ) /**
* Create and configure a {@link CheckPointThreshold} based on the given configurations.
*/
static CheckPointThreshold createThreshold( Config config, Clock clock, LogProvider logProvider )
{ {

String policyName = config.get( check_point_policy );
int txThreshold = config.get( GraphDatabaseSettings.check_point_interval_tx ); CheckPointThresholdPolicy policy;
final CountCommittedTransactionThreshold countCommittedTransactionThreshold = try
new CountCommittedTransactionThreshold( txThreshold ); {

policy = CheckPointThresholdPolicy.loadPolicy( policyName );
long timeMillisThreshold = config.get( GraphDatabaseSettings.check_point_interval_time ).toMillis(); }
TimeCheckPointThreshold timeCheckPointThreshold = new TimeCheckPointThreshold( timeMillisThreshold, clock ); catch ( NoSuchElementException e )

{
return or( countCommittedTransactionThreshold, timeCheckPointThreshold ); logProvider.getLog( CheckPointThreshold.class ).warn(
"Could not load check point policy '" + check_point_policy.name() + "=" + policyName + "'. " +
"Using default policy instead.", e );
policy = new PeriodicThresholdPolicy();
}
return policy.createThreshold( config, clock, logProvider );
} }


/**
* Create a new {@link CheckPointThreshold} which will trigger if any of the given thresholds triggers.
*/
static CheckPointThreshold or( final CheckPointThreshold... thresholds ) static CheckPointThreshold or( final CheckPointThreshold... thresholds )
{ {
return new CheckPointThreshold() return new CheckPointThreshold()
Expand Down
@@ -0,0 +1,67 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.transaction.log.checkpoint;

import java.time.Clock;
import java.util.NoSuchElementException;

import org.neo4j.helpers.Service;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.logging.LogProvider;

/**
* The {@link CheckPointThresholdPolicy} specifies the overall <em>type</em> of threshold that should be used for
* deciding when to check point.
*
* The is determined by the {@link org.neo4j.graphdb.factory.GraphDatabaseSettings#check_point_policy} setting, and
* based on this, the concrete policies are loaded and used to
* {@link CheckPointThreshold#createThreshold(Config, Clock, org.neo4j.logging.LogProvider) create} the final and fully configured check point
* thresholds.
*/
public abstract class CheckPointThresholdPolicy extends Service
{
/**
* Create a new instance of a service implementation identified with the
* specified key(s).
*
* @param key the main key for identifying this service implementation
* @param altKeys alternative spellings of the identifier of this service
*/
protected CheckPointThresholdPolicy( String key, String... altKeys )
{
super( key, altKeys );
}

/**
* Load the {@link CheckPointThresholdPolicy} by the given name.
*
* @throws NoSuchElementException if the policy was not found.
*/
public static CheckPointThresholdPolicy loadPolicy( String policyName ) throws NoSuchElementException
{
return Service.load( CheckPointThresholdPolicy.class, policyName );
}

/**
* Create a {@link CheckPointThreshold} instance based on this policy and the given configurations.
*/
public abstract CheckPointThreshold createThreshold( Config config, Clock clock,
LogProvider logProvider );
}
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.transaction.log.checkpoint;

import java.time.Clock;

import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.helpers.Service;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.logging.LogProvider;

import static org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointThreshold.or;

/**
* The {@code periodic} check point threshold policy uses the {@link GraphDatabaseSettings#check_point_interval_time}
* and {@link GraphDatabaseSettings#check_point_interval_tx} to decide when check points processes should be started.
*/
@Service.Implementation( CheckPointThresholdPolicy.class )
public class PeriodicThresholdPolicy extends CheckPointThresholdPolicy
{
public PeriodicThresholdPolicy()
{
super( "periodically", "periodic", "interval" );
}

@Override
public CheckPointThreshold createThreshold( Config config, Clock clock, LogProvider logProvider )
{
int txThreshold = config.get( GraphDatabaseSettings.check_point_interval_tx );
final CountCommittedTransactionThreshold countCommittedTransactionThreshold =
new CountCommittedTransactionThreshold( txThreshold );

long timeMillisThreshold = config.get( GraphDatabaseSettings.check_point_interval_time ).toMillis();
TimeCheckPointThreshold timeCheckPointThreshold = new TimeCheckPointThreshold( timeMillisThreshold, clock );

return or( countCommittedTransactionThreshold, timeCheckPointThreshold );
}
}
@@ -0,0 +1 @@
org.neo4j.kernel.impl.transaction.log.checkpoint.PeriodicThresholdPolicy
Expand Up @@ -29,6 +29,8 @@


import org.neo4j.graphdb.factory.GraphDatabaseSettings; import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Config;
import org.neo4j.logging.LogProvider;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.time.Clocks; import org.neo4j.time.Clocks;
import org.neo4j.time.FakeClock; import org.neo4j.time.FakeClock;


Expand All @@ -40,12 +42,12 @@
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.neo4j.helpers.collection.MapUtil.stringMap; import static org.neo4j.helpers.collection.MapUtil.stringMap;
import static org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointThreshold.createThreshold;


public class CheckPointThresholdTest public class CheckPointThresholdTest
{ {
private FakeClock clock;
private Config config; private Config config;
private FakeClock clock;
private LogProvider logProvider;
private Integer intervalTx; private Integer intervalTx;
private Duration intervalTime; private Duration intervalTime;
private Consumer<String> notTriggered; private Consumer<String> notTriggered;
Expand All @@ -55,8 +57,9 @@ public class CheckPointThresholdTest
@Before @Before
public void setUp() public void setUp()
{ {
clock = Clocks.fakeClock();
config = Config.empty(); config = Config.empty();
clock = Clocks.fakeClock();
logProvider = NullLogProvider.getInstance();
intervalTx = config.get( GraphDatabaseSettings.check_point_interval_tx ); intervalTx = config.get( GraphDatabaseSettings.check_point_interval_tx );
intervalTime = config.get( GraphDatabaseSettings.check_point_interval_time ); intervalTime = config.get( GraphDatabaseSettings.check_point_interval_time );
triggerConsumer = new LinkedBlockingQueue<>(); triggerConsumer = new LinkedBlockingQueue<>();
Expand All @@ -74,6 +77,11 @@ private void withIntervalTx( int count )
config.augment( stringMap( GraphDatabaseSettings.check_point_interval_tx.name(), String.valueOf( count ) ) ); config.augment( stringMap( GraphDatabaseSettings.check_point_interval_tx.name(), String.valueOf( count ) ) );
} }


private CheckPointThreshold createThreshold()
{
return CheckPointThreshold.createThreshold( config, clock, logProvider );
}

private void verifyTriggered( String reason ) private void verifyTriggered( String reason )
{ {
assertThat( triggerConsumer.poll(), containsString( reason ) ); assertThat( triggerConsumer.poll(), containsString( reason ) );
Expand All @@ -87,7 +95,7 @@ private void verifyNoMoreTriggers()
@Test @Test
public void mustCreateThresholdThatTriggersAfterTransactionCount() throws Exception public void mustCreateThresholdThatTriggersAfterTransactionCount() throws Exception
{ {
CheckPointThreshold threshold = createThreshold( config, clock ); CheckPointThreshold threshold = createThreshold();
threshold.initialize( 1 ); // Initialise at transaction id offset by 1. threshold.initialize( 1 ); // Initialise at transaction id offset by 1.


// False because we're not yet at threshold. // False because we're not yet at threshold.
Expand All @@ -103,7 +111,7 @@ public void mustCreateThresholdThatTriggersAfterTransactionCount() throws Except
@Test @Test
public void mustCreateThresholdThatTriggersAfterTime() throws Exception public void mustCreateThresholdThatTriggersAfterTime() throws Exception
{ {
CheckPointThreshold threshold = createThreshold( config, clock ); CheckPointThreshold threshold = createThreshold();
threshold.initialize( 1 ); threshold.initialize( 1 );
// Skip the initial wait period. // Skip the initial wait period.
clock.forward( intervalTime.toMillis(), MILLISECONDS ); clock.forward( intervalTime.toMillis(), MILLISECONDS );
Expand All @@ -122,7 +130,7 @@ public void mustCreateThresholdThatTriggersAfterTime() throws Exception
public void mustNotTriggerBeforeTimeWithTooFewCommittedTransactions() throws Throwable public void mustNotTriggerBeforeTimeWithTooFewCommittedTransactions() throws Throwable
{ {
withIntervalTime( "100ms" ); withIntervalTime( "100ms" );
CheckPointThreshold threshold = createThreshold( config, clock ); CheckPointThreshold threshold = createThreshold();
threshold.initialize( 2 ); threshold.initialize( 2 );


clock.forward( 50, MILLISECONDS ); clock.forward( 50, MILLISECONDS );
Expand All @@ -133,7 +141,7 @@ public void mustNotTriggerBeforeTimeWithTooFewCommittedTransactions() throws Thr
public void mustTriggerWhenTimeThresholdIsReachedAndThereAreCommittedTransactions() throws Throwable public void mustTriggerWhenTimeThresholdIsReachedAndThereAreCommittedTransactions() throws Throwable
{ {
withIntervalTime( "100ms" ); withIntervalTime( "100ms" );
CheckPointThreshold threshold = createThreshold( config, clock ); CheckPointThreshold threshold = createThreshold();
threshold.initialize( 2 ); threshold.initialize( 2 );


clock.forward( 199, MILLISECONDS ); clock.forward( 199, MILLISECONDS );
Expand All @@ -147,7 +155,7 @@ public void mustTriggerWhenTimeThresholdIsReachedAndThereAreCommittedTransaction
public void mustNotTriggerWhenTimeThresholdIsReachedAndThereAreNoCommittedTransactions() throws Throwable public void mustNotTriggerWhenTimeThresholdIsReachedAndThereAreNoCommittedTransactions() throws Throwable
{ {
withIntervalTime( "100ms" ); withIntervalTime( "100ms" );
CheckPointThreshold threshold = createThreshold( config, clock ); CheckPointThreshold threshold = createThreshold();
threshold.initialize( 42 ); threshold.initialize( 42 );


clock.forward( 199, MILLISECONDS ); clock.forward( 199, MILLISECONDS );
Expand All @@ -160,7 +168,7 @@ public void mustNotTriggerWhenTimeThresholdIsReachedAndThereAreNoCommittedTransa
public void mustNotTriggerPastTimeThresholdSinceLastCheckpointWithNoNewTransactions() public void mustNotTriggerPastTimeThresholdSinceLastCheckpointWithNoNewTransactions()
{ {
withIntervalTime( "100ms" ); withIntervalTime( "100ms" );
CheckPointThreshold threshold = createThreshold( config, clock ); CheckPointThreshold threshold = createThreshold();
threshold.initialize( 2 ); threshold.initialize( 2 );


clock.forward( 199, MILLISECONDS ); clock.forward( 199, MILLISECONDS );
Expand All @@ -175,7 +183,7 @@ public void mustNotTriggerPastTimeThresholdSinceLastCheckpointWithNoNewTransacti
public void mustTriggerPastTimeThresholdSinceLastCheckpointWithNewTransactions() public void mustTriggerPastTimeThresholdSinceLastCheckpointWithNewTransactions()
{ {
withIntervalTime( "100ms" ); withIntervalTime( "100ms" );
CheckPointThreshold threshold = createThreshold( config, clock ); CheckPointThreshold threshold = createThreshold();
threshold.initialize( 2 ); threshold.initialize( 2 );


clock.forward( 199, MILLISECONDS ); clock.forward( 199, MILLISECONDS );
Expand All @@ -191,7 +199,7 @@ public void mustTriggerPastTimeThresholdSinceLastCheckpointWithNewTransactions()
public void mustNotTriggerOnTransactionCountWhenThereAreNoNewTransactions() throws Throwable public void mustNotTriggerOnTransactionCountWhenThereAreNoNewTransactions() throws Throwable
{ {
withIntervalTx( 2 ); withIntervalTx( 2 );
CheckPointThreshold threshold = createThreshold( config, clock ); CheckPointThreshold threshold = createThreshold();
threshold.initialize( 2 ); threshold.initialize( 2 );


assertFalse( threshold.isCheckPointingNeeded( 2, notTriggered ) ); assertFalse( threshold.isCheckPointingNeeded( 2, notTriggered ) );
Expand All @@ -201,7 +209,7 @@ public void mustNotTriggerOnTransactionCountWhenThereAreNoNewTransactions() thro
public void mustNotTriggerOnTransactionCountWhenCountIsBellowThreshold() throws Throwable public void mustNotTriggerOnTransactionCountWhenCountIsBellowThreshold() throws Throwable
{ {
withIntervalTx( 2 ); withIntervalTx( 2 );
CheckPointThreshold threshold = createThreshold( config, clock ); CheckPointThreshold threshold = createThreshold();
threshold.initialize( 2 ); threshold.initialize( 2 );


assertFalse( threshold.isCheckPointingNeeded( 3, notTriggered ) ); assertFalse( threshold.isCheckPointingNeeded( 3, notTriggered ) );
Expand All @@ -211,7 +219,7 @@ public void mustNotTriggerOnTransactionCountWhenCountIsBellowThreshold() throws
public void mustTriggerOnTransactionCountWhenCountIsAtThreshold() throws Throwable public void mustTriggerOnTransactionCountWhenCountIsAtThreshold() throws Throwable
{ {
withIntervalTx( 2 ); withIntervalTx( 2 );
CheckPointThreshold threshold = createThreshold( config, clock ); CheckPointThreshold threshold = createThreshold();
threshold.initialize( 2 ); threshold.initialize( 2 );


assertTrue( threshold.isCheckPointingNeeded( 4, triggered ) ); assertTrue( threshold.isCheckPointingNeeded( 4, triggered ) );
Expand All @@ -223,7 +231,7 @@ public void mustTriggerOnTransactionCountWhenCountIsAtThreshold() throws Throwab
public void mustNotTriggerOnTransactionCountAtThresholdIfCheckPointAlreadyHappened() throws Throwable public void mustNotTriggerOnTransactionCountAtThresholdIfCheckPointAlreadyHappened() throws Throwable
{ {
withIntervalTx( 2 ); withIntervalTx( 2 );
CheckPointThreshold threshold = createThreshold( config, clock ); CheckPointThreshold threshold = createThreshold();
threshold.initialize( 2 ); threshold.initialize( 2 );


threshold.checkPointHappened( 4 ); threshold.checkPointHappened( 4 );
Expand All @@ -234,7 +242,7 @@ public void mustNotTriggerOnTransactionCountAtThresholdIfCheckPointAlreadyHappen
public void mustNotTriggerWhenTransactionCountIsWithinThresholdSinceLastTrigger() throws Exception public void mustNotTriggerWhenTransactionCountIsWithinThresholdSinceLastTrigger() throws Exception
{ {
withIntervalTx( 2 ); withIntervalTx( 2 );
CheckPointThreshold threshold = createThreshold( config, clock ); CheckPointThreshold threshold = createThreshold();
threshold.initialize( 2 ); threshold.initialize( 2 );


threshold.checkPointHappened( 4 ); threshold.checkPointHappened( 4 );
Expand All @@ -245,7 +253,7 @@ public void mustNotTriggerWhenTransactionCountIsWithinThresholdSinceLastTrigger(
public void mustTriggerOnTransactionCountWhenCountIsAtThresholdSinceLastCheckPoint() throws Throwable public void mustTriggerOnTransactionCountWhenCountIsAtThresholdSinceLastCheckPoint() throws Throwable
{ {
withIntervalTx( 2 ); withIntervalTx( 2 );
CheckPointThreshold threshold = createThreshold( config, clock ); CheckPointThreshold threshold = createThreshold();
threshold.initialize( 2 ); threshold.initialize( 2 );


threshold.checkPointHappened( 4 ); threshold.checkPointHappened( 4 );
Expand All @@ -259,9 +267,9 @@ public void mustTriggerOnTransactionCountWhenCountIsAtThresholdSinceLastCheckPoi
public void timeBasedThresholdMustSuggestSchedulingFrequency() throws Exception public void timeBasedThresholdMustSuggestSchedulingFrequency() throws Exception
{ {
long defaultInterval = intervalTime.toMillis(); long defaultInterval = intervalTime.toMillis();
assertThat( createThreshold( config, clock ).checkFrequencyMillis().min().getAsLong(), is( defaultInterval ) ); assertThat( createThreshold().checkFrequencyMillis().min().getAsLong(), is( defaultInterval ) );


withIntervalTime( "100ms" ); withIntervalTime( "100ms" );
assertThat( createThreshold( config, clock ).checkFrequencyMillis().min().getAsLong(), is( 100L ) ); assertThat( createThreshold().checkFrequencyMillis().min().getAsLong(), is( 100L ) );
} }
} }

0 comments on commit d423c75

Please sign in to comment.