Skip to content

Commit

Permalink
Introduce test-only SharedDiscoveryService.
Browse files Browse the repository at this point in the history
  • Loading branch information
apcj committed Jun 12, 2016
1 parent d18af75 commit c998c73
Show file tree
Hide file tree
Showing 26 changed files with 363 additions and 68 deletions.
Expand Up @@ -27,7 +27,7 @@


public interface ClusterTopology public interface ClusterTopology
{ {
boolean bootstrappable(); boolean canBeBootstrapped();


Set<CoreMember> coreMembers(); Set<CoreMember> coreMembers();


Expand Down
Expand Up @@ -27,6 +27,6 @@ public interface CoreTopologyService extends ReadOnlyTopologyService


interface Listener interface Listener
{ {
void onTopologyChange( ClusterTopology clusterTopology ); void onTopologyChange();
} }
} }
Expand Up @@ -19,9 +19,9 @@
*/ */
package org.neo4j.coreedge.discovery; package org.neo4j.coreedge.discovery;


import org.neo4j.helpers.HostnamePort; import org.neo4j.coreedge.server.AdvertisedSocketAddress;


public interface EdgeTopologyService extends ReadOnlyTopologyService public interface EdgeTopologyService extends ReadOnlyTopologyService
{ {
void registerEdgeServer( HostnamePort address ); void registerEdgeServer( AdvertisedSocketAddress address );
} }
Expand Up @@ -27,7 +27,6 @@


import org.neo4j.coreedge.server.AdvertisedSocketAddress; import org.neo4j.coreedge.server.AdvertisedSocketAddress;
import org.neo4j.coreedge.server.BoltAddress; import org.neo4j.coreedge.server.BoltAddress;
import org.neo4j.helpers.HostnamePort;
import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;
Expand Down Expand Up @@ -108,7 +107,7 @@ public void stop() throws Throwable
} }


@Override @Override
public void registerEdgeServer( HostnamePort address ) public void registerEdgeServer( AdvertisedSocketAddress address )
{ {
hazelcastInstance.getSet( EDGE_SERVERS ).add( address.toString() ); hazelcastInstance.getSet( EDGE_SERVERS ).add( address.toString() );
} }
Expand Down
Expand Up @@ -45,7 +45,7 @@ public class HazelcastClusterTopology implements ClusterTopology
} }


@Override @Override
public boolean bootstrappable() public boolean canBeBootstrapped()
{ {
Member firstMember = coreMembers.iterator().next(); Member firstMember = coreMembers.iterator().next();
return firstMember.localMember(); return firstMember.localMember();
Expand Down
Expand Up @@ -41,7 +41,6 @@
import org.neo4j.coreedge.server.AdvertisedSocketAddress; import org.neo4j.coreedge.server.AdvertisedSocketAddress;
import org.neo4j.coreedge.server.ListenSocketAddress; import org.neo4j.coreedge.server.ListenSocketAddress;
import org.neo4j.coreedge.server.edge.EnterpriseEdgeEditionModule; import org.neo4j.coreedge.server.edge.EnterpriseEdgeEditionModule;
import org.neo4j.helpers.HostnamePort;
import org.neo4j.helpers.Listeners; import org.neo4j.helpers.Listeners;
import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.lifecycle.LifecycleAdapter;
Expand Down Expand Up @@ -82,7 +81,7 @@ public void addMembershipListener( Listener listener )
String registrationId = hazelcastInstance.getCluster().addMembershipListener( hazelcastListener ); String registrationId = hazelcastInstance.getCluster().addMembershipListener( hazelcastListener );
membershipRegistrationId.put( hazelcastListener, registrationId ); membershipRegistrationId.put( hazelcastListener, registrationId );
} }
listener.onTopologyChange( currentTopology() ); listener.onTopologyChange();
} }


@Override @Override
Expand Down Expand Up @@ -171,7 +170,7 @@ private HazelcastInstance createHazelcastInstance()
AdvertisedSocketAddress raftAddress = config.get( CoreEdgeClusterSettings.raft_advertised_address ); AdvertisedSocketAddress raftAddress = config.get( CoreEdgeClusterSettings.raft_advertised_address );
memberAttributeConfig.setStringAttribute( RAFT_SERVER, raftAddress.toString() ); memberAttributeConfig.setStringAttribute( RAFT_SERVER, raftAddress.toString() );


HostnamePort boltAddress = EnterpriseEdgeEditionModule.extractBoltAddress( config ); AdvertisedSocketAddress boltAddress = EnterpriseEdgeEditionModule.extractBoltAddress( config );
memberAttributeConfig.setStringAttribute( BOLT_SERVER, boltAddress.toString() ); memberAttributeConfig.setStringAttribute( BOLT_SERVER, boltAddress.toString() );


