Skip to content

Commit

Permalink
Procedure to show installed protocols raft connections
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewkerr9000 committed Feb 12, 2018
1 parent 6e33ef6 commit 5b42408
Show file tree
Hide file tree
Showing 25 changed files with 751 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.NoSuchElementException;

import org.neo4j.function.ThrowingSupplier;
import org.neo4j.helpers.collection.Iterators;

/**
* Just like {@link Iterator}, but with the addition that {@link #hasNext()} and {@link #next()} can
Expand Down Expand Up @@ -95,19 +96,6 @@ protected T fetchNextOrNull() throws EX
*/
static <T, EX extends Exception> RawIterator<T, EX> wrap( final Iterator<T> iterator )
{
return new RawIterator<T,EX>()
{
@Override
public boolean hasNext() throws EX
{
return iterator.hasNext();
}

@Override
public T next() throws EX
{
return iterator.next();
}
};
return Iterators.asRawIterator( iterator );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,11 @@ public T next() throws EX
};
}

public static <T, EX extends Exception> RawIterator<T, EX> asRawIterator( Stream<T> stream )
{
return asRawIterator( stream.iterator() );
}

public static <FROM, TO> Iterator<TO> flatMap( Function<? super FROM, ? extends Iterator<TO>> function, Iterator<FROM> from )
{
return new CombiningIterator<>( map(function, from) );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.PrintWriter;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Stream;

import org.neo4j.causalclustering.ReplicationModule;
import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase;
Expand All @@ -46,9 +47,10 @@
import org.neo4j.causalclustering.discovery.DiscoveryServiceFactory;
import org.neo4j.causalclustering.discovery.procedures.ClusterOverviewProcedure;
import org.neo4j.causalclustering.discovery.procedures.CoreRoleProcedure;
import org.neo4j.causalclustering.handlers.VoidPipelineWrapperFactory;
import org.neo4j.causalclustering.handlers.PipelineWrapper;
import org.neo4j.causalclustering.discovery.procedures.InstalledProtocolsProcedure;
import org.neo4j.causalclustering.handlers.DuplexPipelineWrapperFactory;
import org.neo4j.causalclustering.handlers.PipelineWrapper;
import org.neo4j.causalclustering.handlers.VoidPipelineWrapperFactory;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.load_balancing.LoadBalancingPluginLoader;
import org.neo4j.causalclustering.load_balancing.LoadBalancingProcessor;
Expand All @@ -63,15 +65,19 @@
import org.neo4j.causalclustering.messaging.RaftOutbound;
import org.neo4j.causalclustering.messaging.SenderService;
import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory;
import org.neo4j.causalclustering.protocol.ProtocolInstaller;
import org.neo4j.causalclustering.protocol.handshake.HandshakeClientInitializer;
import org.neo4j.causalclustering.protocol.Protocol;
import org.neo4j.causalclustering.protocol.ProtocolInstaller;
import org.neo4j.causalclustering.protocol.ProtocolInstallerRepository;
import org.neo4j.causalclustering.protocol.handshake.HandshakeClientInitializer;
import org.neo4j.causalclustering.protocol.handshake.ProtocolRepository;
import org.neo4j.causalclustering.protocol.handshake.ProtocolStack;
import org.neo4j.com.storecopy.StoreUtil;
import org.neo4j.function.Predicates;
import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.SocketAddress;
import org.neo4j.helpers.collection.Pair;
import org.neo4j.internal.kernel.api.exceptions.KernelException;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
Expand Down Expand Up @@ -125,6 +131,8 @@ public class EnterpriseCoreEditionModule extends EditionModule
private final CoreTopologyService topologyService;
protected final LogProvider logProvider;
protected final Config config;
private final Supplier<Stream<Pair<AdvertisedSocketAddress,ProtocolStack>>> clientInstalledProtocols;
private final Supplier<Stream<Pair<SocketAddress,ProtocolStack>>> serverInstalledProtocols;
private CoreStateMachinesModule coreStateMachinesModule;

public enum RaftLogImplementation
Expand Down Expand Up @@ -162,6 +170,7 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke

procedures.register( new ClusterOverviewProcedure( topologyService, consensusModule.raftMachine(), logProvider ) );
procedures.register( new CoreRoleProcedure( consensusModule.raftMachine() ) );
procedures.register( new InstalledProtocolsProcedure( clientInstalledProtocols, serverInstalledProtocols ) );
procedures.registerComponent( Replicator.class, x -> replicationModule.getReplicator(), true );
procedures.registerProcedure( ReplicationBenchmarkProcedure.class );
}
Expand Down Expand Up @@ -231,6 +240,7 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke
protocolInstallerRepository, config, clientPipelineBuilderFactory );
final SenderService raftSender = new SenderService( channelInitializer, logProvider );
life.add( raftSender );
this.clientInstalledProtocols = raftSender::installedProtocols;

