Skip to content

Commit

Permalink
Fixed duplicated CommitProcessSwitchers
Browse files Browse the repository at this point in the history
Duplicated CommitProcessSwitchers could be added to ComponentSwitcherContainer
because HA commit process factory created and added new one after every
NeoStoreDataSource restart. Such restarts happen when slave handles branched data.

This commit makes CommitProcessSwitcher a static component that is added to the
container only once. It also uses DependencyResolver to locate components needed
to create master/slave implementation of the CommitProcess.
  • Loading branch information
lutovich committed Oct 28, 2015
1 parent e4b9486 commit 786be85
Show file tree
Hide file tree
Showing 9 changed files with 55 additions and 71 deletions.
Expand Up @@ -1042,7 +1042,7 @@ private KernelModule buildKernel( IntegrityValidator integrityValidator, Transac
SchemaIndexProviderMap schemaIndexProviderMap, ProcedureCache procedureCache )
{
TransactionCommitProcess transactionCommitProcess = commitProcessFactory.create( appender, storeApplier,
integrityValidator, indexUpdatesValidator, config );
indexUpdatesValidator, config );

/*
* This is used by legacy indexes and constraint indexes whenever a transaction is to be spawned
Expand Down
Expand Up @@ -22,10 +22,9 @@
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.api.index.IndexUpdatesValidator;
import org.neo4j.kernel.impl.transaction.log.TransactionAppender;
import org.neo4j.kernel.impl.transaction.state.IntegrityValidator;

public interface CommitProcessFactory
{
TransactionCommitProcess create( TransactionAppender appender, TransactionRepresentationStoreApplier storeApplier,
IntegrityValidator integrityValidator, IndexUpdatesValidator indexUpdatesValidator, Config config );
TransactionCommitProcess create( TransactionAppender appender, TransactionRepresentationStoreApplier applier,
IndexUpdatesValidator indexUpdatesValidator, Config config );
}
Expand Up @@ -28,19 +28,17 @@
import org.neo4j.kernel.impl.api.TransactionRepresentationStoreApplier;
import org.neo4j.kernel.impl.api.index.IndexUpdatesValidator;
import org.neo4j.kernel.impl.transaction.log.TransactionAppender;
import org.neo4j.kernel.impl.transaction.state.IntegrityValidator;

public class CommunityCommitProcessFactory implements CommitProcessFactory
{
@Override
public TransactionCommitProcess create( TransactionAppender appender,
TransactionRepresentationStoreApplier storeApplier, IntegrityValidator integrityValidator,
public TransactionCommitProcess create( TransactionAppender appender, TransactionRepresentationStoreApplier applier,
IndexUpdatesValidator indexUpdatesValidator, Config config )
{
if ( config.get( GraphDatabaseSettings.read_only ) )
{
return new ReadOnlyTransactionCommitProcess();
}
return new TransactionRepresentationCommitProcess( appender, storeApplier, indexUpdatesValidator );
return new TransactionRepresentationCommitProcess( appender, applier, indexUpdatesValidator );
}
}
Expand Up @@ -29,7 +29,6 @@
import org.neo4j.kernel.impl.api.TransactionRepresentationStoreApplier;
import org.neo4j.kernel.impl.api.index.IndexUpdatesValidator;
import org.neo4j.kernel.impl.transaction.log.TransactionAppender;
import org.neo4j.kernel.impl.transaction.state.IntegrityValidator;

import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertThat;
Expand All @@ -46,8 +45,7 @@ public void createReadOnlyCommitProcess()
Config config = new Config( stringMap( GraphDatabaseSettings.read_only.name(), "true" ) );

TransactionCommitProcess commitProcess = factory.create( mock( TransactionAppender.class ),
mock( TransactionRepresentationStoreApplier.class ), mock( IntegrityValidator.class ),
mock( IndexUpdatesValidator.class ), config );
mock( TransactionRepresentationStoreApplier.class ), mock( IndexUpdatesValidator.class ), config );

assertThat( commitProcess, instanceOf( ReadOnlyTransactionCommitProcess.class ) );
}
Expand All @@ -58,8 +56,8 @@ public void createRegularCommitProcess()
CommunityCommitProcessFactory factory = new CommunityCommitProcessFactory();

TransactionCommitProcess commitProcess = factory.create( mock( TransactionAppender.class ),
mock( TransactionRepresentationStoreApplier.class ), mock( IntegrityValidator.class ),
mock( IndexUpdatesValidator.class ), new Config() );
mock( TransactionRepresentationStoreApplier.class ), mock( IndexUpdatesValidator.class ),
new Config() );

assertThat( commitProcess, instanceOf( TransactionRepresentationCommitProcess.class ) );
}
Expand Down
Expand Up @@ -34,16 +34,16 @@
*/
public class MasterTransactionCommitProcess implements TransactionCommitProcess
{
private final TransactionPropagator pusher;
private final TransactionPropagator txPropagator;
private final IntegrityValidator validator;
private final TransactionCommitProcess inner;

public MasterTransactionCommitProcess( TransactionCommitProcess commitProcess,
TransactionPropagator pusher,
TransactionPropagator txPropagator,
IntegrityValidator validator )
{
this.inner = commitProcess;
this.pusher = pusher;
this.txPropagator = txPropagator;
this.validator = validator;
}

Expand All @@ -55,7 +55,7 @@ public long commit( TransactionRepresentation representation, LockGroup locks, C

long result = inner.commit( representation, locks, commitEvent, mode );

pusher.committed( result, representation.getAuthorId() );
txPropagator.committed( result, representation.getAuthorId() );

return result;
}
Expand Down
Expand Up @@ -19,38 +19,55 @@
*/
package org.neo4j.kernel.ha.cluster.modeswitch;

import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.kernel.ha.DelegateInvocationHandler;
import org.neo4j.kernel.ha.MasterTransactionCommitProcess;
import org.neo4j.kernel.ha.SlaveTransactionCommitProcess;
import org.neo4j.kernel.ha.com.RequestContextFactory;
import org.neo4j.kernel.ha.com.master.Master;
import org.neo4j.kernel.ha.transaction.TransactionPropagator;
import org.neo4j.kernel.impl.api.TransactionCommitProcess;
import org.neo4j.kernel.impl.api.TransactionRepresentationCommitProcess;
import org.neo4j.kernel.impl.api.TransactionRepresentationStoreApplier;
import org.neo4j.kernel.impl.api.index.IndexUpdatesValidator;
import org.neo4j.kernel.impl.transaction.log.TransactionAppender;
import org.neo4j.kernel.impl.transaction.state.IntegrityValidator;

public class CommitProcessSwitcher extends AbstractComponentSwitcher<TransactionCommitProcess>
{
private final MasterTransactionCommitProcess masterImpl;
private final SlaveTransactionCommitProcess slaveImpl;
private final TransactionPropagator txPropagator;
private final Master master;
private final RequestContextFactory requestContextFactory;
private final DependencyResolver dependencyResolver;

public CommitProcessSwitcher( TransactionPropagator pusher, Master master,
public CommitProcessSwitcher( TransactionPropagator txPropagator, Master master,
DelegateInvocationHandler<TransactionCommitProcess> delegate, RequestContextFactory requestContextFactory,
IntegrityValidator integrityValidator, TransactionCommitProcess innerCommitProcess )
DependencyResolver dependencyResolver )
{
super( delegate );
this.masterImpl = new MasterTransactionCommitProcess( innerCommitProcess, pusher, integrityValidator );
this.slaveImpl = new SlaveTransactionCommitProcess( master, requestContextFactory );
this.txPropagator = txPropagator;
this.master = master;
this.requestContextFactory = requestContextFactory;
this.dependencyResolver = dependencyResolver;
}

@Override
protected TransactionCommitProcess getSlaveImpl()
{
return slaveImpl;
return new SlaveTransactionCommitProcess( master, requestContextFactory );
}

@Override
protected TransactionCommitProcess getMasterImpl()
{
return masterImpl;
TransactionCommitProcess commitProcess = new TransactionRepresentationCommitProcess(
dependencyResolver.resolveDependency( TransactionAppender.class ),
dependencyResolver.resolveDependency( TransactionRepresentationStoreApplier.class ),
dependencyResolver.resolveDependency( IndexUpdatesValidator.class ) );

return new MasterTransactionCommitProcess(
commitProcess,
txPropagator,
dependencyResolver.resolveDependency( IntegrityValidator.class ) );
}
}
Expand Up @@ -22,59 +22,32 @@
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.ha.DelegateInvocationHandler;
import org.neo4j.kernel.ha.cluster.modeswitch.CommitProcessSwitcher;
import org.neo4j.kernel.ha.cluster.modeswitch.ComponentSwitcherContainer;
import org.neo4j.kernel.ha.com.RequestContextFactory;
import org.neo4j.kernel.ha.com.master.Master;
import org.neo4j.kernel.ha.transaction.TransactionPropagator;
import org.neo4j.kernel.impl.api.CommitProcessFactory;
import org.neo4j.kernel.impl.api.ReadOnlyTransactionCommitProcess;
import org.neo4j.kernel.impl.api.TransactionCommitProcess;
import org.neo4j.kernel.impl.api.TransactionRepresentationCommitProcess;
import org.neo4j.kernel.impl.api.TransactionRepresentationStoreApplier;
import org.neo4j.kernel.impl.api.index.IndexUpdatesValidator;
import org.neo4j.kernel.impl.transaction.log.TransactionAppender;
import org.neo4j.kernel.impl.transaction.state.IntegrityValidator;

import static java.lang.reflect.Proxy.newProxyInstance;

class HighlyAvailableCommitProcessFactory implements CommitProcessFactory
{
private final ComponentSwitcherContainer componentSwitcherContainer;
private final Master master;
private final TransactionPropagator transactionPropagator;
private final RequestContextFactory requestContextFactory;
private final DelegateInvocationHandler<TransactionCommitProcess> commitProcessDelegate;

private final DelegateInvocationHandler<TransactionCommitProcess> commitProcessDelegate =
new DelegateInvocationHandler<>( TransactionCommitProcess.class );

HighlyAvailableCommitProcessFactory( ComponentSwitcherContainer componentSwitcherContainer, Master master,
TransactionPropagator transactionPropagator, RequestContextFactory requestContextFactory )
HighlyAvailableCommitProcessFactory( DelegateInvocationHandler<TransactionCommitProcess> commitProcessDelegate )
{
this.componentSwitcherContainer = componentSwitcherContainer;
this.master = master;
this.transactionPropagator = transactionPropagator;
this.requestContextFactory = requestContextFactory;
this.commitProcessDelegate = commitProcessDelegate;
}

@Override
public TransactionCommitProcess create( TransactionAppender appender,
TransactionRepresentationStoreApplier storeApplier, IntegrityValidator integrityValidator,
public TransactionCommitProcess create( TransactionAppender appender, TransactionRepresentationStoreApplier applier,
IndexUpdatesValidator indexUpdatesValidator, Config config )
{
if ( config.get( GraphDatabaseSettings.read_only ) )
{
return new ReadOnlyTransactionCommitProcess();
}

TransactionCommitProcess commitProcess = new TransactionRepresentationCommitProcess( appender, storeApplier,
indexUpdatesValidator );

CommitProcessSwitcher commitProcessSwitcher = new CommitProcessSwitcher( transactionPropagator,
master, commitProcessDelegate, requestContextFactory, integrityValidator, commitProcess );

componentSwitcherContainer.add( commitProcessSwitcher );

return (TransactionCommitProcess) newProxyInstance( TransactionCommitProcess.class.getClassLoader(),
new Class[]{TransactionCommitProcess.class}, commitProcessDelegate );
}
Expand Down
Expand Up @@ -91,6 +91,7 @@
import org.neo4j.kernel.ha.cluster.member.ClusterMembers;
import org.neo4j.kernel.ha.cluster.member.HighAvailabilitySlaves;
import org.neo4j.kernel.ha.cluster.member.ObservedClusterMembers;
import org.neo4j.kernel.ha.cluster.modeswitch.CommitProcessSwitcher;
import org.neo4j.kernel.ha.cluster.modeswitch.ComponentSwitcherContainer;
import org.neo4j.kernel.ha.cluster.modeswitch.HighAvailabilityModeSwitcher;
import org.neo4j.kernel.ha.cluster.modeswitch.LabelTokenCreatorSwitcher;
Expand Down Expand Up @@ -578,8 +579,14 @@ private CommitProcessFactory createCommitProcessFactory( Dependencies dependenci
logging.getInternalLog( TransactionPropagator.class ), slaves, new CommitPusher( jobScheduler ) );
paxosLife.add( transactionPropagator );

return new HighlyAvailableCommitProcessFactory( componentSwitcherContainer, master, transactionPropagator,
requestContextFactory );
DelegateInvocationHandler<TransactionCommitProcess> commitProcessDelegate = new DelegateInvocationHandler<>(
TransactionCommitProcess.class );

CommitProcessSwitcher commitProcessSwitcher = new CommitProcessSwitcher( transactionPropagator,
master, commitProcessDelegate, requestContextFactory, dependencies );
componentSwitcherContainer.add( commitProcessSwitcher );

return new HighlyAvailableCommitProcessFactory( commitProcessDelegate );
}

private IdGeneratorFactory createIdGeneratorFactory(
Expand Down
Expand Up @@ -26,16 +26,11 @@
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.ha.DelegateInvocationHandler;
import org.neo4j.kernel.ha.cluster.modeswitch.ComponentSwitcherContainer;
import org.neo4j.kernel.ha.com.RequestContextFactory;
import org.neo4j.kernel.ha.com.master.Master;
import org.neo4j.kernel.ha.transaction.TransactionPropagator;
import org.neo4j.kernel.impl.api.ReadOnlyTransactionCommitProcess;
import org.neo4j.kernel.impl.api.TransactionCommitProcess;
import org.neo4j.kernel.impl.api.TransactionRepresentationStoreApplier;
import org.neo4j.kernel.impl.api.index.IndexUpdatesValidator;
import org.neo4j.kernel.impl.transaction.log.TransactionAppender;
import org.neo4j.kernel.impl.transaction.state.IntegrityValidator;

import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
Expand All @@ -49,14 +44,12 @@ public class HighlyAvailableCommitProcessFactoryTest
public void createReadOnlyCommitProcess()
{
HighlyAvailableCommitProcessFactory factory = new HighlyAvailableCommitProcessFactory(
new ComponentSwitcherContainer(), mock( Master.class ), mock( TransactionPropagator.class ),
mock( RequestContextFactory.class ) );
new DelegateInvocationHandler<>( TransactionCommitProcess.class ) );

Config config = new Config( stringMap( GraphDatabaseSettings.read_only.name(), "true" ) );

TransactionCommitProcess commitProcess = factory.create( mock( TransactionAppender.class ),
mock( TransactionRepresentationStoreApplier.class ), mock( IntegrityValidator.class ),
mock( IndexUpdatesValidator.class ), config );
mock( TransactionRepresentationStoreApplier.class ), mock( IndexUpdatesValidator.class ), config );

assertThat( commitProcess, instanceOf( ReadOnlyTransactionCommitProcess.class ) );
}
Expand All @@ -65,12 +58,11 @@ public void createReadOnlyCommitProcess()
public void createRegularCommitProcess()
{
HighlyAvailableCommitProcessFactory factory = new HighlyAvailableCommitProcessFactory(
new ComponentSwitcherContainer(), mock( Master.class ), mock( TransactionPropagator.class ),
mock( RequestContextFactory.class ) );
new DelegateInvocationHandler<>( TransactionCommitProcess.class ) );

TransactionCommitProcess commitProcess = factory.create( mock( TransactionAppender.class ),
mock( TransactionRepresentationStoreApplier.class ), mock( IntegrityValidator.class ),
mock( IndexUpdatesValidator.class ), new Config() );
mock( TransactionRepresentationStoreApplier.class ), mock( IndexUpdatesValidator.class ),
new Config() );

assertThat( commitProcess, not( instanceOf( ReadOnlyTransactionCommitProcess.class ) ) );
assertThat( Proxy.getInvocationHandler( commitProcess ), instanceOf( DelegateInvocationHandler.class ) );
Expand Down

0 comments on commit 786be85

Please sign in to comment.