Skip to content

Commit

Permalink
core-edge: fix deadlock between shared discovery service and listener
Browse files Browse the repository at this point in the history
Previously the RaftDiscoveryServiceConnector would access the
discovery service when invoked on its topology change callback,
but within a synchronized block while at the same time accessing
the services of the discovery service which might be busy trying
to invoke the callback for some other trigger, leading to a
deadlock.

This is now fixed by the discovery client never explcitly asking
about the topology, but rather only being notified when it changes
and keeping that value.

Also fixes a non-synchronized iteration over the listeners in
the SharedDiscoveryCoreClient.
  • Loading branch information
martinfurmanski committed Aug 31, 2016
1 parent 1f22cd0 commit b1e96d6
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 36 deletions.
Expand Up @@ -31,8 +31,11 @@ void addCoreTopologyListener( CoreTopologyService.Listener listener )
listeners.add( listener );
}

void notifyListeners()
void notifyListeners( ClusterTopology clusterTopology )
{
listeners.forEach( CoreTopologyService.Listener::onCoreTopologyChange );
for ( CoreTopologyService.Listener listener : listeners )
{
listener.onCoreTopologyChange( clusterTopology );
}
}
}
Expand Up @@ -29,6 +29,6 @@ public interface CoreTopologyService extends TopologyService

interface Listener
{
void onCoreTopologyChange();
void onCoreTopologyChange( ClusterTopology clusterTopology );
}
}
Expand Up @@ -69,7 +69,7 @@ class HazelcastCoreTopologyService extends LifecycleAdapter implements CoreTopol
public void addCoreTopologyListener( Listener listener )
{
listenerService.addCoreTopologyListener( listener );
listener.onCoreTopologyChange();
listener.onCoreTopologyChange( currentTopology() );
}

@Override
Expand All @@ -82,26 +82,30 @@ public boolean publishClusterId( ClusterId clusterId )
public void memberAdded( MembershipEvent membershipEvent )
{
log.info( "Core member added %s", membershipEvent );
log.info( "Current topology is %s", currentTopology() );
notifyMembershipChange();

ClusterTopology clusterTopology = currentTopology();
log.info( "Current topology is %s", clusterTopology );
notifyMembershipChange( clusterTopology );
}

@Override
public void memberRemoved( MembershipEvent membershipEvent )
{
log.info( "Core member removed %s", membershipEvent );

ClusterTopology clusterTopology = currentTopology();
log.info( "Current topology is %s", clusterTopology );
notifyMembershipChange( clusterTopology );
}

private void notifyMembershipChange()
private void notifyMembershipChange( ClusterTopology clusterTopology )
{
Set<AdvertisedSocketAddress> members = hazelcastInstance.getCluster().getMembers().stream()
.map( member -> new AdvertisedSocketAddress(
String.format( "%s:%d", member.getSocketAddress().getHostName(),
member.getSocketAddress().getPort() ) ) ).collect( Collectors.toSet() );
discoveredMemberRepository.store( members );
listenerService.notifyListeners();
}

@Override
public void memberRemoved( MembershipEvent membershipEvent )
{
log.info( "Core member removed %s", membershipEvent );
log.info( "Current topology is %s", currentTopology() );
notifyMembershipChange();
listenerService.notifyListeners( clusterTopology );
}

@Override
Expand All @@ -115,7 +119,7 @@ public void start()
hazelcastInstance = createHazelcastInstance();
log.info( "Cluster discovery service started" );
membershipRegistrationId = hazelcastInstance.getCluster().addMembershipListener( this );
notifyMembershipChange();
notifyMembershipChange( currentTopology() );
}

@Override
Expand Down
Expand Up @@ -53,9 +53,9 @@ public void start() throws BootstrapException
}

@Override
public synchronized void onCoreTopologyChange()
public synchronized void onCoreTopologyChange( ClusterTopology clusterTopology )
{
Set<MemberId> targetMembers = discoveryService.currentTopology().coreMembers();
Set<MemberId> targetMembers = clusterTopology.coreMembers();
raftMachine.setTargetMembershipSet( targetMembers );
}
}
Expand Up @@ -19,7 +19,6 @@
*/
package org.neo4j.coreedge.discovery;

import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Set;

