Skip to content

Commit

Permalink
Introduce CappedLogger as a replacement for CappedOperation
Browse files Browse the repository at this point in the history
This fixes a problem where the SlaveUpdatePuller only logs every 10th exception, which doesn't really make any sense.
The TransactionPropagator has also been changed to use the CappedLogger, which fixes similar weird logging behaviour in that code.
  • Loading branch information
chrisvest committed Dec 29, 2015
1 parent ada6fb5 commit cb58e4e
Show file tree
Hide file tree
Showing 8 changed files with 871 additions and 71 deletions.
Expand Up @@ -26,6 +26,7 @@


import org.neo4j.graphdb.config.Setting; import org.neo4j.graphdb.config.Setting;
import org.neo4j.graphdb.factory.GraphDatabaseSettings; import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.helpers.Clock;
import org.neo4j.helpers.UTF8; import org.neo4j.helpers.UTF8;
import org.neo4j.helpers.collection.Visitor; import org.neo4j.helpers.collection.Visitor;
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
Expand All @@ -44,7 +45,7 @@
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.util.ArrayQueueOutOfOrderSequence; import org.neo4j.kernel.impl.util.ArrayQueueOutOfOrderSequence;
import org.neo4j.kernel.impl.util.Bits; import org.neo4j.kernel.impl.util.Bits;
import org.neo4j.kernel.impl.util.CappedOperation; import org.neo4j.kernel.impl.util.CappedLogger;
import org.neo4j.kernel.impl.util.OutOfOrderSequence; import org.neo4j.kernel.impl.util.OutOfOrderSequence;
import org.neo4j.kernel.impl.util.StringLogger; import org.neo4j.kernel.impl.util.StringLogger;
import org.neo4j.kernel.impl.util.StringLogger.LineLogger; import org.neo4j.kernel.impl.util.StringLogger.LineLogger;
Expand All @@ -53,7 +54,6 @@
import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.concurrent.TimeUnit.SECONDS;
import static org.neo4j.io.pagecache.PagedFile.PF_EXCLUSIVE_LOCK; import static org.neo4j.io.pagecache.PagedFile.PF_EXCLUSIVE_LOCK;
import static org.neo4j.io.pagecache.PagedFile.PF_SHARED_LOCK; import static org.neo4j.io.pagecache.PagedFile.PF_SHARED_LOCK;
import static org.neo4j.kernel.impl.util.CappedOperation.time;


