Skip to content

Commit

Permalink
make hazelcast usage more robust and cache results
Browse files Browse the repository at this point in the history
 * Use standard job scheduler instead of custom timeout service.
 * Use robust wrapper for the job scheduler.
 * Use robust wrapper for hazelcast.
 * Refresh and cache all the topology information periodically.
 * Optimize for the hot paths (e.g. address lookup).
 * Streamline core and replica implementations.
  • Loading branch information
martinfurmanski committed Mar 8, 2017
1 parent bed7e4c commit bd94401
Show file tree
Hide file tree
Showing 23 changed files with 231 additions and 389 deletions.
Expand Up @@ -208,10 +208,6 @@ public class CausalClusteringSettings implements LoadableConfig
public static final Setting<Long> read_replica_time_to_live =
setting( "causal_clustering.read_replica_time_to_live", DURATION, "1m", min( 60_000L ) );

@Description( "Read replica 'call home' frequency" )
public static final Setting<Long> read_replica_refresh_rate =
setting( "causal_clustering.read_replica_refresh_rate", DURATION, "5s", min( 5_000L ) );

@Description( "How long drivers should cache the data from the `dbms.cluster.routing.getServers()` procedure." )
public static final Setting<Long> cluster_routing_ttl =
setting( "causal_clustering.cluster_routing_ttl", DURATION, "5m", min( 1_000L ) );
Expand Down
Expand Up @@ -19,8 +19,6 @@
*/
package org.neo4j.causalclustering.discovery;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -48,21 +46,16 @@ public CoreTopology( ClusterId clusterId, boolean canBeBootstrapped, Map<MemberI
this.coreMembers = new HashMap<>( coreMembers );
}

public Set<MemberId> members()
public Map<MemberId,CoreServerInfo> members()
{
return coreMembers.keySet();
return coreMembers;
}

public ClusterId clusterId()
{
return clusterId;
}

public Collection<CoreServerInfo> allMemberInfo()
{
return coreMembers.values();
}

public boolean canBeBootstrapped()
{
return canBeBootstrapped;
Expand Down
Expand Up @@ -39,8 +39,6 @@ public interface CoreTopologyService extends TopologyService
*/
boolean setClusterId( ClusterId clusterId );

void refreshCoreTopology();

interface Listener
{
void onCoreTopologyChange( CoreTopology coreTopology );
Expand Down
Expand Up @@ -19,7 +19,6 @@
*/
package org.neo4j.causalclustering.discovery;

import org.neo4j.causalclustering.core.consensus.schedule.DelayedRenewableTimeoutService;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.util.JobScheduler;
Expand All @@ -31,6 +30,5 @@ CoreTopologyService coreTopologyService( Config config, MemberId myself, JobSche
LogProvider logProvider, LogProvider userLogProvider );

TopologyService topologyService( Config config, LogProvider logProvider,
DelayedRenewableTimeoutService timeoutService, long readReplicaTimeToLiveTimeout,
long readReplicaRefreshRate, MemberId myself );
JobScheduler jobScheduler, MemberId myself );
}
Expand Up @@ -19,17 +19,17 @@
*/
package org.neo4j.causalclustering.discovery;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceNotActiveException;

import java.util.HashMap;
import java.util.List;
import java.util.function.Function;
import java.util.Map;
import java.util.Optional;

import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService;
import org.neo4j.causalclustering.helper.RobustJobSchedulerWrapper;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
Expand All @@ -38,159 +38,124 @@
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.READ_REPLICA_BOLT_ADDRESS_MAP_NAME;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.READ_REPLICA_MEMBER_ID_MAP_NAME;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.READ_REPLICA_TRANSACTION_SERVER_ADDRESS_MAP_NAME;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.extractCatchupAddressesMap;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.getCoreTopology;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.getReadReplicaTopology;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.refreshTags;

