diff --git a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/ConsistencyCheckTasks.java b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/ConsistencyCheckTasks.java index 4c46308583c22..1674fc5b54e41 100644 --- a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/ConsistencyCheckTasks.java +++ b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/ConsistencyCheckTasks.java @@ -57,7 +57,7 @@ public class ConsistencyCheckTasks { - private final ProgressMonitorFactory.MultiPartBuilder progress; + private final ProgressMonitorFactory.MultiPartBuilder multiPartBuilder; private final StoreProcessor defaultProcessor; private final StoreAccess nativeStores; private final Statistics statistics; @@ -68,12 +68,12 @@ public class ConsistencyCheckTasks private final CacheAccess cacheAccess; private final int numberOfThreads; - ConsistencyCheckTasks( ProgressMonitorFactory.MultiPartBuilder progress, + ConsistencyCheckTasks( ProgressMonitorFactory.MultiPartBuilder multiPartBuilder, StoreProcessor defaultProcessor, StoreAccess nativeStores, Statistics statistics, CacheAccess cacheAccess, LabelScanStore labelScanStore, IndexAccessors indexes, MultiPassStore.Factory multiPass, ConsistencyReporter reporter, int numberOfThreads ) { - this.progress = progress; + this.multiPartBuilder = multiPartBuilder; this.defaultProcessor = defaultProcessor; this.nativeStores = nativeStores; this.statistics = statistics; @@ -155,11 +155,11 @@ public List createTasksForFullCheck( boolean checkLabelS new SchemaRecordCheck( new SchemaStorage( nativeStores.getSchemaStore() ), indexes ); tasks.add( new SchemaStoreProcessorTask<>( "SchemaStoreProcessor-check_rules", statistics, numberOfThreads, nativeStores.getSchemaStore(), nativeStores, "check_rules", - schemaCheck, progress, cacheAccess, defaultProcessor, ROUND_ROBIN ) ); + schemaCheck, multiPartBuilder, cacheAccess, defaultProcessor, ROUND_ROBIN ) ); // PASS 3: Obligation verification and semantic rule uniqueness tasks.add( new SchemaStoreProcessorTask<>( "SchemaStoreProcessor-check_obligations", statistics, numberOfThreads, nativeStores.getSchemaStore(), nativeStores, - "check_obligations", schemaCheck.forObligationChecking(), progress, cacheAccess, defaultProcessor, + "check_obligations", schemaCheck.forObligationChecking(), multiPartBuilder, cacheAccess, defaultProcessor, ROUND_ROBIN ) ); if ( checkGraph ) { @@ -200,23 +200,23 @@ private RecordScanner recordScanner( String name, @SuppressWarnings( "rawtypes" ) IterableStore... warmupStores ) { return stage.isParallel() - ? new ParallelRecordScanner<>( name, statistics, numberOfThreads, store, progress, processor, + ? new ParallelRecordScanner<>( name, statistics, numberOfThreads, store, multiPartBuilder, processor, cacheAccess, distribution, warmupStores ) - : new SequentialRecordScanner<>( name, statistics, numberOfThreads, store, progress, processor, + : new SequentialRecordScanner<>( name, statistics, numberOfThreads, store, multiPartBuilder, processor, warmupStores ); } private StoreProcessorTask create( String name, RecordStore input, QueueDistribution distribution ) { - return new StoreProcessorTask<>( name, statistics, numberOfThreads, input, nativeStores, name, progress, + return new StoreProcessorTask<>( name, statistics, numberOfThreads, input, nativeStores, name, multiPartBuilder, cacheAccess, defaultProcessor, distribution ); } private StoreProcessorTask create( String name, RecordStore input, StoreProcessor processor, QueueDistribution distribution ) { - return new StoreProcessorTask<>( name, statistics, numberOfThreads, input, nativeStores, name, progress, + return new StoreProcessorTask<>( name, statistics, numberOfThreads, input, nativeStores, name, multiPartBuilder, cacheAccess, processor, distribution ); } } diff --git a/community/kernel/src/main/java/org/neo4j/helpers/progress/Aggregator.java b/community/kernel/src/main/java/org/neo4j/helpers/progress/Aggregator.java index c3a39b692f674..e5350decf754c 100644 --- a/community/kernel/src/main/java/org/neo4j/helpers/progress/Aggregator.java +++ b/community/kernel/src/main/java/org/neo4j/helpers/progress/Aggregator.java @@ -19,8 +19,8 @@ */ package org.neo4j.helpers.progress; +import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; @@ -28,8 +28,7 @@ final class Aggregator { - private final Map states = - new ConcurrentHashMap<>(); + private final Map states = new HashMap<>(); private final Indicator indicator; @SuppressWarnings( "unused"/*accessed through updater*/ ) private volatile long progress; @@ -39,7 +38,6 @@ final class Aggregator private static final AtomicIntegerFieldUpdater LAST = AtomicIntegerFieldUpdater.newUpdater( Aggregator.class, "last" ); private long totalCount; - private final Completion completion = new Completion(); Aggregator( Indicator indicator ) { @@ -52,7 +50,7 @@ synchronized void add( ProgressListener progress, long totalCount ) this.totalCount += totalCount; } - synchronized Completion initialize() + synchronized void initialize() { indicator.startProcess( totalCount ); if ( states.isEmpty() ) @@ -60,7 +58,6 @@ synchronized Completion initialize() indicator.progress( 0, indicator.reportResolution() ); indicator.completeProcess(); } - return completion; } void update( long delta ) @@ -79,16 +76,15 @@ void update( long delta ) } } - void start( ProgressListener.MultiPartProgressListener part ) + synchronized void start( ProgressListener.MultiPartProgressListener part ) { - if ( states.put( part, ProgressListener.MultiPartProgressListener.State.LIVE ) == ProgressListener - .MultiPartProgressListener.State.INIT ) + if ( states.put( part, ProgressListener.MultiPartProgressListener.State.LIVE ) == ProgressListener.MultiPartProgressListener.State.INIT ) { indicator.startPart( part.part, part.totalCount ); } } - void complete( ProgressListener.MultiPartProgressListener part ) + synchronized void complete( ProgressListener.MultiPartProgressListener part ) { if ( states.remove( part ) != null ) { @@ -96,13 +92,12 @@ void complete( ProgressListener.MultiPartProgressListener part ) if ( states.isEmpty() ) { indicator.completeProcess(); - completion.complete(); } } } - public void signalFailure( String part, Throwable e ) + synchronized void signalFailure( Throwable e ) { - completion.signalFailure( part, e ); + indicator.failure( e ); } } diff --git a/community/kernel/src/main/java/org/neo4j/helpers/progress/Completion.java b/community/kernel/src/main/java/org/neo4j/helpers/progress/Completion.java deleted file mode 100644 index a6cf2bd361b0a..0000000000000 --- a/community/kernel/src/main/java/org/neo4j/helpers/progress/Completion.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.helpers.progress; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.neo4j.helpers.ProcessFailureException; - -public final class Completion -{ - private volatile Collection callbacks = new ArrayList<>(); - private final List processFailureCauses = new ArrayList<>(); - - @SuppressWarnings( "SynchronizationOnLocalVariableOrMethodParameter" ) - void complete() - { - Collection callbacks = this.callbacks; - if ( callbacks != null ) - { - Runnable[] targets; - synchronized ( callbacks ) - { - targets = callbacks.toArray( new Runnable[callbacks.size()] ); - this.callbacks = null; - } - for ( Runnable target : targets ) - { - try - { - target.run(); - } - catch ( Exception e ) - { - e.printStackTrace(); - } - } - } - } - - void signalFailure( String part, Throwable e ) - { - processFailureCauses.add( new ProcessFailureException.Entry( part, e ) ); - complete(); - } - - @SuppressWarnings( "SynchronizationOnLocalVariableOrMethodParameter" ) - void notify( Runnable callback ) - { - if ( callback == null ) - { - throw new IllegalArgumentException( "callback may not be null" ); - } - Collection callbacks = this.callbacks; - if ( callbacks != null ) - { - synchronized ( callbacks ) - { - if ( this.callbacks == callbacks ) - { // double checked locking - callbacks.add( callback ); - callback = null; // we have not reached completion - } - } - } - if ( callback != null ) - { // we have already reached completion - callback.run(); - } - } - - @SuppressWarnings( "SynchronizationOnLocalVariableOrMethodParameter" ) - public void await( long timeout, TimeUnit unit ) - throws InterruptedException, TimeoutException, ProcessFailureException - { - CountDownLatch latch = null; - Collection callbacks = this.callbacks; - if ( callbacks != null ) - { - synchronized ( callbacks ) - { - if ( this.callbacks == callbacks ) - { // double checked locking - callbacks.add( new CountDown( latch = new CountDownLatch( 1 ) ) ); - } - } - } - if ( latch != null ) - { // await completion - if ( !latch.await( timeout, unit ) ) - { - throw new TimeoutException( - String.format( "Process did not complete within %d %s.", timeout, unit.name() ) ); - } - } - if ( !processFailureCauses.isEmpty() ) - { - throw new ProcessFailureException( processFailureCauses ); - } - } - - private static final class CountDown implements Runnable - { - private final CountDownLatch latch; - - CountDown( CountDownLatch latch ) - { - this.latch = latch; - } - - @Override - public void run() - { - latch.countDown(); - } - } -} diff --git a/community/kernel/src/main/java/org/neo4j/helpers/progress/ProgressListener.java b/community/kernel/src/main/java/org/neo4j/helpers/progress/ProgressListener.java index 3507aecdf5d96..7e6e9e4810789 100644 --- a/community/kernel/src/main/java/org/neo4j/helpers/progress/ProgressListener.java +++ b/community/kernel/src/main/java/org/neo4j/helpers/progress/ProgressListener.java @@ -184,7 +184,7 @@ public void done() @Override public void failed( Throwable e ) { - aggregator.signalFailure( part, e ); + aggregator.signalFailure( e ); } private void update( long progress ) diff --git a/community/kernel/src/main/java/org/neo4j/helpers/progress/ProgressMonitorFactory.java b/community/kernel/src/main/java/org/neo4j/helpers/progress/ProgressMonitorFactory.java index a042fb6234747..91cc8bd70c756 100644 --- a/community/kernel/src/main/java/org/neo4j/helpers/progress/ProgressMonitorFactory.java +++ b/community/kernel/src/main/java/org/neo4j/helpers/progress/ProgressMonitorFactory.java @@ -76,7 +76,6 @@ public static class MultiPartBuilder { private Aggregator aggregator; private Set parts = new HashSet<>(); - private Completion completion; private MultiPartBuilder( ProgressMonitorFactory factory, String process ) { @@ -118,15 +117,14 @@ private void assertNotBuilt() } } - public Completion build() + public void build() { if ( aggregator != null ) { - completion = aggregator.initialize(); + aggregator.initialize(); } aggregator = null; parts = null; - return completion; } } } diff --git a/community/kernel/src/test/java/org/neo4j/helpers/progress/ProgressMonitorTest.java b/community/kernel/src/test/java/org/neo4j/helpers/progress/ProgressMonitorTest.java index 4004b050b1823..925ebfe4aebb9 100644 --- a/community/kernel/src/test/java/org/neo4j/helpers/progress/ProgressMonitorTest.java +++ b/community/kernel/src/test/java/org/neo4j/helpers/progress/ProgressMonitorTest.java @@ -34,27 +34,18 @@ import java.nio.charset.Charset; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import org.neo4j.helpers.ProcessFailureException; import org.neo4j.test.rule.SuppressOutput; import static java.lang.System.lineSeparator; -import static java.util.concurrent.TimeUnit.SECONDS; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; public class ProgressMonitorTest @@ -318,122 +309,6 @@ public void shouldPrintADotEveryHalfPercentAndFullPercentageEveryTenPercentEvenW writer.toString() ); } - @Test - public void shouldBeAbleToAwaitCompletionOfMultiPartProgress() throws Exception - { - // given - ProgressMonitorFactory.MultiPartBuilder builder = ProgressMonitorFactory.NONE.multipleParts( testName.getMethodName() ); - ProgressListener part1 = builder.progressForPart( "part1", 1 ); - ProgressListener part2 = builder.progressForPart( "part2", 1 ); - final Completion completion = builder.build(); - - // when - final CountDownLatch begin = new CountDownLatch( 1 ); - final CountDownLatch end = new CountDownLatch( 1 ); - new Thread() - { - @Override - public void run() - { - begin.countDown(); - try - { - completion.await( 10, SECONDS ); - } - catch ( Exception e ) - { - return; // do not count down the end latch - } - end.countDown(); - } - }.start(); - Runnable callback = mock( Runnable.class ); - completion.notify( callback ); - assertTrue( begin.await( 10, SECONDS ) ); - - // then - verifyZeroInteractions( callback ); - - // when - try - { - completion.await( 1, TimeUnit.MILLISECONDS ); - fail( "should have thrown exception" ); - } - // then - catch ( TimeoutException expected ) - { - assertEquals( "Process did not complete within 1 MILLISECONDS.", expected.getMessage() ); - } - - // when - part1.done(); - // then - verifyZeroInteractions( callback ); - - // when - part2.done(); - // then - verify( callback ).run(); - completion.await( 0, TimeUnit.NANOSECONDS ); // should not have to wait - assertTrue( end.await( 10, SECONDS ) ); // should have been completed - - // when - callback = mock( Runnable.class ); - completion.notify( callback ); - verify( callback ).run(); - } - - @Test - public void shouldReturnToCompletionWaiterWhenFirstJobFails() throws Exception - { - // given - ProgressMonitorFactory.MultiPartBuilder builder = ProgressMonitorFactory.NONE.multipleParts( testName.getMethodName() ); - ProgressListener part1 = builder.progressForPart( "part1", 1 ); - ProgressListener part2 = builder.progressForPart( "part2", 1 ); - final Completion completion = builder.build(); - - // when - part1.started(); - part2.started(); - part2.failed( new RuntimeException( "failure in one of the jobs" ) ); - - // neither job completes - expected.expect( ProcessFailureException.class ); - expected.expectMessage( "failure in one of the jobs" ); - completion.await( 1, TimeUnit.MILLISECONDS ); - } - - @Test - public void shouldNotAllowNullCompletionCallbacks() throws Exception - { - ProgressMonitorFactory.MultiPartBuilder builder = ProgressMonitorFactory.NONE.multipleParts( testName.getMethodName() ); - Completion completion = builder.build(); - - expected.expect( IllegalArgumentException.class ); - expected.expectMessage( "callback may not be null" ); - completion.notify( null ); - } - - @Test - public void shouldInvokeAllCallbacksEvenWhenOneThrowsException() throws Exception - { - // given - ProgressMonitorFactory.MultiPartBuilder builder = ProgressMonitorFactory.NONE.multipleParts( testName.getMethodName() ); - ProgressListener progressListener = builder.progressForPart( "only part", 1 ); - Completion completion = builder.build(); - Runnable callback = mock( Runnable.class ); - doThrow( new RuntimeException( "on purpose" ) ).doNothing().when( callback ).run(); - completion.notify( callback ); - completion.notify( callback ); - - // when - progressListener.done(); - - // then - verify( callback, times( 2 ) ).run(); - } - @Test public void shouldAllowStartingAPartBeforeCompletionOfMultiPartBuilder() throws Exception {