Expand All @@ -38,9 +37,11 @@ class SharedDiscoveryCoreClient extends LifecycleAdapter implements CoreTopology
private final SharedDiscoveryService sharedDiscoveryService;
private final MemberId member;
private final CoreAddresses coreAddresses;
private final Set<Listener> listeners = Collections.synchronizedSet( new LinkedHashSet<>() );
private final Set<Listener> listeners = new LinkedHashSet<>();
private final Log log;

private ClusterTopology clusterTopology;

SharedDiscoveryCoreClient( SharedDiscoveryService sharedDiscoveryService, MemberId member, LogProvider logProvider, Config config )
{
this.sharedDiscoveryService = sharedDiscoveryService;
Expand All @@ -50,10 +51,10 @@ class SharedDiscoveryCoreClient extends LifecycleAdapter implements CoreTopology
}

@Override
public void addCoreTopologyListener( Listener listener )
public synchronized void addCoreTopologyListener( Listener listener )
{
listeners.add( listener );
listener.onCoreTopologyChange();
listener.onCoreTopologyChange( clusterTopology );
}

@Override
Expand All @@ -79,17 +80,19 @@ public void stop()
}

@Override
public ClusterTopology currentTopology()
public synchronized ClusterTopology currentTopology()
{
ClusterTopology topology = sharedDiscoveryService.currentTopology( this );
log.info( "Current topology is %s", topology );
return topology;
return clusterTopology;
}

void onTopologyChange()
synchronized void onTopologyChange( ClusterTopology clusterTopology )
{
log.info( "Notified of topology change" );
listeners.forEach( Listener::onCoreTopologyChange );
this.clusterTopology = clusterTopology;
for ( Listener listener : listeners )
{
listener.onCoreTopologyChange( clusterTopology );
}
}

private static CoreAddresses extractAddresses( Config config )
Expand Down
Expand Up @@ -26,7 +26,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -49,13 +48,15 @@ public class SharedDiscoveryService implements DiscoveryServiceFactory

private final Lock lock = new ReentrantLock();
private final Condition enoughMembers = lock.newCondition();
private AtomicReference<ClusterId> clusterId = new AtomicReference<>();
private ClusterId clusterId;

@Override
public CoreTopologyService coreTopologyService( Config config, MemberId myself,
DiscoveredMemberRepository discoveredMemberRepository, LogProvider logProvider )
{
return new SharedDiscoveryCoreClient( this, myself, logProvider, config );
SharedDiscoveryCoreClient sharedDiscoveryCoreClient = new SharedDiscoveryCoreClient( this, myself, logProvider, config );
sharedDiscoveryCoreClient.onTopologyChange( currentTopology( sharedDiscoveryCoreClient ) );
return sharedDiscoveryCoreClient;
}

