Skip to content

Commit

Permalink
Make the page cache harness tests spend less time on creating threads.
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisvest committed Jun 14, 2018
1 parent f35bc78 commit 93061e8
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 31 deletions.
Expand Up @@ -20,21 +20,24 @@
package org.neo4j.io.pagecache.randomharness;

import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;

class PlanRunner implements Callable<Void>
{
private final Plan plan;
private final AtomicBoolean stopSignal;

PlanRunner( Plan plan )
PlanRunner( Plan plan, AtomicBoolean stopSignal )
{
this.plan = plan;
this.stopSignal = stopSignal;
}

@Override
public Void call() throws Exception
{
Action action = plan.next();
while ( action != null )
while ( action != null && !stopSignal.get() )
{
try
{
Expand Down
Expand Up @@ -31,9 +31,13 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import org.neo4j.adversaries.RandomAdversary;
import org.neo4j.adversaries.fs.AdversarialFileSystemAbstraction;
Expand All @@ -58,11 +62,21 @@
* records don't end up in the wrong files. The harness can also execute separate preparation and verification steps,
* before and after executing the planned test respectively, and it can integrate with the adversarial file system
* for fault injection, and arbitrary PageCacheTracers.
*
* <p>
* See {@link LinearHistoryPageCacheTracerTest} for an example of how to configure and use the harness.
*/
public class RandomPageCacheTestHarness implements Closeable
{
private static final ThreadFactory THREAD_FACTORY = r ->
{
Thread thread = Executors.defaultThreadFactory().newThread( r );
thread.setDaemon( true );
return thread;
};

private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
0, Integer.MAX_VALUE, 1, TimeUnit.SECONDS, new SynchronousQueue<>(), THREAD_FACTORY );

private double mischiefRate;
private double failureRate;
private double errorRate;
Expand Down Expand Up @@ -127,7 +141,7 @@ public void disableCommands( Command... commands )
* Set the probability factor of the given command. The default value is given by
* {@link Command#getDefaultProbabilityFactor()}. The effective probability is computed from the relative
* difference in probability factors between all the commands.
*
* <p>
* Setting the probability factor to zero will disable that command.
*/
public void setCommandProbabilityFactor( Command command, double probabilityFactor )
Expand All @@ -139,7 +153,7 @@ public void setCommandProbabilityFactor( Command command, double probabilityFact
/**
* Set to "true" to execute the plans with fault injection from the {@link AdversarialFileSystemAbstraction}, or
* set to "false" to disable this feature.
*
* <p>
* The default is "true".
*/
public void setUseAdversarialIO( boolean useAdversarialIO )
Expand Down Expand Up @@ -201,7 +215,7 @@ public void setConcurrencyLevel( int concurrencyLevel )
* Set the number of files that should be mapped from the start of the plan. If you have set the probability of
* the {@link Command#MapFile} command to zero, then you must have a positive number of initial mapped files.
* Otherwise there will be no files to plan any work for.
*
* <p>
* The default value is 2.
*/
public void setInitialMappedFiles( int initialMappedFiles )
Expand Down Expand Up @@ -241,7 +255,7 @@ public void setCommandCount( int commandCount )
* Set the preparation phase to use. This phase is executed before all the planned commands. It can be used to
* prepare some file contents, or reset some external state, such as the
* {@link LinearTracers}.
*
* <p>
* The preparation phase is executed before each iteration.
*/
public void setPreparation( Phase preparation )
Expand All @@ -252,7 +266,7 @@ public void setPreparation( Phase preparation )
/**
* Set the verification phase to use. This phase is executed after all the planned commands have executed
* completely, and can be used to verify the consistency of the data, or some other invariant.
*
* <p>
* The verification phase is executed after each iteration.
*/
public void setVerification( Phase verification )
Expand All @@ -270,7 +284,7 @@ public void setRecordFormat( RecordFormat recordFormat )

/**
* Set and fix the random seed to the given value. All iterations run through this harness will then use that seed.
*
* <p>
* If the random seed has not been configured, then each iteration will use a new seed.
*/
public void setRandomSeed( long randomSeed )
Expand Down Expand Up @@ -318,9 +332,9 @@ public void describePreviousRun( PrintStream out )

/**
* Run a single iteration with the current harness configuration.
*
* <p>
* This will either complete within the given timeout, or throw an exception.
*
* <p>
* If the run fails, then a description will be printed to System.err.
*/
public void run( long iterationTimeout, TimeUnit unit ) throws Exception
Expand All @@ -330,13 +344,13 @@ public void run( long iterationTimeout, TimeUnit unit ) throws Exception

/**
* Run the given number of iterations with the given harness configuration.
*
* <p>
* If the random seed has been set to a specific value, then all iterations will use that seed. Otherwise each
* iteration will use a new seed.
*
* <p>
* The given timeout applies to the individual iteration, not to their combined run. This is effectively similar
* to calling {@link #run(long, TimeUnit)} the given number of times.
*
* <p>
* The run will stop at the first failure, if any, and print a description of it to System.err.
*/
public void run( int iterations, long iterationTimeout, TimeUnit unit ) throws Exception
Expand Down Expand Up @@ -397,12 +411,12 @@ private void runIteration( long timeout, TimeUnit unit ) throws Exception

plan = plan( cache, files, fileMap );

Callable<Void> planRunner = new PlanRunner( plan );
AtomicBoolean stopSignal = new AtomicBoolean();
Callable<Void> planRunner = new PlanRunner( plan, stopSignal );
Future<Void>[] futures = new Future[concurrencyLevel];
ExecutorService executor = Executors.newFixedThreadPool( concurrencyLevel );
for ( int i = 0; i < concurrencyLevel; i++ )
{
futures[i] = executor.submit( planRunner );
futures[i] = EXECUTOR_SERVICE.submit( planRunner );
}

if ( preparation != null )
Expand Down Expand Up @@ -432,28 +446,45 @@ private void runIteration( long timeout, TimeUnit unit ) throws Exception
}
finally
{
stopSignal.set( true );
adversary.setProbabilityFactor( 0.0 );
for ( Future<Void> future : futures )
try
{
future.cancel( true );
for ( Future<Void> future : futures )
{
future.get( 10, TimeUnit.SECONDS );
}
}
executor.shutdown();
now = System.currentTimeMillis();
executor.awaitTermination( deadlineMillis - now, TimeUnit.MILLISECONDS );
plan.close();
cache.close();

if ( this.fs instanceof EphemeralFileSystemAbstraction )
catch ( InterruptedException | TimeoutException e )
{
this.fs.close();
this.fs = new EphemeralFileSystemAbstraction();
for ( Future<Void> future : futures )
{
future.cancel( true );
}
e.printStackTrace();
}
else

try
{
for ( File file : files )
plan.close();
cache.close();

if ( this.fs instanceof EphemeralFileSystemAbstraction )
{
file.delete();
this.fs.close();
this.fs = new EphemeralFileSystemAbstraction();
}
else
{
for ( File file : files )
{
file.delete();
}
}
}
catch ( IOException e )
{
e.printStackTrace();
}
}
}
Expand Down

0 comments on commit 93061e8

Please sign in to comment.