Skip to content

Commit

Permalink
core-edge: extend dbms.cluster.role with read_replica
Browse files Browse the repository at this point in the history
  • Loading branch information
martinfurmanski committed Aug 4, 2016
1 parent 475700e commit 0e80dbb
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 31 deletions.
Expand Up @@ -38,6 +38,7 @@
import org.neo4j.coreedge.core.consensus.ConsensusModule;
import org.neo4j.coreedge.core.consensus.RaftMachine;
import org.neo4j.coreedge.core.consensus.RaftMessages;
import org.neo4j.coreedge.discovery.procedures.CoreRoleProcedure;
import org.neo4j.coreedge.messaging.CoreReplicatedContentMarshal;
import org.neo4j.coreedge.messaging.LoggingOutbound;
import org.neo4j.coreedge.messaging.Outbound;
Expand All @@ -53,7 +54,6 @@
import org.neo4j.coreedge.discovery.procedures.AcquireEndpointsProcedure;
import org.neo4j.coreedge.discovery.procedures.ClusterOverviewProcedure;
import org.neo4j.coreedge.discovery.procedures.DiscoverMembersProcedure;
import org.neo4j.coreedge.discovery.procedures.RoleProcedure;
import org.neo4j.coreedge.logging.BetterMessageLogger;
import org.neo4j.coreedge.logging.MessageLogger;
import org.neo4j.coreedge.logging.NullMessageLogger;
Expand Down Expand Up @@ -91,8 +91,6 @@
import org.neo4j.logging.LogProvider;
import org.neo4j.udc.UsageData;

import static org.neo4j.coreedge.discovery.procedures.RoleProcedure.CoreOrEdge.CORE;

