Skip to content

Commit

Permalink
Slaves pulls and applies updates in a more batching fashion
Browse files Browse the repository at this point in the history
more specifically avoids lucene index refreshing until after each batch.
Main change is making choice of IndexUpdateMode explicit by the caller,
rather than implicit by IndexingService and exposing a means of just
flushing changes (in the Lucene case refreshing the searcher). These two
changes are taked advantage of in TransactionCommittingResponseUnpacker to
defer refreshing lucene indexes until once per batch (100 by default).
The unpacker also uses batching of label scan store and legacy index
updates, just like recovery does.

All these changes makes update pulling 1-2 orders of magnitude faster
than before and therefore makes transaction gap between slaves and master
much smaller, also cluster commits faster where ha.tx_push_factor > 0.
  • Loading branch information
tinwelint committed Sep 21, 2015
1 parent 34eca40 commit d8f990c
Show file tree
Hide file tree
Showing 47 changed files with 1,230 additions and 421 deletions.
Expand Up @@ -281,6 +281,12 @@ public void force() throws IOException
throw new UnsupportedOperationException();
}

@Override
public void flush() throws IOException
{
throw new UnsupportedOperationException();
}

@Override
public BoundedIterable<Long> newAllEntriesReader()
{
Expand Down
Expand Up @@ -66,8 +66,11 @@
import org.neo4j.kernel.impl.api.TransactionHooks;
import org.neo4j.kernel.impl.api.TransactionRepresentationStoreApplier;
import org.neo4j.kernel.impl.api.UpdateableSchemaState;
import org.neo4j.kernel.impl.api.index.IndexUpdateMode;
import org.neo4j.kernel.impl.api.index.IndexUpdatesValidator;
import org.neo4j.kernel.impl.api.index.IndexingService;
import org.neo4j.kernel.impl.api.index.OnlineIndexUpdatesValidator;
import org.neo4j.kernel.impl.api.index.RecoveryIndexingUpdatesValidator;
import org.neo4j.kernel.impl.api.index.SchemaIndexProviderMap;
import org.neo4j.kernel.impl.api.index.sampling.IndexSamplingConfig;
import org.neo4j.kernel.impl.api.scan.LabelScanStoreProvider;
Expand Down Expand Up @@ -465,6 +468,9 @@ public void transactionRecovered( long txId )
// Build all modules and their services
try
{
LegacyIndexApplierLookup legacyIndexApplierLookup =
dependencies.satisfyDependency( new LegacyIndexApplierLookup.Direct( legacyIndexProviderLookup ) );

final NeoStoreModule neoStoreModule =
buildNeoStore( storeFactory, labelTokens, relationshipTypeTokens, propertyKeyTokenHolder );
// TODO The only reason this is here is because of the provider-stuff for DiskLayer. Remove when possible:
Expand All @@ -484,12 +490,13 @@ public void transactionRecovered( long txId )
TransactionLogModule transactionLogModule =
buildTransactionLogs( config, logging, indexingModule.labelScanStore(),
fs, neoStoreModule.neoStore(), cacheModule.cacheAccess(), indexingModule.indexingService(),
indexProviders.values() );
indexProviders.values(), legacyIndexApplierLookup );

buildRecovery( fs, cacheModule.cacheAccess(), indexingModule.indexingService(),
indexingModule.indexUpdatesValidator(), indexingModule.labelScanStore(), neoStoreModule.neoStore(),
indexingModule.labelScanStore(), neoStoreModule.neoStore(),
monitors.newMonitor( RecoveryVisitor.Monitor.class ), monitors.newMonitor( Recovery.Monitor.class ),
transactionLogModule.logFiles(), transactionLogModule.logRotationControl(), startupStatistics );
transactionLogModule.logFiles(), transactionLogModule.logRotationControl(), startupStatistics,
legacyIndexApplierLookup );

