Skip to content

Commit

Permalink
Indexing Slow Log
Browse files Browse the repository at this point in the history
  • Loading branch information
kimchy committed Dec 3, 2012
1 parent 19315c6 commit f6697d0
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 4 deletions.
6 changes: 5 additions & 1 deletion config/elasticsearch.yml
Expand Up @@ -338,7 +338,6 @@

# Shard level query and fetch threshold logging.

#index.search.slowlog.level: TRACE
#index.search.slowlog.threshold.query.warn: 10s
#index.search.slowlog.threshold.query.info: 5s
#index.search.slowlog.threshold.query.debug: 2s
Expand All @@ -349,6 +348,11 @@
#index.search.slowlog.threshold.fetch.debug: 500ms
#index.search.slowlog.threshold.fetch.trace: 200ms

#index.indexing.slowlog.threshold.index.warn: 10s
#index.indexing.slowlog.threshold.index.info: 5s
#index.indexing.slowlog.threshold.index.debug: 2s
#index.indexing.slowlog.threshold.index.trace: 500ms

################################## GC Logging ################################

#monitor.jvm.gc.ParNew.warn: 1000ms
Expand Down
10 changes: 10 additions & 0 deletions config/logging.yml
Expand Up @@ -16,9 +16,11 @@ logger:
#discovery: TRACE

index.search.slowlog: TRACE, index_search_slow_log_file
index.indexing.slowlog: TRACE, index_indexing_slow_log_file

additivity:
index.search.slowlog: false
index.indexing.slowlog: false

appender:
console:
Expand All @@ -42,3 +44,11 @@ appender:
layout:
type: pattern
conversionPattern: "[%d{ISO8601}][%-5p][%-25c] %m%n"

index_indexing_slow_log_file:
type: dailyRollingFile
file: ${path.logs}/${cluster.name}_index_indexing_slowlog.log
datePattern: "'.'yyyy-MM-dd"
layout:
type: pattern
conversionPattern: "[%d{ISO8601}][%-5p][%-25c] %m%n"
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.index.indexing;

import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.index.indexing.slowlog.ShardSlowLogIndexingService;

