Skip to content

Commit

Permalink
Log (with tunable frequency) when member has no known address.
Browse files Browse the repository at this point in the history
  • Loading branch information
Max Sumrall committed Jun 29, 2016
1 parent e447fff commit 56767b3
Show file tree
Hide file tree
Showing 21 changed files with 405 additions and 157 deletions.
Expand Up @@ -66,11 +66,12 @@ public abstract class CoreClient extends LifecycleAdapter implements StoreFileRe
private Outbound<CoreMember, Message> outbound;

public CoreClient( LogProvider logProvider, ChannelInitializer<SocketChannel> channelInitializer, Monitors monitors,
int maxQueueSize, NonBlockingChannels nonBlockingChannels, TopologyService discoveryService )
int maxQueueSize, NonBlockingChannels nonBlockingChannels, TopologyService discoveryService,
long logThresholdMillis )
{
senderService =
new SenderService( channelInitializer, logProvider, monitors, maxQueueSize, nonBlockingChannels );
this.outbound = new CoreOutbound( discoveryService, senderService );
this.outbound = new CoreOutbound( discoveryService, senderService, logProvider, logThresholdMillis );
this.pullRequestMonitor = monitors.newMonitor( PullRequestMonitor.class );
}

Expand Down
Expand Up @@ -53,9 +53,11 @@
public class CoreToCoreClient extends CoreClient
{
public CoreToCoreClient( LogProvider logProvider, ChannelInitializer channelInitializer, Monitors monitors,
int maxQueueSize, NonBlockingChannels nonBlockingChannels, CoreTopologyService discoveryService )
int maxQueueSize, NonBlockingChannels nonBlockingChannels, CoreTopologyService discoveryService,
long logThresholdMillis )
{
super( logProvider, channelInitializer, monitors, maxQueueSize, nonBlockingChannels, discoveryService );
super( logProvider, channelInitializer, monitors, maxQueueSize, nonBlockingChannels, discoveryService,
logThresholdMillis );
}

public static class ChannelInitializer extends io.netty.channel.ChannelInitializer<SocketChannel>
Expand Down
Expand Up @@ -50,9 +50,11 @@
public class EdgeToCoreClient extends CoreClient
{
public EdgeToCoreClient( LogProvider logProvider, ChannelInitializer channelInitializer, Monitors monitors,
int maxQueueSize, NonBlockingChannels nonBlockingChannels, EdgeTopologyService discoveryService )
int maxQueueSize, NonBlockingChannels nonBlockingChannels, EdgeTopologyService discoveryService,
long logThresholdMillis )
{
super( logProvider, channelInitializer, monitors, maxQueueSize, nonBlockingChannels, discoveryService );
super( logProvider, channelInitializer, monitors, maxQueueSize, nonBlockingChannels, discoveryService,
logThresholdMillis );
}

public static class ChannelInitializer extends io.netty.channel.ChannelInitializer<SocketChannel>
Expand Down
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.coreedge.discovery;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
Expand All @@ -45,7 +46,12 @@ public Set<CoreMember> coreMembers()
return coreMembers.keySet();
}

public Set<EdgeAddresses> edgeMembers()
public Collection<CoreAddresses> coreMemberAddresses()
{
return coreMembers.values();
}

public Set<EdgeAddresses> edgeMemberAddresses()
{
return edgeAddresses;
}
Expand All @@ -55,9 +61,14 @@ boolean canBeBootstrapped()
return canBeBootstrapped;
}

public CoreAddresses coreAddresses(CoreMember coreMember)
public CoreAddresses coreAddresses( CoreMember coreMember ) throws NoKnownAddressesException
{
return coreMembers.get( coreMember );
CoreAddresses coreAddresses = coreMembers.get( coreMember );
if ( coreAddresses == null )
{
throw new NoKnownAddressesException();
}
return coreAddresses;
}

