Skip to content

Commit

Permalink
make hazelcast usage more robust and cache results
Browse files Browse the repository at this point in the history
 * Use standard job scheduler instead of custom timeout service.
 * Use robust wrapper for the job scheduler.
 * Use robust wrapper for hazelcast.
 * Refresh and cache all the topology information periodically.
 * Optimize for the hot paths (e.g. address lookup).
 * Streamline core and replica implementations.
  • Loading branch information
martinfurmanski committed Mar 13, 2017
1 parent 89748b4 commit e9d2faa
Show file tree
Hide file tree
Showing 19 changed files with 241 additions and 338 deletions.
Expand Up @@ -204,11 +204,7 @@ public class CausalClusteringSettings
public static final Setting<Long> read_replica_time_to_live =
setting( "causal_clustering.read_replica_time_to_live", DURATION, "1m", min(60_000L) );

@Description( "Read replica 'call home' frequency" )
public static final Setting<Long> read_replica_refresh_rate =
setting( "causal_clustering.read_replica_refresh_rate", DURATION, "5s", min(5_000L) );

@Description( "How long drivers should cache the data from the `dbms.cluster.routing.getServers()` procedure." )
@Description( "How long drivers should cache the data from the `dbms.cluster.routing.getServers()` procedure." )
public static final Setting<Long> cluster_routing_ttl =
setting( "causal_clustering.cluster_routing_ttl", DURATION, "5m", min(1_000L) );

Expand Down
Expand Up @@ -37,8 +37,6 @@ public interface CoreTopologyService extends TopologyService
*/
boolean setClusterId( ClusterId clusterId );

void refreshCoreTopology();

interface Listener
{
void onCoreTopologyChange( CoreTopology coreTopology );
Expand Down
Expand Up @@ -19,7 +19,6 @@
*/
package org.neo4j.causalclustering.discovery;

import org.neo4j.causalclustering.core.consensus.schedule.DelayedRenewableTimeoutService;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.util.JobScheduler;
Expand All @@ -31,5 +30,5 @@ CoreTopologyService coreTopologyService( Config config, MemberId myself, JobSche
LogProvider logProvider, LogProvider userLogProvider );

TopologyService readReplicaDiscoveryService( Config config, LogProvider logProvider,
DelayedRenewableTimeoutService timeoutService, long readReplicaTimeToLiveTimeout, long readReplicaRefreshRate );
JobScheduler jobScheduler );
}
Expand Up @@ -19,126 +19,126 @@
*/
package org.neo4j.causalclustering.discovery;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceNotActiveException;

import java.util.function.Function;

import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.helper.RobustJobSchedulerWrapper;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.READ_REPLICA_BOLT_ADDRESS_MAP_NAME;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.extractCatchupAddressesMap;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.getCoreTopology;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.getReadReplicaTopology;