class HazelcastClient extends LifecycleAdapter implements TopologyService
{
static final RenewableTimeoutService.TimeoutName REFRESH_READ_REPLICA = () -> "Refresh Read Replica";
private final Log log;
private final ClientConnectorAddresses connectorAddresses;
private final HazelcastConnector connector;
private final RobustHazelcastWrapper hzInstance;
private final RobustJobSchedulerWrapper scheduler;
private final Config config;
private final RenewableTimeoutService renewableTimeoutService;
private final AdvertisedSocketAddress transactionSource;
private final List<String> tags;
private HazelcastInstance hazelcastInstance;
private RenewableTimeoutService.RenewableTimeout readReplicaRefreshTimer;
private final long readReplicaTimeToLiveTimeout;
private final long readReplicaRefreshRate;
private MemberId myself;

HazelcastClient( HazelcastConnector connector, LogProvider logProvider, Config config,
RenewableTimeoutService renewableTimeoutService, long readReplicaTimeToLiveTimeout,
long readReplicaRefreshRate, MemberId myself )
private final long timeToLive;
private final long refreshPeriod;
private final MemberId myself;

private JobScheduler.JobHandle keepAliveJob;
private JobScheduler.JobHandle refreshTopologyJob;

private volatile Map<MemberId,AdvertisedSocketAddress> catchupAddressMap = new HashMap<>();
private volatile CoreTopology coreTopology = CoreTopology.EMPTY;
private volatile ReadReplicaTopology rrTopology = ReadReplicaTopology.EMPTY;

HazelcastClient( HazelcastConnector connector, JobScheduler scheduler, LogProvider logProvider, Config config, MemberId myself )
{
this.connector = connector;
this.hzInstance = new RobustHazelcastWrapper( connector );
this.config = config;
this.renewableTimeoutService = renewableTimeoutService;
this.readReplicaRefreshRate = readReplicaRefreshRate;
this.log = logProvider.getLog( getClass() );
this.scheduler = new RobustJobSchedulerWrapper( scheduler, log );
this.connectorAddresses = ClientConnectorAddresses.extractFromConfig( config );
this.transactionSource = config.get( CausalClusteringSettings.transaction_advertised_address );
this.tags = config.get( CausalClusteringSettings.server_tags );
this.readReplicaTimeToLiveTimeout = readReplicaTimeToLiveTimeout;
this.timeToLive = config.get( CausalClusteringSettings.read_replica_time_to_live );
this.refreshPeriod = config.get( CausalClusteringSettings.cluster_topology_refresh );
this.myself = myself;
}

@Override
public CoreTopology coreServers()
{
try
{
return retry( ( hazelcastInstance ) -> getCoreTopology( hazelcastInstance, config, log ) );
}
catch ( Exception e )
{
log.info(
"Failed to read cluster topology from Hazelcast. Continuing with empty (disconnected) topology. " +
"Connection will be reattempted on next polling attempt.", e );
return CoreTopology.EMPTY;
}
return coreTopology;
}

@Override
public ReadReplicaTopology readReplicas()
{
try
{
return retry( ( hazelcastInstance ) -> getReadReplicaTopology( hazelcastInstance, log ) );
}
catch ( Exception e )
{
log.info(
"Failed to read cluster topology from Hazelcast. Continuing with empty (disconnected) topology. " +
"Connection will be reattempted on next polling attempt.", e );
return ReadReplicaTopology.EMPTY;
}
return rrTopology;
}

@Override
public ClusterTopology allServers()
public Optional<AdvertisedSocketAddress> findCatchupAddress( MemberId memberId )
{
return new ClusterTopology( coreServers(), readReplicas() );
return Optional.ofNullable( catchupAddressMap.get( memberId ) );
}

/**
* Caches the topology so that the lookups are fast.
*/
private void refreshTopology() throws HazelcastInstanceNotActiveException
{
coreTopology = hzInstance.apply( ( hz ) -> getCoreTopology( hz, config, log ) );
rrTopology = hzInstance.apply( ( hz ) -> getReadReplicaTopology( hz, log ) );
catchupAddressMap = extractCatchupAddressesMap( coreTopology, rrTopology );
}

@Override
public void start() throws Throwable
{
readReplicaRefreshTimer =
renewableTimeoutService.create( REFRESH_READ_REPLICA, readReplicaRefreshRate, 0, timeout ->
{
timeout.renew();
retry( this::addReadReplica );
} );
keepAliveJob = scheduler.scheduleRecurring( "KeepAlive", timeToLive / 3, this::keepReadReplicaAlive );
refreshTopologyJob = scheduler.scheduleRecurring( "TopologyRefresh", refreshPeriod, this::refreshTopology );
}

private Void addReadReplica( HazelcastInstance hazelcastInstance )
@Override
public void stop() throws Throwable
{
String uuid = hazelcastInstance.getLocalEndpoint().getUuid();
String addresses = connectorAddresses.toString();

log.debug( "Adding read replica into cluster (%s -> %s)", uuid, addresses );

hazelcastInstance.getMap( READ_REPLICA_TRANSACTION_SERVER_ADDRESS_MAP_NAME )
.put( uuid, transactionSource.toString(), readReplicaTimeToLiveTimeout, MILLISECONDS );

hazelcastInstance.getMap( READ_REPLICA_MEMBER_ID_MAP_NAME )
.put( uuid, myself.getUuid().toString(), readReplicaTimeToLiveTimeout, MILLISECONDS );

refreshTags( hazelcastInstance, uuid, tags );

// this needs to be last as when we read from it in HazelcastClusterTopology.readReplicas
// we assume that all the other maps have been populated if an entry exists in this one
hazelcastInstance.getMap( READ_REPLICA_BOLT_ADDRESS_MAP_NAME )
.put( uuid, addresses, readReplicaTimeToLiveTimeout, MILLISECONDS );

return null; // return value not used.
scheduler.cancelAndWaitTermination( keepAliveJob );
scheduler.cancelAndWaitTermination( refreshTopologyJob );
disconnectFromCore();
}

@Override
public synchronized void stop() throws Throwable
private void disconnectFromCore()
{
if ( hazelcastInstance != null )
try
{
try
{
String uuid = hazelcastInstance.getLocalEndpoint().getUuid();
hazelcastInstance.getMap( READ_REPLICA_BOLT_ADDRESS_MAP_NAME ).remove( uuid );
hazelcastInstance.shutdown();
}
catch ( Throwable t )
{
// Hazelcast is not able to stop correctly sometimes and throws a bunch of different exceptions
// let's simply log the current problem but go on with our shutdown
log.warn( "Unable to shutdown Hazelcast", t );
}
String uuid = hzInstance.apply( hzInstance -> hzInstance.getLocalEndpoint().getUuid() );
hzInstance.apply( hz -> hz.getMap( READ_REPLICA_BOLT_ADDRESS_MAP_NAME ).remove( uuid ) );
hzInstance.shutdown();
}
catch ( Throwable e )
{
// Hazelcast is not able to stop correctly sometimes and throws a bunch of different exceptions
// let's simply log the current problem but go on with our shutdown
log.warn( "Unable to shutdown hazelcast cleanly", e );
}

readReplicaRefreshTimer.cancel();
}

private synchronized <T> T retry( Function<HazelcastInstance,T> hazelcastOperation )
private void keepReadReplicaAlive() throws HazelcastInstanceNotActiveException
{
boolean attemptedConnection = false;
HazelcastInstanceNotActiveException exception = null;

while ( !attemptedConnection )
hzInstance.perform( hazelcastInstance ->
{
if ( hazelcastInstance == null )
{
attemptedConnection = true;
hazelcastInstance = connector.connectToHazelcast();
}

try
{
return hazelcastOperation.apply( hazelcastInstance );
}
catch ( HazelcastInstanceNotActiveException e )
{
hazelcastInstance = null;
exception = e;
}
}
throw exception;
String uuid = hazelcastInstance.getLocalEndpoint().getUuid();
String addresses = connectorAddresses.toString();
log.debug( "Adding read replica into cluster (%s -> %s)", uuid, addresses );

hazelcastInstance.getMap( READ_REPLICA_TRANSACTION_SERVER_ADDRESS_MAP_NAME )
.put( uuid, transactionSource.toString(), timeToLive, MILLISECONDS );

hazelcastInstance.getMap( READ_REPLICA_MEMBER_ID_MAP_NAME )
.put( uuid, myself.getUuid().toString(), timeToLive, MILLISECONDS );

refreshTags( hazelcastInstance, uuid, tags );

// this needs to be last as when we read from it in HazelcastClusterTopology.readReplicas
// we assume that all the other maps have been populated if an entry exists in this one
hazelcastInstance.getMap( READ_REPLICA_BOLT_ADDRESS_MAP_NAME )
.put( uuid, addresses, timeToLive, MILLISECONDS );
} );
}
}
Expand Up @@ -45,7 +45,7 @@
import static org.neo4j.helpers.SocketAddressFormat.socketAddress;
import static org.neo4j.helpers.collection.Iterables.asSet;