@Override
Expand All @@ -66,7 +77,7 @@ public String toString()
return "TestOnlyClusterTopology{" +
"coreMembers.size()=" + coreMembers.size() +
", bootstrappable=" + canBeBootstrapped() +
", edgeMembers.size()=" + edgeAddresses.size() +
", edgeMemberAddresses.size()=" + edgeAddresses.size() +
'}';
}
}
@@ -0,0 +1,24 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.discovery;

public class NoKnownAddressesException extends Exception
{
}
@@ -1,3 +1,22 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.discovery;

import org.neo4j.kernel.lifecycle.Lifecycle;
Expand Down
Expand Up @@ -19,46 +19,57 @@
*/
package org.neo4j.coreedge.raft.net;

import java.time.Clock;
import java.util.Collection;

import org.neo4j.coreedge.discovery.CoreAddresses;
import org.neo4j.coreedge.discovery.NoKnownAddressesException;
import org.neo4j.coreedge.discovery.TopologyService;
import org.neo4j.coreedge.network.Message;
import org.neo4j.coreedge.server.AdvertisedSocketAddress;
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.logging.LogProvider;

public class CoreOutbound implements Outbound<CoreMember, Message>
{
private final TopologyService discoveryService;
private final Outbound<AdvertisedSocketAddress, Message> outbound;
private UnknownAddressMonitor unknownAddressMonitor;

public CoreOutbound( TopologyService discoveryService, Outbound<AdvertisedSocketAddress, Message> outbound )
public CoreOutbound( TopologyService discoveryService, Outbound<AdvertisedSocketAddress,Message> outbound,
LogProvider logProvider, long logThresholdMillis )
{
this.discoveryService = discoveryService;
this.outbound = outbound;
this.unknownAddressMonitor = new UnknownAddressMonitor(
logProvider.getLog( this.getClass() ), Clock.systemUTC(), logThresholdMillis );
}

@Override
public void send( CoreMember to, Message message )
{
CoreAddresses coreAddresses = discoveryService.currentTopology().coreAddresses( to );
if ( coreAddresses != null )
try
{
CoreAddresses coreAddresses = discoveryService.currentTopology().coreAddresses( to );
outbound.send( coreAddresses.getCoreServer(), message );
}
// Drop messages for servers that are missing from the cluster topology;
// discovery service thinks that they are offline, so it's not worth trying to send them anything.
catch ( NoKnownAddressesException e )
{
unknownAddressMonitor.logAttemptToSendToMemberWithNoKnownAddress( to );
}
}

@Override
public void send( CoreMember to, Collection<Message> messages )
{
CoreAddresses coreAddresses = discoveryService.currentTopology().coreAddresses( to );
if ( coreAddresses != null )
try
{
CoreAddresses coreAddresses = discoveryService.currentTopology().coreAddresses( to );
outbound.send( coreAddresses.getCoreServer(), messages );
}
// Drop messages for servers that are missing from the cluster topology;
// discovery service thinks that they are offline, so it's not worth trying to send them anything.
catch ( NoKnownAddressesException e )
{
unknownAddressMonitor.logAttemptToSendToMemberWithNoKnownAddress( to );
}
}
}
Expand Up @@ -19,16 +19,19 @@
*/
package org.neo4j.coreedge.raft.net;

import java.time.Clock;
import java.util.Collection;

import org.neo4j.coreedge.catchup.storecopy.LocalDatabase;
import org.neo4j.coreedge.discovery.CoreAddresses;
import org.neo4j.coreedge.discovery.CoreTopologyService;
import org.neo4j.coreedge.discovery.NoKnownAddressesException;
import org.neo4j.coreedge.network.Message;
import org.neo4j.coreedge.raft.RaftMessages.RaftMessage;
import org.neo4j.coreedge.raft.RaftMessages.StoreIdAwareMessage;
import org.neo4j.coreedge.server.AdvertisedSocketAddress;
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.logging.LogProvider;

import static java.util.stream.Collectors.toList;

Expand All @@ -37,38 +40,45 @@ public class RaftOutbound implements Outbound<CoreMember, RaftMessage>
private final CoreTopologyService discoveryService;
private final Outbound<AdvertisedSocketAddress,Message> outbound;
private final LocalDatabase localDatabase;
private final UnknownAddressMonitor unknownAddressMonitor;