c.setMemberAttributeConfig( memberAttributeConfig ); c.setMemberAttributeConfig( memberAttributeConfig );
Expand Down Expand Up @@ -211,7 +210,7 @@ public void memberAdded( MembershipEvent membershipEvent )
HazelcastClusterTopology clusterTopology = new HazelcastClusterTopology( HazelcastClusterTopology clusterTopology = new HazelcastClusterTopology(
hazelcastInstance.getCluster().getMembers(), hazelcastInstance.getCluster().getMembers(),
HazelcastClient.edgeMembers(hazelcastInstance) ); HazelcastClient.edgeMembers(hazelcastInstance) );
listener.onTopologyChange( clusterTopology ); listener.onTopologyChange();
} }


@Override @Override
Expand All @@ -220,7 +219,7 @@ public void memberRemoved( MembershipEvent membershipEvent )
HazelcastClusterTopology clusterTopology = new HazelcastClusterTopology( HazelcastClusterTopology clusterTopology = new HazelcastClusterTopology(
hazelcastInstance.getCluster().getMembers(), hazelcastInstance.getCluster().getMembers(),
HazelcastClient.edgeMembers(hazelcastInstance) ); HazelcastClient.edgeMembers(hazelcastInstance) );
listener.onTopologyChange( clusterTopology ); listener.onTopologyChange();
} }


@Override @Override
Expand Down
Expand Up @@ -47,12 +47,12 @@ public void start() throws BootstrapException
ClusterTopology clusterTopology = discoveryService.currentTopology(); ClusterTopology clusterTopology = discoveryService.currentTopology();
Set<CoreMember> initialMembers = clusterTopology.coreMembers(); Set<CoreMember> initialMembers = clusterTopology.coreMembers();


if ( clusterTopology.bootstrappable() ) if ( clusterTopology.canBeBootstrapped() )
{ {
raftInstance.bootstrapWithInitialMembers( new CoreMemberSet( initialMembers ) ); raftInstance.bootstrapWithInitialMembers( new CoreMemberSet( initialMembers ) );
} }


onTopologyChange( clusterTopology ); onTopologyChange();
} }


@Override @Override
Expand All @@ -62,8 +62,8 @@ public void stop()
} }


@Override @Override
public void onTopologyChange( ClusterTopology clusterTopology ) public void onTopologyChange()
{ {
raftInstance.setTargetMembershipSet( clusterTopology.coreMembers() ); raftInstance.setTargetMembershipSet( discoveryService.currentTopology().coreMembers() );
} }
} }
Expand Up @@ -163,6 +163,7 @@
import org.neo4j.udc.UsageData; import org.neo4j.udc.UsageData;


import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.concurrent.TimeUnit.SECONDS;

import static org.neo4j.kernel.impl.util.JobScheduler.SchedulingStrategy.NEW_THREAD; import static org.neo4j.kernel.impl.util.JobScheduler.SchedulingStrategy.NEW_THREAD;