class HazelcastClient extends LifecycleAdapter implements TopologyService
{
static final RenewableTimeoutService.TimeoutName REFRESH_READ_REPLICA = () -> "Refresh Read Replica";
private final Log log;
private final ClientConnectorAddresses connectorAddresses;
private final HazelcastConnector connector;
private final RobustHazelcastWrapper hzInstance;
private final RobustJobSchedulerWrapper scheduler;
private final Config config;
private final RenewableTimeoutService renewableTimeoutService;
private HazelcastInstance hazelcastInstance;
private RenewableTimeoutService.RenewableTimeout readReplicaRefreshTimer;
private final long readReplicaTimeToLiveTimeout;
private final long readReplicaRefreshRate;

HazelcastClient( HazelcastConnector connector, LogProvider logProvider, Config config,
RenewableTimeoutService renewableTimeoutService, long readReplicaTimeToLiveTimeout, long readReplicaRefreshRate )

private final long timeToLive;
private final long refreshPeriod;

private JobScheduler.JobHandle keepAliveJob;
private JobScheduler.JobHandle refreshTopologyJob;

private volatile Map<MemberId,AdvertisedSocketAddress> catchupAddressMap = new HashMap<>();
private volatile CoreTopology coreTopology = CoreTopology.EMPTY;
private volatile ReadReplicaTopology rrTopology = ReadReplicaTopology.EMPTY;

HazelcastClient( HazelcastConnector connector, JobScheduler scheduler, LogProvider logProvider, Config config )
{
this.connector = connector;
this.hzInstance = new RobustHazelcastWrapper( connector );
this.config = config;
this.renewableTimeoutService = renewableTimeoutService;
this.readReplicaRefreshRate = readReplicaRefreshRate;
this.log = logProvider.getLog( getClass() );
this.scheduler = new RobustJobSchedulerWrapper( scheduler, log );
this.connectorAddresses = ClientConnectorAddresses.extractFromConfig( config );
this.readReplicaTimeToLiveTimeout = readReplicaTimeToLiveTimeout;
this.timeToLive = config.get( CausalClusteringSettings.read_replica_time_to_live );
this.refreshPeriod = config.get( CausalClusteringSettings.cluster_topology_refresh );
}

@Override
public CoreTopology coreServers()
{
try
{
return retry( ( hazelcastInstance ) -> HazelcastClusterTopology.getCoreTopology( hazelcastInstance,
config, log ) );
}
catch ( Exception e )
{
log.info( "Failed to read cluster topology from Hazelcast. Continuing with empty (disconnected) topology. "
+ "Connection will be reattempted on next polling attempt.", e );
return CoreTopology.EMPTY;
}
return coreTopology;
}

@Override
public void start() throws Throwable
public ReadReplicaTopology readReplicas()
{
readReplicaRefreshTimer = renewableTimeoutService.create( REFRESH_READ_REPLICA, readReplicaRefreshRate, 0, timeout -> {
timeout.renew();
retry( this::addReadReplica );
} );
return rrTopology;
}

private Object addReadReplica( HazelcastInstance hazelcastInstance )
@Override
public Optional<AdvertisedSocketAddress> findCatchupAddress( MemberId memberId )
{
String uuid = hazelcastInstance.getLocalEndpoint().getUuid();
String addresses = connectorAddresses.toString();
return Optional.ofNullable( catchupAddressMap.get( memberId ) );
}

log.debug( "Adding read replica into cluster (%s -> %s)", uuid, addresses );
/**
* Caches the topology so that the lookups are fast.
*/
private void refreshTopology() throws HazelcastInstanceNotActiveException
{
coreTopology = hzInstance.apply( ( hz ) -> getCoreTopology( hz, config, log ) );
rrTopology = hzInstance.apply( ( hz ) -> getReadReplicaTopology( hz, log ) );
catchupAddressMap = extractCatchupAddressesMap( coreTopology, rrTopology );
}

return hazelcastInstance.getMap( READ_REPLICA_BOLT_ADDRESS_MAP_NAME )
.put( uuid, addresses, readReplicaTimeToLiveTimeout, MILLISECONDS );
@Override
public void start() throws Throwable
{
keepAliveJob = scheduler.scheduleRecurring( "KeepAlive", timeToLive / 3, this::keepReadReplicaAlive );
refreshTopologyJob = scheduler.scheduleRecurring( "TopologyRefresh", refreshPeriod, this::refreshTopology );
}

@Override
public synchronized void stop() throws Throwable
public void stop() throws Throwable
{
readReplicaRefreshTimer.cancel();
scheduler.cancelAndWaitTermination( keepAliveJob );
scheduler.cancelAndWaitTermination( refreshTopologyJob );
disconnectFromCore();
}

if ( hazelcastInstance != null )
private void disconnectFromCore()
{
try
{
String uuid = hzInstance.apply( hzInstance -> hzInstance.getLocalEndpoint().getUuid() );
hzInstance.apply( hz -> hz.getMap( READ_REPLICA_BOLT_ADDRESS_MAP_NAME ).remove( uuid ) );
hzInstance.shutdown();
}
catch ( Throwable e )
{
try
{
String uuid = hazelcastInstance.getLocalEndpoint().getUuid();
hazelcastInstance.getMap( READ_REPLICA_BOLT_ADDRESS_MAP_NAME ).remove( uuid );
hazelcastInstance.shutdown();
}
catch ( Throwable t )
{
// Hazelcast is not able to stop correctly sometimes and throws a bunch of different exceptions
// let's simply log the current problem but go on with our shutdown
log.warn( "Unable to shutdown Hazelcast", t );
}
// Hazelcast is not able to stop correctly sometimes and throws a bunch of different exceptions
// let's simply log the current problem but go on with our shutdown
log.warn( "Unable to shutdown hazelcast cleanly", e );
}
}

private synchronized <T> T retry( Function<HazelcastInstance, T> hazelcastOperation )
private void keepReadReplicaAlive() throws HazelcastInstanceNotActiveException
{
boolean attemptedConnection = false;
HazelcastInstanceNotActiveException exception = null;

while ( !attemptedConnection )
hzInstance.perform( hazelcastInstance ->
{
if ( hazelcastInstance == null )
{
attemptedConnection = true;
hazelcastInstance = connector.connectToHazelcast();
}

try
{
return hazelcastOperation.apply( hazelcastInstance );
}
catch ( HazelcastInstanceNotActiveException e )
{
hazelcastInstance = null;
exception = e;
}
}
throw exception;
String uuid = hazelcastInstance.getLocalEndpoint().getUuid();
String addresses = connectorAddresses.toString();
log.debug( "Adding read replica into cluster (%s -> %s)", uuid, addresses );

// this needs to be last as when we read from it in HazelcastClusterTopology.readReplicas
// we assume that all the other maps have been populated if an entry exists in this one
hazelcastInstance.getMap( READ_REPLICA_BOLT_ADDRESS_MAP_NAME )
.put( uuid, addresses, timeToLive, MILLISECONDS );
} );
}
}
Expand Up @@ -19,17 +19,18 @@
*/
package org.neo4j.causalclustering.discovery;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;

