From 99a922c5047cc28185b72bb27bb5812289d15a1d Mon Sep 17 00:00:00 2001 From: Anton Persson Date: Tue, 16 May 2017 16:46:59 +0200 Subject: [PATCH] RecoveryCleanupWorkCollector lifecycle RecoveryCleanupWorkCollector now extends Lifecycle. Important details: - Register collector to life before any indexes are registered because init will clear eventual state (in case of restart) and registered cleanup job would be lost. - Responsibility of scheduling cleanup jobs is transferred from outside trigger (previously created as part of buildRecovery in NeoStoreDataSource) to collector itself. --- .../neo4j/kernel/impl/util/JobScheduler.java | 0 .../kernel/impl/util/JobSchedulerAdapter.java | 93 +++++++++ .../GroupingRecoveryCleanupWorkCollector.java | 43 ++-- .../gbptree/RecoveryCleanupWorkCollector.java | 25 +-- .../index/internal/gbptree/GBPTreeTest.java | 60 ++++-- ...upingRecoveryCleanupWorkCollectorTest.java | 188 ++++++++++++++++++ .../internal/gbptree/ThrowingRunnable.java | 4 +- .../org/neo4j/kernel/NeoStoreDataSource.java | 25 +-- .../kernel/impl/factory/PlatformModule.java | 2 +- .../test/java/recovery/RecoveryCleanupIT.java | 5 +- 10 files changed, 377 insertions(+), 68 deletions(-) rename community/{kernel => common}/src/main/java/org/neo4j/kernel/impl/util/JobScheduler.java (100%) create mode 100644 community/common/src/main/java/org/neo4j/kernel/impl/util/JobSchedulerAdapter.java create mode 100644 community/index/src/test/java/org/neo4j/index/internal/gbptree/GroupingRecoveryCleanupWorkCollectorTest.java diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/util/JobScheduler.java b/community/common/src/main/java/org/neo4j/kernel/impl/util/JobScheduler.java similarity index 100% rename from community/kernel/src/main/java/org/neo4j/kernel/impl/util/JobScheduler.java rename to community/common/src/main/java/org/neo4j/kernel/impl/util/JobScheduler.java diff --git a/community/common/src/main/java/org/neo4j/kernel/impl/util/JobSchedulerAdapter.java b/community/common/src/main/java/org/neo4j/kernel/impl/util/JobSchedulerAdapter.java new file mode 100644 index 0000000000000..18c638838faa3 --- /dev/null +++ b/community/common/src/main/java/org/neo4j/kernel/impl/util/JobSchedulerAdapter.java @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.impl.util; + +import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +public class JobSchedulerAdapter implements JobScheduler +{ + @Override + public void init() throws Throwable + { // no-op + } + + @Override + public void start() throws Throwable + { // no-op + } + + @Override + public void stop() throws Throwable + { // no-op + } + + @Override + public void shutdown() throws Throwable + { // no-op + } + + @Override + public Executor executor( Group group ) + { + return null; + } + + @Override + public ThreadFactory threadFactory( Group group ) + { + return null; + } + + @Override + public JobHandle schedule( Group group, Runnable job ) + { + return null; + } + + @Override + public JobHandle schedule( Group group, Runnable job, Map metadata ) + { + return null; + } + + @Override + public JobHandle schedule( Group group, Runnable runnable, long initialDelay, + TimeUnit timeUnit ) + { + return null; + } + + @Override + public JobHandle scheduleRecurring( Group group, Runnable runnable, long period, + TimeUnit timeUnit ) + { + return null; + } + + @Override + public JobHandle scheduleRecurring( Group group, Runnable runnable, long initialDelay, + long period, TimeUnit timeUnit ) + { + return null; + } +} diff --git a/community/index/src/main/java/org/neo4j/index/internal/gbptree/GroupingRecoveryCleanupWorkCollector.java b/community/index/src/main/java/org/neo4j/index/internal/gbptree/GroupingRecoveryCleanupWorkCollector.java index fdbd3c652649d..9f795e2eebf5c 100644 --- a/community/index/src/main/java/org/neo4j/index/internal/gbptree/GroupingRecoveryCleanupWorkCollector.java +++ b/community/index/src/main/java/org/neo4j/index/internal/gbptree/GroupingRecoveryCleanupWorkCollector.java @@ -19,8 +19,12 @@ */ package org.neo4j.index.internal.gbptree; -import java.util.LinkedList; import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.neo4j.kernel.impl.util.JobScheduler; + +import static org.neo4j.kernel.impl.util.JobScheduler.Groups.recoveryCleanup; /** * Collects recovery cleanup work to be performed and {@link #run() runs} them all as one job, @@ -30,29 +34,44 @@ */ public class GroupingRecoveryCleanupWorkCollector implements RecoveryCleanupWorkCollector { - private final Queue jobs = new LinkedList<>(); + private final Queue jobs; + private final JobScheduler jobScheduler; - /** - * Separate add phase from run phase - */ - private volatile boolean started; + public GroupingRecoveryCleanupWorkCollector( JobScheduler jobScheduler ) + { + this.jobScheduler = jobScheduler; + this.jobs = new LinkedBlockingQueue<>(); + } + + @Override + public void init() throws Throwable + { + jobs.clear(); + } @Override - public synchronized void add( CleanupJob job ) + public void add( CleanupJob job ) { - assert !started : "Tried to add cleanup job after started"; jobs.add( job ); } @Override - public synchronized void run() + public void start() throws Throwable { - assert !started : "Tried to start cleanup job more than once"; - started = true; CleanupJob job; while ( (job = jobs.poll()) != null ) { - job.run(); + jobScheduler.schedule( recoveryCleanup, job ); } } + + @Override + public void stop() throws Throwable + { // no-op + } + + @Override + public void shutdown() throws Throwable + { // no-op + } } diff --git a/community/index/src/main/java/org/neo4j/index/internal/gbptree/RecoveryCleanupWorkCollector.java b/community/index/src/main/java/org/neo4j/index/internal/gbptree/RecoveryCleanupWorkCollector.java index fbaa5ea37c036..771d459e3f011 100644 --- a/community/index/src/main/java/org/neo4j/index/internal/gbptree/RecoveryCleanupWorkCollector.java +++ b/community/index/src/main/java/org/neo4j/index/internal/gbptree/RecoveryCleanupWorkCollector.java @@ -19,18 +19,18 @@ */ package org.neo4j.index.internal.gbptree; +import org.neo4j.kernel.lifecycle.Lifecycle; +import org.neo4j.kernel.lifecycle.LifecycleAdapter; + /** * Place to add recovery cleanup work to be done as part of recovery of {@link GBPTree}. *

- * Lifecycle has two phases: Add phase and run phase. - *

- * During add phase, jobs are added, potentially from different - * threads. From system perspective this happens during startup of database as part of {@code life.init()} in - * {@code NeoStoreDataSource} when indexes are started. + * {@link Lifecycle#init()} must prepare implementing class to be reused, probably by cleaning current state. After + * this, implementing class must be ready to receive new jobs through {@link #add(CleanupJob)}. *

- * Run phase is triggered as part of {@code life.start()} in {@code NeoStoreDataSource}. + * Jobs may be processed during {@link #add(CleanupJob) add} or {@link Lifecycle#start() start}. */ -public interface RecoveryCleanupWorkCollector extends Runnable +public interface RecoveryCleanupWorkCollector extends Lifecycle { /** * Adds {@link CleanupJob} to this collector. @@ -43,17 +43,14 @@ public interface RecoveryCleanupWorkCollector extends Runnable * {@link CleanupJob#run() Runs} {@link #add(CleanupJob) added} cleanup jobs right away in the thread * calling {@link #add(CleanupJob)}. */ - RecoveryCleanupWorkCollector IMMEDIATE = new RecoveryCleanupWorkCollector() + RecoveryCleanupWorkCollector IMMEDIATE = new ImmediateRecoveryCleanupWorkCollector(); + + class ImmediateRecoveryCleanupWorkCollector extends LifecycleAdapter implements RecoveryCleanupWorkCollector { @Override public void add( CleanupJob job ) { job.run(); } - - @Override - public void run() - { // no-op - } - }; + } } diff --git a/community/index/src/test/java/org/neo4j/index/internal/gbptree/GBPTreeTest.java b/community/index/src/test/java/org/neo4j/index/internal/gbptree/GBPTreeTest.java index c9a540998f1db..76341bf414776 100644 --- a/community/index/src/test/java/org/neo4j/index/internal/gbptree/GBPTreeTest.java +++ b/community/index/src/test/java/org/neo4j/index/internal/gbptree/GBPTreeTest.java @@ -32,7 +32,9 @@ import java.nio.file.OpenOption; import java.util.ArrayList; import java.util.Arrays; +import java.util.LinkedList; import java.util.List; +import java.util.Queue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -57,6 +59,7 @@ import org.neo4j.io.pagecache.PageCache; import org.neo4j.io.pagecache.PageCursor; import org.neo4j.io.pagecache.PagedFile; +import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.test.Barrier; import org.neo4j.test.rule.PageCacheRule; import org.neo4j.test.rule.RandomRule; @@ -295,7 +298,7 @@ public void shouldFailWhenTryingToRemapWithPageSizeLargerThanCachePageSize() thr // Good } - try ( GBPTree ignored = index() + try ( GBPTree ignored = index() .withPageCachePageSize( pageCachePageSize / 2 ) .withIndexPageSize( pageCachePageSize ) .build() ) @@ -322,10 +325,10 @@ public void shouldRemapFileIfMappedWithPageSizeLargerThanCreationSize() throws E try ( GBPTree index = index() .withPageCachePageSize( pageSize ) .withIndexPageSize( pageSize / 2 ) - .build() ) + .build() ) { // Insert some data - try ( Writer writer = index.writer() ) + try ( Writer writer = index.writer() ) { MutableLong key = new MutableLong(); MutableLong value = new MutableLong(); @@ -345,7 +348,7 @@ public void shouldRemapFileIfMappedWithPageSizeLargerThanCreationSize() throws E { MutableLong fromInclusive = new MutableLong( 0L ); MutableLong toExclusive = new MutableLong( 200L ); - try ( RawCursor, IOException> seek = index.seek( fromInclusive, toExclusive ) ) + try ( RawCursor,IOException> seek = index.seek( fromInclusive, toExclusive ) ) { int i = 0; while ( seek.next() ) @@ -456,11 +459,11 @@ public void failureDuringInitializeWriterShouldNotFailNextInitialize() throws Ex IOException no = new IOException( "No" ); AtomicBoolean throwOnNextIO = new AtomicBoolean(); PageCache controlledPageCache = pageCacheThatThrowExceptionWhenToldTo( no, throwOnNextIO ); - try ( GBPTree index = index().with( controlledPageCache ).build() ) + try ( GBPTree index = index().with( controlledPageCache ).build() ) { // WHEN assert throwOnNextIO.compareAndSet( false, true ); - try ( Writer ignored = index.writer() ) + try ( Writer ignored = index.writer() ) { fail( "Expected to throw" ); } @@ -470,7 +473,7 @@ public void failureDuringInitializeWriterShouldNotFailNextInitialize() throws Ex } // THEN - try ( Writer writer = index.writer() ) + try ( Writer writer = index.writer() ) { writer.put( new MutableLong( 1 ), new MutableLong( 1 ) ); } @@ -557,7 +560,8 @@ public void shouldReplaceHeaderDataInNextCheckPoint() throws Exception verifyHeaderDataAfterClose( beforeClose ); } - private void verifyHeaderDataAfterClose( BiConsumer,byte[]> beforeClose ) throws IOException + private void verifyHeaderDataAfterClose( BiConsumer,byte[]> beforeClose ) + throws IOException { byte[] expectedHeader = new byte[12]; ThreadLocalRandom.current().nextBytes( expectedHeader ); @@ -744,13 +748,13 @@ public void cleanJobShouldLockOutCheckpoint() throws Exception index.writer().close(); } - RecoveryCleanupWorkCollector cleanupWork = new GroupingRecoveryCleanupWorkCollector(); + RecoveryCleanupWorkCollector cleanupWork = new ControlledRecoveryCleanupWorkCollector(); CleanJobControlledMonitor monitor = new CleanJobControlledMonitor(); try ( GBPTree index = index().with( monitor ).with( cleanupWork ).build() ) { // WHEN // Cleanup not finished - Future cleanup = executor.submit( throwing( cleanupWork::run ) ); + Future cleanup = executor.submit( throwing( cleanupWork::start ) ); monitor.barrier.awaitUninterruptibly(); index.writer().close(); @@ -774,13 +778,13 @@ public void cleanJobShouldLockOutClose() throws Exception index.writer().close(); } - RecoveryCleanupWorkCollector cleanupWork = new GroupingRecoveryCleanupWorkCollector(); + RecoveryCleanupWorkCollector cleanupWork = new ControlledRecoveryCleanupWorkCollector(); CleanJobControlledMonitor monitor = new CleanJobControlledMonitor(); GBPTree index = index().with( monitor ).with( cleanupWork ).build(); // WHEN // Cleanup not finished - Future cleanup = executor.submit( throwing( cleanupWork::run ) ); + Future cleanup = executor.submit( throwing( cleanupWork::start ) ); monitor.barrier.awaitUninterruptibly(); // THEN @@ -802,13 +806,13 @@ public void cleanJobShouldNotLockOutWriter() throws Exception index.writer().close(); } - RecoveryCleanupWorkCollector cleanupWork = new GroupingRecoveryCleanupWorkCollector(); + RecoveryCleanupWorkCollector cleanupWork = new ControlledRecoveryCleanupWorkCollector(); CleanJobControlledMonitor monitor = new CleanJobControlledMonitor(); try ( GBPTree index = index().with( monitor ).with( cleanupWork ).build() ) { // WHEN // Cleanup not finished - Future cleanup = executor.submit( throwing( cleanupWork::run ) ); + Future cleanup = executor.submit( throwing( cleanupWork::start ) ); monitor.barrier.awaitUninterruptibly(); // THEN @@ -830,14 +834,14 @@ public void writerShouldNotLockOutCleanJob() throws Exception index.writer().close(); } - RecoveryCleanupWorkCollector cleanupWork = new GroupingRecoveryCleanupWorkCollector(); + RecoveryCleanupWorkCollector cleanupWork = new ControlledRecoveryCleanupWorkCollector(); try ( GBPTree index = index().with( cleanupWork ).build() ) { // WHEN try ( Writer writer = index.writer() ) { // THEN - Future cleanup = executor.submit( throwing( cleanupWork::run ) ); + Future cleanup = executor.submit( throwing( cleanupWork::start ) ); // Move writer to let cleaner pass writer.put( new MutableLong( 1 ), new MutableLong( 1 ) ); cleanup.get(); @@ -1268,13 +1272,35 @@ public void mustRetryCloseIfFailure() throws Exception AtomicBoolean throwOnNext = new AtomicBoolean(); IOException exception = new IOException( "My failure" ); PageCache pageCache = pageCacheThatThrowExceptionWhenToldTo( exception, throwOnNext ); - try ( GBPTree index = index().with( pageCache ).build() ) + try ( GBPTree ignored = index().with( pageCache ).build() ) { // WHEN throwOnNext.set( true ); } } + private class ControlledRecoveryCleanupWorkCollector extends LifecycleAdapter + implements RecoveryCleanupWorkCollector + { + Queue jobs = new LinkedList<>(); + + @Override + public void start() throws Throwable + { + CleanupJob job; + while ( (job = jobs.poll()) != null ) + { + job.run(); + } + } + + @Override + public void add( CleanupJob job ) + { + jobs.add( job ); + } + } + private PageCache pageCacheThatThrowExceptionWhenToldTo( final IOException e, final AtomicBoolean throwOnNextIO ) { return new DelegatingPageCache( createPageCache( 256 ) ) diff --git a/community/index/src/test/java/org/neo4j/index/internal/gbptree/GroupingRecoveryCleanupWorkCollectorTest.java b/community/index/src/test/java/org/neo4j/index/internal/gbptree/GroupingRecoveryCleanupWorkCollectorTest.java new file mode 100644 index 0000000000000..c66f225c1b00c --- /dev/null +++ b/community/index/src/test/java/org/neo4j/index/internal/gbptree/GroupingRecoveryCleanupWorkCollectorTest.java @@ -0,0 +1,188 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.index.internal.gbptree; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; + +import org.neo4j.kernel.impl.util.JobSchedulerAdapter; + +import static org.junit.Assert.assertTrue; + +public class GroupingRecoveryCleanupWorkCollectorTest +{ + private final RegisteringJobScheduler jobScheduler = new RegisteringJobScheduler(); + private final GroupingRecoveryCleanupWorkCollector collector = + new GroupingRecoveryCleanupWorkCollector( jobScheduler ); + + @Test + public void mustNotScheduleAnyJobsBeforeStart() throws Throwable + { + // given + List expectedJobs = someJobs(); + + // when + collector.init(); + addAll( expectedJobs ); + + // then + assertTrue( jobScheduler.registeredJobs.isEmpty() ); + } + + @Test + public void mustScheduleAllJobs() throws Throwable + { + // given + List expectedJobs = someJobs(); + + // when + collector.init(); + addAll( expectedJobs ); + collector.start(); + + // then + assertSame( expectedJobs, jobScheduler.registeredJobs ); + } + + @Test + public void mustNotScheduleOldJobsAfterRestart() throws Throwable + { + // given + List someJobs = someJobs(); + + // when + collector.init(); + addAll( someJobs ); + collector.init(); + collector.start(); + + // then + assertTrue( jobScheduler.registeredJobs.isEmpty() ); + } + + @Test + public void mustNotScheduleOldJobsOnMultipleStart() throws Throwable + { + // given + List expectedJobs = someJobs(); + + // when + collector.init(); + addAll( expectedJobs ); + collector.start(); + collector.start(); + + // then + assertSame( expectedJobs, jobScheduler.registeredJobs ); + } + + @Test + public void mustNotScheduleOldJobsOnStartStopStart() throws Throwable + { + // given + List expectedJobs = someJobs(); + + // when + collector.init(); + addAll( expectedJobs ); + collector.start(); + collector.stop(); + collector.start(); + + // then + assertSame( expectedJobs, jobScheduler.registeredJobs ); + } + + private void addAll( Collection jobs ) + { + jobs.forEach( collector::add ); + } + + private void assertSame( List someJobs, List actual ) + { + assertTrue( actual.containsAll( someJobs ) ); + assertTrue( someJobs.containsAll( actual ) ); + } + + private List someJobs() + { + return new ArrayList<>( Arrays.asList( + new DummyJob( "A" ), + new DummyJob( "B" ), + new DummyJob( "C" ) + ) ); + } + + private class RegisteringJobScheduler extends JobSchedulerAdapter + { + List registeredJobs = new LinkedList<>(); + + @Override + public JobHandle schedule( Group group, Runnable job ) + { + registeredJobs.add( job ); + return super.schedule( group, job ); + } + } + + private class DummyJob implements CleanupJob + { + private final String name; + + DummyJob( String name ) + { + this.name = name; + } + + @Override + public String toString() + { + return name; + } + + @Override + public boolean needed() + { + return false; + } + + @Override + public boolean hasFailed() + { + return false; + } + + @Override + public Exception getCause() + { + return null; + } + + @Override + public void run() + { // no-op + } + } +} \ No newline at end of file diff --git a/community/index/src/test/java/org/neo4j/index/internal/gbptree/ThrowingRunnable.java b/community/index/src/test/java/org/neo4j/index/internal/gbptree/ThrowingRunnable.java index d9269d253d73f..886a7b7758f04 100644 --- a/community/index/src/test/java/org/neo4j/index/internal/gbptree/ThrowingRunnable.java +++ b/community/index/src/test/java/org/neo4j/index/internal/gbptree/ThrowingRunnable.java @@ -21,7 +21,7 @@ interface ThrowingRunnable { - void run() throws Exception; + void run() throws Throwable; static Runnable throwing( ThrowingRunnable callable ) { @@ -31,7 +31,7 @@ static Runnable throwing( ThrowingRunnable callable ) { callable.run(); } - catch ( Exception e ) + catch ( Throwable e ) { throw new RuntimeException( e ); } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/NeoStoreDataSource.java b/community/kernel/src/main/java/org/neo4j/kernel/NeoStoreDataSource.java index 37561adb7a4a8..8f421e71bac21 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/NeoStoreDataSource.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/NeoStoreDataSource.java @@ -412,6 +412,8 @@ public void start() throws IOException dependencies = new Dependencies(); life = new LifeSupport(); + life.add( recoveryCleanupWorkCollector ); + schemaIndexProvider = dependencyResolver.resolveDependency( SchemaIndexProvider.class, HighestSelectionStrategy.getInstance() ); @@ -460,8 +462,8 @@ public void start() throws IOException monitors.newMonitor( Recovery.Monitor.class ), monitors.newMonitor( PositionToRecoverFrom.Monitor.class ), transactionLogModule.logFiles(), startupStatistics, - storageEngine, logEntryReader, transactionLogModule.logicalTransactionStore(), - recoveryCleanupWorkCollector, scheduler ); + storageEngine, logEntryReader, transactionLogModule.logicalTransactionStore() + ); // At the time of writing this comes from the storage engine (IndexStoreView) PropertyAccessor propertyAccessor = dependencies.resolveDependency( PropertyAccessor.class ); @@ -692,9 +694,7 @@ private void buildRecovery( final StartupStatisticsProvider startupStatistics, StorageEngine storageEngine, LogEntryReader logEntryReader, - LogicalTransactionStore logicalTransactionStore, - RecoveryCleanupWorkCollector recoveryCleanupWorkCollector, - JobScheduler scheduler ) + LogicalTransactionStore logicalTransactionStore ) { final LatestCheckPointFinder checkPointFinder = new LatestCheckPointFinder( logFiles, fileSystemAbstraction, logEntryReader ); @@ -711,21 +711,6 @@ public void recoveryCompleted( int numberOfRecoveredTransactions ) } } ); life.add( recovery ); - - life.add( cleanupAfterRecovery( scheduler, recoveryCleanupWorkCollector ) ); - } - - private LifecycleAdapter cleanupAfterRecovery( JobScheduler jobScheduler, - final RecoveryCleanupWorkCollector recoveryCleanupWorkCollector ) - { - return new LifecycleAdapter() - { - @Override - public void start() throws Throwable - { - jobScheduler.schedule( JobScheduler.Groups.recoveryCleanup, recoveryCleanupWorkCollector ); - } - }; } private NeoStoreKernelModule buildKernel( TransactionAppender appender, diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/factory/PlatformModule.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/factory/PlatformModule.java index 56997bab92c0a..c587165730ff3 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/factory/PlatformModule.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/factory/PlatformModule.java @@ -153,7 +153,7 @@ public PlatformModule( File providedStoreDir, Config config, DatabaseInfo databa jobScheduler = life.add( dependencies.satisfyDependency( createJobScheduler() ) ); // Cleanup after recovery, used by GBPTree, added to life in CommunityEditionModule - recoveryCleanupWorkCollector = new GroupingRecoveryCleanupWorkCollector(); + recoveryCleanupWorkCollector = new GroupingRecoveryCleanupWorkCollector( jobScheduler ); dependencies.satisfyDependency( recoveryCleanupWorkCollector ); // Database system information, used by UDC diff --git a/community/neo4j/src/test/java/recovery/RecoveryCleanupIT.java b/community/neo4j/src/test/java/recovery/RecoveryCleanupIT.java index 365904a3caeca..30db19c411b63 100644 --- a/community/neo4j/src/test/java/recovery/RecoveryCleanupIT.java +++ b/community/neo4j/src/test/java/recovery/RecoveryCleanupIT.java @@ -176,7 +176,8 @@ private DatabaseHealth databaseHealth( GraphDatabaseService db ) private CheckPointer checkPointer( GraphDatabaseService db ) { DependencyResolver dependencyResolver = dependencyResolver( db ); - return dependencyResolver.resolveDependency( NeoStoreDataSource.class ).getDependencyResolver().resolveDependency( CheckPointer.class ); + return dependencyResolver.resolveDependency( NeoStoreDataSource.class ).getDependencyResolver() + .resolveDependency( CheckPointer.class ); } private DependencyResolver dependencyResolver( GraphDatabaseService db ) @@ -184,7 +185,7 @@ private DependencyResolver dependencyResolver( GraphDatabaseService db ) return ((GraphDatabaseAPI) db).getDependencyResolver(); } - private class RecoveryBarrierMonitor implements LabelScanStore.Monitor + private class RecoveryBarrierMonitor extends LabelScanStore.Monitor.Adaptor { private final Barrier.Control barrier;