Skip to content

Commit

Permalink
Add rudementary volumetric check-point triggering.
Browse files Browse the repository at this point in the history
Currently it only *starts* a check-point process as soon as it looks
like there are logs that could be pruned.

In the future it would be interesting to have it use statistics to try
to predict the frequency at which log prunings happen, and adjust the
check-point frequency to match.
  • Loading branch information
chrisvest committed Dec 1, 2017
1 parent e65851b commit 57f43d1
Show file tree
Hide file tree
Showing 17 changed files with 316 additions and 84 deletions.
Expand Up @@ -344,8 +344,13 @@ public class GraphDatabaseSettings implements LoadableConfig
@Description( "Configures the general policy for when check-points should occur. The default policy is to " +
"check-point 'periodically', as specified by the 'dbms.checkpoint.interval.tx' and " +
"'dbms.checkpoint.interval.time' settings. " +
"An alternative policy is to check-point 'continuously', which will ignore those settings and run " +
"the check-point process all the time." )
"There are two alternative policies: " +
"The first is to check-point 'continuously', which will ignore those settings and run the " +
"check-point process all the time. " +
"The second (Enterprise only) alternative policy is the 'volumetric' policy, which makes a " +
"best-effort at check-pointing often enough so that the database doesn't get too far behind on " +
"deleting old transaction logs in accordance with the 'dbms.tx_log.rotation.retention_policy' " +
"setting." )
public static final Setting<String> check_point_policy = setting( "dbms.checkpoint", STRING, "periodically" );

