Skip to content

Commit

Permalink
Move check pointing in its own thread
Browse files Browse the repository at this point in the history
Check pointing can be triggered based on time and number of
transactions, default values are 5 minutes and 100_000 txs
respectively.

Introduce CheckPointTracer in order to trace check point events, which
are now disconnected from LogAppendEvents.
  • Loading branch information
davidegrohmann committed Jun 22, 2015
1 parent 001de30 commit 6d43d73
Show file tree
Hide file tree
Showing 29 changed files with 1,098 additions and 156 deletions.
16 changes: 16 additions & 0 deletions community/function/src/main/java/org/neo4j/function/Consumers.java
Expand Up @@ -51,4 +51,20 @@ public static <T> Consumer<T> noop()
{
return (Consumer<T>) NOOP;
}


public static LongConsumer seq( final LongConsumer... consumers )
{
return new LongConsumer()
{
@Override
public void accept( long input )
{
for ( LongConsumer consumer : consumers )
{
consumer.accept( input );
}
}
};
}
}
Expand Up @@ -142,8 +142,26 @@ public abstract class GraphDatabaseSettings
@Description( "Maximum number of history files for the internal log." )
public static final Setting<Integer> store_internal_log_max_archives = setting("store.internal_log.max_archives", INTEGER, "7", min(1) );

@Description( "Maximum number of transactions to trigger a check point" )
public static final Setting<Integer> store_internal_check_point_max_txs = setting( "store.internal_checkpoint.max_txs", INTEGER, "200000", min(1) );
@Description( "Configures the transaction interval between check-points. The database will not check-point more " +
"often than this (unless check pointing is triggered by a different event), but might check-point " +
"less often than this interval, if performing a check-point takes longer time than the configured " +
"interval. A check-point is a point in the transaction logs, from which recovery would start from. " +
"Longer check-point intervals typically means that recovery will take longer to complete in case " +
"of a crash. On the other hand, a longer check-point interval can also reduce the I/O load that " +
"the database places on the system, as each check-point implies a flushing and forcing of all the " +
"store files. The default is '100000' for a check-point every 100000 transactions." )
public static final Setting<Integer> check_point_interval_tx = setting( "dbms.checkpoint.interval.tx", INTEGER, "100000", min(1) );

@Description( "Configures the time interval between check-points. The database will not check-point more often " +
"than this (unless check pointing is triggered by a different event), but might check-point less " +
"often than this interval, if performing a check-point takes longer time than the configured " +
"interval. A check-point is a point in the transaction logs, from which recovery would start from. " +
"Longer check-point intervals typically means that recovery will take longer to complete in case " +
"of a crash. On the other hand, a longer check-point interval can also reduce the I/O load that " +
"the database places on the system, as each check-point implies a flushing and forcing of all the " +
"store files. The default is '5m' for a check-point every 5 minutes. Other supported units are 's' " +
"for seconds, and 'ms' for milliseconds." )
public static final Setting<Long> check_point_interval_time = setting( "dbms.checkpoint.interval.time", DURATION, "5m" );

// Indexing
@Description("Controls the auto indexing feature for nodes. Setting it to `false` shuts it down, " +
Expand Down
Expand Up @@ -27,16 +27,20 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;

import org.neo4j.function.Consumers;
import org.neo4j.function.LongConsumer;
import org.neo4j.function.Supplier;
import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.graphdb.config.Setting;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.graphdb.index.IndexImplementation;
import org.neo4j.graphdb.index.IndexProviders;
import org.neo4j.helpers.Clock;
import org.neo4j.helpers.Exceptions;
import org.neo4j.helpers.Provider;
import org.neo4j.helpers.collection.Visitor;
Expand Down Expand Up @@ -117,9 +121,13 @@
import org.neo4j.kernel.impl.transaction.log.ReadableVersionableLogChannel;
import org.neo4j.kernel.impl.transaction.log.TransactionAppender;
import org.neo4j.kernel.impl.transaction.log.TransactionMetadataCache;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointTransactionCountThreshold;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointScheduler;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointThreshold;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointThresholds;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointerImpl;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CountCommittedTransactionThreshold;
import org.neo4j.kernel.impl.transaction.log.checkpoint.TimeCheckPointThreshold;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntry;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReaderFactory;
Expand Down Expand Up @@ -470,7 +478,7 @@ public void transactionRecovered( long txId )
indexingModule.indexingService(), cacheModule.schemaCache() );

