Skip to content

Commit

Permalink
Cache local topology information and various minor cleanup
Browse files Browse the repository at this point in the history
The constant filtering actually showed up as a small blip in some
specific scenarios, so it is just prudent to get rid of it by caching.
  • Loading branch information
martinfurmanski committed May 16, 2018
1 parent 99b8c8f commit 003eb67
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 73 deletions.

This file was deleted.

Expand Up @@ -36,7 +36,7 @@

public class CoreTopology implements Topology<CoreServerInfo>
{
static CoreTopology EMPTY = new CoreTopology( null, false, emptyMap() );
static final CoreTopology EMPTY = new CoreTopology( null, false, emptyMap() );

private final ClusterId clusterId;
private final boolean canBeBootstrapped;
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
import org.neo4j.scheduler.JobScheduler;
Expand All @@ -47,7 +48,7 @@
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.getReadReplicaTopology;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.refreshGroups;

public class HazelcastClient extends AbstractTopologyService
public class HazelcastClient implements TopologyService, Lifecycle
{
private final Log log;
private final ClientConnectorAddresses connectorAddresses;
Expand All @@ -66,15 +67,17 @@ public class HazelcastClient extends AbstractTopologyService

private JobScheduler.JobHandle keepAliveJob;
private JobScheduler.JobHandle refreshTopologyJob;
private JobScheduler.JobHandle refreshRolesJob;

private volatile Map<MemberId,AdvertisedSocketAddress> catchupAddressMap = new HashMap<>();
/* cached data updated during each refresh */
private volatile CoreTopology coreTopology = CoreTopology.EMPTY;
private volatile ReadReplicaTopology rrTopology = ReadReplicaTopology.EMPTY;
private volatile Map<MemberId,RoleInfo> coreRoles = emptyMap();
private volatile CoreTopology localCoreTopology = CoreTopology.EMPTY;
private volatile ReadReplicaTopology readReplicaTopology = ReadReplicaTopology.EMPTY;
private volatile ReadReplicaTopology localReadReplicaTopology = ReadReplicaTopology.EMPTY;
private volatile Map<MemberId,AdvertisedSocketAddress> catchupAddressMap = new HashMap<>();
private volatile Map<MemberId,RoleInfo> coreRoles;

public HazelcastClient( HazelcastConnector connector, JobScheduler scheduler, LogProvider logProvider, Config config, MemberId myself,
TopologyServiceRetryStrategy topologyServiceRetryStrategy )
public HazelcastClient( HazelcastConnector connector, JobScheduler scheduler, LogProvider logProvider,
Config config, MemberId myself )
{
this.hzInstance = new RobustHazelcastWrapper( connector );
this.config = config;
Expand Down Expand Up @@ -117,10 +120,22 @@ public CoreTopology allCoreServers()
return coreTopology;
}

@Override
public CoreTopology localCoreServers()
{
return localCoreTopology;
}

@Override
public ReadReplicaTopology allReadReplicas()
{
return rrTopology;
return readReplicaTopology;
}

@Override
public ReadReplicaTopology localReadReplicas()
{
return localReadReplicaTopology;
}

@Override
Expand All @@ -139,8 +154,14 @@ private Optional<AdvertisedSocketAddress> retrieveSocketAddress( MemberId member
*/
private void refreshTopology() throws HazelcastInstanceNotActiveException
{
coreTopology = hzInstance.apply( hz -> getCoreTopology( hz, config, log ) );
rrTopology = hzInstance.apply( hz -> getReadReplicaTopology( hz, log ) );
CoreTopology newCoreTopology = hzInstance.apply( hz -> getCoreTopology( hz, config, log ) );
coreTopology = newCoreTopology;
localCoreTopology = newCoreTopology.filterTopologyByDb( dbName );

ReadReplicaTopology newReadReplicaTopology = hzInstance.apply( hz -> getReadReplicaTopology( hz, log ) );
readReplicaTopology = newReadReplicaTopology;
localReadReplicaTopology = newReadReplicaTopology.filterTopologyByDb( dbName );

catchupAddressMap = extractCatchupAddressesMap( localCoreServers(), localReadReplicas() );
}

Expand All @@ -149,6 +170,12 @@ private void refreshRoles() throws HazelcastInstanceNotActiveException
coreRoles = hzInstance.apply(hz -> HazelcastClusterTopology.getCoreRoles( hz, allCoreServers().members().keySet() ) );
}

@Override
public void init()
{
// nothing to do
}

@Override
public void start()
{
Expand All @@ -167,6 +194,12 @@ public void stop()
disconnectFromCore();
}

@Override
public void shutdown()
{
// nothing to do
}

private void disconnectFromCore()
{
try
Expand Down
Expand Up @@ -51,6 +51,7 @@
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.ListenSocketAddress;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
import org.neo4j.scheduler.JobScheduler;
Expand All @@ -70,7 +71,7 @@
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.getReadReplicaTopology;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.refreshGroups;

public class HazelcastCoreTopologyService extends AbstractTopologyService implements CoreTopologyService
public class HazelcastCoreTopologyService implements CoreTopologyService, Lifecycle
{
private static final long HAZELCAST_IS_HEALTHY_TIMEOUT_MS = TimeUnit.MINUTES.toMillis( 10 );
private static final int HAZELCAST_MIN_CLUSTER = 2;
Expand All @@ -93,8 +94,12 @@ public class HazelcastCoreTopologyService extends AbstractTopologyService implem
private final AtomicReference<Optional<LeaderInfo>> stepDownInfo = new AtomicReference<>( Optional.empty() );

private volatile HazelcastInstance hazelcastInstance;
private volatile ReadReplicaTopology readReplicaTopology = ReadReplicaTopology.EMPTY;

/* cached data updated during each refresh */
private volatile CoreTopology coreTopology = CoreTopology.EMPTY;
private volatile CoreTopology localCoreTopology = CoreTopology.EMPTY;
private volatile ReadReplicaTopology readReplicaTopology = ReadReplicaTopology.EMPTY;
private volatile ReadReplicaTopology localReadReplicaTopology = ReadReplicaTopology.EMPTY;
private volatile Map<MemberId,AdvertisedSocketAddress> catchupAddressMap = new HashMap<>();
private volatile Map<MemberId,RoleInfo> coreRoles = Collections.emptyMap();

Expand Down Expand Up @@ -184,6 +189,12 @@ public String localDBName()
return localDBName;
}

@Override
public void init()
{
// nothing to do
}

@Override
public void start()
{
Expand Down Expand Up @@ -247,6 +258,12 @@ public void stop()
}
}

@Override
public void shutdown()
{
// nothing to do
}

private HazelcastInstance createHazelcastInstance()
{
System.setProperty( WAIT_SECONDS_BEFORE_JOIN.getName(), "1" );
Expand Down Expand Up @@ -364,12 +381,24 @@ public CoreTopology allCoreServers()
return coreTopology;
}

@Override
public CoreTopology localCoreServers()
{
return localCoreTopology;
}

@Override
public ReadReplicaTopology allReadReplicas()
{
return readReplicaTopology;
}

@Override
public ReadReplicaTopology localReadReplicas()
{
return localReadReplicaTopology;
}

@Override
public Optional<AdvertisedSocketAddress> findCatchupAddress( MemberId memberId )
{
Expand Down Expand Up @@ -414,7 +443,9 @@ private void refreshCoreTopology() throws InterruptedException

CoreTopology newCoreTopology = getCoreTopology( hazelcastInstance, config, log );
TopologyDifference difference = coreTopology.difference( newCoreTopology );

coreTopology = newCoreTopology;
localCoreTopology = newCoreTopology.filterTopologyByDb( localDBName );

if ( difference.hasChanges() )
{
Expand All @@ -426,15 +457,17 @@ private void refreshCoreTopology() throws InterruptedException
private void refreshReadReplicaTopology() throws InterruptedException
{
waitOnHazelcastInstanceCreation();
ReadReplicaTopology newReadReplicaTopology = getReadReplicaTopology( hazelcastInstance, log );

ReadReplicaTopology newReadReplicaTopology = getReadReplicaTopology( hazelcastInstance, log );
TopologyDifference difference = readReplicaTopology.difference( newReadReplicaTopology );

this.readReplicaTopology = newReadReplicaTopology;
this.localReadReplicaTopology = newReadReplicaTopology.filterTopologyByDb( localDBName );

if ( difference.hasChanges() )
{
log.info( "Read replica topology changed %s", difference );
}

this.readReplicaTopology = newReadReplicaTopology;
}

/*
Expand Down
Expand Up @@ -51,7 +51,7 @@ public TopologyService topologyService( Config config, LogProvider logProvider,
{
configureHazelcast( config, logProvider );
return new HazelcastClient( new HazelcastClientConnector( config, logProvider, hostnameResolver ), jobScheduler,
logProvider, config, myself, topologyServiceRetryStrategy );
logProvider, config, myself );
}

public static void configureHazelcast( Config config, LogProvider logProvider )
Expand Down
Expand Up @@ -138,8 +138,7 @@ private HazelcastClient hzClient( OnDemandJobScheduler jobScheduler, com.hazelca
{
HazelcastConnector connector = mock( HazelcastConnector.class );

HazelcastClient client = new HazelcastClient( connector, jobScheduler, NullLogProvider.getInstance(), config, myself,
topologyServiceRetryStrategy );
HazelcastClient client = new HazelcastClient( connector, jobScheduler, NullLogProvider.getInstance(), config, myself );

HazelcastInstance hazelcastInstance = mock( HazelcastInstance.class );
when( connector.connectToHazelcast() ).thenReturn( hazelcastInstance );
Expand Down Expand Up @@ -226,8 +225,7 @@ public void shouldNotReconnectWhileHazelcastRemainsAvailable()
HazelcastConnector connector = mock( HazelcastConnector.class );
OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler();

HazelcastClient client = new HazelcastClient( connector, jobScheduler, NullLogProvider.getInstance(), config(), myself,
topologyServiceRetryStrategy );
HazelcastClient client = new HazelcastClient( connector, jobScheduler, NullLogProvider.getInstance(), config(), myself );

HazelcastInstance hazelcastInstance = mock( HazelcastInstance.class );
when( connector.connectToHazelcast() ).thenReturn( hazelcastInstance );
Expand Down Expand Up @@ -276,7 +274,7 @@ public void shouldReturnEmptyTopologyIfUnableToConnectToHazelcast()

OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler();

HazelcastClient client = new HazelcastClient( connector, jobScheduler, logProvider, config(), myself, topologyServiceRetryStrategy );
HazelcastClient client = new HazelcastClient( connector, jobScheduler, logProvider, config(), myself );

com.hazelcast.core.Cluster cluster = mock( Cluster.class );
when( hazelcastInstance.getCluster() ).thenReturn( cluster );
Expand Down Expand Up @@ -327,8 +325,7 @@ public void shouldRegisterReadReplicaInTopology()
when( connector.connectToHazelcast() ).thenReturn( hazelcastInstance );

OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler();
HazelcastClient hazelcastClient = new HazelcastClient( connector, jobScheduler, NullLogProvider.getInstance(), config(), myself,
topologyServiceRetryStrategy );
HazelcastClient hazelcastClient = new HazelcastClient( connector, jobScheduler, NullLogProvider.getInstance(), config(), myself );

// when
hazelcastClient.start();
Expand Down Expand Up @@ -372,8 +369,7 @@ public void shouldRemoveReadReplicasOnGracefulShutdown()
when( connector.connectToHazelcast() ).thenReturn( hazelcastInstance );

OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler();
HazelcastClient hazelcastClient = new HazelcastClient( connector, jobScheduler, NullLogProvider.getInstance(), config(), myself,
topologyServiceRetryStrategy );
HazelcastClient hazelcastClient = new HazelcastClient( connector, jobScheduler, NullLogProvider.getInstance(), config(), myself );

hazelcastClient.start();

Expand Down Expand Up @@ -404,8 +400,7 @@ public void shouldSwallowNPEFromHazelcast()

OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler();

HazelcastClient hazelcastClient = new HazelcastClient( connector, jobScheduler, NullLogProvider.getInstance(), config(), myself,
topologyServiceRetryStrategy );
HazelcastClient hazelcastClient = new HazelcastClient( connector, jobScheduler, NullLogProvider.getInstance(), config(), myself );

hazelcastClient.start();

Expand Down

0 comments on commit 003eb67

Please sign in to comment.