Skip to content

Commit

Permalink
RecoveryCleanupWorkCollector lifecycle
Browse files Browse the repository at this point in the history
RecoveryCleanupWorkCollector now extends Lifecycle.

Important details:
 - Register collector to life before any indexes are registered
   because init will clear eventual state (in case of restart) and
   registered cleanup job would be lost.
 - Responsibility of scheduling cleanup jobs is transferred
    from outside trigger (previously created as part of
    buildRecovery in NeoStoreDataSource) to collector itself.
  • Loading branch information
burqen committed May 18, 2017
1 parent ef6552b commit 99a922c
Show file tree
Hide file tree
Showing 10 changed files with 377 additions and 68 deletions.
@@ -0,0 +1,93 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.util;

import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

public class JobSchedulerAdapter implements JobScheduler
{
@Override
public void init() throws Throwable
{ // no-op
}

@Override
public void start() throws Throwable
{ // no-op
}

@Override
public void stop() throws Throwable
{ // no-op
}

@Override
public void shutdown() throws Throwable
{ // no-op
}

@Override
public Executor executor( Group group )
{
return null;
}

@Override
public ThreadFactory threadFactory( Group group )
{
return null;
}

@Override
public JobHandle schedule( Group group, Runnable job )
{
return null;
}

@Override
public JobHandle schedule( Group group, Runnable job, Map<String,String> metadata )
{
return null;
}

@Override
public JobHandle schedule( Group group, Runnable runnable, long initialDelay,
TimeUnit timeUnit )
{
return null;
}

@Override
public JobHandle scheduleRecurring( Group group, Runnable runnable, long period,
TimeUnit timeUnit )
{
return null;
}

@Override
public JobHandle scheduleRecurring( Group group, Runnable runnable, long initialDelay,
long period, TimeUnit timeUnit )
{
return null;
}
}
Expand Up @@ -19,8 +19,12 @@
*/
package org.neo4j.index.internal.gbptree;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;

import org.neo4j.kernel.impl.util.JobScheduler;

import static org.neo4j.kernel.impl.util.JobScheduler.Groups.recoveryCleanup;

/**
* Collects recovery cleanup work to be performed and {@link #run() runs} them all as one job,
Expand All @@ -30,29 +34,44 @@
*/
public class GroupingRecoveryCleanupWorkCollector implements RecoveryCleanupWorkCollector
{
private final Queue<CleanupJob> jobs = new LinkedList<>();
private final Queue<CleanupJob> jobs;
private final JobScheduler jobScheduler;

/**
* Separate add phase from run phase
*/
private volatile boolean started;
public GroupingRecoveryCleanupWorkCollector( JobScheduler jobScheduler )
{
this.jobScheduler = jobScheduler;
this.jobs = new LinkedBlockingQueue<>();
}

@Override
public void init() throws Throwable
{
jobs.clear();
}

@Override
public synchronized void add( CleanupJob job )
public void add( CleanupJob job )
{
assert !started : "Tried to add cleanup job after started";
jobs.add( job );
}

@Override
public synchronized void run()
public void start() throws Throwable
{
assert !started : "Tried to start cleanup job more than once";
started = true;
CleanupJob job;
while ( (job = jobs.poll()) != null )
{
job.run();
jobScheduler.schedule( recoveryCleanup, job );
}
}

@Override
public void stop() throws Throwable
{ // no-op
}

@Override
public void shutdown() throws Throwable
{ // no-op
}
}
Expand Up @@ -19,18 +19,18 @@
*/
package org.neo4j.index.internal.gbptree;

import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;