import com.hazelcast.config.MemberAttributeConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IAtomicReference;
import com.hazelcast.core.IMap;
import com.hazelcast.core.Member;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;

import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.causalclustering.identity.MemberId;
Expand All @@ -41,10 +42,9 @@
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.stream.Collectors.toSet;

import static org.neo4j.helpers.SocketAddressFormat.socketAddress;

class HazelcastClusterTopology
public class HazelcastClusterTopology
{
static final String READ_REPLICA_BOLT_ADDRESS_MAP_NAME = "read-replicas"; // hz client uuid string -> boltAddress
// string
Expand Down Expand Up @@ -72,7 +72,7 @@ static ReadReplicaTopology getReadReplicaTopology( HazelcastInstance hazelcastIn
log.info( "Cannot currently bind to distributed discovery service." );
}

return new ReadReplicaTopology( clusterId, readReplicas );
return new ReadReplicaTopology( readReplicas );
}

static CoreTopology getCoreTopology( HazelcastInstance hazelcastInstance, Config config, Log log )
Expand All @@ -98,6 +98,19 @@ static CoreTopology getCoreTopology( HazelcastInstance hazelcastInstance, Config
return new CoreTopology( clusterId, canBeBootstrapped, coreMembers );
}

public static Map<MemberId,AdvertisedSocketAddress> extractCatchupAddressesMap( CoreTopology coreTopology, ReadReplicaTopology rrTopology )
{
Map<MemberId,AdvertisedSocketAddress> catchupAddressMap = new HashMap<>();

for ( MemberId memberId : coreTopology.members() )
{
Optional<CoreAddresses> coreAddresses = coreTopology.find( memberId );
coreAddresses.ifPresent( a -> catchupAddressMap.put( memberId, a.getCatchupServer() ) );
}

return catchupAddressMap;
}

private static ClusterId getClusterId( HazelcastInstance hazelcastInstance )
{
IAtomicReference<UUID> uuidReference = hazelcastInstance.getAtomicReference( CLUSTER_UUID );
Expand Down

0 comments on commit e9d2faa

Please sign in to comment.