Skip to content

Commit

Permalink
Faster UniquenessConstraintValidationHAIT
Browse files Browse the repository at this point in the history
  • Loading branch information
tinwelint committed Dec 16, 2015
1 parent 1522813 commit 8b81436
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 95 deletions.
Expand Up @@ -27,7 +27,9 @@
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
Expand Down Expand Up @@ -83,6 +85,7 @@
import org.neo4j.kernel.impl.logging.NullLogService; import org.neo4j.kernel.impl.logging.NullLogService;
import org.neo4j.kernel.impl.transaction.log.rotation.StoreFlusher; import org.neo4j.kernel.impl.transaction.log.rotation.StoreFlusher;
import org.neo4j.kernel.impl.util.Dependencies; import org.neo4j.kernel.impl.util.Dependencies;
import org.neo4j.kernel.impl.util.Listener;
import org.neo4j.kernel.lifecycle.LifeSupport; import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.kernel.lifecycle.Lifecycle; import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.lifecycle.LifecycleAdapter;
Expand Down Expand Up @@ -159,6 +162,8 @@ public String apply( int ignored )
private final Provider clustersProvider; private final Provider clustersProvider;
private final HighlyAvailableGraphDatabaseFactory dbFactory; private final HighlyAvailableGraphDatabaseFactory dbFactory;
private final StoreDirInitializer storeDirInitializer; private final StoreDirInitializer storeDirInitializer;
private final Listener<GraphDatabaseService> initialDatasetCreator;
private final List<Predicate<ManagedCluster>> availabilityChecks;
LifeSupport life; LifeSupport life;


private ClusterManager( Builder builder ) private ClusterManager( Builder builder )
Expand All @@ -168,6 +173,8 @@ private ClusterManager( Builder builder )
this.commonConfig = withDefaults( builder.commonConfig ); this.commonConfig = withDefaults( builder.commonConfig );
this.dbFactory = builder.factory; this.dbFactory = builder.factory;
this.storeDirInitializer = builder.initializer; this.storeDirInitializer = builder.initializer;
this.initialDatasetCreator = builder.initialDatasetCreator;
this.availabilityChecks = builder.availabilityChecks;
} }


private Map<String,IntFunction<String>> withDefaults( Map<String,IntFunction<String>> commonConfig ) private Map<String,IntFunction<String>> withDefaults( Map<String,IntFunction<String>> commonConfig )
Expand Down Expand Up @@ -556,6 +563,17 @@ public void start() throws Throwable
ManagedCluster managedCluster = new ManagedCluster( cluster ); ManagedCluster managedCluster = new ManagedCluster( cluster );
clusterMap.put( cluster.getName(), managedCluster ); clusterMap.put( cluster.getName(), managedCluster );
life.add( managedCluster ); life.add( managedCluster );

for ( Predicate<ManagedCluster> availabilityCheck : availabilityChecks )
{
managedCluster.await( availabilityCheck );
}

if ( initialDatasetCreator != null )
{
initialDatasetCreator.receive( managedCluster.getMaster() );
managedCluster.sync();
}
} }
} }


Expand Down Expand Up @@ -649,6 +667,20 @@ public interface ClusterBuilder<SELF>
* {@link Setting} instance as key as well. * {@link Setting} instance as key as well.
*/ */
SELF withSharedSetting( Setting<?> setting, String value ); SELF withSharedSetting( Setting<?> setting, String value );

/**
* Initial dataset to be created once the cluster is up and running.
*
* @param transactor the {@link Listener} receiving a call to create the dataset on the master.
*/
SELF withInitialDataset( Listener<GraphDatabaseService> transactor );

/**
* Checks that must pass before cluster is considered to be up.
*
* @param checks availability checks that must pass before considering the cluster online.
*/
SELF withAvailabilityChecks( Predicate<ManagedCluster>... checks );
} }


public static class Builder implements ClusterBuilder<Builder> public static class Builder implements ClusterBuilder<Builder>
Expand All @@ -658,6 +690,8 @@ public static class Builder implements ClusterBuilder<Builder>
private final Map<String,IntFunction<String>> commonConfig = new HashMap<>(); private final Map<String,IntFunction<String>> commonConfig = new HashMap<>();
private HighlyAvailableGraphDatabaseFactory factory = new HighlyAvailableGraphDatabaseFactory(); private HighlyAvailableGraphDatabaseFactory factory = new HighlyAvailableGraphDatabaseFactory();
private StoreDirInitializer initializer; private StoreDirInitializer initializer;
private Listener<GraphDatabaseService> initialDatasetCreator;
private List<Predicate<ManagedCluster>> availabilityChecks = Collections.emptyList();