/**
* Place to add recovery cleanup work to be done as part of recovery of {@link GBPTree}.
* <p>
* Lifecycle has two phases: Add phase and run phase.
* <p>
* During add phase, jobs are added, potentially from different
* threads. From system perspective this happens during startup of database as part of {@code life.init()} in
* {@code NeoStoreDataSource} when indexes are started.
* {@link Lifecycle#init()} must prepare implementing class to be reused, probably by cleaning current state. After
* this, implementing class must be ready to receive new jobs through {@link #add(CleanupJob)}.
* <p>
* Run phase is triggered as part of {@code life.start()} in {@code NeoStoreDataSource}.
* Jobs may be processed during {@link #add(CleanupJob) add} or {@link Lifecycle#start() start}.
*/
public interface RecoveryCleanupWorkCollector extends Runnable
public interface RecoveryCleanupWorkCollector extends Lifecycle
{
/**
* Adds {@link CleanupJob} to this collector.
Expand All @@ -43,17 +43,14 @@ public interface RecoveryCleanupWorkCollector extends Runnable
* {@link CleanupJob#run() Runs} {@link #add(CleanupJob) added} cleanup jobs right away in the thread
* calling {@link #add(CleanupJob)}.
*/
RecoveryCleanupWorkCollector IMMEDIATE = new RecoveryCleanupWorkCollector()
RecoveryCleanupWorkCollector IMMEDIATE = new ImmediateRecoveryCleanupWorkCollector();

class ImmediateRecoveryCleanupWorkCollector extends LifecycleAdapter implements RecoveryCleanupWorkCollector
{
@Override
public void add( CleanupJob job )
{
job.run();
}

@Override
public void run()
{ // no-op
}
};
}
}
Expand Up @@ -32,7 +32,9 @@
import java.nio.file.OpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -57,6 +59,7 @@
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.io.pagecache.PagedFile;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.test.Barrier;
import org.neo4j.test.rule.PageCacheRule;
import org.neo4j.test.rule.RandomRule;
Expand Down Expand Up @@ -295,7 +298,7 @@ public void shouldFailWhenTryingToRemapWithPageSizeLargerThanCachePageSize() thr
// Good
}

