Skip to content

Commit

Permalink
Merge pull request #5629 from thobe/2.3-log-returning-heartbeat
Browse files Browse the repository at this point in the history
Log the first heartbeat after a series of missed
  • Loading branch information
tinwelint committed Nov 25, 2015
2 parents aad82f6 + becd524 commit f9e1f7a
Show file tree
Hide file tree
Showing 72 changed files with 2,305 additions and 392 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@
*/
package org.neo4j.io.pagecache.impl.muninn;

import static org.neo4j.unsafe.impl.internal.dragons.FeatureToggles.flag;

final class CursorPool
{
private static boolean disableCursorPooling = Boolean.getBoolean(
"org.neo4j.io.pagecache.impl.muninn.CursorPool.disableCursorPooling" );
private static boolean disableCursorPooling = flag( CursorPool.class, "disableCursorPooling", false );

private final ThreadLocal<MuninnReadPageCursor> readCursorCache = new MuninnReadPageCursorThreadLocal();
private final ThreadLocal<MuninnWritePageCursor> writeCursorCache = new MuninnWritePageCursorThreadLocal();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.neo4j.unsafe.impl.internal.dragons.MemoryManager;
import org.neo4j.unsafe.impl.internal.dragons.UnsafeUtil;

import static org.neo4j.unsafe.impl.internal.dragons.FeatureToggles.flag;

/**
* The Muninn {@link org.neo4j.io.pagecache.PageCache page cache} implementation.
* <pre>
Expand Down Expand Up @@ -95,7 +97,7 @@
public class MuninnPageCache implements PageCache
{
public static final byte ZERO_BYTE =
(byte) (Boolean.getBoolean( "org.neo4j.io.pagecache.impl.muninn.MuninnPage.brandedZeroByte" )? 0x0F : 0);
(byte) (flag( MuninnPageCache.class, "brandedZeroByte", false ) ? 0x0f : 0);

// Keep this many pages free and ready for use in faulting.
// This will be truncated to be no more than half of the number of pages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

import org.neo4j.io.pagecache.PageSwapper;

import static org.neo4j.unsafe.impl.internal.dragons.FeatureToggles.packageFlag;

/**
* The default PageCacheTracer implementation, that just increments counters.
*/
Expand All @@ -41,8 +43,7 @@ public class DefaultPageCacheTracer implements PageCacheTracer
try
{
// A hidden setting to have pin/unpin monitoring enabled from the start by default.
boolean alwaysEnabled = Boolean.getBoolean(
"org.neo4j.io.pagecache.tracing.tracePinUnpin" );
boolean alwaysEnabled = packageFlag( DefaultPageCacheTracer.class, "tracePinUnpin", false );

MethodType type = MethodType.methodType( PinEvent.class );
MethodHandles.Lookup lookup = MethodHandles.lookup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@

import org.neo4j.helpers.collection.PrefetchingIterator;

import static org.neo4j.unsafe.impl.internal.dragons.FeatureToggles.flag;

/**
* A utility for locating services. This implements the same functionality as <a
* href="http://java.sun.com/javase/6/docs/api/java/util/ServiceLoader.html">
Expand Down Expand Up @@ -117,7 +119,7 @@ public abstract class Service
* Enabling this is useful for debugging why services aren't loaded where you would expect them to.
*/
private static final boolean printServiceLoaderStackTraces =
Boolean.getBoolean( "org.neo4j.helpers.Service.printServiceLoaderStackTraces" );
flag( Service.class, "printServiceLoaderStackTraces", false );

/**
* Designates that a class implements the specified service and should be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.io.pagecache.PagedFile;

import static org.neo4j.unsafe.impl.internal.dragons.FeatureToggles.flag;

class KeyValueWriter implements Closeable
{
private final MetadataCollector metadata;
Expand Down Expand Up @@ -245,7 +247,7 @@ private IllegalStateException illegalState( KeyValueWriter writer, String what )
static abstract class Writer
{
private static final boolean WRITE_TO_PAGE_CACHE =
Boolean.getBoolean( KeyValueWriter.class.getName() + ".WRITE_TO_PAGE_CACHE" );
flag( KeyValueWriter.class, "WRITE_TO_PAGE_CACHE", false );

abstract void write( byte[] data ) throws IOException;

Expand Down
115 changes: 115 additions & 0 deletions community/logging/src/main/java/org/neo4j/logging/async/AsyncLog.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package org.neo4j.logging.async;

import org.neo4j.concurrent.AsyncEventSender;
import org.neo4j.function.Consumer;
import org.neo4j.logging.AbstractLog;
import org.neo4j.logging.Log;
import org.neo4j.logging.Logger;

import static java.util.Objects.requireNonNull;
import static org.neo4j.logging.async.AsyncLogEvent.bulkLogEvent;
import static org.neo4j.logging.async.AsyncLogEvent.logEvent;

public class AsyncLog extends AbstractLog
{
private final Log log;
private final AsyncEventSender<AsyncLogEvent> events;

public AsyncLog( AsyncEventSender<AsyncLogEvent> events, Log log )
{
this.log = requireNonNull( log, "Log" );
this.events = requireNonNull( events, "AsyncEventSender<AsyncLogEvent>" );
}

@Override
public boolean isDebugEnabled()
{
return log.isDebugEnabled();
}

@Override
public Logger debugLogger()
{
return new AsyncLogger( events, log.debugLogger() );
}

@Override
public Logger infoLogger()
{
return new AsyncLogger( events, log.infoLogger() );
}

@Override
public Logger warnLogger()
{
return new AsyncLogger( events, log.warnLogger() );
}

@Override
public Logger errorLogger()
{
return new AsyncLogger( events, log.errorLogger() );
}

@Override
public void bulk( Consumer<Log> consumer )
{
events.send( bulkLogEvent( log, consumer ) );
}

private static class AsyncLogger implements Logger
{
private final Logger logger;
private final AsyncEventSender<AsyncLogEvent> events;

AsyncLogger( AsyncEventSender<AsyncLogEvent> events, Logger logger )
{
this.logger = requireNonNull( logger, "Logger" );
this.events = events;
}

@Override
public void log( String message )
{
events.send( logEvent( logger, message ) );
}

@Override
public void log( String message, Throwable throwable )
{
events.send( logEvent( logger, message, throwable ) );
}

@Override
public void log( String format, Object... arguments )
{
events.send( logEvent( logger, format, arguments ) );
}

@Override
public void bulk( Consumer<Logger> consumer )
{
events.send( bulkLogEvent( logger, consumer ) );
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package org.neo4j.logging.async;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.TimeZone;

import org.neo4j.concurrent.AsyncEvent;
import org.neo4j.function.Consumer;
import org.neo4j.logging.Log;
import org.neo4j.logging.Logger;

import static java.util.Objects.requireNonNull;

public final class AsyncLogEvent extends AsyncEvent
{
static AsyncLogEvent logEvent( Logger logger, String message )
{
return new AsyncLogEvent( logger, requireNonNull( message, "message" ), null );
}

static AsyncLogEvent logEvent( Logger logger, String message, Throwable throwable )
{
return new AsyncLogEvent( logger, requireNonNull( message, "message" ),
requireNonNull( throwable, "Throwable" ) );
}

static AsyncLogEvent logEvent( Logger logger, String format, Object... arguments )
{
return new AsyncLogEvent( logger, requireNonNull( format, "format" ),
arguments == null ? new Object[0] : arguments );
}

static AsyncLogEvent bulkLogEvent( Log log, final Consumer<Log> consumer )
{
requireNonNull( consumer, "Consumer<Log>" );
return new AsyncLogEvent( log, null, new BulkLogger()
{
@Override
void process( long timestamp, Object target )
{
consumer.accept( (Log) target ); // TODO: include timestamp!
}

@Override
public String toString()
{
return "Log.bulkLog( " + consumer + " )";
}
} );
}

static AsyncLogEvent bulkLogEvent( Logger logger, final Consumer<Logger> consumer )
{
requireNonNull( consumer, "Consumer<Logger>" );
return new AsyncLogEvent( logger, null, new BulkLogger()
{
@Override
void process( long timestamp, Object target )
{
consumer.accept( (Logger) target ); // TODO: include timestamp!
}

@Override
public String toString()
{
return "Logger.bulkLog( " + consumer + " )";
}
} );
}

@SuppressWarnings( "StringEquality" )
public void process()
{
if ( parameter == null )
{
((Logger) target).log( "[AsyncLog @ " + timestamp() + "] " + message );
}
else if ( parameter instanceof Throwable )
{
((Logger) target).log( "[AsyncLog @ " + timestamp() + "] " + message, (Throwable) parameter );
}
else if ( parameter instanceof Object[] )
{
((Logger) target).log( "[AsyncLog @ " + timestamp() + "] " + message, (Object[]) parameter );
}
else if ( parameter instanceof BulkLogger )
{
((BulkLogger) parameter).process( timestamp, target );
}
}

private final long timestamp;
private final Object target;
private final String message;
private final Object parameter;

private AsyncLogEvent( Object target, String message, Object parameter )
{
this.target = target;
this.message = message;
this.parameter = parameter;
this.timestamp = System.currentTimeMillis();
}

@Override
public String toString()
{
if ( parameter == null )
{
return "log( @ " + timestamp() + ": \"" + message + "\" )";
}
if ( parameter instanceof Throwable )
{
return "log( @ " + timestamp() + ": \"" + message + "\", " + parameter + " )";
}
if ( parameter instanceof Object[] )
{
return "log( @ " + timestamp() + ": \"" + message + "\", " +
Arrays.toString( (Object[]) parameter ) + " )";
}
if ( parameter instanceof BulkLogger )
{
return parameter.toString();
}
return super.toString();
}

private String timestamp()
{
return TIMESTAMP.format( timestamp );
}

private static abstract class BulkLogger
{
abstract void process( long timestamp, Object target );
}

private static final ThreadLocalFormat TIMESTAMP = new ThreadLocalFormat();

private static class ThreadLocalFormat extends ThreadLocal<DateFormat>
{
String format( long timestamp )
{
return get().format( new Date( timestamp ) );
}

@Override
protected DateFormat initialValue()
{
SimpleDateFormat format = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss.SSSZ" );
format.setTimeZone( TimeZone.getTimeZone( "UTC" ) );
return format;
}
}
}

0 comments on commit f9e1f7a

Please sign in to comment.