TransactionLogModule transactionLogModule =
buildTransactionLogs( storeDir, config, logProvider, indexingModule.labelScanStore(),
buildTransactionLogs( storeDir, config, logProvider, scheduler, indexingModule.labelScanStore(),
fs, neoStoreModule.neoStore(), cacheModule.cacheAccess(), indexingModule.indexingService(),
indexProviders.values() );

Expand Down Expand Up @@ -728,6 +736,7 @@ public StoreReadLayer storeLayer()
}

private TransactionLogModule buildTransactionLogs( File storeDir, Config config, LogProvider logProvider,
JobScheduler scheduler,
LabelScanStore labelScanStore,
FileSystemAbstraction fileSystemAbstraction,
NeoStore neoStore, CacheAccessBackDoor cacheAccess,
Expand Down Expand Up @@ -790,19 +799,35 @@ public long getTimestampForVersion( long version ) throws IOException
final LogRotation logRotation = new LogRotationImpl( monitors.newMonitor( LogRotation.Monitor.class ),
logFile, kernelHealth, logProvider );

final CheckPointTransactionCountThreshold transactionCountThreshold = new CheckPointTransactionCountThreshold(
config.get( GraphDatabaseSettings.store_internal_check_point_max_txs ) );
final CheckPointer checkPointing = new CheckPointerImpl( logFile, neoStore, transactionCountThreshold,
storeFlusher, logPruning, kernelHealth, logProvider );
int txThreshold = config.get( GraphDatabaseSettings.check_point_interval_tx );
final CountCommittedTransactionThreshold countCommittedTransactionThreshold =
new CountCommittedTransactionThreshold( txThreshold );

final TransactionAppender appender = new BatchingTransactionAppender( logFile, logRotation, checkPointing,
transactionCountThreshold, transactionMetadataCache, neoStore, legacyIndexTransactionOrdering, kernelHealth );
final LogicalTransactionStore logicalTransactionStore = new PhysicalLogicalTransactionStore( logFile,
transactionMetadataCache );
long timeMillisThreshold = config.get( GraphDatabaseSettings.check_point_interval_time );
TimeCheckPointThreshold timeCheckPointThreshold = new TimeCheckPointThreshold( timeMillisThreshold, Clock.SYSTEM_CLOCK );

CheckPointThreshold threshold =
CheckPointThresholds.or( countCommittedTransactionThreshold, timeCheckPointThreshold );

final CheckPointerImpl checkPointer = new CheckPointerImpl( neoStore, threshold, storeFlusher, logPruning,
kernelHealth, logProvider, tracers.checkPointTracer, logFile );

long recurringPeriod = Math.min( timeMillisThreshold, TimeUnit.SECONDS.toMillis( 10 ) );
CheckPointScheduler checkPointScheduler = new CheckPointScheduler( checkPointer, scheduler, recurringPeriod );

LongConsumer transactionCommittedConsumer =
Consumers.seq( countCommittedTransactionThreshold, timeCheckPointThreshold );

final TransactionAppender appender = new BatchingTransactionAppender( logFile, logRotation,
transactionCommittedConsumer, transactionMetadataCache, neoStore, legacyIndexTransactionOrdering,
kernelHealth );
final LogicalTransactionStore logicalTransactionStore =
new PhysicalLogicalTransactionStore( logFile, transactionMetadataCache );

life.add( logFile );
life.add( appender );
life.add( checkPointing );
life.add( checkPointer );
life.add( checkPointScheduler );

return new TransactionLogModule()
{
Expand Down Expand Up @@ -851,7 +876,7 @@ public LogRotation logRotation()
@Override
public CheckPointer checkPointing()
{
return checkPointing;
return checkPointer;
}

@Override
Expand Down
Expand Up @@ -30,7 +30,6 @@
import org.neo4j.helpers.ThisShouldNotHappenError;
import org.neo4j.kernel.KernelHealth;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryWriterV1;
import org.neo4j.kernel.impl.transaction.log.rotation.LogRotation;
import org.neo4j.kernel.impl.transaction.tracing.LogAppendEvent;
Expand Down Expand Up @@ -82,7 +81,6 @@ public void unpark()
private final TransactionMetadataCache transactionMetadataCache;
private final LogFile logFile;
private final LogRotation logRotation;
private final CheckPointer checkPointer;
private final LongConsumer transactionCommitConsumer;
private final TransactionIdStore transactionIdStore;
private final LogPositionMarker positionMarker = new LogPositionMarker();
Expand All @@ -93,14 +91,13 @@ public void unpark()
private TransactionLogWriter transactionLogWriter;
private IndexCommandDetector indexCommandDetector;

public BatchingTransactionAppender( LogFile logFile, LogRotation logRotation, CheckPointer checkPointer,
public BatchingTransactionAppender( LogFile logFile, LogRotation logRotation,
LongConsumer transactionCommitConsumer, TransactionMetadataCache transactionMetadataCache,
TransactionIdStore transactionIdStore, IdOrderingQueue legacyIndexTransactionOrdering,
KernelHealth kernelHealth )
{
this.logFile = logFile;
this.logRotation = logRotation;
this.checkPointer = checkPointer;
this.transactionCommitConsumer = transactionCommitConsumer;
this.transactionIdStore = transactionIdStore;
this.legacyIndexTransactionOrdering = legacyIndexTransactionOrdering;
Expand All @@ -126,7 +123,6 @@ public long append( TransactionRepresentation transaction, LogAppendEvent logApp
// we generate the next transaction id
boolean logRotated = logRotation.rotateLogIfNeeded( logAppendEvent );
logAppendEvent.setLogRotated( logRotated );
checkPointer.checkPointIfNeeded( logAppendEvent );

TransactionCommitment commit;
try
Expand Down
@@ -0,0 +1,84 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.transaction.log.checkpoint;

import java.io.IOException;

import org.neo4j.kernel.impl.store.UnderlyingStorageException;
import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.neo4j.kernel.impl.util.JobScheduler.Groups.checkPoint;

public class CheckPointScheduler extends LifecycleAdapter
{
private final CheckPointer checkPointer;
private final JobScheduler scheduler;
private final long recurringPeriodMillis;
private final Runnable job = new Runnable()
{
@Override
public void run()
{
try
{
checkPointer.checkPointIfNeeded();
}
catch ( IOException e )
{
// no need to reschedule since the check pointer has raised a kernel panic and a shutdown is expected
throw new UnderlyingStorageException( e );
}

// reschedule only if it is not stopped
if ( !stopped )
{
handle = scheduler.schedule( checkPoint, job, recurringPeriodMillis, MILLISECONDS );
}
}
};

private volatile JobScheduler.JobHandle handle;
private volatile boolean stopped = false;

public CheckPointScheduler( CheckPointer checkPointer, JobScheduler scheduler, long recurringPeriodMillis )
{
this.checkPointer = checkPointer;
this.scheduler = scheduler;
this.recurringPeriodMillis = recurringPeriodMillis;
}

@Override
public void start() throws Throwable
{
handle = scheduler.schedule( checkPoint, job, recurringPeriodMillis, MILLISECONDS );
}

@Override
public void stop() throws Throwable
{
stopped = true;
if ( handle != null )
{
handle.cancel( false );
}
}
}
Expand Up @@ -33,7 +33,7 @@ public boolean isCheckPointingNeeded()
}

@Override
public void checkPointHappened()
public void checkPointHappened( long transactionId )
{

}
Expand All @@ -51,6 +51,8 @@ public void checkPointHappened()
* has been written in the transaction log in order to make sure that the threshold updates its condition.
*
* This is important since we might have multiple thresholds or forced check points.
*
* @param transactionId the latest transaction committed id used by the check point
*/
void checkPointHappened();
void checkPointHappened( long transactionId );
}
@@ -0,0 +1,56 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.transaction.log.checkpoint;

public class CheckPointThresholds
{
public static CheckPointThreshold or( final CheckPointThreshold... thresholds )
{
return new CheckPointThreshold()
{
@Override
public boolean isCheckPointingNeeded()
{
for ( CheckPointThreshold threshold : thresholds )
{
if ( threshold.isCheckPointingNeeded() )
{
return true;
}
}

return false;
}

@Override
public void checkPointHappened( long transactionId )
{
for ( CheckPointThreshold threshold : thresholds )
{
threshold.checkPointHappened( transactionId );
}
}
};
}

private CheckPointThresholds()
{
}
}
Expand Up @@ -21,34 +21,20 @@

import java.io.IOException;

import org.neo4j.kernel.impl.transaction.tracing.LogAppendEvent;

/**
* This interface represent a check pointer which is responsible to write check points in the transaction log.
*/
public interface CheckPointer
{
CheckPointer NO_CHECKPOINT = new CheckPointer()
{
@Override
public void checkPointIfNeeded( LogAppendEvent logAppendEvent )
{
}

@Override
public void forceCheckPoint()
{
}
};

/**
* This method will verify that the conditions for triggering a check point hold and in such a case it will write
* a check point in the transaction log.
*
* @param logAppendEvent the log append event to be used to notify the check pointing
* This method does NOT handle concurrency since there should be only one check point thread running.
*
* @throws IOException if writing the check point fails
*/
void checkPointIfNeeded( LogAppendEvent logAppendEvent ) throws IOException;
void checkPointIfNeeded() throws IOException;

/**
* This method forces the write of a check point in the transaction log.
Expand Down

0 comments on commit 6d43d73

Please sign in to comment.