/**
*/
Expand All @@ -28,5 +29,6 @@ public class ShardIndexingModule extends AbstractModule {
@Override
protected void configure() {
bind(ShardIndexingService.class).asEagerSingleton();
bind(ShardSlowLogIndexingService.class).asEagerSingleton();
}
}
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.indexing.slowlog.ShardSlowLogIndexingService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
Expand All @@ -39,15 +40,18 @@
*/
public class ShardIndexingService extends AbstractIndexShardComponent {

private final ShardSlowLogIndexingService slowLog;

private final StatsHolder totalStats = new StatsHolder();

private volatile Map<String, StatsHolder> typesStats = ImmutableMap.of();

private CopyOnWriteArrayList<IndexingOperationListener> listeners = null;

@Inject
public ShardIndexingService(ShardId shardId, @IndexSettings Settings indexSettings) {
public ShardIndexingService(ShardId shardId, @IndexSettings Settings indexSettings, ShardSlowLogIndexingService slowLog) {
super(shardId, indexSettings);
this.slowLog = slowLog;
}

/**
Expand Down Expand Up @@ -95,6 +99,8 @@ public synchronized void removeListener(IndexingOperationListener listener) {
}

public Engine.Create preCreate(Engine.Create create) {
totalStats.indexCurrent.inc();
typeStats(create.type()).indexCurrent.inc();
if (listeners != null) {
for (IndexingOperationListener listener : listeners) {
create = listener.preCreate(create);
Expand All @@ -118,7 +124,11 @@ public void postCreateUnderLock(Engine.Create create) {
public void postCreate(Engine.Create create) {
long took = create.endTime() - create.startTime();
totalStats.indexMetric.inc(took);
typeStats(create.type()).indexMetric.inc(took);
totalStats.indexCurrent.dec();
StatsHolder typeStats = typeStats(create.type());
typeStats.indexMetric.inc(took);
typeStats.indexCurrent.dec();
slowLog.postCreate(create, took);
if (listeners != null) {
for (IndexingOperationListener listener : listeners) {
try {
Expand Down Expand Up @@ -160,6 +170,7 @@ public void postIndex(Engine.Index index) {
StatsHolder typeStats = typeStats(index.type());
typeStats.indexMetric.inc(took);
typeStats.indexCurrent.dec();
slowLog.postIndex(index, took);
if (listeners != null) {
for (IndexingOperationListener listener : listeners) {
try {
Expand Down
@@ -0,0 +1,176 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.indexing.slowlog;

import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
*/
public class ShardSlowLogIndexingService extends AbstractIndexShardComponent {

private boolean reformat;

private long indexWarnThreshold;
private long indexInfoThreshold;
private long indexDebugThreshold;
private long indexTraceThreshold;

private String level;

private final ESLogger indexLogger;
private final ESLogger deleteLogger;

static {
IndexMetaData.addDynamicSettings(
"index.indexing.slowlog.threshold.index.warn",
"index.indexing.slowlog.threshold.index.info",
"index.indexing.slowlog.threshold.index.debug",
"index.indexing.slowlog.threshold.index.trace",
"index.indexing.slowlog.reformat",
"index.indexing.slowlog.level"
);
}

class ApplySettings implements IndexSettingsService.Listener {
@Override
public synchronized void onRefreshSettings(Settings settings) {
long indexWarnThreshold = settings.getAsTime("index.indexing.slowlog.threshold.index.warn", TimeValue.timeValueNanos(ShardSlowLogIndexingService.this.indexWarnThreshold)).nanos();
if (indexWarnThreshold != ShardSlowLogIndexingService.this.indexWarnThreshold) {
ShardSlowLogIndexingService.this.indexWarnThreshold = indexWarnThreshold;
}
long indexInfoThreshold = settings.getAsTime("index.indexing.slowlog.threshold.index.info", TimeValue.timeValueNanos(ShardSlowLogIndexingService.this.indexInfoThreshold)).nanos();
if (indexInfoThreshold != ShardSlowLogIndexingService.this.indexInfoThreshold) {
ShardSlowLogIndexingService.this.indexInfoThreshold = indexInfoThreshold;
}
long indexDebugThreshold = settings.getAsTime("index.indexing.slowlog.threshold.index.debug", TimeValue.timeValueNanos(ShardSlowLogIndexingService.this.indexDebugThreshold)).nanos();
if (indexDebugThreshold != ShardSlowLogIndexingService.this.indexDebugThreshold) {
ShardSlowLogIndexingService.this.indexDebugThreshold = indexDebugThreshold;
}
long indexTraceThreshold = settings.getAsTime("index.indexing.slowlog.threshold.index.trace", TimeValue.timeValueNanos(ShardSlowLogIndexingService.this.indexTraceThreshold)).nanos();
if (indexTraceThreshold != ShardSlowLogIndexingService.this.indexTraceThreshold) {
ShardSlowLogIndexingService.this.indexTraceThreshold = indexTraceThreshold;
}

String level = settings.get("index.indexing.slowlog.level", ShardSlowLogIndexingService.this.level);
if (!level.equals(ShardSlowLogIndexingService.this.level)) {
ShardSlowLogIndexingService.this.indexLogger.setLevel(level.toUpperCase());
ShardSlowLogIndexingService.this.deleteLogger.setLevel(level.toUpperCase());
ShardSlowLogIndexingService.this.level = level;
}

boolean reformat = settings.getAsBoolean("index.indexing.slowlog.reformat", ShardSlowLogIndexingService.this.reformat);
if (reformat != ShardSlowLogIndexingService.this.reformat) {
ShardSlowLogIndexingService.this.reformat = reformat;
}
}
}

@Inject
public ShardSlowLogIndexingService(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService) {
super(shardId, indexSettings);

this.reformat = componentSettings.getAsBoolean("reformat", true);

this.indexWarnThreshold = componentSettings.getAsTime("threshold.index.warn", TimeValue.timeValueNanos(-1)).nanos();
this.indexInfoThreshold = componentSettings.getAsTime("threshold.index.info", TimeValue.timeValueNanos(-1)).nanos();
this.indexDebugThreshold = componentSettings.getAsTime("threshold.index.debug", TimeValue.timeValueNanos(-1)).nanos();
this.indexTraceThreshold = componentSettings.getAsTime("threshold.index.trace", TimeValue.timeValueNanos(-1)).nanos();

this.level = componentSettings.get("level", "TRACE").toUpperCase();

this.indexLogger = Loggers.getLogger(logger, ".index");
this.deleteLogger = Loggers.getLogger(logger, ".delete");

indexLogger.setLevel(level);
deleteLogger.setLevel(level);

indexSettingsService.addListener(new ApplySettings());
}

public void postIndex(Engine.Index index, long tookInNanos) {
postIndexing(index.parsedDoc(), tookInNanos);
}

public void postCreate(Engine.Create create, long tookInNanos) {
postIndexing(create.parsedDoc(), tookInNanos);
}

private void postIndexing(ParsedDocument doc, long tookInNanos) {
if (indexWarnThreshold >= 0 && tookInNanos > indexWarnThreshold) {
indexLogger.warn("{}", new SlowLogParsedDocumentPrinter(doc, tookInNanos, reformat));
} else if (indexInfoThreshold >= 0 && tookInNanos > indexInfoThreshold) {
indexLogger.info("{}", new SlowLogParsedDocumentPrinter(doc, tookInNanos, reformat));
} else if (indexDebugThreshold >= 0 && tookInNanos > indexDebugThreshold) {
indexLogger.debug("{}", new SlowLogParsedDocumentPrinter(doc, tookInNanos, reformat));
} else if (indexTraceThreshold >= 0 && tookInNanos > indexTraceThreshold) {
indexLogger.trace("{}", new SlowLogParsedDocumentPrinter(doc, tookInNanos, reformat));
}
}

public static class SlowLogParsedDocumentPrinter {
private final ParsedDocument doc;
private final long tookInNanos;
private final boolean reformat;

public SlowLogParsedDocumentPrinter(ParsedDocument doc, long tookInNanos, boolean reformat) {
this.doc = doc;
this.tookInNanos = tookInNanos;
this.reformat = reformat;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("took[").append(TimeValue.timeValueNanos(tookInNanos)).append("], took_millis[").append(TimeUnit.NANOSECONDS.toMillis(tookInNanos)).append("], ");
sb.append("type[").append(doc.type()).append("], ");
sb.append("id[").append(doc.id()).append("], ");
if (doc.routing() == null) {
sb.append("routing[], ");
} else {
sb.append("routing[").append(doc.routing()).append("], ");
}
if (doc.source() != null && doc.source().length() > 0) {
try {
sb.append("source[").append(XContentHelper.convertToJson(doc.source(), reformat)).append("]");
} catch (IOException e) {
sb.append("source[_failed_to_convert_]");
}
} else {
sb.append("source[]");
}
return sb.toString();
}
}
}
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.robin.RobinEngine;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.indexing.slowlog.ShardSlowLogIndexingService;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
Expand All @@ -38,7 +39,8 @@
public class SimpleRobinEngineTests extends AbstractSimpleEngineTests {

protected Engine createEngine(Store store, Translog translog) {
return new RobinEngine(shardId, EMPTY_SETTINGS, threadPool, new IndexSettingsService(shardId.index(), EMPTY_SETTINGS), new ShardIndexingService(shardId, EMPTY_SETTINGS), null, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), createMergeScheduler(),
IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), EMPTY_SETTINGS);
return new RobinEngine(shardId, EMPTY_SETTINGS, threadPool, indexSettingsService, new ShardIndexingService(shardId, EMPTY_SETTINGS, new ShardSlowLogIndexingService(shardId, EMPTY_SETTINGS, indexSettingsService)), null, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), createMergeScheduler(),
new AnalysisService(shardId.index()), new SimilarityService(shardId.index()), new NoneBloomCache(shardId.index()));
}
}

0 comments on commit f6697d0

Please sign in to comment.