Skip to content

Commit

Permalink
Hazelcast client reconnection to core cluster
Browse files Browse the repository at this point in the history
Previously if the core servers restarted edge servers would
never connect again and would therefore fall behind as they
are unable to make pull requests.

EdgeDiscoveryService#currentTopology now tries to reestablish
a connection if one previously failed.
  • Loading branch information
apcj authored and Mark Needham committed Feb 22, 2016
1 parent c1ad817 commit 65023cd
Show file tree
Hide file tree
Showing 25 changed files with 556 additions and 290 deletions.
Expand Up @@ -22,6 +22,7 @@
import java.util.function.Supplier;

import org.neo4j.coreedge.catchup.storecopy.edge.CoreClient;
import org.neo4j.coreedge.discovery.EdgeServerConnectionException;
import org.neo4j.coreedge.server.edge.EdgeToCoreConnectionStrategy;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.util.JobScheduler;
Expand Down Expand Up @@ -58,11 +59,19 @@ public TxPollingClient( JobScheduler jobScheduler, long pollingInterval,
public void startPolling()
{
coreClient.addTxPullResponseListener( txPullResponseListener );
final TransactionIdStore transactionIdStore = transactionIdStoreSupplier.get();
final TransactionIdStore txIdStore = transactionIdStoreSupplier.get();
jobScheduler.scheduleRecurring( pullUpdates,
() -> coreClient.pollForTransactions(
connectionStrategy.coreServer(),
transactionIdStore.getLastCommittedTransactionId() ), pollingInterval, MILLISECONDS );
() -> {
try
{
coreClient.pollForTransactions( connectionStrategy.coreServer(),
txIdStore.getLastCommittedTransactionId() );
}
catch ( EdgeServerConnectionException e )
{
// Do nothing, we'll poll again shortly.
}
}, pollingInterval, MILLISECONDS );
}

@Override
Expand Down
Expand Up @@ -19,16 +19,43 @@
*/
package org.neo4j.coreedge.discovery;

import java.util.Collections;
import java.util.Set;

import org.neo4j.coreedge.server.AdvertisedSocketAddress;
import org.neo4j.coreedge.server.CoreMember;

