diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java index c449b690fff7d..a8b459555f65a 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java @@ -159,15 +159,15 @@ public void doImport( Input input ) throws IOException NodeStage nodeStage = new NodeStage( config, writeMonitor, nodes, idMapper, idGenerator, neoStore, inputCache, neoStore.getLabelScanStore(), storeUpdateMonitor, nodeRelationshipCache, memoryUsageStats ); - executeStages( nodeStage ); + executeStage( nodeStage ); if ( idMapper.needsPreparation() ) { - executeStages( new IdMapperPreparationStage( config, idMapper, cachedNodes, + executeStage( new IdMapperPreparationStage( config, idMapper, cachedNodes, badCollector, memoryUsageStats ) ); PrimitiveLongIterator duplicateNodeIds = badCollector.leftOverDuplicateNodesIds(); if ( duplicateNodeIds.hasNext() ) { - executeStages( new DeleteDuplicateNodesStage( config, duplicateNodeIds, neoStore ) ); + executeStage( new DeleteDuplicateNodesStage( config, duplicateNodeIds, neoStore ) ); } } @@ -175,7 +175,7 @@ public void doImport( Input input ) throws IOException CalculateDenseNodesStage calculateDenseNodesStage = new CalculateDenseNodesStage( withBatchSize( config, config.batchSize()*10 ), relationships, nodeRelationshipCache, idMapper, badCollector, inputCache, neoStore ); - executeStages( calculateDenseNodesStage ); + executeStage( calculateDenseNodesStage ); importRelationships( nodeRelationshipCache, storeUpdateMonitor, neoStore, writeMonitor, idMapper, cachedRelationships, inputCache, @@ -196,10 +196,10 @@ public void doImport( Input input ) throws IOException // Stage 6 -- count nodes per label and labels per node nodeLabelsCache = new NodeLabelsCache( AUTO, neoStore.getLabelRepository().getHighId() ); memoryUsageStats = new MemoryUsageStatsProvider( nodeLabelsCache ); - executeStages( new NodeCountsStage( config, nodeLabelsCache, neoStore.getNodeStore(), + executeStage( new NodeCountsStage( config, nodeLabelsCache, neoStore.getNodeStore(), neoStore.getLabelRepository().getHighId(), countsUpdater, memoryUsageStats ) ); // Stage 7 -- count label-[type]->label - executeStages( new RelationshipCountsStage( config, nodeLabelsCache, relationshipStore, + executeStage( new RelationshipCountsStage( config, nodeLabelsCache, relationshipStore, neoStore.getLabelRepository().getHighId(), neoStore.getRelationshipTypeRepository().getHighId(), countsUpdater, AUTO ) ); @@ -282,16 +282,16 @@ private void importRelationships( NodeRelationshipCache nodeRelationshipCache, final RelationshipStage relationshipStage = new RelationshipStage( topic, config, writeMonitor, perType, idMapper, neoStore, nodeRelationshipCache, storeUpdateMonitor, nextRelationshipId ); - executeStages( relationshipStage ); + executeStage( relationshipStage ); // Stage 4a -- set node nextRel fields for dense nodes - executeStages( new NodeFirstRelationshipStage( topic, nodeConfig, neoStore.getNodeStore(), + executeStage( new NodeFirstRelationshipStage( topic, nodeConfig, neoStore.getNodeStore(), neoStore.getTemporaryRelationshipGroupStore(), nodeRelationshipCache, true/*dense*/, currentTypeId ) ); // Stage 5a -- link relationship chains together for dense nodes nodeRelationshipCache.setForwardScan( false ); - executeStages( new RelationshipLinkbackStage( topic, + executeStage( new RelationshipLinkbackStage( topic, relationshipConfig, neoStore.getRelationshipStore(), nodeRelationshipCache, nextRelationshipId, @@ -303,24 +303,24 @@ private void importRelationships( NodeRelationshipCache nodeRelationshipCache, String topic = " Sparse"; nodeRelationshipCache.setForwardScan( true ); // Stage 4b -- set node nextRel fields for sparse nodes - executeStages( new NodeFirstRelationshipStage( topic, nodeConfig, neoStore.getNodeStore(), + executeStage( new NodeFirstRelationshipStage( topic, nodeConfig, neoStore.getNodeStore(), neoStore.getTemporaryRelationshipGroupStore(), nodeRelationshipCache, false/*sparse*/, -1 ) ); // Stage 5b -- link relationship chains together for sparse nodes nodeRelationshipCache.setForwardScan( false ); - executeStages( new RelationshipLinkbackStage( topic, relationshipConfig, neoStore.getRelationshipStore(), + executeStage( new RelationshipLinkbackStage( topic, relationshipConfig, neoStore.getRelationshipStore(), nodeRelationshipCache, 0, nextRelationshipId, false/*sparse*/ ) ); if ( minorityRelationshipTypes.length > 0 ) { // Do some batch insertion style random-access insertions for super small minority types - executeStages( new BatchInsertRelationshipsStage( config, idMapper, + executeStage( new BatchInsertRelationshipsStage( config, idMapper, perTypeIterator.getMinorityRelationships(), neoStore, nextRelationshipId ) ); } } - private void executeStages( Stage... stages ) + private void executeStage( Stage stage ) { - superviseExecution( executionMonitor, config, stages ); + superviseExecution( executionMonitor, config, stage ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/CoarseBoundedProgressExecutionMonitor.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/CoarseBoundedProgressExecutionMonitor.java index 45a88902be874..6ec4649c4e763 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/CoarseBoundedProgressExecutionMonitor.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/CoarseBoundedProgressExecutionMonitor.java @@ -31,7 +31,7 @@ public abstract class CoarseBoundedProgressExecutionMonitor extends ExecutionMonitor.Adapter { private final long totalNumberOfBatches; - private long[] prevDoneBatches; + private long prevDoneBatches; private long totalReportedBatches = 0; public CoarseBoundedProgressExecutionMonitor( long highNodeId, long highRelationshipId, @@ -51,26 +51,24 @@ protected long total() } @Override - public void check( StageExecution[] executions ) + public void check( StageExecution execution ) { - update( executions ); + update( execution ); } @Override - public void start( StageExecution[] executions ) + public void start( StageExecution execution ) { - prevDoneBatches = new long[executions.length]; + prevDoneBatches = 0; } - private void update( StageExecution[] executions ) + private void update( StageExecution execution ) { long diff = 0; - for ( int i = 0; i < executions.length; i++ ) - { - long doneBatches = doneBatches( executions[i] ); - diff += doneBatches - prevDoneBatches[i]; - prevDoneBatches[i] = doneBatches; - } + long doneBatches = doneBatches( execution ); + diff += doneBatches - prevDoneBatches; + prevDoneBatches = doneBatches; + if ( diff > 0 ) { totalReportedBatches += diff; diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/CoarseUnboundedProgressExecutionMonitor.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/CoarseUnboundedProgressExecutionMonitor.java index 9985de8655b4e..ca3f6d73ddd6c 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/CoarseUnboundedProgressExecutionMonitor.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/CoarseUnboundedProgressExecutionMonitor.java @@ -44,19 +44,15 @@ public CoarseUnboundedProgressExecutionMonitor( int dotEveryN, PrintStream out ) } @Override - public void start( StageExecution[] executions ) + public void start( StageExecution execution ) { prevN = 0; } @Override - public void check( StageExecution[] executions ) + public void check( StageExecution execution ) { - int n = prevN; - for ( StageExecution execution : executions ) - { - n = max( n, n( execution ) ); - } + int n = max( prevN, n( execution ) ); while ( prevN < n ) { diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/DetailedExecutionMonitor.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/DetailedExecutionMonitor.java index 8936d087bdd07..a8100373ecb74 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/DetailedExecutionMonitor.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/DetailedExecutionMonitor.java @@ -50,34 +50,21 @@ public DetailedExecutionMonitor( PrintStream out, long intervalSeconds ) } @Override - public void start( StageExecution[] executions ) + public void start( StageExecution execution ) { StringBuilder names = new StringBuilder(); - for ( StageExecution execution : executions ) - { - names.append( names.length() > 0 ? ", " : "" ).append( execution.getStageName() ); - } + names.append( names.length() > 0 ? ", " : "" ).append( execution.getStageName() ); out.println( format( "%n>>>>> EXECUTING STAGE(s) %s <<<<<%n", names ) ); } @Override - public void end( StageExecution[] executions, long totalTimeMillis ) + public void end( StageExecution execution, long totalTimeMillis ) { out.println( "Stage total time " + duration( totalTimeMillis ) ); } @Override - public void check( StageExecution[] executions ) - { - boolean first = true; - for ( StageExecution execution : executions ) - { - printStats( execution, first ); - first = false; - } - } - - private void printStats( StageExecution execution, boolean first ) + public void check( StageExecution execution ) { Step bottleNeck = execution.stepsOrderedBy( Keys.avg_processing_time, false ).iterator().next().first(); @@ -86,7 +73,7 @@ private void printStats( StageExecution execution, boolean first ) for ( Step step : execution.steps() ) { StepStats stats = step.stats(); - builder.append( i > 0 ? format( "%n " ) : (first ? "--" : " -") ) + builder.append( i > 0 ? format( "%n " ) : "--" ) .append( stats.toString( DetailLevel.BASIC ) ) .append( step == bottleNeck ? " <== BOTTLE NECK" : "" ); i++; diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/DynamicProcessorAssigner.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/DynamicProcessorAssigner.java index 678c4d45558f2..de5aa8c26c073 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/DynamicProcessorAssigner.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/DynamicProcessorAssigner.java @@ -57,29 +57,26 @@ public DynamicProcessorAssigner( Configuration config, int availableProcessors ) } @Override - public void start( StageExecution[] executions ) + public void start( StageExecution execution ) { // A new stage begins, any data that we had is irrelevant lastChangedProcessors.clear(); } @Override - public void check( StageExecution[] executions ) + public void check( StageExecution execution ) { - int permits = availableProcessors - countActiveProcessors( executions ); - for ( StageExecution execution : executions ) + int permits = availableProcessors - countActiveProcessors( execution ); + if ( execution.stillExecuting() ) { - if ( execution.stillExecuting() ) + if ( permits > 0 ) { - if ( permits > 0 ) - { - // Be swift at assigning processors to slow steps, i.e. potentially multiple per round - permits -= assignProcessorsToPotentialBottleNeck( execution, permits ); - } - // Be a little more conservative removing processors from too fast steps - if ( permits == 0 && removeProcessorFromPotentialIdleStep( execution ) ) - { - permits++; - } + // Be swift at assigning processors to slow steps, i.e. potentially multiple per round + permits -= assignProcessorsToPotentialBottleNeck( execution, permits ); + } + // Be a little more conservative removing processors from too fast steps + if ( permits == 0 && removeProcessorFromPotentialIdleStep( execution ) ) + { + permits++; } } } @@ -150,24 +147,21 @@ private long batches( Step step ) return step.stats().stat( Keys.done_batches ).asLong(); } - private int countActiveProcessors( StageExecution[] executions ) + private int countActiveProcessors( StageExecution execution ) { float processors = 0; - for ( StageExecution execution : executions ) + if ( execution.stillExecuting() ) { - if ( execution.stillExecuting() ) + long highestAverage = avg( execution.stepsOrderedBy( + Keys.avg_processing_time, false ).iterator().next().first() ); + for ( Step step : execution.steps() ) { - long highestAverage = avg( execution.stepsOrderedBy( - Keys.avg_processing_time, false ).iterator().next().first() ); - for ( Step step : execution.steps() ) - { - // Calculate how active each step is so that a step that is very cheap - // and idles a lot counts for less than 1 processor, so that bottlenecks can - // "steal" some of its processing power. - long avg = avg( step ); - float factor = (float)avg / (float)highestAverage; - processors += factor * step.processors( 0 ); - } + // Calculate how active each step is so that a step that is very cheap + // and idles a lot counts for less than 1 processor, so that bottlenecks can + // "steal" some of its processing power. + long avg = avg( step ); + float factor = (float)avg / (float)highestAverage; + processors += factor * step.processors( 0 ); } } return Math.round( processors ); diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ExecutionMonitor.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ExecutionMonitor.java index 3581701a7df13..b49b6de552f7d 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ExecutionMonitor.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ExecutionMonitor.java @@ -30,17 +30,17 @@ public interface ExecutionMonitor { /** - * Signals the start of one or more stages, + * Signals the start of a {@link StageExecution}. */ - void start( StageExecution[] executions ); + void start( StageExecution execution ); /** - * Signals the end of the executions previously {@link #start(StageExecution[]) stated} + * Signals the end of the execution previously {@link #start(StageExecution) started}. */ - void end( StageExecution[] executions, long totalTimeMillis ); + void end( StageExecution execution, long totalTimeMillis ); /** - * Signals the end of the import as a whole + * Called after all {@link StageExecution stage executions} have run. */ void done( long totalTimeMillis, String additionalInformation ); @@ -50,9 +50,9 @@ public interface ExecutionMonitor long nextCheckTime(); /** - * Called with currently executing {@link StageExecution} instances so that data from them can be gathered. + * Called periodically while executing a {@link StageExecution}. */ - void check( StageExecution[] executions ); + void check( StageExecution execution ); /** * Base implementation with most methods defaulting to not doing anything. @@ -80,12 +80,12 @@ public long nextCheckTime() } @Override - public void start( StageExecution[] executions ) + public void start( StageExecution execution ) { // Do nothing by default } @Override - public void end( StageExecution[] executions, long totalTimeMillis ) + public void end( StageExecution execution, long totalTimeMillis ) { // Do nothing by default } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ExecutionMonitors.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ExecutionMonitors.java index 57f0100be396f..bd530d3e8b383 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ExecutionMonitors.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ExecutionMonitors.java @@ -41,12 +41,12 @@ public static ExecutionMonitor defaultVisible() private static final ExecutionMonitor INVISIBLE = new ExecutionMonitor() { @Override - public void start( StageExecution[] executions ) + public void start( StageExecution execution ) { // Do nothing } @Override - public void end( StageExecution[] executions, long totalTimeMillis ) + public void end( StageExecution execution, long totalTimeMillis ) { // Do nothing } @@ -57,7 +57,7 @@ public long nextCheckTime() } @Override - public void check( StageExecution[] executions ) + public void check( StageExecution execution ) { // Do nothing } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ExecutionSupervisor.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ExecutionSupervisor.java index a8ef5eb6dd554..491b42f5fc676 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ExecutionSupervisor.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ExecutionSupervisor.java @@ -52,19 +52,19 @@ public ExecutionSupervisor( ExecutionMonitor monitor ) * Made synchronized to ensure that only one set of executions take place at any given time * and also to make sure the calling thread goes through a memory barrier (useful both before and after execution). * - * @param executions {@link StageExecution} instances to supervise simultaneously. + * @param execution {@link StageExecution} instances to supervise simultaneously. */ - public synchronized void supervise( StageExecution... executions ) + public synchronized void supervise( StageExecution execution ) { long startTime = currentTimeMillis(); - start( executions ); + start( execution ); - while ( anyStillExecuting( executions ) ) + while ( execution.stillExecuting() ) { - finishAwareSleep( executions ); - monitor.check( executions ); + finishAwareSleep( execution ); + monitor.check( execution ); } - end( executions, currentTimeMillis()-startTime ); + end( execution, currentTimeMillis()-startTime ); } private long currentTimeMillis() @@ -72,34 +72,22 @@ private long currentTimeMillis() return clock.currentTimeMillis(); } - private boolean anyStillExecuting( StageExecution[] executions ) + protected void end( StageExecution execution, long totalTimeMillis ) { - for ( StageExecution execution : executions ) - { - if ( execution.stillExecuting() ) - { - return true; - } - } - return false; - } - - protected void end( StageExecution[] executions, long totalTimeMillis ) - { - monitor.end( executions, totalTimeMillis ); + monitor.end( execution, totalTimeMillis ); } - protected void start( StageExecution[] executions ) + protected void start( StageExecution execution ) { - monitor.start( executions ); + monitor.start( execution ); } - private void finishAwareSleep( StageExecution[] executions ) + private void finishAwareSleep( StageExecution execution ) { long endTime = monitor.nextCheckTime(); while ( currentTimeMillis() < endTime ) { - if ( !anyStillExecuting( executions ) ) + if ( !execution.stillExecuting() ) { break; } @@ -110,10 +98,7 @@ private void finishAwareSleep( StageExecution[] executions ) } catch ( InterruptedException e ) { - for ( StageExecution execution : executions ) - { - execution.panic( e ); - } + execution.panic( e ); break; } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ExecutionSupervisors.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ExecutionSupervisors.java index a09bfc2e0fc32..cf24b731ef3b3 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ExecutionSupervisors.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ExecutionSupervisors.java @@ -32,31 +32,31 @@ public class ExecutionSupervisors { /** * Using an {@link ExecutionMonitors#invisible() invisible} monitor. - * @see #superviseDynamicExecution(ExecutionMonitor, Stage...) + * @see #superviseDynamicExecution(ExecutionMonitor, Stage) */ - public static void superviseDynamicExecution( Stage... stages ) + public static void superviseDynamicExecution( Stage stage ) { - superviseDynamicExecution( ExecutionMonitors.invisible(), stages ); + superviseDynamicExecution( ExecutionMonitors.invisible(), stage ); } /** * With {@link Configuration#DEFAULT}. - * @see #superviseDynamicExecution(ExecutionMonitor, Configuration, Stage...) + * @see #superviseDynamicExecution(ExecutionMonitor, Configuration, Stage) */ - public static void superviseDynamicExecution( ExecutionMonitor monitor, Stage... stages ) + public static void superviseDynamicExecution( ExecutionMonitor monitor, Stage stage ) { - superviseDynamicExecution( monitor, Configuration.DEFAULT, stages ); + superviseDynamicExecution( monitor, Configuration.DEFAULT, stage ); } /** * Supervises an execution with the given monitor AND a {@link DynamicProcessorAssigner} to give * the execution a dynamic and optimal nature. * - * @see #superviseExecution(ExecutionMonitor, Configuration, Stage...) + * @see #superviseExecution(ExecutionMonitor, Configuration, Stage) */ - public static void superviseDynamicExecution( ExecutionMonitor monitor, Configuration config, Stage... stages ) + public static void superviseDynamicExecution( ExecutionMonitor monitor, Configuration config, Stage stage ) { - superviseExecution( withDynamicProcessorAssignment( monitor, config ), config, stages ); + superviseExecution( withDynamicProcessorAssignment( monitor, config ), config, stage ); } /** @@ -65,33 +65,23 @@ public static void superviseDynamicExecution( ExecutionMonitor monitor, Configur * * @param monitor {@link ExecutionMonitor} to get insight into the execution. * @param config {@link Configuration} for the execution. - * @param stages {@link Stage stages} to execute. + * @param stage {@link Stage stages} to execute. */ - public static void superviseExecution( ExecutionMonitor monitor, Configuration config, Stage... stages ) + public static void superviseExecution( ExecutionMonitor monitor, Configuration config, Stage stage ) { ExecutionSupervisor supervisor = new ExecutionSupervisor( Clock.SYSTEM_CLOCK, monitor ); - StageExecution[] executions = new StageExecution[stages.length]; + StageExecution execution = null; try { - for ( int i = 0; i < stages.length; i++ ) - { - executions[i] = stages[i].execute(); - } - supervisor.supervise( executions ); + execution = stage.execute(); + supervisor.supervise( execution ); } finally { - for ( Stage stage : stages ) - { - stage.close(); - } - - for ( StageExecution execution : executions ) + stage.close(); + if ( execution != null ) { - if ( execution != null ) - { - execution.assertHealthy(); - } + execution.assertHealthy(); } } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/MultiExecutionMonitor.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/MultiExecutionMonitor.java index 160762eb93685..c14b51cc86980 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/MultiExecutionMonitor.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/MultiExecutionMonitor.java @@ -45,20 +45,20 @@ public MultiExecutionMonitor( Clock clock, ExecutionMonitor... monitors ) } @Override - public void start( StageExecution[] executions ) + public void start( StageExecution execution ) { for ( ExecutionMonitor monitor : monitors ) { - monitor.start( executions ); + monitor.start( execution ); } } @Override - public void end( StageExecution[] executions, long totalTimeMillis ) + public void end( StageExecution execution, long totalTimeMillis ) { for ( ExecutionMonitor monitor : monitors ) { - monitor.end( executions, totalTimeMillis ); + monitor.end( execution, totalTimeMillis ); } } @@ -96,14 +96,14 @@ private void fillEndTimes() } @Override - public void check( StageExecution[] executions ) + public void check( StageExecution execution ) { long currentTimeMillis = clock.currentTimeMillis(); for ( int i = 0; i < monitors.length; i++ ) { if ( currentTimeMillis >= endTimes[i] ) { - monitors[i].check( executions ); + monitors[i].check( execution ); endTimes[i] = monitors[i].nextCheckTime(); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/SpectrumExecutionMonitor.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/SpectrumExecutionMonitor.java index a4b3a86ce58ad..cf5c15e994a55 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/SpectrumExecutionMonitor.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/SpectrumExecutionMonitor.java @@ -20,8 +20,6 @@ package org.neo4j.unsafe.impl.batchimport.staging; import java.io.PrintStream; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.TimeUnit; import org.neo4j.helpers.collection.Pair; @@ -57,7 +55,6 @@ public class SpectrumExecutionMonitor extends ExecutionMonitor.Adapter private final PrintStream out; private final int width; - private long tick; public SpectrumExecutionMonitor( long interval, TimeUnit unit, PrintStream out, int width ) { @@ -67,24 +64,15 @@ public SpectrumExecutionMonitor( long interval, TimeUnit unit, PrintStream out, } @Override - public void start( StageExecution[] executions ) + public void start( StageExecution execution ) { - for ( int i = 0; i < executions.length; i++ ) - { - if ( i > 0 ) - { - out.print( ", " ); - } - out.print( executions[i].getStageName() ); - } - out.println(); - tick = 0; + out.println( execution.getStageName() ); } @Override - public void end( StageExecution[] executions, long totalTimeMillis ) + public void end( StageExecution execution, long totalTimeMillis ) { - check( executions ); + check( execution ); out.println(); out.println( "Done in " + duration( totalTimeMillis ) ); } @@ -97,29 +85,11 @@ public void done( long totalTimeMillis, String additionalInformation ) } @Override - public void check( StageExecution[] executions ) + public void check( StageExecution execution ) { - StageExecution execution = rotatedExecution( executions ); - if ( execution != null ) - { - StringBuilder builder = new StringBuilder(); - printSpectrum( builder, execution, width ); - out.print( "\r" + builder ); - } - } - - private StageExecution rotatedExecution( StageExecution[] executions ) - { - List active = new ArrayList<>( executions.length ); - for ( StageExecution execution : executions ) - { - if ( execution.stillExecuting() ) - { - active.add( execution ); - } - } - - return !active.isEmpty() ? active.get( (int) ((tick++)%active.size()) ) : null; + StringBuilder builder = new StringBuilder(); + printSpectrum( builder, execution, width ); + out.print( "\r" + builder ); } private void printSpectrum( StringBuilder builder, StageExecution execution, int width ) diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/CoarseBoundedProgressExecutionMonitorTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/CoarseBoundedProgressExecutionMonitorTest.java index 60881193cdfe0..f5773e12f436d 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/CoarseBoundedProgressExecutionMonitorTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/CoarseBoundedProgressExecutionMonitorTest.java @@ -67,8 +67,8 @@ public void progressOnMultipleExecutions() for ( int i = 0; i < 4; i++ ) { - progressExecutionMonitor.start( singleExecution( 0, config ) ); - progressExecutionMonitor.check( singleExecution( total / 4, config ) ); + progressExecutionMonitor.start( execution( 0, config ) ); + progressExecutionMonitor.check( execution( total / 4, config ) ); } progressExecutionMonitor.done( 0, "Completed" ); @@ -77,23 +77,23 @@ public void progressOnMultipleExecutions() private long monitorSingleStageExecution( ProgressExecutionMonitor progressExecutionMonitor, Configuration config ) { - progressExecutionMonitor.start( singleExecution( 0, config ) ); + progressExecutionMonitor.start( execution( 0, config ) ); long total = progressExecutionMonitor.total(); long part = total / 10; for ( int i = 0; i < 9; i++ ) { - progressExecutionMonitor.check( singleExecution( part * (i+1), config ) ); + progressExecutionMonitor.check( execution( part * (i+1), config ) ); assertTrue( progressExecutionMonitor.getProgress() < total ); } progressExecutionMonitor.done( 0, "Test" ); return total; } - private StageExecution[] singleExecution( long doneBatches, Configuration config ) + private StageExecution execution( long doneBatches, Configuration config ) { Step step = ControlledStep.stepWithStats( "Test", 0, done_batches, doneBatches ); StageExecution execution = new StageExecution( "Test", config, Collections.singletonList( step ), 0 ); - return new StageExecution[] {execution}; + return execution; } private Configuration config() diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/DynamicProcessorAssignerTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/DynamicProcessorAssignerTest.java index 9c0bdc19a9a27..fa5f74f8895f8 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/DynamicProcessorAssignerTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/DynamicProcessorAssignerTest.java @@ -47,7 +47,7 @@ public void shouldAssignAdditionalCPUToBottleNeckStep() throws Exception ControlledStep slowStep = stepWithStats( "slow", 0, avg_processing_time, 10L, done_batches, 10L ); ControlledStep fastStep = stepWithStats( "fast", 0, avg_processing_time, 2L, done_batches, 10L ); - StageExecution[] execution = executionOf( config, slowStep, fastStep ); + StageExecution execution = executionOf( config, slowStep, fastStep ); assigner.start( execution ); // WHEN @@ -72,7 +72,7 @@ public void shouldRemoveCPUsFromWayTooFastStep() throws Exception ControlledStep fastStep = spy( stepWithStats( "fast", 0, avg_processing_time, 2L, done_batches, 10L ) .setProcessors( 2 ) ); - StageExecution[] execution = executionOf( config, slowStep, fastStep ); + StageExecution execution = executionOf( config, slowStep, fastStep ); assigner.start( execution ); // WHEN checking @@ -93,7 +93,7 @@ public void shouldRemoveCPUsButNotSoThatTheFastStepBecomesBottleneck() throws Ex ControlledStep fastStep = spy( stepWithStats( "fast", 0, avg_processing_time, 7L, done_batches, 10L ) .setProcessors( 3 ) ); - StageExecution[] execution = executionOf( config, slowStep, fastStep ); + StageExecution execution = executionOf( config, slowStep, fastStep ); assigner.start( execution ); // WHEN checking the first time @@ -114,7 +114,7 @@ public void shouldHandleZeroAverage() throws Exception ControlledStep aStep = stepWithStats( "slow", 0, avg_processing_time, 0L, done_batches, 0L ); ControlledStep anotherStep = stepWithStats( "fast", 0, avg_processing_time, 0L, done_batches, 0L ); - StageExecution[] execution = executionOf( config, aStep, anotherStep ); + StageExecution execution = executionOf( config, aStep, anotherStep ); assigner.start( execution ); // WHEN @@ -140,7 +140,7 @@ public void shouldRemoveCPUsFromTooFastStepEvenIfThereIsAWayFaster() throws Exce Step fast = spy( stepWithStats( "fast", 0, avg_processing_time, 100L, done_batches, 20L ) .setProcessors( 3 ) ); Step slow = stepWithStats( "slow", 1, avg_processing_time, 220L, done_batches, 20L ); - StageExecution[] execution = executionOf( config, slow, wayFastest, fast ); + StageExecution execution = executionOf( config, slow, wayFastest, fast ); assigner.start( execution ); // WHEN @@ -162,9 +162,8 @@ public int movingAverageSize() }; } - private StageExecution[] executionOf( Configuration config, Step... steps ) + private StageExecution executionOf( Configuration config, Step... steps ) { - StageExecution execution = new StageExecution( "Test", config, Arrays.asList( steps ), ORDER_SEND_DOWNSTREAM ); - return new StageExecution[] {execution}; + return new StageExecution( "Test", config, Arrays.asList( steps ), ORDER_SEND_DOWNSTREAM ); } } diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/MultiExecutionMonitorTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/MultiExecutionMonitorTest.java index 985316e703615..76438d58d79aa 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/MultiExecutionMonitorTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/MultiExecutionMonitorTest.java @@ -84,7 +84,7 @@ public TestableMonitor( Clock clock, long interval, TimeUnit unit, String name ) } @Override - public void check( StageExecution[] executions ) + public void check( StageExecution execution ) { timesPolled++; } diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorAssignmentStrategies.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorAssignmentStrategies.java index f31deaad0a193..caf0ce13d1880 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorAssignmentStrategies.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorAssignmentStrategies.java @@ -44,34 +44,31 @@ public static ExecutionMonitor eagerRandomSaturation( final int availableProcess return new AbstractAssigner( Clock.SYSTEM_CLOCK, 10, TimeUnit.SECONDS ) { @Override - public void start( StageExecution[] executions ) + public void start( StageExecution execution ) { - saturate( availableProcessor, executions ); - registerProcessorCount( executions ); + saturate( availableProcessor, execution ); + registerProcessorCount( execution ); } - private void saturate( final int availableProcessor, StageExecution[] executions ) + private void saturate( final int availableProcessor, StageExecution execution ) { Random random = ThreadLocalRandom.current(); int processors = availableProcessor; for ( int rounds = 0; rounds < availableProcessor && processors > 0; rounds++ ) { - for ( StageExecution execution : executions ) + for ( Step step : execution.steps() ) { - for ( Step step : execution.steps() ) + int before = step.processors( 0 ); + if ( random.nextBoolean() && step.processors( 1 ) > before && --processors == 0 ) { - int before = step.processors( 0 ); - if ( random.nextBoolean() && step.processors( 1 ) > before && --processors == 0 ) - { - return; - } + return; } } } } @Override - public void check( StageExecution[] executions ) + public void check( StageExecution execution ) { // We do everything in start } }; @@ -87,13 +84,13 @@ public static ExecutionMonitor randomSaturationOverTime( final int availableProc private int processors = availableProcessor; @Override - public void check( StageExecution[] executions ) + public void check( StageExecution execution ) { - saturate( executions ); - registerProcessorCount( executions ); + saturate( execution ); + registerProcessorCount( execution ); } - private void saturate( StageExecution[] executions ) + private void saturate( StageExecution execution ) { if ( processors == 0 ) { @@ -102,18 +99,15 @@ private void saturate( StageExecution[] executions ) Random random = ThreadLocalRandom.current(); int maxThisCheck = random.nextInt( processors-1 )+1; - for ( StageExecution execution : executions ) + for ( Step step : execution.steps() ) { - for ( Step step : execution.steps() ) + int before = step.processors( 0 ); + if ( random.nextBoolean() && step.processors( -1 ) < before ) { - int before = step.processors( 0 ); - if ( random.nextBoolean() && step.processors( -1 ) < before ) + processors--; + if ( --maxThisCheck == 0 ) { - processors--; - if ( --maxThisCheck == 0 ) - { - return; - } + return; } } } @@ -130,16 +124,13 @@ protected AbstractAssigner( Clock clock, long time, TimeUnit unit ) super( clock, time, unit ); } - protected void registerProcessorCount( StageExecution[] executions ) + protected void registerProcessorCount( StageExecution execution ) { - for ( StageExecution execution : executions ) + Map byStage = new HashMap<>(); + processors.put( execution.getStageName(), byStage ); + for ( Step step : execution.steps() ) { - Map byStage = new HashMap<>(); - processors.put( execution.getStageName(), byStage ); - for ( Step step : execution.steps() ) - { - byStage.put( step.name(), step.processors( 0 ) ); - } + byStage.put( step.name(), step.processors( 0 ) ); } } diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/SpectrumExecutionMonitorTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/SpectrumExecutionMonitorTest.java deleted file mode 100644 index 3b8bfc15c79e2..0000000000000 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/SpectrumExecutionMonitorTest.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.unsafe.impl.batchimport.staging; - -import org.junit.Test; - -import java.io.ByteArrayOutputStream; -import java.io.PrintStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import org.neo4j.unsafe.impl.batchimport.stats.Keys; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static java.util.concurrent.TimeUnit.MILLISECONDS; - -public class SpectrumExecutionMonitorTest -{ - @Test - public void shouldAlternateStagesWithMultiple() throws Exception - { - // GIVEN - ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - PrintStream out = new PrintStream( bytes ); - SpectrumExecutionMonitor monitor = new SpectrumExecutionMonitor( 0, MILLISECONDS, new PrintStream( out ), 100 ); - StageExecution[] stages = new StageExecution[] { - stage( "A_1", "A_2" ), - stage( "B_1", "B_2" ), - }; - - { - // WHEN - monitor.check( stages ); - String[] lines = linesOf( out, bytes ); - - // THEN - assertEquals( 1, lines.length ); - assertTrue( lines[0], lines[0].contains( "A_1" ) ); - assertFalse( lines[0], lines[0].contains( "B_1" ) ); - } - - { - // and WHEN - monitor.check( stages ); - String[] lines = linesOf( out, bytes ); - - // THEN - assertEquals( 2, lines.length ); - assertTrue( lines[1], lines[1].contains( "B_1" ) ); - assertFalse( lines[1], lines[1].contains( "A_1" ) ); - } - - { - // and WHEN - monitor.check( stages ); - String[] lines = linesOf( out, bytes ); - - // THEN - assertEquals( 3, lines.length ); - assertTrue( lines[2], lines[0].contains( "A_1" ) ); - assertFalse( lines[2], lines[0].contains( "B_1" ) ); - } - } - - @Test - public void shouldOnlyAlternativeBetweenActiveStages() throws Exception - { - // GIVEN - ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - PrintStream out = new PrintStream( bytes ); - SpectrumExecutionMonitor monitor = new SpectrumExecutionMonitor( 0, MILLISECONDS, new PrintStream( out ), 100 ); - StageExecution stageB; - StageExecution[] stages = new StageExecution[] { - stage( "A_1", "A_2" ), - stageB = stage( "B_1", "B_2" ), - stage( "C_1", "C_2" ) - }; - - { - // WHEN - monitor.check( stages ); - String[] lines = linesOf( out, bytes ); - - // THEN - assertEquals( 1, lines.length ); - assertTrue( lines[0], lines[0].contains( "A_1" ) ); - assertFalse( lines[0], lines[0].contains( "B_1" ) ); - } - - complete( stageB ); - - { - // WHEN - monitor.check( stages ); - String[] lines = linesOf( out, bytes ); - - // THEN - assertEquals( 2, lines.length ); - assertTrue( lines[1], lines[1].contains( "C_1" ) ); - assertFalse( lines[1], lines[1].contains( "A_1" ) ); - assertFalse( lines[1], lines[1].contains( "B_1" ) ); - } - - { - // WHEN - monitor.check( stages ); - String[] lines = linesOf( out, bytes ); - - // THEN - assertEquals( 3, lines.length ); - assertTrue( lines[2], lines[2].contains( "A_1" ) ); - assertFalse( lines[2], lines[2].contains( "B_1" ) ); - assertFalse( lines[2], lines[2].contains( "C_1" ) ); - } - } - - private void complete( StageExecution stage ) - { - for ( Step step : stage.steps() ) - { - ((ControlledStep) step).complete(); - } - } - - private String[] linesOf( PrintStream out, ByteArrayOutputStream bytes ) - { - out.flush(); - String string = bytes.toString(); - String[] allLines = string.split( "\r" ); - return Arrays.copyOfRange( allLines, 1, allLines.length ); - } - - private StageExecution stage( String... stepNames ) - { - Collection> pipeline = new ArrayList<>(); - long avg = 10; - for ( String name : stepNames ) - { - pipeline.add( ControlledStep.stepWithStats( name, 1, Keys.avg_processing_time, avg, Keys.done_batches, 1L ) ); - avg += 10; - } - return new StageExecution( "Test", Configuration.DEFAULT, pipeline, 0 ); - } -}