Skip to content

Commit

Permalink
Feedback from PR implemented, code tider.
Browse files Browse the repository at this point in the history
  • Loading branch information
jimwebber committed Feb 10, 2017
1 parent 1dd1b4a commit f1464b3
Show file tree
Hide file tree
Showing 13 changed files with 55 additions and 92 deletions.
Expand Up @@ -37,7 +37,7 @@ public ClusterTopology( CoreTopology coreTopology, ReadReplicaTopology readRepli
public Optional<CatchupServerAddress> find( MemberId upstream ) public Optional<CatchupServerAddress> find( MemberId upstream )
{ {
Optional<CoreAddresses> coreAddresses = coreTopology.find( upstream ); Optional<CoreAddresses> coreAddresses = coreTopology.find( upstream );
Optional<ReadReplicaAddresses> readReplicaAddresses = readReplicaTopology.find( upstream ); Optional<ReadReplicaAddresses> readReplicaAddresses = readReplicaTopology.findAddressFor( upstream );


if ( coreAddresses.isPresent() ) if ( coreAddresses.isPresent() )
{ {
Expand Down
Expand Up @@ -100,14 +100,7 @@ private Difference asDifference( CoreTopology topology, MemberId memberId )


public Optional<MemberId> anyCoreMemberId() public Optional<MemberId> anyCoreMemberId()
{ {
if ( coreMembers.keySet().size() == 0 )
{
return Optional.empty();
}
else
{
return coreMembers.keySet().stream().findAny(); return coreMembers.keySet().stream().findAny();
}
} }


class TopologyDifference class TopologyDifference
Expand Down
Expand Up @@ -41,12 +41,12 @@ public ReadReplicaTopology( Map<MemberId,ReadReplicaAddresses> readReplicaMember
this.readReplicaMembers = readReplicaMembers; this.readReplicaMembers = readReplicaMembers;
} }


public Collection<ReadReplicaAddresses> members() public Collection<ReadReplicaAddresses> addresses()
{ {
return readReplicaMembers.values(); return readReplicaMembers.values();
} }


public Optional<ReadReplicaAddresses> find( MemberId memberId ) Optional<ReadReplicaAddresses> findAddressFor( MemberId memberId )
{ {
return Optional.ofNullable( readReplicaMembers.get( memberId ) ); return Optional.ofNullable( readReplicaMembers.get( memberId ) );
} }
Expand Down
Expand Up @@ -99,7 +99,7 @@ public RawIterator<Object[],ProcedureException> apply( Context ctx, Object[] inp
log.debug( "No Address found for " + memberId ); log.debug( "No Address found for " + memberId );
} }
} }
for ( ReadReplicaAddresses readReplicaAddresses : discoveryService.readReplicas().members() ) for ( ReadReplicaAddresses readReplicaAddresses : discoveryService.readReplicas().addresses() )
{ {
endpoints.add( new ReadWriteEndPoint( readReplicaAddresses.connectors(), Role.READ_REPLICA ) ); endpoints.add( new ReadWriteEndPoint( readReplicaAddresses.connectors(), Role.READ_REPLICA ) );
} }
Expand Down
Expand Up @@ -38,7 +38,7 @@ public MemberId( UUID uuid )
{ {
Objects.requireNonNull( uuid ); Objects.requireNonNull( uuid );
this.uuid = uuid; this.uuid = uuid;
shortName = uuid.toString();//.substring( 0, 8 ); shortName = uuid.toString().substring( 0, 8 );
} }


public UUID getUuid() public UUID getUuid()
Expand Down
Expand Up @@ -130,7 +130,7 @@ private List<Endpoint> writeEndpoints()


private List<Endpoint> readEndpoints() private List<Endpoint> readEndpoints()
{ {
List<AdvertisedSocketAddress> readReplicas = discoveryService.readReplicas().members().stream() List<AdvertisedSocketAddress> readReplicas = discoveryService.readReplicas().addresses().stream()
.map( extractBoltAddress() ).collect( toList() ); .map( extractBoltAddress() ).collect( toList() );
boolean addFollowers = readReplicas.isEmpty() || config.get( cluster_allow_reads_on_followers ); boolean addFollowers = readReplicas.isEmpty() || config.get( cluster_allow_reads_on_followers );
Stream<AdvertisedSocketAddress> readCore = addFollowers ? coreReadEndPoints() : Stream.empty(); Stream<AdvertisedSocketAddress> readCore = addFollowers ? coreReadEndPoints() : Stream.empty();
Expand Down
Expand Up @@ -94,7 +94,7 @@ private List<Endpoint> writeEndpoints( CoreTopology cores )


private List<Endpoint> readEndpoints( CoreTopology cores, ReadReplicaTopology readers ) private List<Endpoint> readEndpoints( CoreTopology cores, ReadReplicaTopology readers )
{ {
return concat( readers.members().stream(), cores.addresses().stream() ) return concat( readers.addresses().stream(), cores.addresses().stream() )
.map( extractBoltAddress() ) .map( extractBoltAddress() )
.map( Endpoint::read ) .map( Endpoint::read )
.collect( Collectors.toList() ); .collect( Collectors.toList() );
Expand Down
Expand Up @@ -55,21 +55,13 @@ private static class ModuloCounter
ModuloCounter( int modulo ) ModuloCounter( int modulo )
{ {
// e.g. every 10th means 0-9 // e.g. every 10th means 0-9
this.modulo = modulo -1; this.modulo = modulo - 1;
} }


boolean shouldReturnCoreMemberId() boolean shouldReturnCoreMemberId()
{ {
if ( counter == modulo ) counter = (counter + 1) % modulo;
{ return counter == 0;
counter = 0;
return true;
}
else
{
counter++;
return false;
}
} }
} }
} }
Expand Up @@ -54,7 +54,7 @@ public class UpstreamDatabaseStrategySelector
public MemberId bestUpstreamDatabase() throws UpstreamDatabaseSelectionException public MemberId bestUpstreamDatabase() throws UpstreamDatabaseSelectionException
{ {
MemberId result = null; MemberId result = null;
for ( UpstreamDatabaseSelectionStrategy strategy : this.strategies ) for ( UpstreamDatabaseSelectionStrategy strategy : strategies )
{ {
try try
{ {
Expand Down
Expand Up @@ -46,7 +46,7 @@
import static org.neo4j.backup.OnlineBackupCommandIT.runBackupToolFromOtherJvmToGetExitCode; import static org.neo4j.backup.OnlineBackupCommandIT.runBackupToolFromOtherJvmToGetExitCode;
import static org.neo4j.causalclustering.backup.BackupCoreIT.backupAddress; import static org.neo4j.causalclustering.backup.BackupCoreIT.backupAddress;
import static org.neo4j.causalclustering.discovery.Cluster.dataMatchesEventually; import static org.neo4j.causalclustering.discovery.Cluster.dataMatchesEventually;
import static org.neo4j.causalclustering.helpers.DataCreator.createNodes; import static org.neo4j.causalclustering.helpers.DataCreator.createEmptyNodes;


public class ClusterSeedingIT public class ClusterSeedingIT
{ {
Expand Down Expand Up @@ -156,7 +156,7 @@ public void shouldSeedNewMemberFromNonEmptyIdleCluster() throws Throwable
cluster = new Cluster( testDir.directory( "cluster-b" ), 3, 0, cluster = new Cluster( testDir.directory( "cluster-b" ), 3, 0,
new SharedDiscoveryService(), emptyMap(), backupParams(), emptyMap(), emptyMap(), Standard.LATEST_NAME ); new SharedDiscoveryService(), emptyMap(), backupParams(), emptyMap(), emptyMap(), Standard.LATEST_NAME );
cluster.start(); cluster.start();
createNodes( cluster, 100 ); createEmptyNodes( cluster, 100 );


// when: creating a backup // when: creating a backup
File backupDir = createBackup( cluster.getCoreMemberById( 0 ).database(), "the-backup" ); File backupDir = createBackup( cluster.getCoreMemberById( 0 ).database(), "the-backup" );
Expand Down
Expand Up @@ -19,12 +19,33 @@
*/ */
package org.neo4j.causalclustering.helpers; package org.neo4j.causalclustering.helpers;


import java.util.function.Supplier;

import org.neo4j.causalclustering.discovery.Cluster; import org.neo4j.causalclustering.discovery.Cluster;
import org.neo4j.causalclustering.discovery.CoreClusterMember; import org.neo4j.causalclustering.discovery.CoreClusterMember;
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Node;
import org.neo4j.helpers.collection.Pair;


public class DataCreator public class DataCreator
{ {
public static CoreClusterMember createNodes( Cluster cluster, int numberOfNodes ) throws Exception public static CoreClusterMember createLabelledNodesWithProperty( Cluster cluster, int numberOfNodes,
Label label, Supplier<Pair<String,Object>> supplier ) throws Exception
{
CoreClusterMember last = null;
for ( int i = 0; i < numberOfNodes; i++ )
{
last = cluster.coreTx( ( db, tx ) ->
{
Node node = db.createNode( label );
node.setProperty( supplier.get().first(), supplier.get().other() );
tx.success();
} );
}
return last;
}

public static CoreClusterMember createEmptyNodes( Cluster cluster, int numberOfNodes ) throws Exception
{ {
CoreClusterMember last = null; CoreClusterMember last = null;
for ( int i = 0; i < numberOfNodes; i++ ) for ( int i = 0; i < numberOfNodes; i++ )
Expand Down
Expand Up @@ -38,21 +38,22 @@
import org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionStrategy; import org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionStrategy;
import org.neo4j.function.ThrowingSupplier; import org.neo4j.function.ThrowingSupplier;
import org.neo4j.graphdb.GraphDatabaseService; import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Node; import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Transaction; import org.neo4j.graphdb.Transaction;
import org.neo4j.helpers.Service; import org.neo4j.helpers.Service;
import org.neo4j.helpers.collection.Pair;
import org.neo4j.kernel.impl.api.scan.LabelScanStoreProvider; import org.neo4j.kernel.impl.api.scan.LabelScanStoreProvider;
import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.test.causalclustering.ClusterRule; import org.neo4j.test.causalclustering.ClusterRule;


import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.MINUTES;
import static org.hamcrest.CoreMatchers.startsWith; import static org.hamcrest.CoreMatchers.startsWith;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is; import static org.hamcrest.core.Is.is;
import static org.neo4j.causalclustering.helpers.DataCreator.createLabelledNodesWithProperty;
import static org.neo4j.causalclustering.scenarios.ReadReplicaToReadReplicaCatchupIT.SpecificReplicaStrategy.upstreamFactory; import static org.neo4j.causalclustering.scenarios.ReadReplicaToReadReplicaCatchupIT.SpecificReplicaStrategy.upstreamFactory;
import static org.neo4j.com.storecopy.StoreUtil.TEMP_COPY_DIRECTORY_NAME; import static org.neo4j.com.storecopy.StoreUtil.TEMP_COPY_DIRECTORY_NAME;
import static org.neo4j.graphdb.Label.label;
import static org.neo4j.helpers.collection.Iterables.count; import static org.neo4j.helpers.collection.Iterables.count;
import static org.neo4j.test.assertion.Assert.assertEventually; import static org.neo4j.test.assertion.Assert.assertEventually;


Expand All @@ -73,27 +74,18 @@ public void shouldEventuallyPullTransactionAcrossReadReplicas() throws Throwable


cluster.coreTx( ( db, tx ) -> cluster.coreTx( ( db, tx ) ->
{ {
db.schema().constraintFor( Label.label( "Foo" ) ).assertPropertyIsUnique( "foobar" ).create(); db.schema().constraintFor( label( "Foo" ) ).assertPropertyIsUnique( "foobar" ).create();
tx.success(); tx.success();
} ); } );


storeSomeDataInCores( cluster, numberOfNodesToCreate ); createLabelledNodesWithProperty( cluster, numberOfNodesToCreate, label( "Foo" ),
() -> Pair.of( "foobar", String.format( "baz_bat%s", UUID.randomUUID() ) ) );


AtomicBoolean labelScanStoreCorrectlyPlaced = new AtomicBoolean( false ); ReadReplica firstReadReplica = cluster.addReadReplicaWithIdAndMonitors( 101, new Monitors() );
Monitors monitors = new Monitors();
ReadReplica firstReadReplica = cluster.addReadReplicaWithIdAndMonitors( 101, monitors );


File labelScanStore = LabelScanStoreProvider File labelScanStore = LabelScanStoreProvider
.getStoreDirectory( new File( firstReadReplica.storeDir(), TEMP_COPY_DIRECTORY_NAME ) ); .getStoreDirectory( new File( firstReadReplica.storeDir(), TEMP_COPY_DIRECTORY_NAME ) );


monitors.addMonitorListener( (FileCopyMonitor) file ->
{
if ( file.getParent().contains( labelScanStore.getPath() ) )
{
labelScanStoreCorrectlyPlaced.set( true );
}
} );

firstReadReplica.start(); firstReadReplica.start();


checkDataHasReplicatedToReadReplicas( cluster, numberOfNodesToCreate ); checkDataHasReplicatedToReadReplicas( cluster, numberOfNodesToCreate );
Expand Down Expand Up @@ -125,28 +117,18 @@ public void shouldCatchUpFromCoresWhenPreferredReadReplicasAreUnavailable() thro


cluster.coreTx( ( db, tx ) -> cluster.coreTx( ( db, tx ) ->
{ {
db.schema().constraintFor( Label.label( "Foo" ) ).assertPropertyIsUnique( "foobar" ).create(); db.schema().constraintFor( label( "Foo" ) ).assertPropertyIsUnique( "foobar" ).create();
tx.success(); tx.success();
} ); } );


storeSomeDataInCores( cluster, numberOfNodes ); createLabelledNodesWithProperty( cluster, numberOfNodes, label( "Foo" ),

() -> Pair.of( "foobar", String.format( "baz_bat%s", UUID.randomUUID() ) ) );
AtomicBoolean labelScanStoreCorrectlyPlaced = new AtomicBoolean( false );
Monitors monitors = new Monitors();


ReadReplica firstReadReplica = ReadReplica firstReadReplica =
cluster.addReadReplicaWithIdAndMonitors( firstReadReplicaLocalMemberId, monitors ); cluster.addReadReplicaWithIdAndMonitors( firstReadReplicaLocalMemberId, new Monitors() );
File labelScanStore = LabelScanStoreProvider File labelScanStore = LabelScanStoreProvider
.getStoreDirectory( new File( firstReadReplica.storeDir(), TEMP_COPY_DIRECTORY_NAME ) ); .getStoreDirectory( new File( firstReadReplica.storeDir(), TEMP_COPY_DIRECTORY_NAME ) );


monitors.addMonitorListener( (FileCopyMonitor) file ->
{
if ( file.getParent().contains( labelScanStore.getPath() ) )
{
labelScanStoreCorrectlyPlaced.set( true );
}
} );

firstReadReplica.start(); firstReadReplica.start();


checkDataHasReplicatedToReadReplicas( cluster, numberOfNodes ); checkDataHasReplicatedToReadReplicas( cluster, numberOfNodes );
Expand All @@ -167,39 +149,14 @@ public void shouldCatchUpFromCoresWhenPreferredReadReplicasAreUnavailable() thro


// when // when
// More transactions into core // More transactions into core
storeSomeDataInCores( cluster, numberOfNodes ); createLabelledNodesWithProperty( cluster, numberOfNodes, label( "Foo" ),
() -> Pair.of( "foobar", String.format( "baz_bat%s", UUID.randomUUID() ) ) );


// then // then
// reached second read replica from cores // reached second read replica from cores
checkDataHasReplicatedToReadReplicas( cluster, numberOfNodes * 2 ); checkDataHasReplicatedToReadReplicas( cluster, numberOfNodes * 2 );
} }


private void storeSomeDataInCores( Cluster cluster, int numberOfNodes ) throws Exception
{
for ( int i = 0; i < numberOfNodes; i++ )
{
cluster.coreTx( ( db, tx ) ->
{
createData( db );
tx.success();
} );
}
}

private void createData( GraphDatabaseService db )
{
Node node = db.createNode( Label.label( "Foo" ) );
node.setProperty( "foobar", format( "baz_bat%s", UUID.randomUUID() ) );
node.setProperty( "foobar1", format( "baz_bat%s", UUID.randomUUID() ) );
node.setProperty( "foobar2", format( "baz_bat%s", UUID.randomUUID() ) );
node.setProperty( "foobar3", format( "baz_bat%s", UUID.randomUUID() ) );
node.setProperty( "foobar4", format( "baz_bat%s", UUID.randomUUID() ) );
node.setProperty( "foobar5", format( "baz_bat%s", UUID.randomUUID() ) );
node.setProperty( "foobar6", format( "baz_bat%s", UUID.randomUUID() ) );
node.setProperty( "foobar7", format( "baz_bat%s", UUID.randomUUID() ) );
node.setProperty( "foobar8", format( "baz_bat%s", UUID.randomUUID() ) );
}

private void checkDataHasReplicatedToReadReplicas( Cluster cluster, long numberOfNodes ) throws Exception private void checkDataHasReplicatedToReadReplicas( Cluster cluster, long numberOfNodes ) throws Exception
{ {
for ( final ReadReplica server : cluster.readReplicas() ) for ( final ReadReplica server : cluster.readReplicas() )
Expand Down
Expand Up @@ -44,7 +44,7 @@


import static java.util.Collections.singletonList; import static java.util.Collections.singletonList;
import static org.neo4j.causalclustering.discovery.Cluster.dataMatchesEventually; import static org.neo4j.causalclustering.discovery.Cluster.dataMatchesEventually;
import static org.neo4j.causalclustering.helpers.DataCreator.createNodes; import static org.neo4j.causalclustering.helpers.DataCreator.createEmptyNodes;


/** /**
* Recovery scenarios where the transaction log was only partially written. * Recovery scenarios where the transaction log was only partially written.
Expand Down Expand Up @@ -72,14 +72,14 @@ public void setup() throws Exception
public void coreShouldStartAfterPartialTransactionWriteCrash() throws Exception public void coreShouldStartAfterPartialTransactionWriteCrash() throws Exception
{ {
// given: a fully synced cluster with some data // given: a fully synced cluster with some data
dataMatchesEventually( createNodes( cluster, 10 ), cluster.coreMembers() ); dataMatchesEventually( createEmptyNodes( cluster, 10 ), cluster.coreMembers() );


// when: shutting down a core // when: shutting down a core
CoreClusterMember core = cluster.getCoreMemberById( 0 ); CoreClusterMember core = cluster.getCoreMemberById( 0 );
core.shutdown(); core.shutdown();


// and making sure there will be something new to pull // and making sure there will be something new to pull
CoreClusterMember lastWrites = createNodes( cluster, 10 ); CoreClusterMember lastWrites = createEmptyNodes( cluster, 10 );


// and writing a partial tx // and writing a partial tx
writePartialTx( core.storeDir() ); writePartialTx( core.storeDir() );
Expand All @@ -95,14 +95,14 @@ public void coreShouldStartAfterPartialTransactionWriteCrash() throws Exception
public void coreShouldStartWithSeedHavingPartialTransactionWriteCrash() throws Exception public void coreShouldStartWithSeedHavingPartialTransactionWriteCrash() throws Exception
{ {
// given: a fully synced cluster with some data // given: a fully synced cluster with some data
dataMatchesEventually( createNodes( cluster, 10 ), cluster.coreMembers() ); dataMatchesEventually( createEmptyNodes( cluster, 10 ), cluster.coreMembers() );


// when: shutting down a core // when: shutting down a core
CoreClusterMember core = cluster.getCoreMemberById( 0 ); CoreClusterMember core = cluster.getCoreMemberById( 0 );
core.shutdown(); core.shutdown();


// and making sure there will be something new to pull // and making sure there will be something new to pull
CoreClusterMember lastWrites = createNodes( cluster, 10 ); CoreClusterMember lastWrites = createEmptyNodes( cluster, 10 );


// and writing a partial tx // and writing a partial tx
writePartialTx( core.storeDir() ); writePartialTx( core.storeDir() );
Expand All @@ -122,14 +122,14 @@ public void coreShouldStartWithSeedHavingPartialTransactionWriteCrash() throws E
public void readReplicaShouldStartAfterPartialTransactionWriteCrash() throws Exception public void readReplicaShouldStartAfterPartialTransactionWriteCrash() throws Exception
{ {
// given: a fully synced cluster with some data // given: a fully synced cluster with some data
dataMatchesEventually( createNodes( cluster, 10 ), cluster.readReplicas() ); dataMatchesEventually( createEmptyNodes( cluster, 10 ), cluster.readReplicas() );


// when: shutting down a read replica // when: shutting down a read replica
ReadReplica readReplica = cluster.getReadReplicaById( 0 ); ReadReplica readReplica = cluster.getReadReplicaById( 0 );
readReplica.shutdown(); readReplica.shutdown();


// and making sure there will be something new to pull // and making sure there will be something new to pull
CoreClusterMember lastWrites = createNodes( cluster, 10 ); CoreClusterMember lastWrites = createEmptyNodes( cluster, 10 );
dataMatchesEventually( lastWrites, cluster.coreMembers() ); dataMatchesEventually( lastWrites, cluster.coreMembers() );


// and writing a partial tx // and writing a partial tx
Expand Down

0 comments on commit f1464b3

Please sign in to comment.