public Builder( File root ) public Builder( File root )
{ {
Expand Down Expand Up @@ -742,6 +776,21 @@ public Builder withSharedSetting( Setting<?> setting, String value )
return withInstanceSetting( setting, constant( value ) ); return withInstanceSetting( setting, constant( value ) );
} }


@Override
public Builder withInitialDataset( Listener<GraphDatabaseService> transactor )
{
this.initialDatasetCreator = transactor;
return this;
}

@Override
@SafeVarargs
public final Builder withAvailabilityChecks( Predicate<ManagedCluster>... checks )
{
this.availabilityChecks = Arrays.asList( checks );
return this;
}

public ClusterManager build() public ClusterManager build()
{ {
return new ClusterManager( this ); return new ClusterManager( this );
Expand Down
Expand Up @@ -42,7 +42,7 @@ public class TestClusterClientPadding
public void setUp() throws Throwable public void setUp() throws Throwable
{ {
cluster = clusterRule.withProvider( clusterWithAdditionalClients( 2, 1 ) ) cluster = clusterRule.withProvider( clusterWithAdditionalClients( 2, 1 ) )
.availabilityChecks( masterAvailable(), masterSeesMembers( 3 ), allSeesAllAsJoined() ) .withAvailabilityChecks( masterAvailable(), masterSeesMembers( 3 ), allSeesAllAsJoined() )
.startCluster(); .startCluster();
} }


Expand Down
Expand Up @@ -19,50 +19,45 @@
*/ */
package org.neo4j.ha; package org.neo4j.ha;


import org.junit.Before; import org.junit.ClassRule;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;


import java.io.File;
import java.util.concurrent.Future; import java.util.concurrent.Future;


import org.neo4j.graphdb.GraphDatabaseService; import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Transaction; import org.neo4j.graphdb.Transaction;
import org.neo4j.kernel.ha.HighlyAvailableGraphDatabase; import org.neo4j.kernel.ha.HighlyAvailableGraphDatabase;
import org.neo4j.kernel.impl.ha.ClusterManager; import org.neo4j.kernel.impl.ha.ClusterManager;
import org.neo4j.kernel.lifecycle.LifeRule; import org.neo4j.kernel.impl.util.Listener;
import org.neo4j.test.OtherThreadRule; import org.neo4j.test.OtherThreadRule;
import org.neo4j.test.TargetDirectory; import org.neo4j.test.ha.ClusterRule;
import org.neo4j.test.TestGraphDatabaseFactory;


import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;

import static org.neo4j.graphdb.DynamicLabel.label; import static org.neo4j.graphdb.DynamicLabel.label;
import static org.neo4j.kernel.impl.api.integrationtest.UniquenessConstraintValidationConcurrencyIT.createNode; import static org.neo4j.kernel.impl.api.integrationtest.UniquenessConstraintValidationConcurrencyIT.createNode;
import static org.neo4j.kernel.impl.ha.ClusterManager.allSeesAllAsAvailable;
import static org.neo4j.test.OtherThreadRule.isWaiting; import static org.neo4j.test.OtherThreadRule.isWaiting;
import static org.neo4j.test.TargetDirectory.testDirForTest;


public class UniquenessConstraintValidationHAIT public class UniquenessConstraintValidationHAIT
{ {
public final @Rule LifeRule life = new LifeRule(); private static final Label LABEL = label( "Label1" );
public final @Rule TargetDirectory.TestDirectory targetDir = private static final String PROPERTY_KEY = "key1";
testDirForTest( UniquenessConstraintValidationHAIT.class );
public final @Rule OtherThreadRule<Void> otherThread = new OtherThreadRule<>();


@Before @Rule
public void startLife() public final OtherThreadRule<Void> otherThread = new OtherThreadRule<>();
{ @ClassRule
life.start(); public static final ClusterRule clusterRule = new ClusterRule( UniquenessConstraintValidationHAIT.class )
} .withInitialDataset( uniquenessConstraint( LABEL, PROPERTY_KEY ) );


@Test @Test
public void shouldAllowCreationOfNonConflictingDataOnSeparateHosts() throws Exception public void shouldAllowCreationOfNonConflictingDataOnSeparateHosts() throws Exception
{ {
// given // given
ClusterManager.ManagedCluster cluster = startClusterSeededWith( ClusterManager.ManagedCluster cluster = clusterRule.startCluster();
databaseWithUniquenessConstraint( "Label1", "key1" ) );


HighlyAvailableGraphDatabase slave1 = cluster.getAnySlave(); HighlyAvailableGraphDatabase slave1 = cluster.getAnySlave();
HighlyAvailableGraphDatabase slave2 = cluster.getAnySlave( /*except:*/slave1 ); HighlyAvailableGraphDatabase slave2 = cluster.getAnySlave( /*except:*/slave1 );
Expand All @@ -72,9 +67,9 @@ public void shouldAllowCreationOfNonConflictingDataOnSeparateHosts() throws Exce


try ( Transaction tx = slave1.beginTx() ) try ( Transaction tx = slave1.beginTx() )
{ {
slave1.createNode( label( "Label1" ) ).setProperty( "key1", "value1" ); slave1.createNode( LABEL ).setProperty( PROPERTY_KEY, "value1" );


created = otherThread.execute( createNode( slave2, "Label1", "key1", "value2" ) ); created = otherThread.execute( createNode( slave2, LABEL.name(), PROPERTY_KEY, "value2" ) );
tx.success(); tx.success();
} }


Expand All @@ -86,8 +81,7 @@ public void shouldAllowCreationOfNonConflictingDataOnSeparateHosts() throws Exce
public void shouldPreventConcurrentCreationOfConflictingDataOnSeparateHosts() throws Exception public void shouldPreventConcurrentCreationOfConflictingDataOnSeparateHosts() throws Exception
{ {
// given // given
ClusterManager.ManagedCluster cluster = startClusterSeededWith( ClusterManager.ManagedCluster cluster = clusterRule.startCluster();
databaseWithUniquenessConstraint( "Label1", "key1" ) );


HighlyAvailableGraphDatabase slave1 = cluster.getAnySlave(); HighlyAvailableGraphDatabase slave1 = cluster.getAnySlave();
HighlyAvailableGraphDatabase slave2 = cluster.getAnySlave( /*except:*/slave1 ); HighlyAvailableGraphDatabase slave2 = cluster.getAnySlave( /*except:*/slave1 );
Expand All @@ -96,9 +90,9 @@ public void shouldPreventConcurrentCreationOfConflictingDataOnSeparateHosts() th
Future<Boolean> created; Future<Boolean> created;
try ( Transaction tx = slave1.beginTx() ) try ( Transaction tx = slave1.beginTx() )
{ {
slave1.createNode( label( "Label1" ) ).setProperty( "key1", "value1" ); slave1.createNode( LABEL ).setProperty( PROPERTY_KEY, "value3" );


created = otherThread.execute( createNode( slave2, "Label1", "key1", "value1" ) ); created = otherThread.execute( createNode( slave2, LABEL.name(), PROPERTY_KEY, "value3" ) );


assertThat( otherThread, isWaiting() ); assertThat( otherThread, isWaiting() );


Expand All @@ -114,8 +108,7 @@ public void shouldPreventConcurrentCreationOfConflictingDataOnSeparateHosts() th
public void shouldPreventConcurrentCreationOfConflictingNonStringPropertyOnMasterAndSlave() throws Exception public void shouldPreventConcurrentCreationOfConflictingNonStringPropertyOnMasterAndSlave() throws Exception
{ {
// given // given
ClusterManager.ManagedCluster cluster = startClusterSeededWith( ClusterManager.ManagedCluster cluster = clusterRule.startCluster();
databaseWithUniquenessConstraint( "Label1", "key1" ) );


HighlyAvailableGraphDatabase master = cluster.getMaster(); HighlyAvailableGraphDatabase master = cluster.getMaster();
HighlyAvailableGraphDatabase slave = cluster.getAnySlave(); HighlyAvailableGraphDatabase slave = cluster.getAnySlave();
Expand All @@ -124,9 +117,9 @@ public void shouldPreventConcurrentCreationOfConflictingNonStringPropertyOnMaste
Future<Boolean> created; Future<Boolean> created;
try ( Transaction tx = master.beginTx() ) try ( Transaction tx = master.beginTx() )
{ {
master.createNode( label( "Label1" ) ).setProperty( "key1", 0x0099CC ); master.createNode( LABEL ).setProperty( PROPERTY_KEY, 0x0099CC );


created = otherThread.execute( createNode( slave, "Label1", "key1", 0x0099CC ) ); created = otherThread.execute( createNode( slave, LABEL.name(), PROPERTY_KEY, 0x0099CC ) );


assertThat( otherThread, isWaiting() ); assertThat( otherThread, isWaiting() );


Expand All @@ -141,8 +134,7 @@ public void shouldPreventConcurrentCreationOfConflictingNonStringPropertyOnMaste
public void shouldAllowOtherHostToCompleteIfFirstHostRollsBackTransaction() throws Exception public void shouldAllowOtherHostToCompleteIfFirstHostRollsBackTransaction() throws Exception
{ {
// given // given
ClusterManager.ManagedCluster cluster = startClusterSeededWith( ClusterManager.ManagedCluster cluster = clusterRule.startCluster();
databaseWithUniquenessConstraint( "Label1", "key1" ) );


HighlyAvailableGraphDatabase slave1 = cluster.getAnySlave(); HighlyAvailableGraphDatabase slave1 = cluster.getAnySlave();
HighlyAvailableGraphDatabase slave2 = cluster.getAnySlave( /*except:*/slave1 ); HighlyAvailableGraphDatabase slave2 = cluster.getAnySlave( /*except:*/slave1 );
Expand All @@ -152,9 +144,9 @@ public void shouldAllowOtherHostToCompleteIfFirstHostRollsBackTransaction() thro


try ( Transaction tx = slave1.beginTx() ) try ( Transaction tx = slave1.beginTx() )
{ {
slave1.createNode( label( "Label1" ) ).setProperty( "key1", "value1" ); slave1.createNode( LABEL ).setProperty( PROPERTY_KEY, "value4" );


created = otherThread.execute( createNode( slave2, "Label1", "key1", "value1" ) ); created = otherThread.execute( createNode( slave2, LABEL.name(), PROPERTY_KEY, "value4" ) );


assertThat( otherThread, isWaiting() ); assertThat( otherThread, isWaiting() );


Expand All @@ -166,32 +158,20 @@ public void shouldAllowOtherHostToCompleteIfFirstHostRollsBackTransaction() thro
assertTrue( "creating data that conflicts only with rolled back data should pass", created.get() ); assertTrue( "creating data that conflicts only with rolled back data should pass", created.get() );
} }


private ClusterManager.ManagedCluster startClusterSeededWith( File seedDir ) private static Listener<GraphDatabaseService> uniquenessConstraint( final Label label, final String propertyKey )
{ {
ClusterManager.ManagedCluster cluster = life return new Listener<GraphDatabaseService>()
.add( new ClusterManager.Builder( targetDir.directory() ).withSeedDir( seedDir ).build() )
.getDefaultCluster();
cluster.await( allSeesAllAsAvailable() );
return cluster;
}

private File databaseWithUniquenessConstraint( String label, String propertyKey )
{
File storeDir = new File( targetDir.directory(), "seed" );
GraphDatabaseService graphDb = new TestGraphDatabaseFactory().newEmbeddedDatabase( storeDir.getAbsolutePath() );
try
{ {
try ( Transaction tx = graphDb.beginTx() ) @Override
public void receive( GraphDatabaseService db )
{ {
graphDb.schema().constraintFor( label( label ) ).assertPropertyIsUnique( propertyKey ).create(); try ( Transaction tx = db.beginTx() )
{
db.schema().constraintFor( label ).assertPropertyIsUnique( propertyKey ).create();


tx.success(); tx.success();
}
} }
} };
finally
{
graphDb.shutdown();
}
return storeDir;
} }
} }
Expand Up @@ -49,7 +49,7 @@ public void setup() throws Exception
{ {
cluster = clusterRule cluster = clusterRule
.withProvider( clusterWithAdditionalClients( 2, 1 ) ) .withProvider( clusterWithAdditionalClients( 2, 1 ) )
.availabilityChecks( masterAvailable(), masterSeesMembers( 3 ), allSeesAllAsJoined() ) .withAvailabilityChecks( masterAvailable(), masterSeesMembers( 3 ), allSeesAllAsJoined() )
.startCluster(); .startCluster();
} }


Expand Down

0 comments on commit 8b81436

Please sign in to comment.