Skip to content

Commit

Permalink
Refactor way how we construct id components over editions.
Browse files Browse the repository at this point in the history
Solve the problem with non clear component initialization sequences and
lost buffered id generator factory got lost.
  • Loading branch information
MishaDemianenko committed Jul 4, 2017
1 parent 7833deb commit bfa217a
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 61 deletions.
Expand Up @@ -435,6 +435,9 @@ public void start() throws IOException

SynchronizedArrayIdOrderingQueue legacyIndexTransactionOrdering = new SynchronizedArrayIdOrderingQueue( 20 );

Supplier<KernelTransactionsSnapshot> transactionsSnapshotSupplier = () -> kernelModule.kernelTransactions().get();
idController.initialize( transactionsSnapshotSupplier );

storageEngine = buildStorageEngine(
propertyKeyTokenHolder, labelTokens, relationshipTypeTokens, legacyIndexProviderLookup,
indexConfigStore, updateableSchemaState, legacyIndexTransactionOrdering );
Expand Down Expand Up @@ -475,9 +478,6 @@ public void start() throws IOException
clock,
propertyAccessor );

Supplier<KernelTransactionsSnapshot> transactionsSnapshotSupplier = () -> kernelModule.kernelTransactions().get();
idController.initialize( transactionsSnapshotSupplier );

kernelModule.satisfyDependencies( dependencies );

// Do these assignments last so that we can ensure no cyclical dependencies exist
Expand Down
Expand Up @@ -108,8 +108,7 @@ public CommunityEditionModule( PlatformModule platformModule )
idTypeConfigurationProvider = createIdTypeConfigurationProvider( config );
eligibleForIdReuse = IdReuseEligibility.ALWAYS;

idGeneratorFactory = dependencies.satisfyDependency( createIdGeneratorFactory( fileSystem, idTypeConfigurationProvider ) );
idController = createIdController( platformModule );
createIdComponents( platformModule, dependencies, createIdGeneratorFactory( fileSystem, idTypeConfigurationProvider ) );

propertyKeyTokenHolder = life.add( dependencies.satisfyDependency( new DelegatingPropertyKeyTokenHolder(
createPropertyKeyCreator( config, dataSourceManager, idGeneratorFactory ) ) ) );
Expand Down
Expand Up @@ -52,6 +52,7 @@
import org.neo4j.kernel.impl.store.id.IdReuseEligibility;
import org.neo4j.kernel.impl.store.id.configuration.IdTypeConfigurationProvider;
import org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory;
import org.neo4j.kernel.impl.util.Dependencies;
import org.neo4j.kernel.impl.util.DependencySatisfier;
import org.neo4j.kernel.impl.util.watcher.DefaultFileDeletionEventListener;
import org.neo4j.kernel.impl.util.watcher.DefaultFileSystemWatcherService;
Expand Down Expand Up @@ -236,22 +237,32 @@ protected BoltConnectionTracker createSessionTracker()
return BoltConnectionTracker.NOOP;
}

protected IdController createIdController( PlatformModule platformModule )
protected void createIdComponents( PlatformModule platformModule, Dependencies dependencies, IdGeneratorFactory
editionIdGeneratorFactory )
{
return safeIdBuffering ? createBufferedIdController( idGeneratorFactory, platformModule.jobScheduler,
eligibleForIdReuse, idTypeConfigurationProvider ) : createDefaultIdController();
IdGeneratorFactory factory = editionIdGeneratorFactory;
if ( safeIdBuffering )
{
BufferingIdGeneratorFactory bufferingIdGeneratorFactory =
new BufferingIdGeneratorFactory( factory, eligibleForIdReuse, idTypeConfigurationProvider );
idController = createBufferedIdController( bufferingIdGeneratorFactory, platformModule.jobScheduler );
factory = bufferingIdGeneratorFactory;
}
else
{
idController = createDefaultIdController();
}
this.idGeneratorFactory = factory;
dependencies.satisfyDependency( factory );
}

protected BufferedIdController createBufferedIdController( IdGeneratorFactory idGeneratorFactory,
JobScheduler scheduler, IdReuseEligibility eligibleForIdReuse,
IdTypeConfigurationProvider idTypeConfigurationProvider )
private BufferedIdController createBufferedIdController( BufferingIdGeneratorFactory idGeneratorFactory,
JobScheduler scheduler )
{
BufferingIdGeneratorFactory bufferingIdGeneratorFactory =
new BufferingIdGeneratorFactory( idGeneratorFactory, eligibleForIdReuse, idTypeConfigurationProvider );
return new BufferedIdController( bufferingIdGeneratorFactory, scheduler );
return new BufferedIdController( idGeneratorFactory, scheduler );
}

