Skip to content

Commit

Permalink
CrashGenerationCleaner use fewer threads when possible
Browse files Browse the repository at this point in the history
Use a more dynamic thread count and batch size that adapts to the number of pages
in index. By doing this we avoid creating a large number when most of them will have no work to do.
Condition is that each thread should have at least one batch to work on.
  • Loading branch information
burqen committed Aug 23, 2018
1 parent 9a922aa commit b146a8a
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 6 deletions.
Expand Up @@ -41,12 +41,14 @@
*/ */
class CrashGenerationCleaner class CrashGenerationCleaner
{ {
static final long MIN_BATCH_SIZE = 10;
static final long MAX_BATCH_SIZE = 1000;
private final PagedFile pagedFile; private final PagedFile pagedFile;
private final TreeNode<?,?> treeNode; private final TreeNode<?,?> treeNode;
private final long lowTreeNodeId; private final long lowTreeNodeId;
private final long highTreeNodeId; private final long highTreeNodeId;
private final int availableProcessors; private int threads;
private final long batchSize; private long batchSize;
private final long stableGeneration; private final long stableGeneration;
private final long unstableGeneration; private final long unstableGeneration;
private final Monitor monitor; private final Monitor monitor;
Expand All @@ -59,13 +61,25 @@ class CrashGenerationCleaner
this.treeNode = treeNode; this.treeNode = treeNode;
this.lowTreeNodeId = lowTreeNodeId; this.lowTreeNodeId = lowTreeNodeId;
this.highTreeNodeId = highTreeNodeId; this.highTreeNodeId = highTreeNodeId;
this.availableProcessors = Runtime.getRuntime().availableProcessors();
this.batchSize = // Each processor will get roughly 100 batches each
min( 1000, max( 10, (highTreeNodeId - lowTreeNodeId) / (100 * availableProcessors) ) );
this.stableGeneration = stableGeneration; this.stableGeneration = stableGeneration;
this.unstableGeneration = unstableGeneration; this.unstableGeneration = unstableGeneration;
this.monitor = monitor; this.monitor = monitor;
this.internalMaxKeyCount = treeNode.internalMaxKeyCount(); this.internalMaxKeyCount = treeNode.internalMaxKeyCount();
long pagesToClean = highTreeNodeId - lowTreeNodeId;
this.threads = threads( pagesToClean, Runtime.getRuntime().availableProcessors() );
this.batchSize = batchSize( pagesToClean, threads );
}

static int threads( long pagesToClean, long availableProcessors )
{
// Thread count at most equal to availableProcessors, at least one and each thread should have at least one batch of work
return (int) min( availableProcessors, max( 1, pagesToClean / MIN_BATCH_SIZE ) );
}

static long batchSize( long pagesToClean, int threads )
{
// Batch size at most maxBatchSize, at least minBatchSize and trying to give each thread 100 batches each
return min( MAX_BATCH_SIZE, max( MIN_BATCH_SIZE, pagesToClean / (100 * threads) ) );
} }


// === Methods about the execution and threading === // === Methods about the execution and threading ===
Expand All @@ -77,7 +91,6 @@ public void clean() throws IOException
assert unstableGeneration - stableGeneration > 1 : unexpectedGenerations(); assert unstableGeneration - stableGeneration > 1 : unexpectedGenerations();


long startTime = currentTimeMillis(); long startTime = currentTimeMillis();
int threads = availableProcessors;
ExecutorService executor = Executors.newFixedThreadPool( threads ); ExecutorService executor = Executors.newFixedThreadPool( threads );
AtomicLong nextId = new AtomicLong( lowTreeNodeId ); AtomicLong nextId = new AtomicLong( lowTreeNodeId );
AtomicReference<Throwable> error = new AtomicReference<>(); AtomicReference<Throwable> error = new AtomicReference<>();
Expand Down
@@ -0,0 +1,65 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.index.internal.gbptree;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.Arrays;
import java.util.List;

import static org.junit.Assert.assertTrue;

@RunWith( Parameterized.class )
public class CrashGenerationCleanerConfigTest
{
@Parameterized.Parameters
public static List<Object[]> parameters()
{
return Arrays.asList(
new Object[]{1},
new Object[]{8},
new Object[]{32},
new Object[]{64},
new Object[]{128}
);
}

@Parameterized.Parameter
public int availableProcessors;

@Test
public void everyThreadShouldHaveAtLeastOneBatchToWorkOn()
{
long pagesToClean = 1;
long multiplier = 5;

int threads = 1;
long batchSize = 1;
while ( threads < availableProcessors || batchSize < CrashGenerationCleaner.MAX_BATCH_SIZE )
{
threads = CrashGenerationCleaner.threads( pagesToClean, availableProcessors );
batchSize = CrashGenerationCleaner.batchSize( pagesToClean, threads );
assertTrue( "at least one batch per thread", (pagesToClean + batchSize) / batchSize >= threads );
pagesToClean *= multiplier;
}
}
}

0 comments on commit b146a8a

Please sign in to comment.