final MessageLogger<MemberId> messageLogger = createMessageLogger( config, life, identityModule.myself() );

Expand Down Expand Up @@ -267,7 +277,9 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke
CoreServerModule coreServerModule = new CoreServerModule( identityModule, platformModule, consensusModule, coreStateMachinesModule, clusteringModule,
replicationModule, localDatabase, databaseHealthSupplier, clusterStateDirectory.get(), serverPipelineWrapper, clientPipelineWrapper );

new RaftServerModule( platformModule, consensusModule, identityModule, coreServerModule, localDatabase, serverPipelineBuilderFactory, messageLogger );
serverInstalledProtocols = new RaftServerModule(
platformModule, consensusModule, identityModule, coreServerModule, localDatabase, serverPipelineBuilderFactory, messageLogger
).raftServer()::installedProtocols;

editionInvariants( platformModule, dependencies, config, logging, life );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class RaftServerModule
private final MessageLogger<MemberId> messageLogger;
private final LogProvider logProvider;
private final NettyPipelineBuilderFactory pipelineBuilderFactory;
private final RaftServer raftServer;

RaftServerModule( PlatformModule platformModule, ConsensusModule consensusModule, IdentityModule identityModule, CoreServerModule coreServerModule,
LocalDatabase localDatabase, NettyPipelineBuilderFactory pipelineBuilderFactory, MessageLogger<MemberId> messageLogger )
Expand All @@ -72,10 +73,11 @@ class RaftServerModule

LifecycleMessageHandler<ReceivedInstantClusterIdAwareMessage<?>> messageHandlerChain = createMessageHandlerChain( coreServerModule );

createRaftServer( coreServerModule, messageHandlerChain );
raftServer = createRaftServer( coreServerModule, messageHandlerChain );
}

private void createRaftServer( CoreServerModule coreServerModule, LifecycleMessageHandler<ReceivedInstantClusterIdAwareMessage<?>> messageHandlerChain )
private RaftServer createRaftServer( CoreServerModule coreServerModule,
LifecycleMessageHandler<ReceivedInstantClusterIdAwareMessage<?>> messageHandlerChain )
{
ProtocolRepository protocolRepository = new ProtocolRepository( Protocol.Protocols.values() );

Expand All @@ -97,6 +99,8 @@ private void createRaftServer( CoreServerModule coreServerModule, LifecycleMessa
platformModule.life.add( coreServerModule.createCoreLife( messageHandlerChain ) );
platformModule.life.add( coreServerModule.catchupServer() ); // must start last and stop first, since it handles external requests
platformModule.life.add( coreServerModule.downloadService() );

return raftServer;
}

private LifecycleMessageHandler<ReceivedInstantClusterIdAwareMessage<?>> createMessageHandlerChain( CoreServerModule coreServerModule )
Expand All @@ -123,4 +127,9 @@ private LifecycleMessageHandler<ReceivedInstantClusterIdAwareMessage<?>> createM
.compose( monitoringHandler )
.apply( messageApplier );
}

public RaftServer raftServer()
{
return raftServer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
Expand All @@ -29,10 +31,17 @@
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.net.BindException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import org.neo4j.causalclustering.protocol.handshake.ProtocolStack;
import org.neo4j.causalclustering.protocol.handshake.ServerHandshakeFinishedEvent;
import org.neo4j.helpers.ListenSocketAddress;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.helpers.SocketAddress;
import org.neo4j.helpers.collection.Pair;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
Expand All @@ -42,6 +51,8 @@

public class RaftServer extends LifecycleAdapter
{
public static final String RAFT_SERVER = "RaftServer";

private final ChannelInitializer<SocketChannel> channelInitializer;

private final ListenSocketAddress listenAddress;
Expand All @@ -50,6 +61,7 @@ public class RaftServer extends LifecycleAdapter

private EventLoopGroup workerGroup;
private Channel channel;
private ConcurrentMap<SocketAddress,ProtocolStack> installedProtocols = new ConcurrentHashMap<>();

private final NamedThreadFactory threadFactory = new NamedThreadFactory( "raft-server" );

Expand Down Expand Up @@ -98,6 +110,27 @@ private void startNettyServer()
.channel( NioServerSocketChannel.class )
.option( ChannelOption.SO_REUSEADDR, true )
.localAddress( listenAddress.socketAddress() )
.handler( new ChannelInboundHandlerAdapter()
{
@Override
public void userEventTriggered( ChannelHandlerContext ctx, Object evt ) throws Exception
{
if ( evt instanceof ServerHandshakeFinishedEvent.Created )
{
ServerHandshakeFinishedEvent.Created created = (ServerHandshakeFinishedEvent.Created) evt;
installedProtocols.put( created.advertisedSocketAddress, created.protocolStack );
}
else if ( evt instanceof ServerHandshakeFinishedEvent.Closed )
{
ServerHandshakeFinishedEvent.Closed closed = (ServerHandshakeFinishedEvent.Closed) evt;
installedProtocols.remove( closed.advertisedSocketAddress );
}
else
{
super.userEventTriggered( ctx, evt );
}
}
} )
.childHandler( channelInitializer );

try
Expand All @@ -117,4 +150,9 @@ private void startNettyServer()
}
}
}