/** /**
Expand Down Expand Up @@ -276,7 +277,7 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule,
myself = new CoreMember( myself = new CoreMember(
config.get( CoreEdgeClusterSettings.transaction_advertised_address ), config.get( CoreEdgeClusterSettings.transaction_advertised_address ),
config.get( CoreEdgeClusterSettings.raft_advertised_address ), config.get( CoreEdgeClusterSettings.raft_advertised_address ),
new AdvertisedSocketAddress( EnterpriseEdgeEditionModule.extractBoltAddress(config).toString()) EnterpriseEdgeEditionModule.extractBoltAddress( config )
); );


final MessageLogger<AdvertisedSocketAddress> messageLogger; final MessageLogger<AdvertisedSocketAddress> messageLogger;
Expand Down
Expand Up @@ -200,10 +200,9 @@ txPollingClient, platformModule.dataSourceManager, new ConnectToRandomCoreServer
new ExponentialBackoffStrategy( 1, TimeUnit.SECONDS ), logProvider, discoveryService, config ) ); new ExponentialBackoffStrategy( 1, TimeUnit.SECONDS ), logProvider, discoveryService, config ) );
} }


public static HostnamePort extractBoltAddress( Config config ) public static AdvertisedSocketAddress extractBoltAddress( Config config )
{ {
AdvertisedSocketAddress advertisedSocketAddress = config.get( CoreEdgeClusterSettings.bolt_advertised_address ); return config.get( CoreEdgeClusterSettings.bolt_advertised_address );
return new HostnamePort( advertisedSocketAddress.toString() );
} }


private void registerRecovery( final DatabaseInfo databaseInfo, LifeSupport life, private void registerRecovery( final DatabaseInfo databaseInfo, LifeSupport life,
Expand Down
Expand Up @@ -34,6 +34,7 @@
import com.hazelcast.core.Member; import com.hazelcast.core.Member;
import org.junit.Test; import org.junit.Test;


import org.neo4j.coreedge.server.AdvertisedSocketAddress;
import org.neo4j.helpers.HostnamePort; import org.neo4j.helpers.HostnamePort;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;
Expand Down Expand Up @@ -220,7 +221,7 @@ public void shouldRegisterEdgeServerInTopology() throws Exception


// when // when
client.currentTopology(); client.currentTopology();
client.registerEdgeServer( new HostnamePort( "localhost:7000" ) ); client.registerEdgeServer( new AdvertisedSocketAddress( "localhost:7000" ) );


// then // then
assertEquals( 1, client.currentTopology().edgeMembers().size() ); assertEquals( 1, client.currentTopology().edgeMembers().size() );
Expand Down
@@ -0,0 +1,105 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.discovery;

import java.util.LinkedHashSet;
import java.util.Set;

import org.neo4j.coreedge.server.BoltAddress;
import org.neo4j.coreedge.server.CoreEdgeClusterSettings;
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.coreedge.server.edge.EnterpriseEdgeEditionModule;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

class SharedDiscoveryCoreClient extends LifecycleAdapter implements CoreTopologyService
{
private final SharedDiscoveryService sharedDiscoveryService;
private final CoreMember member;
private final BoltAddress boltAddress;
private final Set<Listener> listeners = new LinkedHashSet<>();
private final Log log;

SharedDiscoveryCoreClient( Config config, SharedDiscoveryService sharedDiscoveryService, LogProvider logProvider )
{
this.sharedDiscoveryService = sharedDiscoveryService;
this.member = toCoreMember( config );
this.boltAddress = extractBoltAddress( config );
this.log = logProvider.getLog( getClass() );
}

@Override
public synchronized void addMembershipListener( Listener listener )
{
listeners.add( listener );
}

@Override
public synchronized void removeMembershipListener( Listener listener )
{
listeners.remove( listener );
}

@Override
public void start() throws InterruptedException
{
sharedDiscoveryService.registerCoreServer( member, boltAddress, this );
log.info( "Registered core server %s", member );
sharedDiscoveryService.waitForClusterFormation();
log.info( "Cluster formed" );
}

@Override
public void stop()
{
sharedDiscoveryService.unRegisterCoreServer( member, boltAddress, this );
log.info( "Unregistered core server %s", member );
}

@Override
public ClusterTopology currentTopology()
{
ClusterTopology topology = sharedDiscoveryService.currentTopology( this );
log.info( "Current topology is %s", topology );
return topology;
}

synchronized void onTopologyChange()
{
log.info( "Notified of topology change" );
listeners.forEach( Listener::onTopologyChange );
}

private static CoreMember toCoreMember( Config config )
{
return new CoreMember(
config.get( CoreEdgeClusterSettings.transaction_advertised_address ),
config.get( CoreEdgeClusterSettings.raft_advertised_address ),
EnterpriseEdgeEditionModule.extractBoltAddress( config )
);
}

private static BoltAddress extractBoltAddress( Config config )
{
return new BoltAddress( EnterpriseEdgeEditionModule.extractBoltAddress( config ) );
}
}
@@ -0,0 +1,52 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.discovery;

import org.neo4j.coreedge.server.AdvertisedSocketAddress;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

class SharedDiscoveryEdgeClient extends LifecycleAdapter implements EdgeTopologyService
{
private final SharedDiscoveryService sharedDiscoveryService;
private final Log log;

SharedDiscoveryEdgeClient( SharedDiscoveryService sharedDiscoveryService, LogProvider logProvider )
{
this.sharedDiscoveryService = sharedDiscoveryService;
this.log = logProvider.getLog( getClass() );
}

@Override
public void registerEdgeServer( AdvertisedSocketAddress address )
{
sharedDiscoveryService.registerEdgeServer( address );
log.info( "Registered edge server at %s", address );
}

@Override
public ClusterTopology currentTopology()
{
ClusterTopology topology = sharedDiscoveryService.currentTopology( null );
log.info( "Current topology is %s", topology );
return topology;
}
}

0 comments on commit c998c73

Please sign in to comment.