Skip to content

Commit

Permalink
Moved to CappedLogger
Browse files Browse the repository at this point in the history
Moved to the existing implementation of CappedLogger in favour
of the proprietary back-off implementation.
  • Loading branch information
jimwebber committed Sep 26, 2016
1 parent ca796fa commit bf7168b
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 149 deletions.
Expand Up @@ -212,7 +212,7 @@ private static class Filter
private boolean hasCountLimit;
private int countLimit;
private long timeLimitMillis;
private Clock clock;
private final Clock clock;
private boolean filterDuplicates;

// Atomically updated
Expand Down
Expand Up @@ -24,104 +24,34 @@
import java.util.concurrent.ConcurrentHashMap;

import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.kernel.impl.util.CappedLogger;
import org.neo4j.logging.Log;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

public class UnknownAddressMonitor
{
private final Log log;
private final Clock clock;
private final long initialTimeoutMs;
private Map<MemberId,PeriodicLogger> loggers = new ConcurrentHashMap<>();
private final long timeLimitMs;
private Map<MemberId,CappedLogger> loggers = new ConcurrentHashMap<>();

public UnknownAddressMonitor( Log log, Clock clock, long initialTimeoutMs )
public UnknownAddressMonitor( Log log, Clock clock, long timeLimitMs )
{
this.log = log;
this.clock = clock;
this.initialTimeoutMs = initialTimeoutMs;
}

public long logAttemptToSendToMemberWithNoKnownAddress( MemberId to )
{
PeriodicLogger logger = loggers.get( to );
if ( logger == null )
{
logger = new PeriodicLogger( clock, log );
loggers.put( to, logger );
}
return logger.attemptLog( to );
}

private static class PeriodicLogger
{
private final Clock clock;
private final Log log;
private long numberOfAttemps;
private final Penalty penalty = new Penalty();

PeriodicLogger( Clock clock, Log log )
{
this.clock = clock;
this.log = log;
}

long attemptLog( MemberId to )
{
numberOfAttemps++;

if ( clock.millis() > penalty.blockedUntil() )
{
penalty.cancel();
}

if ( shouldLog() )
{
log.info( "No address found for member %s, probably because the member has been shut down; " +
"dropped %d message(s) over last %d milliseconds", to, numberOfAttemps, clock.millis() -
penalty.blockedUntil() );

numberOfAttemps = 0;
}

penalty.increase();

return penalty.blockedUntil();
}

private boolean shouldLog()
{
return clock.millis() >= penalty.blockedUntil();
}
this.timeLimitMs = timeLimitMs;
}

private static class Penalty
public void logAttemptToSendToMemberWithNoKnownAddress( MemberId to )
{
private static final long MAX_PENALTY = 60 * 1000;
private long currentPenalty = 0;

void increase()
{
if ( currentPenalty == 0 )
{
currentPenalty = 10_000L;
}
else
{
currentPenalty = currentPenalty * 2;
if ( currentPenalty > MAX_PENALTY )
{
currentPenalty = MAX_PENALTY;
}
}
}

long blockedUntil()
{
return currentPenalty;
}

public void cancel()
CappedLogger cappedLogger = loggers.get( to );
if ( cappedLogger == null )
{
currentPenalty = 0;
cappedLogger = new CappedLogger( log );
cappedLogger.setTimeLimit( timeLimitMs, MILLISECONDS, clock );
loggers.put( to, cappedLogger );
}
cappedLogger.info(String.format("No address found for %s, probably because the member has been shut down.", to) );
}
}
Expand Up @@ -21,16 +21,13 @@

import org.junit.Test;

import java.util.ArrayList;

import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.logging.Log;
import org.neo4j.time.Clocks;
import org.neo4j.time.FakeClock;

import static java.util.concurrent.TimeUnit.HOURS;
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
Expand All @@ -46,93 +43,58 @@ public void shouldLogFirstFailure() throws Exception
{
// given
Log log = mock( Log.class );
UnknownAddressMonitor logger = new UnknownAddressMonitor( log, Clocks.fakeClock(), 10000 );
UnknownAddressMonitor logger = new UnknownAddressMonitor( log, testClock(), 100 );

// when
logger.logAttemptToSendToMemberWithNoKnownAddress( member( 0 ) );
MemberId to = member( 0 );
logger.logAttemptToSendToMemberWithNoKnownAddress( to );

// then
verify( log ).info( anyString(), eq( member( 0 ) ), anyLong(), anyLong() );
verify( log ).info( format( "No address found for %s, probably because the member has been shut down.", to ) );
}