public interface ClusterTopology
{
ClusterTopology EMPTY = new ClusterTopology()
{
@Override
public AdvertisedSocketAddress firstTransactionServer()
{
throw new RuntimeException( "No core server found" );
}

@Override
public int getNumberOfCoreServers()
{
return 0;
}

@Override
public Set<CoreMember> getMembers()
{
return Collections.<CoreMember>emptySet();
}

@Override
public boolean bootstrappable()
{
return false;
}
};

AdvertisedSocketAddress firstTransactionServer();

int getNumberOfEdgeServers();
int getNumberOfCoreServers();

Set<CoreMember> getMembers();
Expand Down
Expand Up @@ -20,13 +20,12 @@
package org.neo4j.coreedge.discovery;


import org.neo4j.coreedge.discovery.CoreDiscoveryService;
import org.neo4j.coreedge.discovery.EdgeDiscoveryService;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.logging.LogProvider;

public interface DiscoveryServiceFactory
{
CoreDiscoveryService coreDiscoveryService( Config config );

EdgeDiscoveryService edgeDiscoveryService( Config config );
EdgeDiscoveryService edgeDiscoveryService( Config config, LogProvider logProvider );
}
Expand Up @@ -19,10 +19,10 @@
*/
package org.neo4j.coreedge.discovery;

public class EdgeServerConnectionException extends Throwable
public class EdgeServerConnectionException extends Exception
{
public EdgeServerConnectionException( IllegalStateException e )
public EdgeServerConnectionException( String message )
{
super( e );
super( message );
}
}
@@ -0,0 +1,87 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.discovery;

import java.util.Collections;
import java.util.Set;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.Member;

import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

public class HazelcastClient extends LifecycleAdapter implements EdgeDiscoveryService
{
private final Log log;
private HazelcastConnector connector;
private HazelcastInstance hazelcastInstance;

public HazelcastClient( HazelcastConnector connector, LogProvider logProvider )
{
this.connector = connector;
log = logProvider.getLog( getClass() );
}

@Override
public ClusterTopology currentTopology()
{
Set<Member> hazelcastMembers = Collections.emptySet();
boolean attemptedConnection = false;

while ( hazelcastMembers.isEmpty() && !attemptedConnection )
{
if ( hazelcastInstance == null )
{
try
{
attemptedConnection = true;
hazelcastInstance = connector.connectToHazelcast();
}
catch ( IllegalStateException e )
{
log.info( "Unable to connect to core cluster" );
break;
}
}

try
{
hazelcastMembers = hazelcastInstance.getCluster().getMembers();
}
catch ( HazelcastInstanceNotActiveException e )
{
hazelcastInstance = null;
}
}
return new HazelcastClusterTopology( hazelcastMembers );
}

@Override
public void stop() throws Throwable
{
if ( hazelcastInstance != null )
{
hazelcastInstance.shutdown();
}
}
}
Expand Up @@ -24,80 +24,32 @@
import com.hazelcast.core.HazelcastInstance;

import org.neo4j.cluster.ClusterSettings;
import org.neo4j.coreedge.server.CoreEdgeClusterSettings;
import org.neo4j.coreedge.server.AdvertisedSocketAddress;
import org.neo4j.coreedge.server.CoreEdgeClusterSettings;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;

public class HazelcastClientLifecycle extends LifecycleAdapter implements EdgeDiscoveryService
public class HazelcastClientConnector implements HazelcastConnector
{
private Config config;
private HazelcastInstance hazelcastInstance;
private final Config config;

public HazelcastClientLifecycle( Config config )
public HazelcastClientConnector( Config config )
{
this.config = config;
}

@Override
public void start() throws Throwable
{
ClientConfig clientConfig = clientConfig();

try
{
hazelcastInstance = HazelcastClient.newHazelcastClient( clientConfig );
}
catch ( IllegalStateException e )
{
// assume that IllegalStateExceptions only occur on connection failure
throw new EdgeServerConnectionException( e );
}

addToClusterMap();
}

private void addToClusterMap()
{
hazelcastInstance
.getMap( HazelcastClusterTopology.EDGE_SERVERS )
.put( config.get( ClusterSettings.server_id ), 1 );
}

private ClientConfig clientConfig()
public HazelcastInstance connectToHazelcast()
{
ClientConfig clientConfig = new ClientConfig();

clientConfig.getGroupConfig().setName( config.get( ClusterSettings.cluster_name ) );


for ( AdvertisedSocketAddress address : config.get( CoreEdgeClusterSettings.initial_core_cluster_members ) )
{
clientConfig.getNetworkConfig().addAddress( address.toString() );
}
return clientConfig;
}

@Override
public void stop()
{
try
{
hazelcastInstance
.getMap( HazelcastClusterTopology.EDGE_SERVERS )
.remove( config.get( ClusterSettings.server_id ) );
hazelcastInstance.shutdown();
}
catch ( RuntimeException ignored )
{
// this can happen if the edge server is trying to shutdown but
// the core is gone
}
}

@Override
public ClusterTopology currentTopology()
{
return new HazelcastClusterTopology( hazelcastInstance );
return HazelcastClient.newHazelcastClient( clientConfig );
}
}
Expand Up @@ -22,42 +22,41 @@
import java.util.HashSet;
import java.util.Set;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.Member;

import org.neo4j.coreedge.server.AdvertisedSocketAddress;
import org.neo4j.coreedge.server.CoreMember;

import static org.neo4j.coreedge.discovery.HazelcastServerLifecycle.TRANSACTION_SERVER;
import static org.neo4j.coreedge.discovery.HazelcastServerLifecycle.RAFT_SERVER;
import static org.neo4j.coreedge.discovery.HazelcastServerLifecycle.TRANSACTION_SERVER;

public class HazelcastClusterTopology implements ClusterTopology
{
public static final String EDGE_SERVERS = "edge-servers";
private HazelcastInstance hazelcast;
private final Set<Member> hazelcastMembers;

public HazelcastClusterTopology( HazelcastInstance hazelcast )
public HazelcastClusterTopology( Set<Member> hazelcastMembers )
{
this.hazelcast = hazelcast;
this.hazelcastMembers = hazelcastMembers;
}

@Override
public boolean bootstrappable()
{
Member firstMember = hazelcast.getCluster().getMembers().iterator().next();
Member firstMember = hazelcastMembers.iterator().next();
return firstMember.localMember();
}

@Override
public int getNumberOfCoreServers()
{
return hazelcast.getCluster().getMembers().size();
return hazelcastMembers.size();
}

@Override
public Set<CoreMember> getMembers()
{
return toCoreMembers( hazelcast.getCluster().getMembers() );
return toCoreMembers( hazelcastMembers );
}

private Set<CoreMember> toCoreMembers( Set<Member> members )
Expand All @@ -75,16 +74,10 @@ private Set<CoreMember> toCoreMembers( Set<Member> members )
return coreMembers;
}

@Override
public int getNumberOfEdgeServers()
{
return hazelcast.getMap( EDGE_SERVERS ).size();
}

@Override
public AdvertisedSocketAddress firstTransactionServer()
{
Member member = hazelcast.getCluster().getMembers().iterator().next();
Member member = hazelcastMembers.iterator().next();
return new AdvertisedSocketAddress( member.getStringAttribute( TRANSACTION_SERVER ) );
}
}
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.discovery;

import com.hazelcast.core.HazelcastInstance;

public interface HazelcastConnector
{
HazelcastInstance connectToHazelcast();
}
Expand Up @@ -20,6 +20,7 @@
package org.neo4j.coreedge.discovery;

import org.neo4j.kernel.configuration.Config;
import org.neo4j.logging.LogProvider;

public class HazelcastDiscoveryServiceFactory implements DiscoveryServiceFactory
{
Expand All @@ -30,8 +31,8 @@ public CoreDiscoveryService coreDiscoveryService( Config config )
}

@Override
public EdgeDiscoveryService edgeDiscoveryService( Config config )
public EdgeDiscoveryService edgeDiscoveryService( Config config, LogProvider logProvider )
{
return new HazelcastClientLifecycle( config );
return new HazelcastClient( new HazelcastClientConnector( config ), logProvider );
}
}

0 comments on commit 65023cd

Please sign in to comment.