Skip to content

Commit

Permalink
Removed completion logic from aggregated progress monitor since no usage
Browse files Browse the repository at this point in the history
  • Loading branch information
klaren committed Nov 13, 2017
1 parent 6b5fc1a commit 5a2dd98
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 290 deletions.
Expand Up @@ -57,7 +57,7 @@

public class ConsistencyCheckTasks
{
private final ProgressMonitorFactory.MultiPartBuilder progress;
private final ProgressMonitorFactory.MultiPartBuilder multiPartBuilder;
private final StoreProcessor defaultProcessor;
private final StoreAccess nativeStores;
private final Statistics statistics;
Expand All @@ -68,12 +68,12 @@ public class ConsistencyCheckTasks
private final CacheAccess cacheAccess;
private final int numberOfThreads;

ConsistencyCheckTasks( ProgressMonitorFactory.MultiPartBuilder progress,
ConsistencyCheckTasks( ProgressMonitorFactory.MultiPartBuilder multiPartBuilder,
StoreProcessor defaultProcessor, StoreAccess nativeStores, Statistics statistics,
CacheAccess cacheAccess, LabelScanStore labelScanStore,
IndexAccessors indexes, MultiPassStore.Factory multiPass, ConsistencyReporter reporter, int numberOfThreads )
{
this.progress = progress;
this.multiPartBuilder = multiPartBuilder;
this.defaultProcessor = defaultProcessor;
this.nativeStores = nativeStores;
this.statistics = statistics;
Expand Down Expand Up @@ -155,11 +155,11 @@ public List<ConsistencyCheckerTask> createTasksForFullCheck( boolean checkLabelS
new SchemaRecordCheck( new SchemaStorage( nativeStores.getSchemaStore() ), indexes );
tasks.add( new SchemaStoreProcessorTask<>( "SchemaStoreProcessor-check_rules", statistics, numberOfThreads,
nativeStores.getSchemaStore(), nativeStores, "check_rules",
schemaCheck, progress, cacheAccess, defaultProcessor, ROUND_ROBIN ) );
schemaCheck, multiPartBuilder, cacheAccess, defaultProcessor, ROUND_ROBIN ) );
// PASS 3: Obligation verification and semantic rule uniqueness
tasks.add( new SchemaStoreProcessorTask<>( "SchemaStoreProcessor-check_obligations", statistics,
numberOfThreads, nativeStores.getSchemaStore(), nativeStores,
"check_obligations", schemaCheck.forObligationChecking(), progress, cacheAccess, defaultProcessor,
"check_obligations", schemaCheck.forObligationChecking(), multiPartBuilder, cacheAccess, defaultProcessor,
ROUND_ROBIN ) );
if ( checkGraph )
{
Expand Down Expand Up @@ -200,23 +200,23 @@ private <RECORD> RecordScanner<RECORD> recordScanner( String name,
@SuppressWarnings( "rawtypes" ) IterableStore... warmupStores )
{
return stage.isParallel()
? new ParallelRecordScanner<>( name, statistics, numberOfThreads, store, progress, processor,
? new ParallelRecordScanner<>( name, statistics, numberOfThreads, store, multiPartBuilder, processor,
cacheAccess, distribution, warmupStores )
: new SequentialRecordScanner<>( name, statistics, numberOfThreads, store, progress, processor,
: new SequentialRecordScanner<>( name, statistics, numberOfThreads, store, multiPartBuilder, processor,
warmupStores );
}

private <RECORD extends AbstractBaseRecord> StoreProcessorTask<RECORD> create( String name,
RecordStore<RECORD> input, QueueDistribution distribution )
{
return new StoreProcessorTask<>( name, statistics, numberOfThreads, input, nativeStores, name, progress,
return new StoreProcessorTask<>( name, statistics, numberOfThreads, input, nativeStores, name, multiPartBuilder,
cacheAccess, defaultProcessor, distribution );
}

private <RECORD extends AbstractBaseRecord> StoreProcessorTask<RECORD> create( String name,
RecordStore<RECORD> input, StoreProcessor processor, QueueDistribution distribution )
{
return new StoreProcessorTask<>( name, statistics, numberOfThreads, input, nativeStores, name, progress,
return new StoreProcessorTask<>( name, statistics, numberOfThreads, input, nativeStores, name, multiPartBuilder,
cacheAccess, processor, distribution );
}
}
Expand Up @@ -19,17 +19,16 @@
*/
package org.neo4j.helpers.progress;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

import static java.util.concurrent.atomic.AtomicLongFieldUpdater.newUpdater;

final class Aggregator
{
private final Map<ProgressListener, ProgressListener.MultiPartProgressListener.State> states =
new ConcurrentHashMap<>();
private final Map<ProgressListener, ProgressListener.MultiPartProgressListener.State> states = new HashMap<>();
private final Indicator indicator;
@SuppressWarnings( "unused"/*accessed through updater*/ )
private volatile long progress;
Expand All @@ -39,7 +38,6 @@ final class Aggregator
private static final AtomicIntegerFieldUpdater<Aggregator> LAST =
AtomicIntegerFieldUpdater.newUpdater( Aggregator.class, "last" );
private long totalCount;
private final Completion completion = new Completion();

Aggregator( Indicator indicator )
{
Expand All @@ -52,15 +50,14 @@ synchronized void add( ProgressListener progress, long totalCount )
this.totalCount += totalCount;
}

synchronized Completion initialize()
synchronized void initialize()
{
indicator.startProcess( totalCount );
if ( states.isEmpty() )
{
indicator.progress( 0, indicator.reportResolution() );
indicator.completeProcess();
}
return completion;
}

void update( long delta )
Expand All @@ -79,30 +76,28 @@ void update( long delta )
}
}

void start( ProgressListener.MultiPartProgressListener part )
synchronized void start( ProgressListener.MultiPartProgressListener part )
{
if ( states.put( part, ProgressListener.MultiPartProgressListener.State.LIVE ) == ProgressListener
.MultiPartProgressListener.State.INIT )
if ( states.put( part, ProgressListener.MultiPartProgressListener.State.LIVE ) == ProgressListener.MultiPartProgressListener.State.INIT )
{
indicator.startPart( part.part, part.totalCount );
}
}

void complete( ProgressListener.MultiPartProgressListener part )
synchronized void complete( ProgressListener.MultiPartProgressListener part )
{
if ( states.remove( part ) != null )
{
indicator.completePart( part.part );
if ( states.isEmpty() )
{
indicator.completeProcess();
completion.complete();
}
}
}

public void signalFailure( String part, Throwable e )
synchronized void signalFailure( Throwable e )
{
completion.signalFailure( part, e );
indicator.failure( e );
}
}

This file was deleted.

Expand Up @@ -184,7 +184,7 @@ public void done()
@Override
public void failed( Throwable e )
{
aggregator.signalFailure( part, e );
aggregator.signalFailure( e );
}

private void update( long progress )
Expand Down
Expand Up @@ -76,7 +76,6 @@ public static class MultiPartBuilder
{
private Aggregator aggregator;
private Set<String> parts = new HashSet<>();
private Completion completion;

private MultiPartBuilder( ProgressMonitorFactory factory, String process )
{
Expand Down Expand Up @@ -118,15 +117,14 @@ private void assertNotBuilt()
}
}

public Completion build()
public void build()
{
if ( aggregator != null )
{
completion = aggregator.initialize();
aggregator.initialize();
}
aggregator = null;
parts = null;
return completion;
}
}
}

0 comments on commit 5a2dd98

Please sign in to comment.