@Description( "Configures the transaction interval between check-points. The database will not check-point more " +
Expand Down
Expand Up @@ -642,7 +642,7 @@ private NeoStoreTransactionLogModule buildTransactionLogs(
final LogicalTransactionStore logicalTransactionStore =
new PhysicalLogicalTransactionStore( logFile, transactionMetadataCache, logEntryReader );

CheckPointThreshold threshold = CheckPointThreshold.createThreshold( config, clock, logProvider );
CheckPointThreshold threshold = CheckPointThreshold.createThreshold( config, clock, logPruning, logProvider );

final CheckPointerImpl checkPointer = new CheckPointerImpl(
transactionIdStore, threshold, storageEngine, logPruning, appender, databaseHealth, logProvider,
Expand Down
Expand Up @@ -26,7 +26,7 @@
* Abstract class that implement common logic for making the consumer to consume the description of this
* threshold if {@link #thresholdReached(long)} is true.
*/
abstract class AbstractCheckPointThreshold implements CheckPointThreshold
public abstract class AbstractCheckPointThreshold implements CheckPointThreshold
{
private final String description;

Expand Down
Expand Up @@ -26,6 +26,7 @@
import java.util.stream.Stream;

import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.transaction.log.pruning.LogPruning;
import org.neo4j.logging.LogProvider;

import static org.neo4j.graphdb.factory.GraphDatabaseSettings.check_point_policy;
Expand Down Expand Up @@ -78,7 +79,8 @@ public interface CheckPointThreshold
/**
* Create and configure a {@link CheckPointThreshold} based on the given configurations.
*/
static CheckPointThreshold createThreshold( Config config, Clock clock, LogProvider logProvider )
static CheckPointThreshold createThreshold(
Config config, Clock clock, LogPruning logPruning, LogProvider logProvider )
{
String policyName = config.get( check_point_policy );
CheckPointThresholdPolicy policy;
Expand All @@ -93,7 +95,7 @@ static CheckPointThreshold createThreshold( Config config, Clock clock, LogProvi
"Using default policy instead.", e );
policy = new PeriodicThresholdPolicy();
}
return policy.createThreshold( config, clock, logProvider );
return policy.createThreshold( config, clock, logPruning, logProvider );
}

/**
Expand Down
Expand Up @@ -24,6 +24,7 @@

import org.neo4j.helpers.Service;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.transaction.log.pruning.LogPruning;
import org.neo4j.logging.LogProvider;

/**
Expand All @@ -32,8 +33,8 @@
*
* The is determined by the {@link org.neo4j.graphdb.factory.GraphDatabaseSettings#check_point_policy} setting, and
* based on this, the concrete policies are loaded and used to
* {@link CheckPointThreshold#createThreshold(Config, Clock, org.neo4j.logging.LogProvider) create} the final and fully configured check point
* thresholds.
* {@link CheckPointThreshold#createThreshold(Config, Clock, LogPruning, LogProvider) create} the final and fully
* configured check point thresholds.
*/
public abstract class CheckPointThresholdPolicy extends Service
{
Expand Down Expand Up @@ -62,6 +63,6 @@ public static CheckPointThresholdPolicy loadPolicy( String policyName ) throws N
/**
* Create a {@link CheckPointThreshold} instance based on this policy and the given configurations.
*/
public abstract CheckPointThreshold createThreshold( Config config, Clock clock,
LogProvider logProvider );
public abstract CheckPointThreshold createThreshold(
Config config, Clock clock, LogPruning logPruning, LogProvider logProvider );
}
Expand Up @@ -23,7 +23,7 @@

class ContinuousCheckPointThreshold extends AbstractCheckPointThreshold
{
public ContinuousCheckPointThreshold()
ContinuousCheckPointThreshold()
{
super( "continuous threshold" );
}
Expand Down
Expand Up @@ -23,6 +23,7 @@

import org.neo4j.helpers.Service;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.transaction.log.pruning.LogPruning;
import org.neo4j.logging.LogProvider;

@Service.Implementation( CheckPointThresholdPolicy.class )
Expand All @@ -34,7 +35,8 @@ public ContinuousThresholdPolicy()
}

@Override
public CheckPointThreshold createThreshold( Config config, Clock clock, LogProvider logProvider )
public CheckPointThreshold createThreshold(
Config config, Clock clock, LogPruning logPruning, LogProvider logProvider )
{
return new ContinuousCheckPointThreshold();
}
Expand Down
Expand Up @@ -25,7 +25,7 @@ class CountCommittedTransactionThreshold extends AbstractCheckPointThreshold

private volatile long nextTransactionIdTarget;

public CountCommittedTransactionThreshold( int notificationThreshold )
CountCommittedTransactionThreshold( int notificationThreshold )
{
super( "tx count threshold" );
this.notificationThreshold = notificationThreshold;
Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.helpers.Service;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.transaction.log.pruning.LogPruning;
import org.neo4j.logging.LogProvider;

import static org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointThreshold.or;
Expand All @@ -41,7 +42,8 @@ public PeriodicThresholdPolicy()
}

@Override
public CheckPointThreshold createThreshold( Config config, Clock clock, LogProvider logProvider )
public CheckPointThreshold createThreshold(
Config config, Clock clock, LogPruning logPruning, LogProvider logProvider )
{
int txThreshold = config.get( GraphDatabaseSettings.check_point_interval_tx );
final CountCommittedTransactionThreshold countCommittedTransactionThreshold =
Expand Down
Expand Up @@ -31,7 +31,7 @@ class TimeCheckPointThreshold extends AbstractCheckPointThreshold
private final long timeMillisThreshold;
private final Clock clock;

public TimeCheckPointThreshold( long thresholdMillis, Clock clock )
TimeCheckPointThreshold( long thresholdMillis, Clock clock )
{
super( "time threshold" );
this.timeMillisThreshold = thresholdMillis;
Expand Down
Expand Up @@ -37,4 +37,18 @@ public interface LogPruning
* Otherwise {@code false} if we are pretty sure that we don't need to prune any logs right now.
*/
boolean mightHaveLogsToPrune();

LogPruning NO_PRUNING = new LogPruning()
{
@Override
public void pruneLogs( long currentVersion )
{
}

@Override
public boolean mightHaveLogsToPrune()
{
return false;
}
};
}
Expand Up @@ -19,84 +19,16 @@
*/
package org.neo4j.kernel.impl.transaction.log.checkpoint;

import org.junit.Before;
import org.junit.Test;

import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;

import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.logging.LogProvider;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.time.Clocks;
import org.neo4j.time.FakeClock;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.neo4j.helpers.collection.MapUtil.stringMap;

public class CheckPointThresholdTest
public class CheckPointThresholdTest extends CheckPointThresholdTestSupport
{
private Config config;
private FakeClock clock;
private LogProvider logProvider;
private Integer intervalTx;
private Duration intervalTime;
private Consumer<String> notTriggered;
private BlockingQueue<String> triggerConsumer;
private Consumer<String> triggered;

@Before
public void setUp()
{
config = Config.empty();
clock = Clocks.fakeClock();
logProvider = NullLogProvider.getInstance();
intervalTx = config.get( GraphDatabaseSettings.check_point_interval_tx );
intervalTime = config.get( GraphDatabaseSettings.check_point_interval_time );
triggerConsumer = new LinkedBlockingQueue<>();
triggered = triggerConsumer::offer;
notTriggered = s -> fail( "Should not have triggered: " + s );
}

private void withPolicy( String policy )
{
config.augment( stringMap( GraphDatabaseSettings.check_point_policy.name(), policy ) );
}

private void withIntervalTime( String time )
{
config.augment( stringMap( GraphDatabaseSettings.check_point_interval_time.name(), time ) );
}

private void withIntervalTx( int count )
{
config.augment( stringMap( GraphDatabaseSettings.check_point_interval_tx.name(), String.valueOf( count ) ) );
}

private CheckPointThreshold createThreshold()
{
return CheckPointThreshold.createThreshold( config, clock, logProvider );
}

private void verifyTriggered( String reason )
{
assertThat( triggerConsumer.poll(), containsString( reason ) );
}

private void verifyNoMoreTriggers()
{
assertTrue( triggerConsumer.isEmpty() );
}

@Test
public void mustCreateThresholdThatTriggersAfterTransactionCount() throws Exception
{
Expand Down
@@ -0,0 +1,98 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.transaction.log.checkpoint;

import org.junit.Before;

import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;

import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.transaction.log.pruning.LogPruning;
import org.neo4j.logging.LogProvider;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.time.Clocks;
import org.neo4j.time.FakeClock;

import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.neo4j.helpers.collection.MapUtil.stringMap;

public class CheckPointThresholdTestSupport
{
protected Config config;
protected FakeClock clock;
protected LogPruning logPruning;
protected LogProvider logProvider;
protected Integer intervalTx;
protected Duration intervalTime;
protected Consumer<String> notTriggered;
protected BlockingQueue<String> triggerConsumer;
protected Consumer<String> triggered;

@Before
public void setUp()
{
config = Config.empty();
clock = Clocks.fakeClock();
logPruning = LogPruning.NO_PRUNING;
logProvider = NullLogProvider.getInstance();
intervalTx = config.get( GraphDatabaseSettings.check_point_interval_tx );
intervalTime = config.get( GraphDatabaseSettings.check_point_interval_time );
triggerConsumer = new LinkedBlockingQueue<>();
triggered = triggerConsumer::offer;
notTriggered = s -> fail( "Should not have triggered: " + s );
}

protected void withPolicy( String policy )
{
config.augment( stringMap( GraphDatabaseSettings.check_point_policy.name(), policy ) );
}

protected void withIntervalTime( String time )
{
config.augment( stringMap( GraphDatabaseSettings.check_point_interval_time.name(), time ) );
}

protected void withIntervalTx( int count )
{
config.augment( stringMap( GraphDatabaseSettings.check_point_interval_tx.name(), String.valueOf( count ) ) );
}

protected CheckPointThreshold createThreshold()
{
return CheckPointThreshold.createThreshold( config, clock, logPruning, logProvider );
}

protected void verifyTriggered( String reason )
{
assertThat( triggerConsumer.poll(), containsString( reason ) );
}

protected void verifyNoMoreTriggers()
{
assertTrue( triggerConsumer.isEmpty() );
}
}

0 comments on commit 57f43d1

Please sign in to comment.