protected DefaultIdController createDefaultIdController()
private DefaultIdController createDefaultIdController()
{
return new DefaultIdController();
}
Expand Down
Expand Up @@ -84,11 +84,8 @@
import org.neo4j.kernel.impl.index.IndexConfigStore;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.proc.Procedures;
import org.neo4j.kernel.impl.storageengine.impl.recordstorage.id.BufferedIdController;
import org.neo4j.kernel.impl.store.id.BufferingIdGeneratorFactory;
import org.neo4j.kernel.impl.store.id.IdGeneratorFactory;
import org.neo4j.kernel.impl.store.id.IdReuseEligibility;
import org.neo4j.kernel.impl.store.id.configuration.IdTypeConfigurationProvider;
import org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFile;
import org.neo4j.kernel.impl.util.Dependencies;
Expand All @@ -100,7 +97,6 @@
import org.neo4j.kernel.lifecycle.LifecycleStatus;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.LogProvider;
import org.neo4j.scheduler.JobScheduler;
import org.neo4j.ssl.SslPolicy;
import org.neo4j.udc.UsageData;

Expand All @@ -116,6 +112,7 @@ public class EnterpriseCoreEditionModule extends EditionModule
private final CoreTopologyService topologyService;
private final LogProvider logProvider;
private final Config config;
private CoreStateMachinesModule coreStateMachinesModule;

public enum RaftLogImplementation
{
Expand Down Expand Up @@ -227,13 +224,13 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke
ReplicationModule replicationModule = new ReplicationModule( identityModule.myself(), platformModule, config, consensusModule,
loggingOutbound, clusterStateDirectory.get(), fileSystem, logProvider );

CoreStateMachinesModule coreStateMachinesModule = new CoreStateMachinesModule( identityModule.myself(),
coreStateMachinesModule = new CoreStateMachinesModule( identityModule.myself(),
platformModule, clusterStateDirectory.get(), config, replicationModule.getReplicator(),
consensusModule.raftMachine(), dependencies, localDatabase );

this.idTypeConfigurationProvider = coreStateMachinesModule.idTypeConfigurationProvider;

createIdComponents( platformModule, coreStateMachinesModule );
createIdComponents( platformModule, dependencies, coreStateMachinesModule.idGeneratorFactory );

this.labelTokenHolder = coreStateMachinesModule.labelTokenHolder;
this.propertyKeyTokenHolder = coreStateMachinesModule.propertyKeyTokenHolder;
Expand All @@ -254,31 +251,13 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke
life.add( coreServerModule.membershipWaiterLifecycle );
}

private void createIdComponents( PlatformModule platformModule, CoreStateMachinesModule coreStateMachinesModule )
{
IdGeneratorFactory factory;
if ( safeIdBuffering )
{
factory = new BufferingIdGeneratorFactory( coreStateMachinesModule.idGeneratorFactory, eligibleForIdReuse,
idTypeConfigurationProvider );
this.idController = createBufferedIdController( factory, platformModule.jobScheduler, eligibleForIdReuse,
idTypeConfigurationProvider );
}
else
{
factory = coreStateMachinesModule.idGeneratorFactory;
this.idController = createDefaultIdController();
}
this.idGeneratorFactory =
new FreeIdFilteredIdGeneratorFactory( factory, coreStateMachinesModule.freeIdCondition );
}

@Override
protected BufferedIdController createBufferedIdController( IdGeneratorFactory idGeneratorFactory,
JobScheduler scheduler, IdReuseEligibility eligibleForIdReuse,
IdTypeConfigurationProvider idTypeConfigurationProvider )
protected void createIdComponents( PlatformModule platformModule, Dependencies dependencies,
IdGeneratorFactory editionIdGeneratorFactory )
{
return new BufferedIdController( (BufferingIdGeneratorFactory) idGeneratorFactory, scheduler );
super.createIdComponents( platformModule, dependencies, editionIdGeneratorFactory );
this.idGeneratorFactory =
new FreeIdFilteredIdGeneratorFactory( this.idGeneratorFactory, coreStateMachinesModule.freeIdCondition );
}

static Predicate<String> fileWatcherFileNameFilter()
Expand Down
Expand Up @@ -363,10 +363,11 @@ public void elected( String role, InstanceId instanceId, URI electedMember )
paxosLife.add( (Lifecycle)clusterEvents );
paxosLife.add( localClusterMemberAvailability );

idGeneratorFactory = dependencies.satisfyDependency( createIdGeneratorFactory(
masterDelegateInvocationHandler, logging.getInternalLogProvider(), requestContextFactory, fs ) );
HaIdGeneratorFactory editionIdGeneratorFactory = (HaIdGeneratorFactory) createIdGeneratorFactory( masterDelegateInvocationHandler,
logging.getInternalLogProvider(), requestContextFactory, fs );
eligibleForIdReuse = new HaIdReuseEligibility( members, platformModule.clock, idReuseSafeZone );
createIdComponents( platformModule, dependencies, editionIdGeneratorFactory );
dependencies.satisfyDependency( new IdBasedStoreEntityCounters( this.idGeneratorFactory ) );
idController = createIdController( platformModule );

// TODO There's a cyclical dependency here that should be fixed
final AtomicReference<HighAvailabilityModeSwitcher> exceptionHandlerRef = new AtomicReference<>();
Expand Down Expand Up @@ -413,12 +414,12 @@ public void elected( String role, InstanceId instanceId, URI electedMember )

SwitchToSlave switchToSlaveInstance = chooseSwitchToSlaveStrategy( platformModule, config, dependencies, logging, monitors,
masterDelegateInvocationHandler, requestContextFactory, clusterMemberAvailability,
masterClientResolver, updatePullerProxy, pullerFactory, slaveServerFactory );
masterClientResolver, updatePullerProxy, pullerFactory, slaveServerFactory, editionIdGeneratorFactory );

