diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/cache/MeasureDoNothing.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/cache/MeasureDoNothing.java index d9f6ca96884f..09ca496eb3bc 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/cache/MeasureDoNothing.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/cache/MeasureDoNothing.java @@ -23,20 +23,69 @@ public class MeasureDoNothing extends Thread { + public interface Monitor + { + default void started() + { + } + + void blocked( long millis ); + + default void stopped() + { + } + } + + public static Monitor loggingMonitor( Log log ) + { + return new Monitor() + { + @Override + public void started() + { + log.debug( "GC Monitor started. " ); + } + + @Override + public void stopped() + { + log.debug( "GC Monitor stopped. " ); + } + + @Override + public void blocked( long millis ) + { + log.warn( String.format( "GC Monitor: Application threads blocked for %dms.", millis ) ); + } + }; + } + + public static class CollectingMonitor implements Monitor + { + private long gcBlockTime; + + @Override + public void blocked( long millis ) + { + gcBlockTime += millis; + } + + public long getGcBlockTime() + { + return gcBlockTime; + } + } + private volatile boolean measure = true; + private final Monitor monitor; private final long TIME_TO_WAIT; private final long NOTIFICATION_THRESHOLD; - private final Log log; - public MeasureDoNothing( String threadName, Log log, long timeToWait, long pauseNotificationThreshold ) + public MeasureDoNothing( String threadName, Monitor monitor, long timeToWait, long pauseNotificationThreshold ) { super( threadName ); - if ( log == null ) - { - throw new IllegalArgumentException( "Null message log" ); - } - this.log = log; + this.monitor = monitor; this.TIME_TO_WAIT = timeToWait; this.NOTIFICATION_THRESHOLD = pauseNotificationThreshold + timeToWait; setDaemon( true ); @@ -45,7 +94,7 @@ public MeasureDoNothing( String threadName, Log log, long timeToWait, long pause @Override public synchronized void run() { - log.debug( "GC Monitor started. " ); + monitor.started(); while ( measure ) { long start = System.nanoTime(); @@ -61,10 +110,10 @@ public synchronized void run() if ( time > NOTIFICATION_THRESHOLD ) { long blockTime = time - TIME_TO_WAIT; - log.warn( String.format( "GC Monitor: Application threads blocked for %dms.", blockTime ) ); + monitor.blocked( blockTime ); } } - log.debug( "GC Monitor stopped. " ); + monitor.stopped(); } public synchronized void stopMeasuring() diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/cache/MonitorGc.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/cache/MonitorGc.java index e072e9723728..04f75fc99e37 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/cache/MonitorGc.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/cache/MonitorGc.java @@ -28,6 +28,7 @@ import static org.neo4j.kernel.configuration.Settings.DURATION; import static org.neo4j.kernel.configuration.Settings.setting; +import static org.neo4j.kernel.impl.cache.MeasureDoNothing.loggingMonitor; public class MonitorGc implements Lifecycle { @@ -57,7 +58,8 @@ public void init() throws Throwable @Override public void start() throws Throwable { - monitorGc = new MeasureDoNothing( "neo4j.PauseMonitor", log, config.get( Configuration.gc_monitor_wait_time ).toMillis(), + monitorGc = new MeasureDoNothing( "neo4j.PauseMonitor", loggingMonitor( log ), + config.get( Configuration.gc_monitor_wait_time ).toMillis(), config.get( Configuration.gc_monitor_threshold ).toMillis() ); monitorGc.start(); } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/OnDemandDetailsExecutionMonitor.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/OnDemandDetailsExecutionMonitor.java index f61cf5e48abf..e7ce919a7c40 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/OnDemandDetailsExecutionMonitor.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/OnDemandDetailsExecutionMonitor.java @@ -27,9 +27,14 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.LongFunction; import org.neo4j.graphdb.DependencyResolver; +import org.neo4j.helpers.Format; import org.neo4j.helpers.collection.Pair; +import org.neo4j.kernel.impl.cache.MeasureDoNothing; +import org.neo4j.kernel.impl.cache.MeasureDoNothing.CollectingMonitor; +import org.neo4j.kernel.impl.util.OsBeanUtil; import org.neo4j.unsafe.impl.batchimport.stats.DetailLevel; import org.neo4j.unsafe.impl.batchimport.stats.Keys; import org.neo4j.unsafe.impl.batchimport.stats.Stat; @@ -40,6 +45,7 @@ import static java.lang.System.currentTimeMillis; import static org.neo4j.helpers.Format.bytes; +import static org.neo4j.helpers.Format.duration; /** * Sits in the background and collect stats about stages that are executing. @@ -62,6 +68,9 @@ public class OnDemandDetailsExecutionMonitor implements ExecutionMonitor private final List details = new ArrayList<>(); private final PrintStream out; private final Map> actions = new HashMap<>(); + private final CollectingMonitor gcBlockTime = new CollectingMonitor(); + private final MeasureDoNothing gcMonitor; + private StageDetails current; private boolean printDetailsOnDone; @@ -69,6 +78,8 @@ public OnDemandDetailsExecutionMonitor( PrintStream out ) { this.out = out; this.actions.put( "i", Pair.of( "Print more detailed information", this::printDetails ) ); + this.actions.put( "gc", Pair.of( "Trigger GC", this::triggerGC ) ); + this.gcMonitor = new MeasureDoNothing( "Importer GC monitor", gcBlockTime, 100, 200 ); } @Override @@ -77,12 +88,13 @@ public void initialize( DependencyResolver dependencyResolver ) out.println( "Interactive command list (end with ENTER):" ); actions.forEach( ( key, action ) -> out.println( " " + key + ": " + action.first() ) ); out.println(); + gcMonitor.start(); } @Override public void start( StageExecution execution ) { - details.add( current = new StageDetails( execution ) ); + details.add( current = new StageDetails( execution, gcBlockTime ) ); } @Override @@ -98,12 +110,13 @@ public void done( long totalTimeMillis, String additionalInformation ) { printDetails(); } + gcMonitor.stopMeasuring(); } @Override public long nextCheckTime() { - return currentTimeMillis() + 200; + return currentTimeMillis() + 500; } @Override @@ -121,10 +134,21 @@ private void printDetails() stageDetails.print( out ); } + out.println( "Environment information:"); + out.println( " Free physical memory: " + bytes( OsBeanUtil.getFreePhysicalMemory() ) ); + out.println( " Max VM memory: " + bytes( Runtime.getRuntime().maxMemory() ) ); + out.println( " Free VM memory: " + bytes( Runtime.getRuntime().freeMemory() ) ); + out.println( " GC block time: " + duration( gcBlockTime.getGcBlockTime() ) ); + // Make sure that if user is interested in details then also print the entire details set when import is done printDetailsOnDone = true; } + private void triggerGC() + { + System.gc(); + } + private void reactToUserInput() { try @@ -150,12 +174,22 @@ private void reactToUserInput() private static class StageDetails { private final StageExecution execution; + private final long startTime; + private final CollectingMonitor gcBlockTime; + private final long baseGcBlockTime; + private long memoryUsage; private long ioThroughput; + private long totalTimeMillis; + private long stageGcBlockTime; + private long doneBatches; - StageDetails( StageExecution execution ) + StageDetails( StageExecution execution, CollectingMonitor gcBlockTime ) { this.execution = execution; + this.gcBlockTime = gcBlockTime; + this.startTime = currentTimeMillis(); + this.baseGcBlockTime = gcBlockTime.getGcBlockTime(); } void print( PrintStream out ) @@ -164,20 +198,28 @@ void print( PrintStream out ) StringBuilder builder = new StringBuilder(); SpectrumExecutionMonitor.printSpectrum( builder, execution, SpectrumExecutionMonitor.DEFAULT_WIDTH, DetailLevel.NO ); out.println( builder.toString() ); - if ( memoryUsage > 0 ) - { - out.println( "Memory usage: " + bytes( memoryUsage ) ); - } - if ( ioThroughput > 0 ) - { - out.println( "I/O throughput: " + bytes( ioThroughput ) + "/s" ); - } + printValue( out, memoryUsage, "Memory usage", Format::bytes ); + printValue( out, ioThroughput, "I/O throughput", value -> bytes( value ) + "/s" ); + printValue( out, stageGcBlockTime, "GC block time", Format::duration ); + printValue( out, totalTimeMillis, "Duration", Format::duration ); + printValue( out, doneBatches, "Done batches", String::valueOf ); out.println(); } + private static void printValue( PrintStream out, long value, String description, LongFunction toStringConverter ) + { + if ( value > 0 ) + { + out.println( description + ": " + toStringConverter.apply( value ) ); + } + } + void collect() { + totalTimeMillis = currentTimeMillis() - startTime; + stageGcBlockTime = gcBlockTime.getGcBlockTime() - baseGcBlockTime; + long lastDoneBatches = doneBatches; for ( Step step : execution.steps() ) { StepStats stats = step.stats(); @@ -191,7 +233,9 @@ void collect() { ioThroughput = ioStat.asLong(); } + lastDoneBatches = stats.stat( Keys.done_batches ).asLong(); } + doneBatches = lastDoneBatches; } } }