class HazelcastClusterTopology
public class HazelcastClusterTopology
{
// per server attributes
private static final String DISCOVERY_SERVER = "discovery_server"; // not currently used
Expand Down Expand Up @@ -102,6 +102,23 @@ static CoreTopology getCoreTopology( HazelcastInstance hazelcastInstance, Config
return new CoreTopology( clusterId, canBeBootstrapped, coreMembers );
}

public static Map<MemberId,AdvertisedSocketAddress> extractCatchupAddressesMap( CoreTopology coreTopology, ReadReplicaTopology rrTopology )
{
Map<MemberId,AdvertisedSocketAddress> catchupAddressMap = new HashMap<>();

for ( Map.Entry<MemberId,CoreServerInfo> entry : coreTopology.members().entrySet() )
{
catchupAddressMap.put( entry.getKey(), entry.getValue().getCatchupServer() );
}

for ( Map.Entry<MemberId,ReadReplicaInfo> entry : rrTopology.members().entrySet() )
{
catchupAddressMap.put( entry.getKey(), entry.getValue().getCatchupServer() );
}

return catchupAddressMap;
}

private static ClusterId getClusterId( HazelcastInstance hazelcastInstance )
{
IAtomicReference<UUID> uuidReference = hazelcastInstance.getAtomicReference( CLUSTER_UUID );
Expand Down

0 comments on commit bd94401

Please sign in to comment.