KernelModule kernelModule = buildKernel( indexingModule.integrityValidator(),
transactionLogModule.logicalTransactionStore(), neoStoreModule.neoStore(),
Expand Down Expand Up @@ -698,7 +705,8 @@ private IndexingModule buildIndexing( Config config, JobScheduler scheduler, Sch
final IntegrityValidator integrityValidator = new IntegrityValidator( neoStore, indexingService );

final IndexUpdatesValidator indexUpdatesValidator = dependencies.satisfyDependency(
new IndexUpdatesValidator( neoStore, new PropertyLoader( neoStore ), indexingService ) );
new OnlineIndexUpdatesValidator( neoStore, new PropertyLoader( neoStore ), indexingService,
IndexUpdateMode.ONLINE ) );

// TODO Move to constructor
final LabelScanStore labelScanStore = dependencyResolver.resolveDependency( LabelScanStoreProvider.class,
Expand Down Expand Up @@ -785,18 +793,20 @@ private TransactionLogModule buildTransactionLogs( Config config, Logging loggin
FileSystemAbstraction fileSystemAbstraction,
NeoStore neoStore, CacheAccessBackDoor cacheAccess,
IndexingService indexingService,
Iterable<IndexImplementation> indexProviders )
Iterable<IndexImplementation> indexProviders,
LegacyIndexApplierLookup legacyIndexApplierLookup )
{
File directory = config.get( GraphDatabaseSettings.store_dir );
TransactionMetadataCache transactionMetadataCache = new TransactionMetadataCache( 1000, 100_000 );
final PhysicalLogFiles logFiles = new PhysicalLogFiles( directory, PhysicalLogFile.DEFAULT_NAME,
fileSystemAbstraction );

IdOrderingQueue legacyIndexTransactionOrdering = new SynchronizedArrayIdOrderingQueue( 20 );
IdOrderingQueue legacyIndexTransactionOrdering =
dependencies.satisfyDependency( new SynchronizedArrayIdOrderingQueue( 20 ) );
final TransactionRepresentationStoreApplier storeApplier = dependencies.satisfyDependency(
new TransactionRepresentationStoreApplier(
indexingService, alwaysCreateNewWriter( labelScanStore ), neoStore,
cacheAccess, lockService, new LegacyIndexApplierLookup.Direct( legacyIndexProviderLookup ),
cacheAccess, lockService, legacyIndexApplierLookup,
indexConfigStore, legacyIndexTransactionOrdering ) );

final PhysicalLogFile logFile = new PhysicalLogFile( fileSystemAbstraction, logFiles,
Expand Down Expand Up @@ -903,15 +913,17 @@ public LabelScanWriter instance()
}

private void buildRecovery( final FileSystemAbstraction fileSystemAbstraction, CacheAccessBackDoor cacheAccess,
IndexingService indexingService, IndexUpdatesValidator indexUpdatesValidator, LabelScanStore labelScanStore,
IndexingService indexingService, LabelScanStore labelScanStore,
final NeoStore neoStore, RecoveryVisitor.Monitor recoveryVisitorMonitor, Recovery.Monitor recoveryMonitor,
final PhysicalLogFiles logFiles, final LogRotationControl logRotationControl,
final StartupStatisticsProvider startupStatistics )
final StartupStatisticsProvider startupStatistics,
LegacyIndexApplierLookup legacyIndexApplierLookup )
{
final RecoveryLabelScanWriterProvider labelScanWriters =
new RecoveryLabelScanWriterProvider( labelScanStore, 1000 );
final RecoveryLegacyIndexApplierLookup legacyIndexApplierLookup = new RecoveryLegacyIndexApplierLookup(
new LegacyIndexApplierLookup.Direct( legacyIndexProviderLookup ), 1000 );
final RecoveryLegacyIndexApplierLookup recoveryLegacyIndexApplierLookup = new RecoveryLegacyIndexApplierLookup(
legacyIndexApplierLookup, 1000 );
final RecoveryIndexingUpdatesValidator indexUpdatesValidator = new RecoveryIndexingUpdatesValidator( indexingService );
final TransactionRepresentationStoreApplier storeRecoverer =
new TransactionRepresentationStoreApplier(
indexingService, labelScanWriters, neoStore, cacheAccess, lockService,
Expand All @@ -932,7 +944,8 @@ public void forceEverything()
try
{
labelScanWriters.close();
legacyIndexApplierLookup.close();
recoveryLegacyIndexApplierLookup.close();
indexUpdatesValidator.close();
}
catch ( IOException e )
{
Expand Down
Expand Up @@ -37,7 +37,7 @@
* Provides {@link LabelScanWriter} that takes advantage of the single-threaded context of recovery
* to cache writes and apply in bigger batches, where each batch holds data from many transactions.
*/
class RecoveryLabelScanWriterProvider implements Provider<LabelScanWriter>, Closeable
public class RecoveryLabelScanWriterProvider implements Provider<LabelScanWriter>, Closeable
{
private int callCount;
private final LabelScanStore labelScanStore;
Expand Down
Expand Up @@ -54,6 +54,8 @@ public interface IndexAccessor extends Closeable
*/
IndexUpdater newUpdater( IndexUpdateMode mode );

void flush() throws IOException;

/**
* Forces this index to disk. Called at certain points from within Neo4j for example when
* rotating the logical log. After completion of this call there cannot be any essential state that
Expand All @@ -69,6 +71,7 @@ public interface IndexAccessor extends Closeable
*
* @throws IOException if unable to close index.
*/
@Override
void close() throws IOException;

/**
Expand Down Expand Up @@ -99,6 +102,11 @@ public IndexUpdater newUpdater( IndexUpdateMode mode )
return SwallowingIndexUpdater.INSTANCE;
}

@Override
public void flush()
{
}

@Override
public void force()
{
Expand Down Expand Up @@ -165,6 +173,12 @@ public IndexUpdater newUpdater( IndexUpdateMode mode )
return delegate.newUpdater( mode );
}

@Override
public void flush() throws IOException
{
delegate.flush();
}

@Override
public void force() throws IOException
{
Expand Down
@@ -0,0 +1,76 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.api;

import java.io.IOException;

import org.neo4j.kernel.RecoveryLabelScanWriterProvider;
import org.neo4j.kernel.api.labelscan.LabelScanStore;
import org.neo4j.kernel.impl.api.index.IndexingService;
import org.neo4j.kernel.impl.core.CacheAccessBackDoor;
import org.neo4j.kernel.impl.index.IndexConfigStore;
import org.neo4j.kernel.impl.locking.LockService;
import org.neo4j.kernel.impl.store.NeoStore;
import org.neo4j.kernel.impl.util.IdOrderingQueue;

/**
* {@link TransactionRepresentationStoreApplier} that builds services made for batching transactions.
* Transaction data can be cached and applied as one batch when a threshold is reached, so ensuring that transaction
* data is actually written will have to be done by calling {@link #closeBatch()}.
*/
public class BatchingTransactionRepresentationStoreApplier extends TransactionRepresentationStoreApplier
{
private final RecoveryLabelScanWriterProvider labelScanWriterProvider;
private final RecoveryLegacyIndexApplierLookup legacyIndexApplierLookup;

public BatchingTransactionRepresentationStoreApplier( IndexingService indexingService,
LabelScanStore labelScanStore, NeoStore neoStore, CacheAccessBackDoor cacheAccess,
LockService lockService, LegacyIndexApplierLookup legacyIndexProviderLookup,
IndexConfigStore indexConfigStore, IdOrderingQueue legacyIndexTransactionOrdering )
{
this( indexingService, new RecoveryLabelScanWriterProvider( labelScanStore, 1000 ),
neoStore, cacheAccess, lockService,
new RecoveryLegacyIndexApplierLookup( legacyIndexProviderLookup, 1000 ),
indexConfigStore, legacyIndexTransactionOrdering );
}

private BatchingTransactionRepresentationStoreApplier(
IndexingService indexingService,
RecoveryLabelScanWriterProvider labelScanWriterProvider,
NeoStore neoStore,
CacheAccessBackDoor cacheAccess,
LockService lockService,
RecoveryLegacyIndexApplierLookup legacyIndexApplierLookup,
IndexConfigStore indexConfigStore,
IdOrderingQueue legacyIndexTransactionOrdering )
{
super( indexingService, labelScanWriterProvider, neoStore, cacheAccess, lockService, legacyIndexApplierLookup,
indexConfigStore, legacyIndexTransactionOrdering );
this.labelScanWriterProvider = labelScanWriterProvider;
this.legacyIndexApplierLookup = legacyIndexApplierLookup;
}

public void closeBatch() throws IOException
{
labelScanWriterProvider.close();
legacyIndexApplierLookup.close();
indexingService.flushAll();
}
}
Expand Up @@ -58,21 +58,20 @@ public TransactionRepresentationCommitProcess( LogicalTransactionStore logicalTr
public long commit( TransactionRepresentation transaction, LockGroup locks, CommitEvent commitEvent,
TransactionApplicationMode mode ) throws TransactionFailureException
{
try ( ValidatedIndexUpdates indexUpdates = validateIndexUpdates( transaction, mode ) )
try ( ValidatedIndexUpdates indexUpdates = validateIndexUpdates( transaction ) )
{
long transactionId = appendToLog( transaction, commitEvent );
applyToStore( transaction, locks, commitEvent, indexUpdates, transactionId, mode );
return transactionId;
}
}

private ValidatedIndexUpdates validateIndexUpdates( TransactionRepresentation transaction,
TransactionApplicationMode mode)
private ValidatedIndexUpdates validateIndexUpdates( TransactionRepresentation transaction )
throws TransactionFailureException
{
try
{
return indexUpdatesValidator.validate( transaction, mode );
return indexUpdatesValidator.validate( transaction );
}
catch ( Throwable e )
{
Expand Down
Expand Up @@ -21,6 +21,7 @@

import java.io.IOException;

import org.neo4j.concurrent.WorkSync;
import org.neo4j.helpers.Provider;
import org.neo4j.kernel.impl.api.index.IndexingService;
import org.neo4j.kernel.impl.api.index.ValidatedIndexUpdates;
Expand All @@ -35,7 +36,6 @@
import org.neo4j.kernel.impl.transaction.command.IndexTransactionApplier;
import org.neo4j.kernel.impl.transaction.command.NeoCommandHandler;
import org.neo4j.kernel.impl.transaction.command.NeoStoreTransactionApplier;
import org.neo4j.concurrent.WorkSync;
import org.neo4j.kernel.impl.util.IdOrderingQueue;
import org.neo4j.kernel.impl.util.function.Optional;
import org.neo4j.unsafe.batchinsert.LabelScanWriter;
Expand All @@ -48,7 +48,7 @@
public class TransactionRepresentationStoreApplier
{
private final NeoStore neoStore;
private final IndexingService indexingService;
protected final IndexingService indexingService;
private final CacheAccessBackDoor cacheAccess;
private final LockService lockService;
private final Provider<LabelScanWriter> labelScanWriters;
Expand Down
Expand Up @@ -81,6 +81,12 @@ public void force() throws IOException
getDelegate().force();
}

@Override
public void flush() throws IOException
{
getDelegate().flush();
}

@Override
public Future<Void> close() throws IOException
{
Expand Down
Expand Up @@ -69,6 +69,11 @@ public void force()
{
}

@Override
public void flush()
{
}

@Override
public IndexDescriptor getDescriptor()
{
Expand Down
Expand Up @@ -136,6 +136,20 @@ public void force() throws IOException
}
}

@Override
public void flush() throws IOException
{
barge( lock.readLock() ); // see javadoc of this method (above) for rationale on why we use barge(...) here
try
{
delegate.flush();
}
finally
{
lock.readLock().unlock();
}
}

/**
* Acquire the {@code ReadLock} in an <i>unfair</i> way, without waiting for queued up writers.
* <p/>
Expand Down
Expand Up @@ -87,6 +87,8 @@ public interface IndexProxy

void force() throws IOException;

void flush() throws IOException;

/**
* @throws IndexNotFoundKernelException if the index isn't online yet.
*/
Expand Down
Expand Up @@ -22,5 +22,5 @@
public enum IndexUpdateMode
{
ONLINE,
RECOVERY
BATCHED
}

0 comments on commit d8f990c

Please sign in to comment.