Skip to content

Commit

Permalink
Some more environment information in details printing
Browse files Browse the repository at this point in the history
  • Loading branch information
tinwelint committed Dec 4, 2017
1 parent 505c044 commit 2858e69
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 22 deletions.
Expand Up @@ -23,20 +23,69 @@


public class MeasureDoNothing extends Thread public class MeasureDoNothing extends Thread
{ {
public interface Monitor
{
default void started()
{
}

void blocked( long millis );

default void stopped()
{
}
}

public static Monitor loggingMonitor( Log log )
{
return new Monitor()
{
@Override
public void started()
{
log.debug( "GC Monitor started. " );
}

@Override
public void stopped()
{
log.debug( "GC Monitor stopped. " );
}

@Override
public void blocked( long millis )
{
log.warn( String.format( "GC Monitor: Application threads blocked for %dms.", millis ) );
}
};
}

public static class CollectingMonitor implements Monitor
{
private long gcBlockTime;

@Override
public void blocked( long millis )
{
gcBlockTime += millis;
}

public long getGcBlockTime()
{
return gcBlockTime;
}
}

private volatile boolean measure = true; private volatile boolean measure = true;


private final Monitor monitor;
private final long TIME_TO_WAIT; private final long TIME_TO_WAIT;
private final long NOTIFICATION_THRESHOLD; private final long NOTIFICATION_THRESHOLD;
private final Log log;


public MeasureDoNothing( String threadName, Log log, long timeToWait, long pauseNotificationThreshold ) public MeasureDoNothing( String threadName, Monitor monitor, long timeToWait, long pauseNotificationThreshold )
{ {
super( threadName ); super( threadName );
if ( log == null ) this.monitor = monitor;
{
throw new IllegalArgumentException( "Null message log" );
}
this.log = log;
this.TIME_TO_WAIT = timeToWait; this.TIME_TO_WAIT = timeToWait;
this.NOTIFICATION_THRESHOLD = pauseNotificationThreshold + timeToWait; this.NOTIFICATION_THRESHOLD = pauseNotificationThreshold + timeToWait;
setDaemon( true ); setDaemon( true );
Expand All @@ -45,7 +94,7 @@ public MeasureDoNothing( String threadName, Log log, long timeToWait, long pause
@Override @Override
public synchronized void run() public synchronized void run()
{ {
log.debug( "GC Monitor started. " ); monitor.started();
while ( measure ) while ( measure )
{ {
long start = System.nanoTime(); long start = System.nanoTime();
Expand All @@ -61,10 +110,10 @@ public synchronized void run()
if ( time > NOTIFICATION_THRESHOLD ) if ( time > NOTIFICATION_THRESHOLD )
{ {
long blockTime = time - TIME_TO_WAIT; long blockTime = time - TIME_TO_WAIT;
log.warn( String.format( "GC Monitor: Application threads blocked for %dms.", blockTime ) ); monitor.blocked( blockTime );
} }
} }
log.debug( "GC Monitor stopped. " ); monitor.stopped();
} }


public synchronized void stopMeasuring() public synchronized void stopMeasuring()
Expand Down
Expand Up @@ -28,6 +28,7 @@


import static org.neo4j.kernel.configuration.Settings.DURATION; import static org.neo4j.kernel.configuration.Settings.DURATION;
import static org.neo4j.kernel.configuration.Settings.setting; import static org.neo4j.kernel.configuration.Settings.setting;
import static org.neo4j.kernel.impl.cache.MeasureDoNothing.loggingMonitor;


public class MonitorGc implements Lifecycle public class MonitorGc implements Lifecycle
{ {
Expand Down Expand Up @@ -57,7 +58,8 @@ public void init() throws Throwable
@Override @Override
public void start() throws Throwable public void start() throws Throwable
{ {
monitorGc = new MeasureDoNothing( "neo4j.PauseMonitor", log, config.get( Configuration.gc_monitor_wait_time ).toMillis(), monitorGc = new MeasureDoNothing( "neo4j.PauseMonitor", loggingMonitor( log ),
config.get( Configuration.gc_monitor_wait_time ).toMillis(),
config.get( Configuration.gc_monitor_threshold ).toMillis() ); config.get( Configuration.gc_monitor_threshold ).toMillis() );
monitorGc.start(); monitorGc.start();
} }
Expand Down
Expand Up @@ -27,9 +27,14 @@
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.function.LongFunction;


import org.neo4j.graphdb.DependencyResolver; import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.helpers.Format;
import org.neo4j.helpers.collection.Pair; import org.neo4j.helpers.collection.Pair;
import org.neo4j.kernel.impl.cache.MeasureDoNothing;
import org.neo4j.kernel.impl.cache.MeasureDoNothing.CollectingMonitor;
import org.neo4j.kernel.impl.util.OsBeanUtil;
import org.neo4j.unsafe.impl.batchimport.stats.DetailLevel; import org.neo4j.unsafe.impl.batchimport.stats.DetailLevel;
import org.neo4j.unsafe.impl.batchimport.stats.Keys; import org.neo4j.unsafe.impl.batchimport.stats.Keys;
import org.neo4j.unsafe.impl.batchimport.stats.Stat; import org.neo4j.unsafe.impl.batchimport.stats.Stat;
Expand All @@ -40,6 +45,7 @@
import static java.lang.System.currentTimeMillis; import static java.lang.System.currentTimeMillis;


import static org.neo4j.helpers.Format.bytes; import static org.neo4j.helpers.Format.bytes;
import static org.neo4j.helpers.Format.duration;


/** /**
* Sits in the background and collect stats about stages that are executing. * Sits in the background and collect stats about stages that are executing.
Expand All @@ -62,13 +68,18 @@ public class OnDemandDetailsExecutionMonitor implements ExecutionMonitor
private final List<StageDetails> details = new ArrayList<>(); private final List<StageDetails> details = new ArrayList<>();
private final PrintStream out; private final PrintStream out;
private final Map<String,Pair<String,Runnable>> actions = new HashMap<>(); private final Map<String,Pair<String,Runnable>> actions = new HashMap<>();
private final CollectingMonitor gcBlockTime = new CollectingMonitor();
private final MeasureDoNothing gcMonitor;

private StageDetails current; private StageDetails current;
private boolean printDetailsOnDone; private boolean printDetailsOnDone;


public OnDemandDetailsExecutionMonitor( PrintStream out ) public OnDemandDetailsExecutionMonitor( PrintStream out )
{ {
this.out = out; this.out = out;
this.actions.put( "i", Pair.of( "Print more detailed information", this::printDetails ) ); this.actions.put( "i", Pair.of( "Print more detailed information", this::printDetails ) );
this.actions.put( "gc", Pair.of( "Trigger GC", this::triggerGC ) );
this.gcMonitor = new MeasureDoNothing( "Importer GC monitor", gcBlockTime, 100, 200 );
} }


@Override @Override
Expand All @@ -77,12 +88,13 @@ public void initialize( DependencyResolver dependencyResolver )
out.println( "Interactive command list (end with ENTER):" ); out.println( "Interactive command list (end with ENTER):" );
actions.forEach( ( key, action ) -> out.println( " " + key + ": " + action.first() ) ); actions.forEach( ( key, action ) -> out.println( " " + key + ": " + action.first() ) );
out.println(); out.println();
gcMonitor.start();
} }


@Override @Override
public void start( StageExecution execution ) public void start( StageExecution execution )
{ {
details.add( current = new StageDetails( execution ) ); details.add( current = new StageDetails( execution, gcBlockTime ) );
} }


@Override @Override
Expand All @@ -98,12 +110,13 @@ public void done( long totalTimeMillis, String additionalInformation )
{ {
printDetails(); printDetails();
} }
gcMonitor.stopMeasuring();
} }


@Override @Override
public long nextCheckTime() public long nextCheckTime()
{ {
return currentTimeMillis() + 200; return currentTimeMillis() + 500;
} }


@Override @Override
Expand All @@ -121,10 +134,21 @@ private void printDetails()
stageDetails.print( out ); stageDetails.print( out );
} }


out.println( "Environment information:");
out.println( " Free physical memory: " + bytes( OsBeanUtil.getFreePhysicalMemory() ) );
out.println( " Max VM memory: " + bytes( Runtime.getRuntime().maxMemory() ) );
out.println( " Free VM memory: " + bytes( Runtime.getRuntime().freeMemory() ) );
out.println( " GC block time: " + duration( gcBlockTime.getGcBlockTime() ) );

// Make sure that if user is interested in details then also print the entire details set when import is done // Make sure that if user is interested in details then also print the entire details set when import is done
printDetailsOnDone = true; printDetailsOnDone = true;
} }


private void triggerGC()
{
System.gc();
}

private void reactToUserInput() private void reactToUserInput()
{ {
try try
Expand All @@ -150,12 +174,22 @@ private void reactToUserInput()
private static class StageDetails private static class StageDetails
{ {
private final StageExecution execution; private final StageExecution execution;
private final long startTime;
private final CollectingMonitor gcBlockTime;
private final long baseGcBlockTime;

private long memoryUsage; private long memoryUsage;
private long ioThroughput; private long ioThroughput;
private long totalTimeMillis;
private long stageGcBlockTime;
private long doneBatches;


StageDetails( StageExecution execution ) StageDetails( StageExecution execution, CollectingMonitor gcBlockTime )
{ {
this.execution = execution; this.execution = execution;
this.gcBlockTime = gcBlockTime;
this.startTime = currentTimeMillis();
this.baseGcBlockTime = gcBlockTime.getGcBlockTime();
} }


void print( PrintStream out ) void print( PrintStream out )
Expand All @@ -164,20 +198,28 @@ void print( PrintStream out )
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
SpectrumExecutionMonitor.printSpectrum( builder, execution, SpectrumExecutionMonitor.DEFAULT_WIDTH, DetailLevel.NO ); SpectrumExecutionMonitor.printSpectrum( builder, execution, SpectrumExecutionMonitor.DEFAULT_WIDTH, DetailLevel.NO );
out.println( builder.toString() ); out.println( builder.toString() );
if ( memoryUsage > 0 ) printValue( out, memoryUsage, "Memory usage", Format::bytes );
{ printValue( out, ioThroughput, "I/O throughput", value -> bytes( value ) + "/s" );
out.println( "Memory usage: " + bytes( memoryUsage ) ); printValue( out, stageGcBlockTime, "GC block time", Format::duration );
} printValue( out, totalTimeMillis, "Duration", Format::duration );
if ( ioThroughput > 0 ) printValue( out, doneBatches, "Done batches", String::valueOf );
{
out.println( "I/O throughput: " + bytes( ioThroughput ) + "/s" );
}


out.println(); out.println();
} }


private static void printValue( PrintStream out, long value, String description, LongFunction<String> toStringConverter )
{
if ( value > 0 )
{
out.println( description + ": " + toStringConverter.apply( value ) );
}
}

void collect() void collect()
{ {
totalTimeMillis = currentTimeMillis() - startTime;
stageGcBlockTime = gcBlockTime.getGcBlockTime() - baseGcBlockTime;
long lastDoneBatches = doneBatches;
for ( Step<?> step : execution.steps() ) for ( Step<?> step : execution.steps() )
{ {
StepStats stats = step.stats(); StepStats stats = step.stats();
Expand All @@ -191,7 +233,9 @@ void collect()
{ {
ioThroughput = ioStat.asLong(); ioThroughput = ioStat.asLong();
} }
lastDoneBatches = stats.stat( Keys.done_batches ).asLong();
} }
doneBatches = lastDoneBatches;
} }
} }
} }

0 comments on commit 2858e69

Please sign in to comment.