/** /**
* This class contains the references to the "NodeStore,RelationshipStore, * This class contains the references to the "NodeStore,RelationshipStore,
Expand Down Expand Up @@ -151,7 +151,7 @@ public static boolean isStorePresent( FileSystemAbstraction fs, Config config )
private final OutOfOrderSequence lastClosedTx = new ArrayQueueOutOfOrderSequence( -1, 200 ); private final OutOfOrderSequence lastClosedTx = new ArrayQueueOutOfOrderSequence( -1, 200 );


private final int relGrabSize; private final int relGrabSize;
private final CappedOperation<Void> transactionCloseWaitLogger; private final CappedLogger transactionCloseWaitLogger;


public NeoStore( File fileName, Config conf, IdGeneratorFactory idGeneratorFactory, PageCache pageCache, public NeoStore( File fileName, Config conf, IdGeneratorFactory idGeneratorFactory, PageCache pageCache,
FileSystemAbstraction fileSystemAbstraction, final StringLogger stringLogger, FileSystemAbstraction fileSystemAbstraction, final StringLogger stringLogger,
Expand All @@ -171,16 +171,8 @@ public NeoStore( File fileName, Config conf, IdGeneratorFactory idGeneratorFacto
this.relGroupStore = relGroupStore; this.relGroupStore = relGroupStore;
this.counts = counts; this.counts = counts;
this.relGrabSize = conf.get( Configuration.relationship_grab_size ); this.relGrabSize = conf.get( Configuration.relationship_grab_size );
this.transactionCloseWaitLogger = new CappedOperation<Void>( time( 30, SECONDS ) ) this.transactionCloseWaitLogger = new CappedLogger( stringLogger );
{ transactionCloseWaitLogger.setTimeLimit( 30, SECONDS, Clock.SYSTEM_CLOCK );
@Override
protected void triggered( Void event )
{
stringLogger.info( format(
"Waiting for all transactions to close...%n committed: %s%n committing: %s%n closed: %s",
highestCommittedTransaction.get(), lastCommittingTxField, lastClosedTx ) );
}
};
counts.setInitializer( new DataInitializer<CountsAccessor.Updater>() counts.setInitializer( new DataInitializer<CountsAccessor.Updater>()
{ {
@Override @Override
Expand Down Expand Up @@ -1001,8 +993,11 @@ public boolean closedTransactionIdIsOnParWithOpenedTransactionId()
{ {
boolean onPar = lastClosedTx.getHighestGapFreeNumber() == lastCommittingTxField.get(); boolean onPar = lastClosedTx.getHighestGapFreeNumber() == lastCommittingTxField.get();
if ( !onPar ) if ( !onPar )
{ // Trigger some logging here, max logged every 30 secs or so {
transactionCloseWaitLogger.event( null ); // Trigger some logging here, logging at most one message every 30 seconds
transactionCloseWaitLogger.info( format(
"Waiting for all transactions to close...%n committed: %s%n committing: %s%n closed: %s",
highestCommittedTransaction.get(), lastCommittingTxField, lastClosedTx ), null );
} }
return onPar; return onPar;
} }
Expand Down
@@ -0,0 +1,303 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.util;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

import org.neo4j.helpers.Clock;

/**
* A CappedLogger will accept log messages, unless they occur "too much", in which case the messages will be ignored
* until some time passes, or the logger is reset.
*
* This class does not extend {@link StringLogger} because that API is a mess, and has been replaced in newer versions
* of Neo4j.
*
* It is also desirable to be aware that log capping is taking place, so we don't mistakenly lose log output due to
* output capping.
*
* By default, the CappedLogger does not filter out any messages. Filtering can be configured at any time using the
* "set" and "unset" methods.
*/
public class CappedLogger
{
private final StringLogger delegate;
// We use the filter indirection so we can atomically update the configuration without locking
private volatile Filter filter;

public CappedLogger( StringLogger delegate )
{
filter = new Filter();
this.delegate = delegate;
}

public void debug( String msg, Throwable cause )
{
if ( filter.accept( msg, cause ) )
{
delegate.debug( msg, cause );
}
}

public void info( String msg, Throwable cause )
{
if ( filter.accept( msg, cause ) )
{
delegate.info( msg, cause );
}
}

public void warn( String msg, Throwable cause )
{
if ( filter.accept( msg, cause ) )
{
delegate.warn( msg, cause );
}
}

public void error( String msg, Throwable cause )
{
if ( filter.accept( msg, cause ) )
{
delegate.error( msg, cause );
}
}

/**
* Reset the filtering state of this CappedLogger. This usually means that something good happened, and that all
* filtering states that grows towards a state where the log messages are filtered out, should calm down and reset.
*
* Specifically, this means that the counter in the count limit should return to zero, and that the time limit and
* duplicate filter should forget about past messages.
*/
public void reset()
{
filter = filter.reset();
}

/**
* Set a limit to the amount of logging that this logger will accept between resets.
* @param limit The number of log messages that the CappedLogger will let through in between resets.
*/
public void setCountLimit( int limit )
{
if ( limit < 1 )
{
throw new IllegalArgumentException( "The count limit must be positive" );
}
filter = filter.setCountLimit( limit );
}

/**
* Unset the count limit, and allow any number of messages through, provided other limits don't apply.
*/
public void unsetCountLimit()
{
filter = filter.unsetCountLimit();
}

/**
* Set a time based limit to the amount of logging that this logger will accept between resets. With a time limit
* of 1 second, for instance, then the logger will log at most one message per second.
* @param time The time amount, must be positive.
* @param unit The time unit.
* @param clock The clock to use for reading the current time when checking this limit.
*/
public void setTimeLimit( long time, TimeUnit unit, Clock clock )
{
if ( time < 1 )
{
throw new IllegalArgumentException( "The time limit must be positive" );
}
if ( unit == null )
{
throw new IllegalArgumentException( "The time unit cannot be null" );
}
if ( clock == null )
{
throw new IllegalArgumentException( "The clock used for time limiting cannot be null" );
}
filter = filter.setTimeLimit( time, unit, clock );
}

/**
* Unset the time limit, and allow any number of messages through, as often as possible, provided other limits
* don't apply.
*/
public void unsetTimeLimit()
{
filter = filter.unsetTimeLimit();
}

/**
* Enable or disable filtering of duplicate messages. This filtering only looks at the previous message, so a
* sequence of identical messages will only have that message logged once, but a sequence of two alternating
* messages will get logged in full.
* @param enabled {@code true} if duplicates should be filtered, {@code false} if they should not.
*/
public void setDuplicateFilterEnabled( boolean enabled )
{
filter = filter.setDuplicateFilterEnabled( enabled );
}

private static class Filter
{
private static final AtomicIntegerFieldUpdater<Filter> CURRENT_COUNT =
AtomicIntegerFieldUpdater.newUpdater( Filter.class, "currentCount" );
private static final AtomicLongFieldUpdater<Filter> LAST_CHECK =
AtomicLongFieldUpdater.newUpdater( Filter.class, "lastCheck" );

// The thread-safety of these normal fields are guarded by the volatile reads and writes to the
// CappedLogger.filter field.
private boolean hasCountLimit;
private int countLimit;
private long timeLimitMillis;
private Clock clock;
private boolean filterDuplicates;

// Atomically updated
private volatile int currentCount;
private volatile long lastCheck;

// Read and updated together; guarded by synchronized(this) in checkDuplicate()
private String lastMessage;
private Throwable lastException;

private Filter()
{
this( false, 0, 0, 0, 0, null, false );
}

private Filter(
boolean hasCountLimit,
int countLimit,
int currentCount,
long timeLimitMillis,
long lastCheck,
Clock clock, boolean filterDuplicates )
{
this.hasCountLimit = hasCountLimit;
this.countLimit = countLimit;
this.currentCount = currentCount;
this.timeLimitMillis = timeLimitMillis;
this.lastCheck = lastCheck;
this.clock = clock;
this.filterDuplicates = filterDuplicates;
}

public Filter setCountLimit( int limit )
{
return new Filter( true, limit, currentCount, timeLimitMillis, lastCheck, clock, filterDuplicates );
}

public boolean accept( String msg, Throwable cause )
{
return (!hasCountLimit || (getAndIncrementCurrentCount() < countLimit))
&& (clock == null || !checkExpiredAndSetLastCheckTime())
&& (!filterDuplicates || checkDuplicate( msg, cause ));
}

public int getAndIncrementCurrentCount()
{
return CURRENT_COUNT.getAndIncrement( this );
}

private boolean checkExpiredAndSetLastCheckTime()
{
long now = clock.currentTimeMillis();
long check = this.lastCheck;
if ( check > now - timeLimitMillis )
{
return true;
}
while ( !LAST_CHECK.compareAndSet( this, check, now ) )
{
check = lastCheck;
if ( check > now )
{
break;
}
}
return false;
}

private synchronized boolean checkDuplicate( String msg, Throwable cause )
{
String last = lastMessage;
Throwable exc = lastException;
if ( stringEqual( last, msg )
&& ( exc == null ? cause == null : sameClass( cause, exc ) && sameMsg( cause, exc ) ) )
{
// Duplicate! Filter it out.
return false;
}
else
{
// Distinct! Update and let it through.
lastMessage = msg;
lastException = cause;
return true;
}
}

private boolean sameMsg( Throwable cause, Throwable exc )
{
return stringEqual( exc.getMessage(), cause.getMessage() );
}

private boolean stringEqual( String a, String b )
{
return a == null ? b == null : a.equals( b );
}

private boolean sameClass( Throwable cause, Throwable exc )
{
return exc.getClass().equals( cause.getClass() );
}

public Filter reset()
{
return new Filter( hasCountLimit, countLimit, 0, timeLimitMillis, 0, clock, filterDuplicates );
}

public Filter unsetCountLimit()
{
return new Filter( false, 0, currentCount, timeLimitMillis, lastCheck, clock, filterDuplicates );
}

public Filter setTimeLimit( long time, TimeUnit unit, Clock clock )
{
return new Filter(
hasCountLimit, countLimit, currentCount, unit.toMillis( time ), lastCheck, clock, filterDuplicates );
}

public Filter unsetTimeLimit()
{
return new Filter( hasCountLimit, countLimit, currentCount, 0, lastCheck, null, filterDuplicates );
}

public Filter setDuplicateFilterEnabled( boolean enabled )
{
return new Filter( hasCountLimit, countLimit, currentCount, timeLimitMillis, lastCheck, clock, enabled );
}
}
}
Expand Up @@ -123,6 +123,11 @@ public static StringLogger wrap( Writer writer )
} }


public static StringLogger wrap( final StringBuffer target ) public static StringLogger wrap( final StringBuffer target )
{
return wrap( target, false );
}

public static StringLogger wrap( final StringBuffer target, boolean enableDebug )
{ {
return new ActualStringLogger( new PrintWriter( new Writer() return new ActualStringLogger( new PrintWriter( new Writer()
{ {
Expand Down Expand Up @@ -188,7 +193,7 @@ public void close() throws IOException
{ {
// do nothing // do nothing
} }
} ), false ); } ), enableDebug );
} }


public static StringLogger tee( final StringLogger logger1, final StringLogger logger2 ) public static StringLogger tee( final StringLogger logger1, final StringLogger logger2 )
Expand Down

0 comments on commit cb58e4e

Please sign in to comment.