public RaftOutbound( CoreTopologyService discoveryService, Outbound<AdvertisedSocketAddress,Message> outbound,
LocalDatabase localDatabase )
LocalDatabase localDatabase, LogProvider logProvider, long logThresholdMillis )
{
this.discoveryService = discoveryService;
this.outbound = outbound;
this.localDatabase = localDatabase;
this.unknownAddressMonitor = new UnknownAddressMonitor(
logProvider.getLog( this.getClass() ), Clock.systemUTC(), logThresholdMillis );
}

@Override
public void send( CoreMember to, RaftMessage message )
{
CoreAddresses coreAddresses = discoveryService.currentTopology().coreAddresses( to );
if ( coreAddresses != null )
try
{
CoreAddresses coreAddresses = discoveryService.currentTopology().coreAddresses( to );
outbound.send( coreAddresses.getRaftServer(), decorateWithStoreId( message ) );
}
// Drop messages for servers that are missing from the cluster topology;
// discovery service thinks that they are offline, so it's not worth trying to send them anything.
catch ( NoKnownAddressesException e )
{
unknownAddressMonitor.logAttemptToSendToMemberWithNoKnownAddress( to );
}
}

@Override
public void send( CoreMember to, Collection<RaftMessage> messages )
{
CoreAddresses coreAddresses = discoveryService.currentTopology().coreAddresses( to );
if ( coreAddresses != null )
try
{
CoreAddresses coreAddresses = discoveryService.currentTopology().coreAddresses( to );
outbound.send( coreAddresses.getRaftServer(),
messages.stream().map( this::decorateWithStoreId ).collect( toList() ) );
}
// Drop messages for servers that are missing from the cluster topology;
// discovery service thinks that they are offline, so it's not worth trying to send them anything.
catch ( NoKnownAddressesException e )
{
unknownAddressMonitor.logAttemptToSendToMemberWithNoKnownAddress( to );
}
}

private StoreIdAwareMessage decorateWithStoreId( RaftMessage m )
Expand Down
@@ -0,0 +1,54 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.raft.net;

import java.time.Clock;
import java.util.HashMap;

import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.logging.Log;

public class UnknownAddressMonitor
{
private final Log log;
private final Clock clock;
private final long logThreshold;

private HashMap<CoreMember, Long> throttle = new HashMap<>( );

public UnknownAddressMonitor( Log log, Clock clock, long logThresholdMillis )
{
this.log = log;
this.clock = clock;
this.logThreshold = logThresholdMillis;
}

void logAttemptToSendToMemberWithNoKnownAddress( CoreMember to )
{
long currentTime = clock.millis();
Long lastLogged = throttle.get( to );
if ( lastLogged == null || (currentTime - lastLogged) > logThreshold )
{
log.info( "No address found for member %s, probably because the member has been shut down; " +
"dropping message.", to );
throttle.put( to, currentTime );
}
}
}
Expand Up @@ -33,7 +33,6 @@
import org.neo4j.coreedge.raft.state.follower.FollowerState;
import org.neo4j.coreedge.raft.state.follower.FollowerStates;
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.coreedge.server.StoreId;
import org.neo4j.helpers.collection.FilteringIterable;
import org.neo4j.logging.Log;

Expand Down
Expand Up @@ -217,6 +217,10 @@ public String toString()
public static final Setting<String> cluster_name = setting( "core_edge.cluster_name", STRING, "core-cluster",
illegalValueMessage( "must be a valid cluster name", matches( ANY ) ) );

@Description("Throttle limit for logging unknown cluster member address")
public static final Setting<Long> unknown_address_logging_throttle =
setting( "core_edge.unknown_address_logging_throttle", DURATION, "10000ms" );

@Description( "Maximum transaction batch size for edge servers when applying transactions pulled from core servers." )
@Internal
public static Setting<Integer> edge_transaction_applier_batch_size = setting( "core_edge.edge_transaction_applier_batch_size", INTEGER, "16" );
Expand Down

0 comments on commit 56767b3

Please sign in to comment.