public Stream<Pair<SocketAddress,ProtocolStack>> installedProtocols()
{
return installedProtocols.entrySet().stream().map( entry -> Pair.of( entry.getKey(), entry.getValue() ) );
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.discovery.procedures;

import java.util.Comparator;
import java.util.function.Supplier;
import java.util.stream.Stream;

import org.neo4j.causalclustering.protocol.ProtocolInstaller;
import org.neo4j.causalclustering.protocol.handshake.ProtocolStack;
import org.neo4j.collection.RawIterator;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.SocketAddress;
import org.neo4j.helpers.collection.Iterators;
import org.neo4j.helpers.collection.Pair;
import org.neo4j.kernel.api.exceptions.ProcedureException;
import org.neo4j.kernel.api.proc.CallableProcedure;
import org.neo4j.kernel.api.proc.Context;
import org.neo4j.kernel.api.proc.Neo4jTypes;
import org.neo4j.kernel.api.proc.ProcedureSignature;
import org.neo4j.kernel.api.proc.QualifiedName;

public class InstalledProtocolsProcedure extends CallableProcedure.BasicProcedure
{
private static final String[] PROCEDURE_NAMESPACE = {"dbms", "cluster"};

public static final String PROCEDURE_NAME = "protocols";

private final Supplier<Stream<Pair<AdvertisedSocketAddress,ProtocolStack>>> clientInstalledProtocols;
private final Supplier<Stream<Pair<SocketAddress,ProtocolStack>>> serverInstalledProtocols;

public InstalledProtocolsProcedure( Supplier<Stream<Pair<AdvertisedSocketAddress,ProtocolStack>>> clientInstalledProtocols,
Supplier<Stream<Pair<SocketAddress,ProtocolStack>>> serverInstalledProtocols )
{
super( ProcedureSignature.procedureSignature( new QualifiedName( PROCEDURE_NAMESPACE, PROCEDURE_NAME ) )
.out( "orientation", Neo4jTypes.NTString )
.out( "remoteAddress", Neo4jTypes.NTString )
.out( "applicationProtocol", Neo4jTypes.NTString )
.out( "version", Neo4jTypes.NTInteger )
.description( "Overview of installed protocols" )
.build() );
this.clientInstalledProtocols = clientInstalledProtocols;
this.serverInstalledProtocols = serverInstalledProtocols;
}

@Override
public RawIterator<Object[],ProcedureException> apply( Context ctx, Object[] input ) throws ProcedureException
{
Stream<Object[]> outbound = toOutputRows( clientInstalledProtocols, ProtocolInstaller.Orientation.Client.OUTBOUND );

Stream<Object[]> inbound = toOutputRows( serverInstalledProtocols, ProtocolInstaller.Orientation.Server.INBOUND );

return Iterators.asRawIterator( Stream.concat( outbound, inbound ) );
}

private <T extends SocketAddress> Stream<Object[]> toOutputRows( Supplier<Stream<Pair<T,ProtocolStack>>> installedProtocols, String orientation )
{
return installedProtocols.get()
.sorted( Comparator.comparing( entry -> entry.first().toString() ) )
.map( entry -> new Object[]
{
orientation,
entry.first().toString(),
entry.other().applicationProtocol().identifier(),
(long) entry.other().applicationProtocol().version()
} );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,19 @@
import io.netty.channel.Channel;
import io.netty.util.concurrent.Future;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;

import org.neo4j.causalclustering.protocol.handshake.ProtocolStack;

/**
* Allows intercepting the writing to a channel.
*/
public interface ChannelInterceptor
{
void write( BiFunction<Channel, Object, Future<Void>> writer, io.netty.channel.Channel channel, Object msg,
CompletableFuture<Void> promise );

Optional<ProtocolStack> installedProtocolStack();
}

0 comments on commit 5b42408

Please sign in to comment.