Skip to content

Commit

Permalink
Use localDatabase as a source of database dependent components in CC.
Browse files Browse the repository at this point in the history
  • Loading branch information
MishaDemianenko committed Jun 21, 2018
1 parent 7ec36c4 commit a284516
Show file tree
Hide file tree
Showing 17 changed files with 123 additions and 116 deletions.
Expand Up @@ -39,8 +39,6 @@
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.NeoStoreDataSource;
import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.LogProvider;
Expand All @@ -51,24 +49,19 @@ public class RegularCatchupServerHandler implements CatchupServerHandler
private final Monitors monitors;
private final LogProvider logProvider;
private final Supplier<StoreId> storeIdSupplier;
private final Supplier<TransactionIdStore> transactionIdStoreSupplier;
private final Supplier<LogicalTransactionStore> logicalTransactionStoreSupplier;
private final Supplier<NeoStoreDataSource> dataSourceSupplier;
private final BooleanSupplier dataSourceAvailabilitySupplier;
private final FileSystemAbstraction fs;
private final CoreSnapshotService snapshotService;
private final Supplier<CheckPointer> checkPointerSupplier;

public RegularCatchupServerHandler( Monitors monitors, LogProvider logProvider, Supplier<StoreId> storeIdSupplier,
Supplier<TransactionIdStore> transactionIdStoreSupplier, Supplier<LogicalTransactionStore> logicalTransactionStoreSupplier,
Supplier<NeoStoreDataSource> dataSourceSupplier, BooleanSupplier dataSourceAvailabilitySupplier, FileSystemAbstraction fs,
CoreSnapshotService snapshotService, Supplier<CheckPointer> checkPointerSupplier )
{
this.monitors = monitors;
this.logProvider = logProvider;
this.storeIdSupplier = storeIdSupplier;
this.transactionIdStoreSupplier = transactionIdStoreSupplier;
this.logicalTransactionStoreSupplier = logicalTransactionStoreSupplier;
this.dataSourceSupplier = dataSourceSupplier;
this.dataSourceAvailabilitySupplier = dataSourceAvailabilitySupplier;
this.fs = fs;
Expand All @@ -79,8 +72,8 @@ public RegularCatchupServerHandler( Monitors monitors, LogProvider logProvider,
@Override
public ChannelHandler txPullRequestHandler( CatchupServerProtocol catchupServerProtocol )
{
return new TxPullRequestHandler( catchupServerProtocol, storeIdSupplier, dataSourceAvailabilitySupplier, transactionIdStoreSupplier,
logicalTransactionStoreSupplier, monitors, logProvider );
return new TxPullRequestHandler( catchupServerProtocol, storeIdSupplier, dataSourceAvailabilitySupplier, dataSourceSupplier,
monitors, logProvider );
}

@Override
Expand All @@ -92,7 +85,7 @@ public ChannelHandler getStoreIdRequestHandler( CatchupServerProtocol catchupSer
@Override
public ChannelHandler storeListingRequestHandler( CatchupServerProtocol catchupServerProtocol )
{
return new PrepareStoreCopyRequestHandler( catchupServerProtocol, checkPointerSupplier, dataSourceSupplier, new PrepareStoreCopyFilesProvider( fs ) );
return new PrepareStoreCopyRequestHandler( catchupServerProtocol, dataSourceSupplier, new PrepareStoreCopyFilesProvider( fs ) );
}

@Override
Expand Down
Expand Up @@ -42,16 +42,14 @@
public class PrepareStoreCopyRequestHandler extends SimpleChannelInboundHandler<PrepareStoreCopyRequest>
{
private final CatchupServerProtocol protocol;
private final Supplier<CheckPointer> checkPointerSupplier;
private final PrepareStoreCopyFilesProvider prepareStoreCopyFilesProvider;
private final Supplier<NeoStoreDataSource> dataSourceSupplier;
private final StoreFileStreamingProtocol streamingProtocol = new StoreFileStreamingProtocol();

public PrepareStoreCopyRequestHandler( CatchupServerProtocol catchupServerProtocol, Supplier<CheckPointer> checkPointerSupplier,
Supplier<NeoStoreDataSource> dataSourceSupplier, PrepareStoreCopyFilesProvider prepareStoreCopyFilesProvider )
public PrepareStoreCopyRequestHandler( CatchupServerProtocol catchupServerProtocol, Supplier<NeoStoreDataSource> dataSourceSupplier,
PrepareStoreCopyFilesProvider prepareStoreCopyFilesProvider )
{
this.protocol = catchupServerProtocol;
this.checkPointerSupplier = checkPointerSupplier;
this.prepareStoreCopyFilesProvider = prepareStoreCopyFilesProvider;
this.dataSourceSupplier = dataSourceSupplier;
}
Expand All @@ -71,7 +69,7 @@ protected void channelRead0( ChannelHandlerContext channelHandlerContext, Prepar
}
else
{
CheckPointer checkPointer = checkPointerSupplier.get();
CheckPointer checkPointer = neoStoreDataSource.getDependencyResolver().resolveDependency( CheckPointer.class );
closeablesListener.add( tryCheckpointAndAcquireMutex( checkPointer ) );
PrepareStoreCopyFiles prepareStoreCopyFiles =
closeablesListener.add( prepareStoreCopyFilesProvider.prepareStoreCopyFiles( neoStoreDataSource ) );
Expand Down
Expand Up @@ -53,18 +53,16 @@ public abstract class StoreCopyRequestHandler<T extends StoreCopyRequest> extend
{
private final CatchupServerProtocol protocol;
private final Supplier<NeoStoreDataSource> dataSource;
private final Supplier<CheckPointer> checkpointerSupplier;
private final StoreFileStreamingProtocol storeFileStreamingProtocol;

private final FileSystemAbstraction fs;
private final Log log;

StoreCopyRequestHandler( CatchupServerProtocol protocol, Supplier<NeoStoreDataSource> dataSource, Supplier<CheckPointer> checkpointerSupplier,
StoreFileStreamingProtocol storeFileStreamingProtocol, FileSystemAbstraction fs, LogProvider logProvider )
StoreCopyRequestHandler( CatchupServerProtocol protocol, Supplier<NeoStoreDataSource> dataSource, StoreFileStreamingProtocol storeFileStreamingProtocol,
FileSystemAbstraction fs, LogProvider logProvider )
{
this.protocol = protocol;
this.dataSource = dataSource;
this.checkpointerSupplier = checkpointerSupplier;
this.storeFileStreamingProtocol = storeFileStreamingProtocol;
this.fs = fs;
this.log = logProvider.getLog( StoreCopyRequestHandler.class );
Expand All @@ -82,7 +80,8 @@ protected void channelRead0( ChannelHandlerContext ctx, T request ) throws Excep
{
responseStatus = StoreCopyFinishedResponse.Status.E_STORE_ID_MISMATCH;
}
else if ( !isTransactionWithinReach( request.requiredTransactionId(), checkpointerSupplier.get() ) )
else if ( !isTransactionWithinReach( request.requiredTransactionId(),
neoStoreDataSource.getDependencyResolver().resolveDependency( CheckPointer.class ) ) )
{
responseStatus = StoreCopyFinishedResponse.Status.E_TOO_FAR_BEHIND;
}
Expand Down Expand Up @@ -130,7 +129,7 @@ public static class GetStoreFileRequestHandler extends StoreCopyRequestHandler<G
public GetStoreFileRequestHandler( CatchupServerProtocol protocol, Supplier<NeoStoreDataSource> dataSource, Supplier<CheckPointer> checkpointerSupplier,
StoreFileStreamingProtocol storeFileStreamingProtocol, FileSystemAbstraction fs, LogProvider logProvider )
{
super( protocol, dataSource, checkpointerSupplier, storeFileStreamingProtocol, fs, logProvider );
super( protocol, dataSource, storeFileStreamingProtocol, fs, logProvider );
}

@Override
Expand All @@ -151,7 +150,7 @@ public GetIndexSnapshotRequestHandler( CatchupServerProtocol protocol, Supplier<
Supplier<CheckPointer> checkpointerSupplier, StoreFileStreamingProtocol storeFileStreamingProtocol,
FileSystemAbstraction fs, LogProvider logProvider )
{
super( protocol, dataSource, checkpointerSupplier, storeFileStreamingProtocol, fs, logProvider );
super( protocol, dataSource, storeFileStreamingProtocol, fs, logProvider );
}

@Override
Expand Down
Expand Up @@ -35,6 +35,8 @@
import org.neo4j.causalclustering.catchup.ResponseMessageType;
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.cursor.IOCursor;
import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.kernel.NeoStoreDataSource;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore;
import org.neo4j.kernel.impl.transaction.log.NoSuchTransactionException;
Expand All @@ -61,14 +63,14 @@ public class TxPullRequestHandler extends SimpleChannelInboundHandler<TxPullRequ
private final Log log;

public TxPullRequestHandler( CatchupServerProtocol protocol, Supplier<StoreId> storeIdSupplier,
BooleanSupplier databaseAvailable, Supplier<TransactionIdStore> transactionIdStoreSupplier,
Supplier<LogicalTransactionStore> logicalTransactionStoreSupplier, Monitors monitors, LogProvider logProvider )
BooleanSupplier databaseAvailable, Supplier<NeoStoreDataSource> dataSourceSupplier, Monitors monitors, LogProvider logProvider )
{
this.protocol = protocol;
this.storeIdSupplier = storeIdSupplier;
this.databaseAvailable = databaseAvailable;
this.transactionIdStore = transactionIdStoreSupplier.get();
this.logicalTransactionStore = logicalTransactionStoreSupplier.get();
DependencyResolver dependencies = dataSourceSupplier.get().getDependencyResolver();
this.transactionIdStore = dependencies.resolveDependency( TransactionIdStore.class );
this.logicalTransactionStore = dependencies.resolveDependency( LogicalTransactionStore.class );
this.monitor = monitors.newMonitor( TxPullRequestsMonitor.class );
this.log = logProvider.getLog( getClass() );
}
Expand Down
Expand Up @@ -77,8 +77,6 @@
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;
import org.neo4j.kernel.impl.util.Dependencies;
import org.neo4j.kernel.internal.DatabaseHealth;
Expand Down Expand Up @@ -182,8 +180,7 @@ commandApplicationProcess, logProvider, new ExponentialBackoffStrategy( 1, 30, S
Collection<ModifierSupportedProtocols> supportedModifierProtocols = supportedProtocolCreator.createSupportedModifierProtocols();

CatchupServerHandler catchupServerHandler = new RegularCatchupServerHandler( platformModule.monitors,
logProvider, localDatabase::storeId, platformModule.dependencies.provideDependency( TransactionIdStore.class ),
platformModule.dependencies.provideDependency( LogicalTransactionStore.class ), localDatabase::dataSource, localDatabase::isAvailable,
logProvider, localDatabase::storeId, localDatabase::dataSource, localDatabase::isAvailable,
fileSystem, snapshotService, platformModule.dependencies.provideDependency( CheckPointer.class ) );

catchupServer = new CatchupServerBuilder( catchupServerHandler )
Expand Down
Expand Up @@ -27,6 +27,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;

import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase;
import org.neo4j.causalclustering.core.consensus.LeaderLocator;
Expand Down Expand Up @@ -72,6 +73,7 @@
import org.neo4j.kernel.impl.util.Dependencies;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.logging.LogProvider;
import org.neo4j.storageengine.api.StorageEngine;

import static org.neo4j.causalclustering.core.CausalClusteringSettings.array_block_id_allocation_size;
import static org.neo4j.causalclustering.core.CausalClusteringSettings.id_alloc_state_size;
Expand Down Expand Up @@ -145,18 +147,20 @@ public CoreStateMachinesModule( MemberId myself, PlatformModule platformModule,
dependencies.satisfyDependency( new IdBasedStoreEntityCounters( this.idGeneratorFactory ) );

TokenRegistry relationshipTypeTokenRegistry = new TokenRegistry( "RelationshipType" );
Supplier<StorageEngine> storageEngineSupplier = () -> localDatabase.dataSource().getDependencyResolver().resolveDependency( StorageEngine.class );
ReplicatedRelationshipTypeTokenHolder relationshipTypeTokenHolder =
new ReplicatedRelationshipTypeTokenHolder( relationshipTypeTokenRegistry, replicator,
this.idGeneratorFactory, dependencies );
this.idGeneratorFactory, storageEngineSupplier );

TokenRegistry propertyKeyTokenRegistry = new TokenRegistry( "PropertyKey" );
ReplicatedPropertyKeyTokenHolder propertyKeyTokenHolder =
new ReplicatedPropertyKeyTokenHolder( propertyKeyTokenRegistry, replicator, this.idGeneratorFactory,
dependencies );
storageEngineSupplier );

TokenRegistry labelTokenRegistry = new TokenRegistry( "Label" );
ReplicatedLabelTokenHolder labelTokenHolder =
new ReplicatedLabelTokenHolder( labelTokenRegistry, replicator, this.idGeneratorFactory, dependencies );
new ReplicatedLabelTokenHolder( labelTokenRegistry, replicator, this.idGeneratorFactory,
storageEngineSupplier );

ReplicatedLockTokenStateMachine replicatedLockTokenStateMachine =
new ReplicatedLockTokenStateMachine( lockTokenState );
Expand All @@ -182,7 +186,7 @@ public CoreStateMachinesModule( MemberId myself, PlatformModule platformModule,
lockManager = createLockManager( config, platformModule.clock, logging, replicator, myself, raftMachine,
replicatedLockTokenStateMachine );

RecoverConsensusLogIndex consensusLogIndexRecovery = new RecoverConsensusLogIndex( dependencies, logProvider );
RecoverConsensusLogIndex consensusLogIndexRecovery = new RecoverConsensusLogIndex( localDatabase, logProvider );

coreStateMachines = new CoreStateMachines( replicatedTxStateMachine, labelTokenStateMachine,
relationshipTypeTokenStateMachine, propertyKeyTokenStateMachine, replicatedLockTokenStateMachine,
Expand Down
Expand Up @@ -22,19 +22,21 @@
*/
package org.neo4j.causalclustering.core.state.machines.token;

import java.util.function.Supplier;

import org.neo4j.causalclustering.core.replication.Replicator;
import org.neo4j.kernel.api.txstate.TransactionState;
import org.neo4j.kernel.impl.core.TokenHolder;
import org.neo4j.kernel.impl.store.id.IdGeneratorFactory;
import org.neo4j.kernel.impl.store.id.IdType;
import org.neo4j.kernel.impl.util.Dependencies;
import org.neo4j.storageengine.api.StorageEngine;

public class ReplicatedLabelTokenHolder extends ReplicatedTokenHolder implements TokenHolder
{
public ReplicatedLabelTokenHolder( TokenRegistry registry, Replicator replicator,
IdGeneratorFactory idGeneratorFactory, Dependencies dependencies )
IdGeneratorFactory idGeneratorFactory, Supplier<StorageEngine> storageEngineSupplier )
{
super( registry, replicator, idGeneratorFactory, IdType.LABEL_TOKEN, dependencies, TokenType.LABEL );
super( registry, replicator, idGeneratorFactory, IdType.LABEL_TOKEN, storageEngineSupplier, TokenType.LABEL );
}

@Override
Expand Down
Expand Up @@ -22,19 +22,21 @@
*/
package org.neo4j.causalclustering.core.state.machines.token;

import java.util.function.Supplier;

import org.neo4j.causalclustering.core.replication.RaftReplicator;
import org.neo4j.kernel.api.txstate.TransactionState;
import org.neo4j.kernel.impl.core.TokenHolder;
import org.neo4j.kernel.impl.store.id.IdGeneratorFactory;
import org.neo4j.kernel.impl.store.id.IdType;
import org.neo4j.kernel.impl.util.Dependencies;
import org.neo4j.storageengine.api.StorageEngine;

public class ReplicatedPropertyKeyTokenHolder extends ReplicatedTokenHolder implements TokenHolder
{
public ReplicatedPropertyKeyTokenHolder( TokenRegistry registry, RaftReplicator replicator,
IdGeneratorFactory idGeneratorFactory, Dependencies dependencies )
IdGeneratorFactory idGeneratorFactory, Supplier<StorageEngine> storageEngineSupplier )
{
super( registry, replicator, idGeneratorFactory, IdType.PROPERTY_KEY_TOKEN, dependencies, TokenType.PROPERTY );
super( registry, replicator, idGeneratorFactory, IdType.PROPERTY_KEY_TOKEN, storageEngineSupplier, TokenType.PROPERTY );
}

@Override
Expand Down
Expand Up @@ -22,19 +22,22 @@
*/
package org.neo4j.causalclustering.core.state.machines.token;

import java.util.function.Supplier;

import org.neo4j.causalclustering.core.replication.RaftReplicator;
import org.neo4j.kernel.api.txstate.TransactionState;
import org.neo4j.kernel.impl.store.id.IdGeneratorFactory;
import org.neo4j.kernel.impl.store.id.IdType;
import org.neo4j.kernel.impl.util.Dependencies;
import org.neo4j.storageengine.api.StorageEngine;

import static org.neo4j.causalclustering.core.state.machines.token.TokenType.RELATIONSHIP;
import static org.neo4j.kernel.impl.store.id.IdType.RELATIONSHIP_TYPE_TOKEN;

public class ReplicatedRelationshipTypeTokenHolder extends ReplicatedTokenHolder
{
public ReplicatedRelationshipTypeTokenHolder( TokenRegistry registry,
RaftReplicator replicator, IdGeneratorFactory idGeneratorFactory, Dependencies dependencies )
RaftReplicator replicator, IdGeneratorFactory idGeneratorFactory, Supplier<StorageEngine> storageEngineSupplier )
{
super( registry, replicator, idGeneratorFactory, IdType.RELATIONSHIP_TYPE_TOKEN, dependencies,
TokenType.RELATIONSHIP );
super( registry, replicator, idGeneratorFactory, RELATIONSHIP_TYPE_TOKEN, storageEngineSupplier, RELATIONSHIP );
}

@Override
Expand Down

0 comments on commit a284516

Please sign in to comment.