try ( GBPTree<MutableLong, MutableLong> ignored = index()
try ( GBPTree<MutableLong,MutableLong> ignored = index()
.withPageCachePageSize( pageCachePageSize / 2 )
.withIndexPageSize( pageCachePageSize )
.build() )
Expand All @@ -322,10 +325,10 @@ public void shouldRemapFileIfMappedWithPageSizeLargerThanCreationSize() throws E
try ( GBPTree<MutableLong,MutableLong> index = index()
.withPageCachePageSize( pageSize )
.withIndexPageSize( pageSize / 2 )
.build() )
.build() )
{
// Insert some data
try ( Writer<MutableLong, MutableLong> writer = index.writer() )
try ( Writer<MutableLong,MutableLong> writer = index.writer() )
{
MutableLong key = new MutableLong();
MutableLong value = new MutableLong();
Expand All @@ -345,7 +348,7 @@ public void shouldRemapFileIfMappedWithPageSizeLargerThanCreationSize() throws E
{
MutableLong fromInclusive = new MutableLong( 0L );
MutableLong toExclusive = new MutableLong( 200L );
try ( RawCursor<Hit<MutableLong,MutableLong>, IOException> seek = index.seek( fromInclusive, toExclusive ) )
try ( RawCursor<Hit<MutableLong,MutableLong>,IOException> seek = index.seek( fromInclusive, toExclusive ) )
{
int i = 0;
while ( seek.next() )
Expand Down Expand Up @@ -456,11 +459,11 @@ public void failureDuringInitializeWriterShouldNotFailNextInitialize() throws Ex
IOException no = new IOException( "No" );
AtomicBoolean throwOnNextIO = new AtomicBoolean();
PageCache controlledPageCache = pageCacheThatThrowExceptionWhenToldTo( no, throwOnNextIO );
try ( GBPTree<MutableLong, MutableLong> index = index().with( controlledPageCache ).build() )
try ( GBPTree<MutableLong,MutableLong> index = index().with( controlledPageCache ).build() )
{
// WHEN
assert throwOnNextIO.compareAndSet( false, true );
try ( Writer<MutableLong, MutableLong> ignored = index.writer() )
try ( Writer<MutableLong,MutableLong> ignored = index.writer() )
{
fail( "Expected to throw" );
}
Expand All @@ -470,7 +473,7 @@ public void failureDuringInitializeWriterShouldNotFailNextInitialize() throws Ex
}

// THEN
try ( Writer<MutableLong, MutableLong> writer = index.writer() )
try ( Writer<MutableLong,MutableLong> writer = index.writer() )
{
writer.put( new MutableLong( 1 ), new MutableLong( 1 ) );
}
Expand Down Expand Up @@ -557,7 +560,8 @@ public void shouldReplaceHeaderDataInNextCheckPoint() throws Exception
verifyHeaderDataAfterClose( beforeClose );
}

private void verifyHeaderDataAfterClose( BiConsumer<GBPTree<MutableLong,MutableLong>,byte[]> beforeClose ) throws IOException
private void verifyHeaderDataAfterClose( BiConsumer<GBPTree<MutableLong,MutableLong>,byte[]> beforeClose )
throws IOException
{
byte[] expectedHeader = new byte[12];
ThreadLocalRandom.current().nextBytes( expectedHeader );
Expand Down Expand Up @@ -744,13 +748,13 @@ public void cleanJobShouldLockOutCheckpoint() throws Exception
index.writer().close();
}

RecoveryCleanupWorkCollector cleanupWork = new GroupingRecoveryCleanupWorkCollector();
RecoveryCleanupWorkCollector cleanupWork = new ControlledRecoveryCleanupWorkCollector();
CleanJobControlledMonitor monitor = new CleanJobControlledMonitor();
try ( GBPTree<MutableLong,MutableLong> index = index().with( monitor ).with( cleanupWork ).build() )
{
// WHEN
// Cleanup not finished
Future<?> cleanup = executor.submit( throwing( cleanupWork::run ) );
Future<?> cleanup = executor.submit( throwing( cleanupWork::start ) );
monitor.barrier.awaitUninterruptibly();
index.writer().close();

Expand All @@ -774,13 +778,13 @@ public void cleanJobShouldLockOutClose() throws Exception
index.writer().close();
}

RecoveryCleanupWorkCollector cleanupWork = new GroupingRecoveryCleanupWorkCollector();
RecoveryCleanupWorkCollector cleanupWork = new ControlledRecoveryCleanupWorkCollector();
CleanJobControlledMonitor monitor = new CleanJobControlledMonitor();
GBPTree<MutableLong,MutableLong> index = index().with( monitor ).with( cleanupWork ).build();

// WHEN
// Cleanup not finished
Future<?> cleanup = executor.submit( throwing( cleanupWork::run ) );
Future<?> cleanup = executor.submit( throwing( cleanupWork::start ) );
monitor.barrier.awaitUninterruptibly();

// THEN
Expand All @@ -802,13 +806,13 @@ public void cleanJobShouldNotLockOutWriter() throws Exception
index.writer().close();
}

RecoveryCleanupWorkCollector cleanupWork = new GroupingRecoveryCleanupWorkCollector();
RecoveryCleanupWorkCollector cleanupWork = new ControlledRecoveryCleanupWorkCollector();
CleanJobControlledMonitor monitor = new CleanJobControlledMonitor();
try ( GBPTree<MutableLong,MutableLong> index = index().with( monitor ).with( cleanupWork ).build() )
{
// WHEN
// Cleanup not finished
Future<?> cleanup = executor.submit( throwing( cleanupWork::run ) );
Future<?> cleanup = executor.submit( throwing( cleanupWork::start ) );
monitor.barrier.awaitUninterruptibly();

// THEN
Expand All @@ -830,14 +834,14 @@ public void writerShouldNotLockOutCleanJob() throws Exception
index.writer().close();
}

RecoveryCleanupWorkCollector cleanupWork = new GroupingRecoveryCleanupWorkCollector();
RecoveryCleanupWorkCollector cleanupWork = new ControlledRecoveryCleanupWorkCollector();
try ( GBPTree<MutableLong,MutableLong> index = index().with( cleanupWork ).build() )
{
// WHEN
try ( Writer<MutableLong,MutableLong> writer = index.writer() )
{
// THEN
Future<?> cleanup = executor.submit( throwing( cleanupWork::run ) );
Future<?> cleanup = executor.submit( throwing( cleanupWork::start ) );
// Move writer to let cleaner pass
writer.put( new MutableLong( 1 ), new MutableLong( 1 ) );
cleanup.get();
Expand Down Expand Up @@ -1268,13 +1272,35 @@ public void mustRetryCloseIfFailure() throws Exception
AtomicBoolean throwOnNext = new AtomicBoolean();
IOException exception = new IOException( "My failure" );
PageCache pageCache = pageCacheThatThrowExceptionWhenToldTo( exception, throwOnNext );
try ( GBPTree<MutableLong, MutableLong> index = index().with( pageCache ).build() )
try ( GBPTree<MutableLong, MutableLong> ignored = index().with( pageCache ).build() )
{
// WHEN
throwOnNext.set( true );
}
}

private class ControlledRecoveryCleanupWorkCollector extends LifecycleAdapter
implements RecoveryCleanupWorkCollector
{
Queue<CleanupJob> jobs = new LinkedList<>();

@Override
public void start() throws Throwable
{
CleanupJob job;
while ( (job = jobs.poll()) != null )
{
job.run();
}
}

@Override
public void add( CleanupJob job )
{
jobs.add( job );
}
}

private PageCache pageCacheThatThrowExceptionWhenToldTo( final IOException e, final AtomicBoolean throwOnNextIO )
{
return new DelegatingPageCache( createPageCache( 256 ) )
Expand Down

0 comments on commit 99a922c

Please sign in to comment.