private FakeClock testClock()
{
return Clocks.fakeClock( 1_000_000, MILLISECONDS );
}

@Test
public void shouldThrottleLogging() throws Exception
{
// given
Log log = mock( Log.class );
FakeClock clock = Clocks.fakeClock();
UnknownAddressMonitor logger = new UnknownAddressMonitor( log, clock, 10000 );
FakeClock clock = testClock();
UnknownAddressMonitor logger = new UnknownAddressMonitor( log, clock, 1000 );
MemberId to = member( 0 );

// when
logger.logAttemptToSendToMemberWithNoKnownAddress( member( 0 ) );
logger.logAttemptToSendToMemberWithNoKnownAddress( to );
clock.forward( 1, MILLISECONDS );
logger.logAttemptToSendToMemberWithNoKnownAddress( member( 0 ) );
logger.logAttemptToSendToMemberWithNoKnownAddress( to );

// then
verify( log, times( 1 ) ).info( anyString(), eq( member( 0 ) ), anyLong(), anyLong() );
verify( log, times( 1 ) )
.info( format( "No address found for %s, probably because the member has been shut " + "down.", to ) );
}

@Test
public void shouldResumeLoggingAfterQuietPeriod() throws Exception
{
// given
Log log = mock( Log.class );
FakeClock clock = Clocks.fakeClock();
UnknownAddressMonitor logger = new UnknownAddressMonitor( log, clock, 10000 );
FakeClock clock = testClock();
UnknownAddressMonitor logger = new UnknownAddressMonitor( log, clock, 1000 );
MemberId to = member( 0 );

// when
logger.logAttemptToSendToMemberWithNoKnownAddress( member( 0 ) );
logger.logAttemptToSendToMemberWithNoKnownAddress( to );
clock.forward( 20001, MILLISECONDS );
logger.logAttemptToSendToMemberWithNoKnownAddress( member( 0 ) );
logger.logAttemptToSendToMemberWithNoKnownAddress( to );
clock.forward( 80001, MILLISECONDS );
logger.logAttemptToSendToMemberWithNoKnownAddress( member( 0 ) );

// then
verify( log, times( 3 ) ).info( anyString(), eq( member( 0 ) ), anyLong(), anyLong() );
}

@Test
public void shouldIncreaseThrottlingWhenClientFloodsLogUpToOneMinute() throws Exception
{
// given
Log log = mock( Log.class );
FakeClock clock = Clocks.fakeClock();
UnknownAddressMonitor logger = new UnknownAddressMonitor( log, clock, 10000 );

// when
ArrayList<Long> tryAfter = new ArrayList<>();
for ( int i = 0; i < 10; i++ )
{
tryAfter.add( logger.logAttemptToSendToMemberWithNoKnownAddress( member( 0 ) ) );
clock.forward( 1, SECONDS );
}

// then
assertEquals( 10_000L, (long) tryAfter.get( 0 ) );
assertEquals( 20_000L, (long) tryAfter.get( 1 ) );
assertEquals( 40_000L, (long) tryAfter.get( 2 ) );
assertEquals( 60_000L, (long) tryAfter.get( 3 ) );
assertEquals( 60_000L, (long) tryAfter.get( 4 ) );
assertEquals( 60_000L, (long) tryAfter.get( 9 ) );
}

@Test
public void shouldReduceThrottlingWhenClientCallRateDropsOff() throws Exception
{
// given
Log log = mock( Log.class );
FakeClock clock = Clocks.fakeClock();
UnknownAddressMonitor logger = new UnknownAddressMonitor( log, clock, 10000 );

// when
for ( int i = 0; i < 100; i++ ) // aggravate the logger
{
logger.logAttemptToSendToMemberWithNoKnownAddress( member( 0 ) );
}

clock.forward( 1, HOURS );
logger.logAttemptToSendToMemberWithNoKnownAddress( to );

// then
assertEquals( 10_000L, logger.logAttemptToSendToMemberWithNoKnownAddress( member( 0 ) ) );
verify( log, times( 3 ) )
.info( format( "No address found for %s, probably because the member has been shut " + "down.", to ) );
}
}

0 comments on commit bf7168b

Please sign in to comment.