@Override
Expand Down Expand Up @@ -86,7 +87,7 @@ ClusterTopology currentTopology( SharedDiscoveryCoreClient client )
try
{
return new ClusterTopology(
clusterId.get(),
clusterId,
coreClients.size() > 0 && coreClients.get( 0 ) == client,
unmodifiableMap( coreMembers ),
unmodifiableSet( edgeAddresses )
Expand All @@ -106,7 +107,7 @@ void registerCoreMember( MemberId memberId, CoreAddresses coreAddresses, SharedD
coreMembers.put( memberId, coreAddresses );
coreClients.add( client );
enoughMembers.signalAll();
coreClients.forEach( SharedDiscoveryCoreClient::onTopologyChange );
notifyCoreClients();
}
finally
{
Expand All @@ -121,20 +122,29 @@ void unRegisterCoreMember( MemberId memberId, SharedDiscoveryCoreClient client )
{
coreMembers.remove( memberId );
coreClients.remove( client );
coreClients.forEach( SharedDiscoveryCoreClient::onTopologyChange );
notifyCoreClients();
}
finally
{
lock.unlock();
}
}

private void notifyCoreClients()
{
for ( SharedDiscoveryCoreClient coreClient : coreClients )
{
coreClient.onTopologyChange( currentTopology( coreClient ) );
}
}

void registerEdgeMember( EdgeAddresses edgeAddresses )
{
lock.lock();
try
{
this.edgeAddresses.add( edgeAddresses );
notifyCoreClients();
}
finally
{
Expand All @@ -148,6 +158,7 @@ void unRegisterEdgeMember( EdgeAddresses edgeAddresses )
try
{
this.edgeAddresses.remove( edgeAddresses );
notifyCoreClients();
}
finally
{
Expand All @@ -157,6 +168,30 @@ void unRegisterEdgeMember( EdgeAddresses edgeAddresses )

boolean casClusterId( ClusterId clusterId )
{
return this.clusterId.compareAndSet( null, clusterId ) || this.clusterId.get().equals( clusterId );
boolean success;
lock.lock();
try
{
if ( this.clusterId == null )
{
success = true;
this.clusterId = clusterId;
}
else
{
success = this.clusterId.equals( clusterId );
}

if ( success )
{
notifyCoreClients();
}
}
finally
{
lock.unlock();
}

return success;
}
}
@@ -0,0 +1,128 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.discovery;

import org.junit.Test;
import org.mockito.ArgumentCaptor;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.neo4j.coreedge.core.CoreEdgeClusterSettings;
import org.neo4j.coreedge.core.consensus.RaftMachine;
import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.logging.NullLogProvider;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.neo4j.helpers.collection.MapUtil.stringMap;
import static org.neo4j.test.assertion.Assert.assertEventually;

public class SharedDiscoveryServiceIT
{
private static final long TIMEOUT_MS = 15_000;
private static final long RUN_TIME_MS = 1000;;

private NullLogProvider logProvider = NullLogProvider.getInstance();

@Test(timeout = TIMEOUT_MS)
public void shouldDiscoverCompleteTargetSetWithoutDeadlocks() throws Exception
{
// given
ExecutorService es = Executors.newCachedThreadPool();

long endTimeMillis = System.currentTimeMillis() + RUN_TIME_MS;
while ( endTimeMillis > System.currentTimeMillis() )
{
Set<MemberId> members = new HashSet<>();
for ( int i = 0; i < 3; i++ )
{
members.add( new MemberId( UUID.randomUUID() ) );
}

SharedDiscoveryService sharedService = new SharedDiscoveryService();

List<Callable<Void>> discoveryJobs = new ArrayList<>();
for ( MemberId member : members )
{
discoveryJobs.add( createDiscoveryJob( member, sharedService, members ) );
}

List<Future<Void>> results = es.invokeAll( discoveryJobs );
for ( Future<Void> result : results )
{
result.get( TIMEOUT_MS, MILLISECONDS );
}
}
}

private Callable<Void> createDiscoveryJob( MemberId member, DiscoveryServiceFactory disoveryServiceFactory, Set<MemberId> expectedTargetSet ) throws ExecutionException, InterruptedException
{
CoreTopologyService topologyService = disoveryServiceFactory.coreTopologyService( config(), member, mock( DiscoveredMemberRepository.class ), logProvider );
return sharedClientStarter( topologyService, expectedTargetSet );
}

private Config config()
{
return new Config( stringMap(
CoreEdgeClusterSettings.raft_advertised_address.name(), "127.0.0.1:7000",
CoreEdgeClusterSettings.transaction_advertised_address.name(), "127.0.0.1:7001",
GraphDatabaseSettings.bolt_advertised_address.name(), "127.0.0.1:7002" ) );
}

private Callable<Void> sharedClientStarter( CoreTopologyService topologyService, Set<MemberId> expectedTargetSet )
{
return () ->
{
try
{
RaftMachine raftMock = mock( RaftMachine.class );
topologyService.start();
topologyService.addCoreTopologyListener( new RaftDiscoveryServiceConnector( topologyService, raftMock ) );

assertEventually( "should discover complete target set", () ->
{
ArgumentCaptor<Set<MemberId>> targetMembers = ArgumentCaptor.forClass( (Class<Set<MemberId>>) expectedTargetSet.getClass() );
verify( raftMock, atLeastOnce() ).setTargetMembershipSet( targetMembers.capture() );
return targetMembers.getValue();
}, equalTo( expectedTargetSet ), TIMEOUT_MS, MILLISECONDS );
}
catch ( Throwable throwable )
{
fail( throwable.getMessage() );
}
return null;
};
}
}

0 comments on commit b1e96d6

Please sign in to comment.