Skip to content

Commit

Permalink
allow to specify which executor to use when warming up
Browse files Browse the repository at this point in the history
  • Loading branch information
kimchy committed Apr 29, 2012
1 parent 70268a6 commit b379225
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 7 deletions.
Expand Up @@ -28,6 +28,9 @@
public interface IndicesWarmer {

static interface Listener {

String executor();

void warm(ShardId shardId, IndexMetaData indexMetaData, Engine.Searcher search);
}

Expand Down
Expand Up @@ -30,23 +30,28 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
*/
public class InternalIndicesWarmer extends AbstractComponent implements IndicesWarmer {

private final ThreadPool threadPool;

private final ClusterService clusterService;

private final IndicesService indicesService;

private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<Listener>();

@Inject
public InternalIndicesWarmer(Settings settings, ClusterService clusterService, IndicesService indicesService) {
public InternalIndicesWarmer(Settings settings, ThreadPool threadPool, ClusterService clusterService, IndicesService indicesService) {
super(settings);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.indicesService = indicesService;
}
Expand All @@ -61,8 +66,8 @@ public void removeListener(Listener listener) {
listeners.remove(listener);
}

public void warm(ShardId shardId, Engine.Searcher searcher) {
IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.index().name());
public void warm(final ShardId shardId, final Engine.Searcher searcher) {
final IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.index().name());
if (indexMetaData == null) {
return;
}
Expand All @@ -82,11 +87,24 @@ public void warm(ShardId shardId, Engine.Searcher searcher) {
}
indexShard.warmerService().onPreWarm();
long time = System.nanoTime();
for (Listener listener : listeners) {
for (final Listener listener : listeners) {
final CountDownLatch latch = new CountDownLatch(1);
threadPool.executor(listener.executor()).execute(new Runnable() {
@Override
public void run() {
try {
listener.warm(shardId, indexMetaData, searcher);
} catch (Exception e) {
logger.warn("[{}][{}] failed to warm [{}]", shardId.index().name(), shardId.id(), listener);
} finally {
latch.countDown();
}
}
});
try {
listener.warm(shardId, indexMetaData, searcher);
} catch (Exception e) {
logger.warn("[{}][{}] failed to warm [{}]", shardId.index().name(), shardId.id(), listener);
latch.await();
} catch (InterruptedException e) {
return;
}
}
long took = System.nanoTime() - time;
Expand Down

0 comments on commit b379225

Please sign in to comment.