Skip to content

Commit

Permalink
Refactor finding db with role.
Browse files Browse the repository at this point in the history
  • Loading branch information
martinfurmanski committed Mar 14, 2016
1 parent 067d879 commit a3fba8f
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 29 deletions.
Expand Up @@ -350,33 +350,38 @@ public GraphDatabaseService findAnEdgeServer()
return edgeServers.iterator().next(); return edgeServers.iterator().next();
} }


public CoreGraphDatabase getLeader() public CoreGraphDatabase getDbWithRole( Role role )
{ {
for ( CoreGraphDatabase coreServer : coreServers ) for ( CoreGraphDatabase coreServer : coreServers )
{ {
if ( coreServer.getRole().equals( Role.LEADER ) ) if ( coreServer.getRole().equals( role ) )
{ {
return coreServer; return coreServer;
} }
} }
return null; return null;
} }


public CoreGraphDatabase findLeader( long leaderWaitTimeout ) throws NoLeaderFoundException public CoreGraphDatabase awaitLeader( long timeoutMillis ) throws TimeoutException
{ {
long leaderWaitEndTime = leaderWaitTimeout + System.currentTimeMillis(); return awaitCoreGraphDatabaseWithRole( timeoutMillis, Role.LEADER );
CoreGraphDatabase leader; }
while ( (leader = getLeader()) == null && (System.currentTimeMillis() < leaderWaitEndTime) )
public CoreGraphDatabase awaitCoreGraphDatabaseWithRole( long timeoutMillis, Role role ) throws TimeoutException
{
long endTimeMillis = timeoutMillis + System.currentTimeMillis();

CoreGraphDatabase db;
while ( (db = getDbWithRole( role ) ) == null && (System.currentTimeMillis() < endTimeMillis) )
{ {
LockSupport.parkNanos( MILLISECONDS.toNanos( 1000 ) ); LockSupport.parkNanos( MILLISECONDS.toNanos( 100 ) );
} }


if ( leader == null ) if ( db == null )
{ {
throw new NoLeaderFoundException(); throw new TimeoutException();
} }

return db;
return leader;
} }


public int numberOfCoreServers() public int numberOfCoreServers()
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.junit.Test; import org.junit.Test;


import org.neo4j.coreedge.discovery.Cluster; import org.neo4j.coreedge.discovery.Cluster;
import org.neo4j.coreedge.raft.roles.Role;
import org.neo4j.coreedge.server.core.CoreGraphDatabase; import org.neo4j.coreedge.server.core.CoreGraphDatabase;
import org.neo4j.graphdb.Transaction; import org.neo4j.graphdb.Transaction;
import org.neo4j.test.TargetDirectory; import org.neo4j.test.TargetDirectory;
Expand Down Expand Up @@ -83,7 +84,7 @@ public void shouldBeAbleToAddAndRemoveCoreServersUnderModestLoad() throws Except