/**
* This implementation of {@link org.neo4j.kernel.impl.factory.EditionModule} creates the implementations of services
* that are specific to the Enterprise Core edition that provides a core cluster.
Expand Down Expand Up @@ -120,7 +118,7 @@ public void registerProcedures( Procedures procedures )
procedures.register( new DiscoverMembersProcedure( discoveryService, logProvider ) );
procedures.register( new AcquireEndpointsProcedure( discoveryService, consensusModule.raftMachine(), logProvider ) );
procedures.register( new ClusterOverviewProcedure( discoveryService, consensusModule.raftMachine(), logProvider ) );
procedures.register( new RoleProcedure( CORE ) );
procedures.register( new CoreRoleProcedure( consensusModule.raftMachine()) );
}
catch ( ProcedureException e )
{
Expand Down
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.discovery.procedures;

import org.neo4j.coreedge.core.consensus.RaftMachine;

public class CoreRoleProcedure extends RoleProcedure
{
private final RaftMachine raft;

public CoreRoleProcedure( RaftMachine raft )
{
super();
this.raft = raft;
}

@Override
Role role()
{
return raft.isLeader() ? Role.LEADER : Role.FOLLOWER;
}
}
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.discovery.procedures;

public class EdgeRoleProcedure extends RoleProcedure
{
public EdgeRoleProcedure()
{
super();
}

@Override
Role role()
{
return RoleProcedure.Role.READ_REPLICA;
}
}
Expand Up @@ -27,31 +27,30 @@

import static org.neo4j.kernel.api.proc.ProcedureSignature.procedureSignature;

public class RoleProcedure extends CallableProcedure.BasicProcedure
abstract class RoleProcedure extends CallableProcedure.BasicProcedure
{
public static final String NAME = "role";
private final CoreOrEdge role;
private static final String PROCEDURE_NAME = "role";
private static final String[] PROCEDURE_NAMESPACE = {"dbms", "cluster"};
private static final String OUTPUT_NAME = "role";

public RoleProcedure( CoreOrEdge role )
RoleProcedure()
{
super( procedureSignature( new ProcedureSignature.ProcedureName( new String[]{"dbms", "cluster"}, NAME ) )
.out( "role", Neo4jTypes.NTString ).build() );
this.role = role;
super( procedureSignature( new ProcedureSignature.ProcedureName( PROCEDURE_NAMESPACE, PROCEDURE_NAME ) )
.out( OUTPUT_NAME, Neo4jTypes.NTString ).build() );
}

@Override
public RawIterator<Object[], ProcedureException> apply( Context ctx, Object[] input ) throws ProcedureException
public RawIterator<Object[],ProcedureException> apply( Context ctx, Object[] input ) throws ProcedureException
{
return RawIterator.<Object[], ProcedureException>of( new Object[]{name()} );
return RawIterator.<Object[],ProcedureException>of( new Object[]{role().name()} );
}

private String name()
{
return role == null ? "UNKNOWN" : role.name();
}
abstract Role role();

public enum CoreOrEdge
public enum Role
{
CORE, EDGE
LEADER,
FOLLOWER,
READ_REPLICA
}
}
Expand Up @@ -37,14 +37,14 @@
import org.neo4j.coreedge.catchup.tx.TxPullClient;
import org.neo4j.coreedge.discovery.DiscoveryServiceFactory;
import org.neo4j.coreedge.discovery.EdgeTopologyService;
import org.neo4j.coreedge.discovery.procedures.EdgeRoleProcedure;
import org.neo4j.coreedge.messaging.routing.ConnectToRandomCoreMember;
import org.neo4j.coreedge.core.consensus.ContinuousJob;
import org.neo4j.coreedge.core.consensus.schedule.DelayedRenewableTimeoutService;
import org.neo4j.coreedge.core.state.machines.tx.ExponentialBackoffStrategy;
import org.neo4j.coreedge.messaging.address.AdvertisedSocketAddress;
import org.neo4j.coreedge.core.CoreEdgeClusterSettings;
import org.neo4j.coreedge.messaging.NonBlockingChannels;
import org.neo4j.coreedge.discovery.procedures.RoleProcedure;
import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.helpers.HostnamePort;
Expand Down Expand Up @@ -92,7 +92,6 @@

import static java.util.Collections.singletonMap;

import static org.neo4j.coreedge.discovery.procedures.RoleProcedure.CoreOrEdge.EDGE;
import static org.neo4j.kernel.impl.factory.CommunityEditionModule.createLockManager;
import static org.neo4j.kernel.impl.util.JobScheduler.SchedulingStrategy.NEW_THREAD;

Expand All @@ -107,7 +106,7 @@ public void registerProcedures( Procedures procedures )
{
try
{
procedures.register( new RoleProcedure( EDGE ) );
procedures.register( new EdgeRoleProcedure() );
}
catch ( ProcedureException e )
{
Expand Down
Expand Up @@ -22,52 +22,59 @@
import org.junit.Test;

import org.neo4j.collection.RawIterator;
import org.neo4j.coreedge.core.consensus.RaftMachine;
import org.neo4j.helpers.collection.Iterators;
import org.neo4j.kernel.api.exceptions.ProcedureException;

import static org.junit.Assert.assertEquals;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.neo4j.helpers.collection.Iterators.asList;

public class RoleProcedureTest
{
@Test
public void shouldReturnCore() throws Exception
public void shouldReturnLeader() throws Exception
{
// given
RoleProcedure proc = new RoleProcedure( RoleProcedure.CoreOrEdge.CORE );
RaftMachine raft = mock( RaftMachine.class );
when( raft.isLeader() ).thenReturn( true );
RoleProcedure proc = new CoreRoleProcedure( raft );

// when
RawIterator<Object[], ProcedureException> result = proc.apply( null, null );

// then
assertEquals( RoleProcedure.CoreOrEdge.CORE.name(), single( result )[0]);
assertEquals( RoleProcedure.Role.LEADER.name(), single( result )[0]);
}

@Test
public void shouldReturnEdge() throws Exception
public void shouldReturnFollower() throws Exception
{
// given
RoleProcedure proc = new RoleProcedure( RoleProcedure.CoreOrEdge.EDGE );
RaftMachine raft = mock( RaftMachine.class );
when( raft.isLeader() ).thenReturn( false );
RoleProcedure proc = new CoreRoleProcedure( raft );

// when
RawIterator<Object[], ProcedureException> result = proc.apply( null, null );

// then
assertEquals( RoleProcedure.CoreOrEdge.EDGE.name(), single( result )[0]);
assertEquals( RoleProcedure.Role.FOLLOWER.name(), single( result )[0]);
}

@Test
public void shouldReturnUnknown() throws Exception
public void shouldReturnReadReplica() throws Exception
{
// given
RoleProcedure proc = new RoleProcedure( null );
RoleProcedure proc = new EdgeRoleProcedure();

// when
RawIterator<Object[], ProcedureException> result = proc.apply( null, null );

// then
assertEquals( "UNKNOWN", single( result )[0]);
assertEquals( RoleProcedure.Role.READ_REPLICA.name(), single( result )[0]);
}

private Object[] single( RawIterator<Object[], ProcedureException> result ) throws ProcedureException
Expand Down

0 comments on commit 0e80dbb

Please sign in to comment.