Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replace ForkJoinPool with fixed thread pool #4552

Merged
merged 3 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -47,9 +48,11 @@
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
Expand Down Expand Up @@ -1902,54 +1905,58 @@ private void indexParallel(String dir, IndexDownArgs args) {
IndexerParallelizer parallelizer = RuntimeEnvironment.getInstance().getIndexerParallelizer();
ObjectPool<Ctags> ctagsPool = parallelizer.getCtagsPool();

Map<Boolean, List<IndexFileWork>> bySuccess = null;
Map<Boolean, List<IndexFileWork>> bySuccess = new HashMap<>();
try (Progress progress = new Progress(LOGGER, String.format("indexing '%s'", dir), worksCount)) {
bySuccess = parallelizer.getForkJoinPool().submit(() ->
args.works.parallelStream().collect(
Collectors.groupingByConcurrent((x) -> {
int tries = 0;
Ctags pctags = null;
boolean ret;
while (true) {
try {
if (alreadyClosedCounter.get() > 0) {
ret = false;
} else {
pctags = ctagsPool.get();
addFile(x.file, x.path, pctags);
successCounter.incrementAndGet();
ret = true;
Set<Callable<IndexFileWork>> callables = args.works.stream().
<Callable<IndexFileWork>>map(x -> () -> {
int tries = 0;
Ctags pctags = null;
while (true) {
try {
if (alreadyClosedCounter.get() > 0) {
x.ret = false;
} else {
pctags = ctagsPool.get();
addFile(x.file, x.path, pctags);
successCounter.incrementAndGet();
x.ret = true;
}
} catch (AlreadyClosedException e) {
alreadyClosedCounter.incrementAndGet();
String errmsg = String.format("ERROR addFile(): '%s'", x.file);
LOGGER.log(Level.SEVERE, errmsg, e);
x.exception = e;
x.ret = false;
} catch (InterruptedException e) {
// Allow one retry if interrupted
if (++tries <= 1) {
continue;
}
LOGGER.log(Level.WARNING, "No retry: ''{0}''", x.file);
x.exception = e;
x.ret = false;
} catch (RuntimeException | IOException e) {
String errmsg = String.format("ERROR addFile(): '%s'", x.file);
LOGGER.log(Level.WARNING, errmsg, e);
x.exception = e;
x.ret = false;
} finally {
if (pctags != null) {
pctags.reset();
ctagsPool.release(pctags);
}
}
} catch (AlreadyClosedException e) {
alreadyClosedCounter.incrementAndGet();
String errmsg = String.format("ERROR addFile(): '%s'", x.file);
LOGGER.log(Level.SEVERE, errmsg, e);
x.exception = e;
ret = false;
} catch (InterruptedException e) {
// Allow one retry if interrupted
if (++tries <= 1) {
continue;
}
LOGGER.log(Level.WARNING, "No retry: ''{0}''", x.file);
x.exception = e;
ret = false;
} catch (RuntimeException | IOException e) {
String errmsg = String.format("ERROR addFile(): '%s'", x.file);
LOGGER.log(Level.WARNING, errmsg, e);
x.exception = e;
ret = false;
} finally {
if (pctags != null) {
pctags.reset();
ctagsPool.release(pctags);
}
}

progress.increment();
return ret;
}
}))).get();
progress.increment();
return x;
vladak marked this conversation as resolved.
Show resolved Hide resolved
}
}).
collect(Collectors.toSet());
List<Future<IndexFileWork>> futures = parallelizer.getIndexWorkExecutor().invokeAll(callables);
for (var future : futures) {
IndexFileWork work = future.get();
bySuccess.computeIfAbsent(work.ret, key -> new ArrayList<>()).add(work);
}
} catch (InterruptedException | ExecutionException e) {
interrupted = true;
int successCount = successCounter.intValue();
Expand All @@ -1961,14 +1968,9 @@ private void indexParallel(String dir, IndexDownArgs args) {

args.curCount = currentCounter.intValue();

// Start with failureCount=worksCount, and then subtract successes.
int failureCount = worksCount;
if (bySuccess != null) {
List<IndexFileWork> successes = bySuccess.getOrDefault(Boolean.TRUE, null);
if (successes != null) {
failureCount -= successes.size();
}
}
int failureCount = worksCount - Optional.ofNullable(bySuccess.get(Boolean.TRUE))
.map(List::size)
.orElse(0);
if (failureCount > 0) {
double pctFailed = 100.0 * failureCount / worksCount;
String exmsg = String.format("%d failures (%.1f%%) while parallel-indexing", failureCount, pctFailed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class IndexFileWork {
final File file;
final String path;
Exception exception;
boolean ret;

IndexFileWork(File file, String path) {
this.file = file;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.ScheduledThreadPoolExecutor;

import org.opengrok.indexer.analysis.Ctags;
Expand All @@ -38,27 +36,25 @@
import org.opengrok.indexer.util.ObjectFactory;
import org.opengrok.indexer.util.ObjectPool;

import static java.util.concurrent.ForkJoinPool.defaultForkJoinWorkerThreadFactory;

/**
* Represents a container for executors that enable parallelism for indexing
* across projects and repositories and also within any {@link IndexDatabase}
* instance -- with global limits for all execution.
* <p>A fixed-thread pool is used for parallelism across repositories, and a
* work-stealing {@link ForkJoinPool} is used for parallelism within any
* {@link #lzIndexWorkExecutor} is used for parallelism within any
* {@link IndexDatabase}. Threads in the former pool are customers of the
* latter, and the bulk of work is done in the latter pool. The work-stealing
* {@link ForkJoinPool} makes use of a corresponding fixed pool of {@link Ctags}
* instances.
* <p>Additionally there are pools for executing for history, for renamings in
* latter, and the bulk of work is done in the latter pool.
* The {@link #lzIndexWorkExecutor} makes use of a corresponding fixed pool
* of {@link Ctags} instances.
* <p>Additionally there are pools for executing for history, for renames in
* history, and for watching the {@link Ctags} instances for timing purposes.
*/
public class IndexerParallelizer implements AutoCloseable {

private final RuntimeEnvironment env;
private final int indexingParallelism;

private LazilyInstantiate<ForkJoinPool> lzForkJoinPool;
private LazilyInstantiate<ExecutorService> lzIndexWorkExecutor;
private LazilyInstantiate<ObjectPool<Ctags>> lzCtagsPool;
private LazilyInstantiate<ExecutorService> lzFixedExecutor;
private LazilyInstantiate<ExecutorService> lzHistoryExecutor;
Expand All @@ -82,7 +78,7 @@ public IndexerParallelizer(RuntimeEnvironment env) {
*/
this.indexingParallelism = env.getIndexingParallelism();

createLazyForkJoinPool();
createIndexWorkExecutor();
createLazyCtagsPool();
createLazyFixedExecutor();
createLazyHistoryExecutor();
Expand All @@ -99,10 +95,10 @@ public ExecutorService getFixedExecutor() {
}

/**
* @return the forkJoinPool
* @return the executor used for individual file processing in the 2nd stage of indexing
*/
public ForkJoinPool getForkJoinPool() {
return lzForkJoinPool.get();
public ExecutorService getIndexWorkExecutor() {
return lzIndexWorkExecutor.get();
}

/**
Expand Down Expand Up @@ -166,7 +162,7 @@ public void close() {
* call this method satisfactorily too.
*/
public void bounce() {
bounceForkJoinPool();
bounceIndexWorkExecutor();
bounceFixedExecutor();
bounceCtagsPool();
bounceHistoryExecutor();
Expand All @@ -175,11 +171,11 @@ public void bounce() {
bounceXrefWatcherExecutor();
}

private void bounceForkJoinPool() {
if (lzForkJoinPool.isActive()) {
ForkJoinPool formerForkJoinPool = lzForkJoinPool.get();
createLazyForkJoinPool();
formerForkJoinPool.shutdown();
private void bounceIndexWorkExecutor() {
if (lzIndexWorkExecutor.isActive()) {
ExecutorService formerIndexWorkExecutor = lzIndexWorkExecutor.get();
createIndexWorkExecutor();
formerIndexWorkExecutor.shutdown();
}
}

Expand Down Expand Up @@ -231,13 +227,10 @@ private void bounceXrefWatcherExecutor() {
}
}

private void createLazyForkJoinPool() {
lzForkJoinPool = LazilyInstantiate.using(() ->
new ForkJoinPool(indexingParallelism, forkJoinPool -> {
ForkJoinWorkerThread thread = defaultForkJoinWorkerThreadFactory.newThread(forkJoinPool);
thread.setName(OpenGrokThreadFactory.PREFIX + "ForkJoinPool-" + thread.getId());
return thread;
}, null, false));
private void createIndexWorkExecutor() {
lzIndexWorkExecutor = LazilyInstantiate.using(() ->
Executors.newFixedThreadPool(indexingParallelism,
new OpenGrokThreadFactory("index-worker")));
}

private void createLazyCtagsPool() {
Expand All @@ -261,7 +254,7 @@ private void createLazyXrefWatcherExecutor() {
private void createLazyFixedExecutor() {
lzFixedExecutor = LazilyInstantiate.using(() ->
Executors.newFixedThreadPool(indexingParallelism,
new OpenGrokThreadFactory("index-worker")));
new OpenGrokThreadFactory("index-db")));
}

private void createLazyHistoryExecutor() {
Expand Down
Loading