Skip to content

Commit

Permalink
Remove listeners from EdgeTopologyService.
Browse files Browse the repository at this point in the history
  • Loading branch information
Max Sumrall committed Jun 29, 2016
1 parent ea8fd89 commit 0b0e544
Show file tree
Hide file tree
Showing 12 changed files with 29 additions and 69 deletions.
Expand Up @@ -36,7 +36,7 @@
import org.neo4j.coreedge.catchup.tx.edge.TxPullResponse;
import org.neo4j.coreedge.catchup.tx.edge.TxPullResponseListener;
import org.neo4j.coreedge.catchup.tx.edge.TxStreamCompleteListener;
import org.neo4j.coreedge.discovery.CoreTopologyService;
import org.neo4j.coreedge.discovery.TopologyService;
import org.neo4j.coreedge.network.Message;
import org.neo4j.coreedge.raft.net.CoreOutbound;
import org.neo4j.coreedge.raft.net.Outbound;
Expand Down Expand Up @@ -66,7 +66,7 @@ public abstract class CoreClient extends LifecycleAdapter implements StoreFileRe
private Outbound<CoreMember, Message> outbound;

public CoreClient( LogProvider logProvider, ChannelInitializer<SocketChannel> channelInitializer, Monitors monitors,
int maxQueueSize, NonBlockingChannels nonBlockingChannels, CoreTopologyService discoveryService )
int maxQueueSize, NonBlockingChannels nonBlockingChannels, TopologyService discoveryService )
{
senderService =
new SenderService( channelInitializer, logProvider, monitors, maxQueueSize, nonBlockingChannels );
Expand Down
Expand Up @@ -40,7 +40,7 @@
import org.neo4j.coreedge.catchup.tx.edge.TxPullResponseHandler;
import org.neo4j.coreedge.catchup.tx.edge.TxStreamFinishedResponseDecoder;
import org.neo4j.coreedge.catchup.tx.edge.TxStreamFinishedResponseHandler;
import org.neo4j.coreedge.discovery.CoreTopologyService;
import org.neo4j.coreedge.discovery.EdgeTopologyService;
import org.neo4j.coreedge.server.IdleChannelReaperHandler;
import org.neo4j.coreedge.server.NonBlockingChannels;
import org.neo4j.coreedge.server.logging.ExceptionLoggingHandler;
Expand All @@ -50,7 +50,7 @@
public class EdgeToCoreClient extends CoreClient
{
public EdgeToCoreClient( LogProvider logProvider, ChannelInitializer channelInitializer, Monitors monitors,
int maxQueueSize, NonBlockingChannels nonBlockingChannels, CoreTopologyService discoveryService )
int maxQueueSize, NonBlockingChannels nonBlockingChannels, EdgeTopologyService discoveryService )
{
super( logProvider, channelInitializer, monitors, maxQueueSize, nonBlockingChannels, discoveryService );
}
Expand Down
Expand Up @@ -19,16 +19,12 @@
*/
package org.neo4j.coreedge.discovery;

import org.neo4j.kernel.lifecycle.Lifecycle;

public interface CoreTopologyService extends Lifecycle
public interface CoreTopologyService extends TopologyService
{
void addMembershipListener( Listener listener );

void removeMembershipListener( Listener listener );

ClusterTopology currentTopology();

interface Listener
{
void onTopologyChange();
Expand Down
Expand Up @@ -21,7 +21,7 @@

import org.neo4j.coreedge.server.AdvertisedSocketAddress;

public interface EdgeTopologyService extends CoreTopologyService
public interface EdgeTopologyService extends TopologyService
{
void registerEdgeServer( AdvertisedSocketAddress address );
}
Expand Up @@ -21,12 +21,6 @@

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.MembershipListener;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.neo4j.coreedge.server.AdvertisedSocketAddress;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
Expand All @@ -44,43 +38,13 @@ class HazelcastClient extends LifecycleAdapter implements EdgeTopologyService
private final LogProvider logProvider;
private HazelcastInstance hazelcastInstance;

private List<MembershipListener> membershipListeners = new ArrayList<>();
private Map<MembershipListener, String> membershipRegistrationId = new ConcurrentHashMap<>();

HazelcastClient( HazelcastConnector connector, LogProvider logProvider )
{
this.connector = connector;
this.logProvider = logProvider;
log = logProvider.getLog( getClass() );
}

@Override
public void addMembershipListener( Listener listener )
{
MembershipListenerAdapter hazelcastListener = new MembershipListenerAdapter( listener, log );
membershipListeners.add( hazelcastListener );

if ( hazelcastInstance != null )
{
String registrationId = hazelcastInstance.getCluster().addMembershipListener( hazelcastListener );
membershipRegistrationId.put( hazelcastListener, registrationId );
}
listener.onTopologyChange();
}

@Override
public void removeMembershipListener( Listener listener )
{
MembershipListenerAdapter hazelcastListener = new MembershipListenerAdapter( listener, log );
membershipListeners.remove( hazelcastListener );
String registrationId = membershipRegistrationId.remove( hazelcastListener );

if ( hazelcastInstance != null && registrationId != null )
{
hazelcastInstance.getCluster().removeMembershipListener( registrationId );
}
}

@Override
public ClusterTopology currentTopology()
{
Expand Down
Expand Up @@ -67,7 +67,7 @@ public void addMembershipListener( Listener listener )
MembershipListenerAdapter hazelcastListener = new MembershipListenerAdapter( listener, log );
membershipListeners.add( hazelcastListener );

if ( hazelcastInstance != null )
if ( hazelcastInstance != null ) // hazelcast has already started
{
String registrationId = hazelcastInstance.getCluster().addMembershipListener( hazelcastListener );
membershipRegistrationId.put( hazelcastListener, registrationId );
Expand All @@ -94,7 +94,8 @@ public void start()
hazelcastInstance = createHazelcastInstance();
log.info( "Cluster discovery service started" );

for ( MembershipListener membershipListener : membershipListeners )
for ( MembershipListener membershipListener : membershipListeners ) // listeners that were added before
// hazelcast started
{
String registrationId = hazelcastInstance.getCluster().addMembershipListener( membershipListener );
membershipRegistrationId.put( membershipListener, registrationId );
Expand Down
@@ -0,0 +1,8 @@
package org.neo4j.coreedge.discovery;

import org.neo4j.kernel.lifecycle.Lifecycle;

public interface TopologyService extends Lifecycle
{
ClusterTopology currentTopology();
}
Expand Up @@ -22,17 +22,17 @@
import java.util.Collection;

import org.neo4j.coreedge.discovery.CoreAddresses;
import org.neo4j.coreedge.discovery.CoreTopologyService;
import org.neo4j.coreedge.discovery.TopologyService;
import org.neo4j.coreedge.network.Message;
import org.neo4j.coreedge.server.AdvertisedSocketAddress;
import org.neo4j.coreedge.server.CoreMember;

public class CoreOutbound implements Outbound<CoreMember, Message>
{
private final CoreTopologyService discoveryService;
private final TopologyService discoveryService;
private final Outbound<AdvertisedSocketAddress, Message> outbound;

public CoreOutbound( CoreTopologyService discoveryService, Outbound<AdvertisedSocketAddress, Message> outbound )
public CoreOutbound( TopologyService discoveryService, Outbound<AdvertisedSocketAddress, Message> outbound )
{
this.discoveryService = discoveryService;
this.outbound = outbound;
Expand Down
Expand Up @@ -21,14 +21,14 @@

import org.neo4j.coreedge.discovery.ClusterTopology;
import org.neo4j.coreedge.discovery.CoreServerSelectionException;
import org.neo4j.coreedge.discovery.EdgeTopologyService;
import org.neo4j.coreedge.discovery.TopologyService;
import org.neo4j.coreedge.server.CoreMember;

public class AlwaysChooseFirstServer implements CoreServerSelectionStrategy
{
private final EdgeTopologyService discoveryService;
private final TopologyService discoveryService;

public AlwaysChooseFirstServer( EdgeTopologyService discoveryService)
public AlwaysChooseFirstServer( TopologyService discoveryService)
{
this.discoveryService = discoveryService;
}
Expand Down
Expand Up @@ -24,15 +24,15 @@

import org.neo4j.coreedge.discovery.ClusterTopology;
import org.neo4j.coreedge.discovery.CoreServerSelectionException;
import org.neo4j.coreedge.discovery.EdgeTopologyService;
import org.neo4j.coreedge.discovery.TopologyService;
import org.neo4j.coreedge.server.CoreMember;

public class ConnectToRandomCoreServer implements CoreServerSelectionStrategy
{
private final EdgeTopologyService discoveryService;
private final TopologyService discoveryService;
private final Random random = new Random();

public ConnectToRandomCoreServer( EdgeTopologyService discoveryService )
public ConnectToRandomCoreServer( TopologyService discoveryService )
{
this.discoveryService = discoveryService;
}
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.neo4j.coreedge.catchup.storecopy.edge.StoreFetcher;
import org.neo4j.coreedge.discovery.ClusterTopology;
import org.neo4j.coreedge.discovery.EdgeTopologyService;
import org.neo4j.coreedge.discovery.TopologyService;
import org.neo4j.coreedge.raft.replication.tx.ConstantTimeRetryStrategy;
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.coreedge.server.edge.AlwaysChooseFirstServer;
Expand Down Expand Up @@ -63,7 +64,7 @@ public void startShouldReplaceLocalStoreWithStoreFromCoreServerAndStartPolling()
LocalDatabase localDatabase = mock( LocalDatabase.class );

CoreMember coreMember = new CoreMember( UUID.randomUUID() );
EdgeTopologyService hazelcastTopology = mock( EdgeTopologyService.class );
TopologyService hazelcastTopology = mock( TopologyService.class );

ClusterTopology clusterTopology = mock( ClusterTopology.class );
when( hazelcastTopology.currentTopology() ).thenReturn( clusterTopology );
Expand Down Expand Up @@ -96,7 +97,7 @@ public void stopShouldStopTheDatabaseAndStopPolling() throws Throwable
LocalDatabase localDatabase = mock( LocalDatabase.class );

CoreMember coreMember = new CoreMember( UUID.randomUUID() );
EdgeTopologyService hazelcastTopology = mock( EdgeTopologyService.class );
TopologyService hazelcastTopology = mock( TopologyService.class );
ClusterTopology clusterTopology = mock( ClusterTopology.class );
when( clusterTopology.coreMembers() ).thenReturn( asSet( coreMember ) );

Expand Down
Expand Up @@ -42,16 +42,6 @@ public void registerEdgeServer( AdvertisedSocketAddress boltAddress )
log.info( "Registered edge server at %s", boltAddress );
}

@Override
public void addMembershipListener( Listener listener )
{
}

@Override
public void removeMembershipListener( Listener listener )
{
}

@Override
public ClusterTopology currentTopology()
{
Expand Down

0 comments on commit 0b0e544

Please sign in to comment.