ExecutorService executorService = Executors.newSingleThreadExecutor(); ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit( (Runnable) () -> { executorService.submit( (Runnable) () -> {
CoreGraphDatabase leader = cluster.getLeader(); CoreGraphDatabase leader = cluster.getDbWithRole( Role.LEADER );
try ( Transaction tx = leader.beginTx() ) try ( Transaction tx = leader.beginTx() )
{ {
leader.createNode(); leader.createNode();
Expand Down Expand Up @@ -137,4 +138,4 @@ public void shouldBeAbleToRestartTheCluster() throws Exception


assertEquals( 3, cluster.numberOfCoreServers() ); assertEquals( 3, cluster.numberOfCoreServers() );
} }
} }
Expand Up @@ -155,7 +155,7 @@ public Long call() throws Exception


public void createNodes( long nodeCount ) throws Exception public void createNodes( long nodeCount ) throws Exception
{ {
GraphDatabaseService coreServer = cluster.findLeader( 5000 ); GraphDatabaseService coreServer = cluster.awaitLeader( 5000 );
try ( Transaction tx = coreServer.beginTx() ) try ( Transaction tx = coreServer.beginTx() )
{ {
for ( int i = 0; i < nodeCount; i++ ) for ( int i = 0; i < nodeCount; i++ )
Expand Down
Expand Up @@ -83,7 +83,7 @@ public void shouldReplicateTransactionToCoreServers() throws Throwable
cluster = Cluster.start( dbDir, 3, 0 ); cluster = Cluster.start( dbDir, 3, 0 );


// when // when
GraphDatabaseService coreDB = cluster.findLeader( 5000 ); GraphDatabaseService coreDB = cluster.awaitLeader( 5000 );


try ( Transaction tx = coreDB.beginTx() ) try ( Transaction tx = coreDB.beginTx() )
{ {
Expand Down
Expand Up @@ -76,6 +76,7 @@ public void shutdown()
if ( cluster != null ) if ( cluster != null )
{ {
cluster.shutdown(); cluster.shutdown();
cluster = null;
} }
} }


Expand All @@ -87,7 +88,7 @@ public void shouldReplicateTransactionToCoreServers() throws Exception
cluster = Cluster.start( dbDir, 3, 0 ); cluster = Cluster.start( dbDir, 3, 0 );


// when // when
GraphDatabaseService coreDB = cluster.findLeader( 5000 ); GraphDatabaseService coreDB = cluster.awaitLeader( 5000 );


try ( Transaction tx = coreDB.beginTx() ) try ( Transaction tx = coreDB.beginTx() )
{ {
Expand Down Expand Up @@ -134,7 +135,7 @@ public void shouldReplicateTransactionToCoreServersAddedAfterInitialStartUp() th
cluster.addCoreServerWithServerId( 3, 4 ); cluster.addCoreServerWithServerId( 3, 4 );


// when // when
GraphDatabaseService coreDB = cluster.findLeader( 5000 ); GraphDatabaseService coreDB = cluster.awaitLeader( 5000 );


try ( Transaction tx = coreDB.beginTx() ) try ( Transaction tx = coreDB.beginTx() )
{ {
Expand All @@ -145,7 +146,7 @@ public void shouldReplicateTransactionToCoreServersAddedAfterInitialStartUp() th


cluster.addCoreServerWithServerId( 4, 5 ); cluster.addCoreServerWithServerId( 4, 5 );


coreDB = cluster.findLeader( 5000 ); coreDB = cluster.awaitLeader( 5000 );


try ( Transaction tx = coreDB.beginTx() ) try ( Transaction tx = coreDB.beginTx() )
{ {
Expand Down Expand Up @@ -181,7 +182,7 @@ public void shouldReplicateTransactionAfterOneOriginalServerRemovedFromCluster()
{ {
File dbDir = dir.directory(); File dbDir = dir.directory();
cluster = Cluster.start( dbDir, 3, 0 ); cluster = Cluster.start( dbDir, 3, 0 );
CoreGraphDatabase leader = cluster.findLeader( 5000 ); CoreGraphDatabase leader = cluster.awaitLeader( 5000 );
try ( Transaction tx = leader.beginTx() ) try ( Transaction tx = leader.beginTx() )
{ {
Node node = leader.createNode(); Node node = leader.createNode();
Expand All @@ -190,7 +191,7 @@ public void shouldReplicateTransactionAfterOneOriginalServerRemovedFromCluster()
} }


cluster.removeCoreServer( leader ); cluster.removeCoreServer( leader );
final GraphDatabaseService newLeader = cluster.findLeader( 5000 ); final GraphDatabaseService newLeader = cluster.awaitLeader( 5000 );
ThrowingSupplier<Boolean, Exception> creationSuccess = () -> { ThrowingSupplier<Boolean, Exception> creationSuccess = () -> {
try ( Transaction tx = newLeader.beginTx() ) try ( Transaction tx = newLeader.beginTx() )
{ {
Expand Down Expand Up @@ -241,7 +242,7 @@ public void shouldReplicateToCoreServersAddedAfterInitialTransactions() throws E
// when // when
for ( int i = 0; i < 15; i++ ) for ( int i = 0; i < 15; i++ )
{ {
CoreGraphDatabase leader = cluster.findLeader( 5000 ); CoreGraphDatabase leader = cluster.awaitLeader( 5000 );


try ( Transaction tx = leader.beginTx() ) try ( Transaction tx = leader.beginTx() )
{ {
Expand Down
Expand Up @@ -19,15 +19,15 @@
*/ */
package org.neo4j.coreedge.scenarios; package org.neo4j.coreedge.scenarios;


import java.io.File;
import java.util.Set;
import java.util.concurrent.TimeoutException;

import org.junit.After; import org.junit.After;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;


import java.io.File;
import java.util.Set;

import org.neo4j.coreedge.discovery.Cluster; import org.neo4j.coreedge.discovery.Cluster;
import org.neo4j.coreedge.raft.NoLeaderFoundException;
import org.neo4j.coreedge.server.core.CoreGraphDatabase; import org.neo4j.coreedge.server.core.CoreGraphDatabase;
import org.neo4j.coreedge.server.edge.EdgeGraphDatabase; import org.neo4j.coreedge.server.edge.EdgeGraphDatabase;
import org.neo4j.function.ThrowingSupplier; import org.neo4j.function.ThrowingSupplier;
Expand All @@ -43,10 +43,12 @@
import static java.io.File.pathSeparator; import static java.io.File.pathSeparator;
import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.concurrent.TimeUnit.SECONDS;

import static org.hamcrest.core.Is.is; import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;

import static org.neo4j.helpers.collection.Iterables.count; import static org.neo4j.helpers.collection.Iterables.count;
import static org.neo4j.test.Assert.assertEventually; import static org.neo4j.test.Assert.assertEventually;


Expand Down Expand Up @@ -204,12 +206,12 @@ private File createExistingEdgeStore( String path )
return dir; return dir;
} }


private GraphDatabaseService executeOnLeaderWithRetry( Workload workload ) throws NoLeaderFoundException private GraphDatabaseService executeOnLeaderWithRetry( Workload workload ) throws TimeoutException
{ {
CoreGraphDatabase coreDB; CoreGraphDatabase coreDB;
while ( true ) while ( true )
{ {
coreDB = cluster.findLeader( 5000 ); coreDB = cluster.awaitLeader( 5000 );
try ( Transaction tx = coreDB.beginTx() ) try ( Transaction tx = coreDB.beginTx() )
{ {
workload.doWork( coreDB ); workload.doWork( coreDB );
Expand Down
Expand Up @@ -134,7 +134,7 @@ public void edgeTest() throws Exception
cluster = Cluster.start( dbDir, 2, 1 ); cluster = Cluster.start( dbDir, 2, 1 );


// when // when
final GraphDatabaseService coreDB = cluster.findLeader( 5000 ); final GraphDatabaseService coreDB = cluster.awaitLeader( 5000 );


try ( Transaction tx = coreDB.beginTx() ) try ( Transaction tx = coreDB.beginTx() )
{ {
Expand Down
Expand Up @@ -78,7 +78,7 @@ public void shouldMonitorCoreEdge() throws Exception
cluster = Cluster.start( dbDir, 3, 1 ); cluster = Cluster.start( dbDir, 3, 1 );


// when // when
GraphDatabaseService coreDB = cluster.findLeader( 5000 ); GraphDatabaseService coreDB = cluster.awaitLeader( 5000 );


try ( Transaction tx = coreDB.beginTx() ) try ( Transaction tx = coreDB.beginTx() )
{ {
Expand Down

0 comments on commit a3fba8f

Please sign in to comment.