final Factory<MasterImpl.SPI> masterSPIFactory =
() -> new DefaultMasterImplSPI( platformModule.graphDatabaseFacade, platformModule.fileSystem,
platformModule.monitors,
labelTokenHolder, propertyKeyTokenHolder, relationshipTypeTokenHolder, idGeneratorFactory,
labelTokenHolder, propertyKeyTokenHolder, relationshipTypeTokenHolder, this.idGeneratorFactory,
platformModule.dependencies.resolveDependency( TransactionCommitProcess.class ),
platformModule.dependencies.resolveDependency( CheckPointer.class ),
platformModule.dependencies.resolveDependency( TransactionIdStore.class ),
Expand Down Expand Up @@ -452,7 +453,7 @@ public void elected( String role, InstanceId instanceId, URI electedMember )
logEntryReader.get() );
};

SwitchToMaster switchToMasterInstance = new SwitchToMaster( logging, (HaIdGeneratorFactory) idGeneratorFactory,
SwitchToMaster switchToMasterInstance = new SwitchToMaster( logging, editionIdGeneratorFactory,
config, dependencies.provideDependency( SlaveFactory.class ),
conversationManagerFactory,
masterFactory,
Expand Down Expand Up @@ -529,8 +530,6 @@ public void elected( String role, InstanceId instanceId, URI electedMember )

coreAPIAvailabilityGuard = new CoreAPIAvailabilityGuard( platformModule.availabilityGuard, transactionStartTimeout );

eligibleForIdReuse = new HaIdReuseEligibility( members, platformModule.clock, idReuseSafeZone );

registerRecovery( platformModule.databaseInfo, dependencies, logging );

UsageData usageData = dependencies.resolveDependency( UsageData.class );
Expand Down Expand Up @@ -564,14 +563,13 @@ private SwitchToSlave chooseSwitchToSlaveStrategy( PlatformModule platformModule
dependencies, LogService logging, Monitors monitors, DelegateInvocationHandler<Master>
masterDelegateInvocationHandler, RequestContextFactory requestContextFactory, ClusterMemberAvailability
clusterMemberAvailability, MasterClientResolver masterClientResolver, UpdatePuller updatePullerProxy,
PullerFactory pullerFactory,
Function<Slave, SlaveServer> slaveServerFactory )
PullerFactory pullerFactory, Function<Slave, SlaveServer> slaveServerFactory, HaIdGeneratorFactory idGeneratorFactory )
{
switch ( config.get( HaSettings.branched_data_copying_strategy ) )
{
case branch_then_copy:
return new SwitchToSlaveBranchThenCopy( platformModule.storeDir, logging,
platformModule.fileSystem, config, dependencies, (HaIdGeneratorFactory) idGeneratorFactory,
platformModule.fileSystem, config, dependencies, idGeneratorFactory,
masterDelegateInvocationHandler, clusterMemberAvailability, requestContextFactory,
pullerFactory,
platformModule.kernelExtensions.listFactories(), masterClientResolver,
Expand All @@ -583,7 +581,7 @@ private SwitchToSlave chooseSwitchToSlaveStrategy( PlatformModule platformModule
monitors, platformModule.transactionMonitor );
case copy_then_branch:
return new SwitchToSlaveCopyThenBranch( platformModule.storeDir, logging,
platformModule.fileSystem, config, dependencies, (HaIdGeneratorFactory) idGeneratorFactory,
platformModule.fileSystem, config, dependencies, idGeneratorFactory,
masterDelegateInvocationHandler, clusterMemberAvailability, requestContextFactory,
pullerFactory,
platformModule.kernelExtensions.listFactories(), masterClientResolver,
Expand Down Expand Up @@ -652,16 +650,14 @@ private IdGeneratorFactory createIdGeneratorFactory(
RequestContextFactory requestContextFactory,
FileSystemAbstraction fs )
{
idGeneratorFactory = new HaIdGeneratorFactory(
masterDelegateInvocationHandler, logging, requestContextFactory, fs, idTypeConfigurationProvider );

HaIdGeneratorFactory idGeneratorFactory = new HaIdGeneratorFactory( masterDelegateInvocationHandler, logging,
requestContextFactory, fs, idTypeConfigurationProvider );
/*
* We don't really switch to master here. We just need to initialize the idGenerator so the initial store
* can be started (if required). In any case, the rest of the database is in pending state, so nothing will
* happen until events start arriving and that will set us to the proper state anyway.
*/
((HaIdGeneratorFactory) idGeneratorFactory).switchToMaster();

idGeneratorFactory.switchToMaster();
return idGeneratorFactory;
}

Expand Down

0 comments on commit bfa217a

Please sign in to comment.