Skip to content

Commit

Permalink
Added test to demonstrate revert-to-random-core when no
Browse files Browse the repository at this point in the history
other preferred read replicas are available.
  • Loading branch information
jimwebber committed Feb 8, 2017
1 parent a31d861 commit dfc2c00
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 51 deletions.
Expand Up @@ -75,7 +75,6 @@ public <T> T makeBlockingRequest( MemberId upstream, CatchUpRequest request,
Optional<AdvertisedSocketAddress> catchUpAddress =
discoveryService.allServers().find( upstream ).map( CatchupServerAddress::getCatchupServer );


CatchUpChannel channel = pool.acquire( catchUpAddress.orElseThrow(
() -> new CatchUpClientException( "Cannot find the target member socket address" ) ) );

Expand Down
Expand Up @@ -27,7 +27,7 @@

public abstract class UpstreamDatabaseSelectionStrategy extends Service
{
protected ReadReplicaTopologyService readReplicaTopologyService;
ReadReplicaTopologyService readReplicaTopologyService;

public UpstreamDatabaseSelectionStrategy( String key, String... altKeys )
{
Expand Down
Expand Up @@ -61,8 +61,8 @@ public MemberId bestUpstreamDatabase() throws UpstreamDatabaseSelectionException
if ( strategy.upstreamDatabase().isPresent() )
{
result = strategy.upstreamDatabase().get();
break;
}
break;
}
catch ( NoSuchElementException ex )
{
Expand Down
Expand Up @@ -163,7 +163,6 @@ public String directURI()
return String.format( "bolt://%s", boltAdvertisedAddress );
}


public File homeDir()
{
return neo4jHome;
Expand Down
Expand Up @@ -69,22 +69,15 @@ public void shouldEventuallyPullTransactionAcrossReadReplicas() throws Throwable
{
// given
Cluster cluster = clusterRule.startCluster();
int nodesBeforeReadReplicaStarts = 1;
int numberOfNodesToCreate = 100;

cluster.coreTx( ( db, tx ) ->
{
db.schema().constraintFor( Label.label( "Foo" ) ).assertPropertyIsUnique( "foobar" ).create();
tx.success();
} );

for ( int i = 0; i < 100; i++ )
{
cluster.coreTx( ( db, tx ) ->
{
createData( db, nodesBeforeReadReplicaStarts );
tx.success();
} );
}
storeSomeDataInCores( cluster, numberOfNodesToCreate );

AtomicBoolean labelScanStoreCorrectlyPlaced = new AtomicBoolean( false );
Monitors monitors = new Monitors();
Expand All @@ -103,22 +96,7 @@ public void shouldEventuallyPullTransactionAcrossReadReplicas() throws Throwable

firstReadReplica.start();

for ( final ReadReplica server : cluster.readReplicas() )
{
GraphDatabaseService readReplica = server.database();
try ( Transaction tx = readReplica.beginTx() )
{
ThrowingSupplier<Long,Exception> nodeCount = () -> count( readReplica.getAllNodes() );
assertEventually( "node to appear on read replica", nodeCount, is( 100L ), 1, MINUTES );

for ( Node node : readReplica.getAllNodes() )
{
assertThat( node.getProperty( "foobar" ).toString(), startsWith( "baz_bat" ) );
}

tx.success();
}
}
checkDataHasReplicatedToReadReplicas( cluster, numberOfNodesToCreate );

for ( CoreClusterMember coreClusterMember : cluster.coreMembers() )
{
Expand All @@ -134,13 +112,103 @@ public void shouldEventuallyPullTransactionAcrossReadReplicas() throws Throwable

// then

checkDataHasReplicatedToReadReplicas( cluster, numberOfNodesToCreate );
}

@Test
public void shouldCatchUpFromCoresWhenPreferredReadReplicasAreUnavailable() throws Throwable
{
// given
Cluster cluster = clusterRule.startCluster();
int numberOfNodes = 1;
int firstReadReplicaLocalMemberId = 101;

cluster.coreTx( ( db, tx ) ->
{
db.schema().constraintFor( Label.label( "Foo" ) ).assertPropertyIsUnique( "foobar" ).create();
tx.success();
} );

storeSomeDataInCores( cluster, numberOfNodes );

AtomicBoolean labelScanStoreCorrectlyPlaced = new AtomicBoolean( false );
Monitors monitors = new Monitors();

ReadReplica firstReadReplica =
cluster.addReadReplicaWithIdAndMonitors( firstReadReplicaLocalMemberId, monitors );
File labelScanStore = LabelScanStoreProvider
.getStoreDirectory( new File( firstReadReplica.storeDir(), TEMP_COPY_DIRECTORY_NAME ) );

monitors.addMonitorListener( (FileCopyMonitor) file ->
{
if ( file.getParent().contains( labelScanStore.getPath() ) )
{
labelScanStoreCorrectlyPlaced.set( true );
}
} );

firstReadReplica.start();

checkDataHasReplicatedToReadReplicas( cluster, numberOfNodes );

upstreamFactory.setCurrent( firstReadReplica );

ReadReplica secondReadReplica = cluster.addReadReplicaWithId( 202 );
secondReadReplica.setUpstreamDatabaseSelectionStrategy( "specific" );

secondReadReplica.start();

checkDataHasReplicatedToReadReplicas( cluster, numberOfNodes );

firstReadReplica.shutdown();
upstreamFactory.reset();

cluster.removeReadReplicaWithMemberId( firstReadReplicaLocalMemberId );

// when
// More transactions into core
storeSomeDataInCores( cluster, numberOfNodes );

// then
// reached second read replica from cores
checkDataHasReplicatedToReadReplicas( cluster, numberOfNodes * 2 );
}

private void storeSomeDataInCores( Cluster cluster, int numberOfNodes ) throws Exception
{
for ( int i = 0; i < numberOfNodes; i++ )
{
cluster.coreTx( ( db, tx ) ->
{
createData( db );
tx.success();
} );
}
}

private void createData( GraphDatabaseService db )
{
Node node = db.createNode( Label.label( "Foo" ) );
node.setProperty( "foobar", format( "baz_bat%s", UUID.randomUUID() ) );
node.setProperty( "foobar1", format( "baz_bat%s", UUID.randomUUID() ) );
node.setProperty( "foobar2", format( "baz_bat%s", UUID.randomUUID() ) );
node.setProperty( "foobar3", format( "baz_bat%s", UUID.randomUUID() ) );
node.setProperty( "foobar4", format( "baz_bat%s", UUID.randomUUID() ) );
node.setProperty( "foobar5", format( "baz_bat%s", UUID.randomUUID() ) );
node.setProperty( "foobar6", format( "baz_bat%s", UUID.randomUUID() ) );
node.setProperty( "foobar7", format( "baz_bat%s", UUID.randomUUID() ) );
node.setProperty( "foobar8", format( "baz_bat%s", UUID.randomUUID() ) );
}

private void checkDataHasReplicatedToReadReplicas( Cluster cluster, long numberOfNodes ) throws Exception
{
for ( final ReadReplica server : cluster.readReplicas() )
{
GraphDatabaseService readReplica = server.database();
try ( Transaction tx = readReplica.beginTx() )
{
ThrowingSupplier<Long,Exception> nodeCount = () -> count( readReplica.getAllNodes() );
assertEventually( "node to appear on read replica", nodeCount, is( 100L ), 1, MINUTES );
assertEventually( "node to appear on read replica", nodeCount, is( numberOfNodes ), 1, MINUTES );

for ( Node node : readReplica.getAllNodes() )
{
Expand All @@ -152,41 +220,29 @@ public void shouldEventuallyPullTransactionAcrossReadReplicas() throws Throwable
}
}

private void createData( GraphDatabaseService db, int amount )
{
for ( int i = 0; i < amount; i++ )
{
Node node = db.createNode( Label.label( "Foo" ) );
node.setProperty( "foobar", format( "baz_bat%s", UUID.randomUUID() ) );
node.setProperty( "foobar1", format( "baz_bat%s", UUID.randomUUID() ) );
node.setProperty( "foobar2", format( "baz_bat%s", UUID.randomUUID() ) );
node.setProperty( "foobar3", format( "baz_bat%s", UUID.randomUUID() ) );
node.setProperty( "foobar4", format( "baz_bat%s", UUID.randomUUID() ) );
node.setProperty( "foobar5", format( "baz_bat%s", UUID.randomUUID() ) );
node.setProperty( "foobar6", format( "baz_bat%s", UUID.randomUUID() ) );
node.setProperty( "foobar7", format( "baz_bat%s", UUID.randomUUID() ) );
node.setProperty( "foobar8", format( "baz_bat%s", UUID.randomUUID() ) );
}
}

@Service.Implementation( UpstreamDatabaseSelectionStrategy.class )
public static class SpecificReplicaStrategy extends UpstreamDatabaseSelectionStrategy
{
// This because we need a stable point for config to inject into Service loader loaded classes
static final UpstreamFactory upstreamFactory = new UpstreamFactory();

private ReadReplica upstream;

public SpecificReplicaStrategy()
{
super( "specific" );
this.upstream = upstreamFactory.current();
}

@Override
public Optional<MemberId> upstreamDatabase() throws UpstreamDatabaseSelectionException
{
return upstream.memberId();
ReadReplica current = upstreamFactory.current();
if ( current == null )
{
return Optional.empty();
}
else
{
return current.memberId();
}
}
}

Expand All @@ -203,5 +259,10 @@ public ReadReplica current()
{
return current;
}

void reset()
{
current = null;
}
}
}

0 comments on commit dfc2c00

Please sign in to comment.