Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bulk request logging changes #1

Merged
merged 3 commits into from Oct 23, 2014
Merged
Changes from 1 commit
Commits
File filter...
Filter file types
Jump to…
Jump to file
Failed to load files.

Always

Just for now

Next

bulk request logging changes

    1) Latency, requestSize logging
  • Loading branch information
srimaruti srimaruti
srimaruti authored and srimaruti committed Oct 23, 2014
commit 316c8129a50ef9d97b85e22c00a4c035e56a27c7
@@ -19,9 +19,13 @@

package org.elasticsearch.action.bulk;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper;
@@ -40,11 +44,17 @@
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.rest.RestStatus;
@@ -53,10 +63,10 @@
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;

import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.net.InetSocketAddress;
import java.util.*;

This comment has been minimized.

@vkroz

vkroz Oct 23, 2014

Our coding convention is not to import '*', but to specify import class explicitly

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
@@ -73,6 +83,43 @@
private final TransportShardBulkAction shardBulkAction;

private final TransportCreateIndexAction createIndexAction;

private final LoadingCache<InetSocketAddress, String> reverseIPLookupCache =
CacheBuilder.newBuilder().expireAfterAccess(60, TimeUnit.MINUTES).
build(new CacheLoader<InetSocketAddress, String>() {
@Override
public String load(InetSocketAddress key) throws Exception {
return key.getHostName();
}
});

class BulkStats implements Comparable<BulkStats>{
public ShardId shardId;
public Long latencyMillis = 0L;
public int numItems = 0;
public int failedItems = 0;
public long bytes = 0;
public ShardRouting shardRouting;

BulkStats(long latencyMillis, int numItems, ShardId shardId) {
this.latencyMillis = latencyMillis;
this.numItems = numItems;
this.shardId = shardId;
}

@Override
public int compareTo(BulkStats o) {
return o.latencyMillis.compareTo(latencyMillis);
}
}

class SlowShard {

This comment has been minimized.

@vkroz

vkroz Oct 23, 2014

Does this class in use anywhere? I can't see where it is used

public int shardId;
public int bulkId;
public String nodeId;
public String index;
}


@Inject
public TransportBulkAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService,
@@ -234,12 +281,23 @@ private void executeBulk(final BulkRequest bulkRequest, final long startTime, fi

// first, go over all the requests and create a ShardId -> Operations mapping
Map<ShardId, List<BulkItemRequest>> requestsByShard = Maps.newHashMap();

final Map<ShardId, ShardRouting> shardRoutingInfo = Maps.newHashMap();
final Map<ShardId, Long> perShardReqBytes = Maps.newHashMap();
for (int i = 0; i < bulkRequest.requests.size(); i++) {
ActionRequest request = bulkRequest.requests.get(i);
if (request instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) request;
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, indexRequest.index(), indexRequest.type(), indexRequest.id(), indexRequest.routing()).shardId();
ShardIterator shardIt = clusterService.operationRouting().indexShards(clusterState, indexRequest.index(), indexRequest.type(), indexRequest.id(), indexRequest.routing());
ShardId shardId = shardIt.shardId();
shardRoutingInfo.put(shardId, shardIt.nextOrNull());
BytesReference buffer = ((IndexRequest) request).source();
Long shardBytes = perShardReqBytes.get(shardId);
if (shardBytes == null) {
perShardReqBytes.put(shardId, new Long(buffer.length()));
} else {
perShardReqBytes.put(shardId, shardBytes + new Long(buffer.length()));
}
buffer = null;
List<BulkItemRequest> list = requestsByShard.get(shardId);
if (list == null) {
list = Lists.newArrayList();
@@ -291,18 +349,27 @@ private void executeBulk(final BulkRequest bulkRequest, final long startTime, fi
}

final AtomicInteger counter = new AtomicInteger(requestsByShard.size());
final Map<ShardId,BulkStats> shardStats = new ConcurrentHashMap<ShardId, BulkStats>();
for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {
final ShardId shardId = entry.getKey();
final List<BulkItemRequest> requests = entry.getValue();
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId.index().name(), shardId.id(), bulkRequest.refresh(), requests.toArray(new BulkItemRequest[requests.size()]));
bulkShardRequest.replicationType(bulkRequest.replicationType());
bulkShardRequest.consistencyLevel(bulkRequest.consistencyLevel());
bulkShardRequest.timeout(bulkRequest.timeout());
shardStats.put(shardId, new BulkStats(System.currentTimeMillis(), requests.size(), shardId));
shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {
@Override
public void onResponse(BulkShardResponse bulkShardResponse) {
final BulkStats bk = shardStats.get(shardId);
bk.latencyMillis = System.currentTimeMillis() - bk.latencyMillis;
bk.shardRouting = shardRoutingInfo.get(shardId);
bk.bytes = perShardReqBytes.get(shardId);
for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {
responses.set(bulkItemResponse.getItemId(), bulkItemResponse);
if (bulkItemResponse.isFailed()) {
++bk.failedItems;
}
}
if (counter.decrementAndGet() == 0) {
finishHim();
@@ -335,7 +402,46 @@ public void onFailure(Throwable e) {
}

private void finishHim() {
listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), System.currentTimeMillis() - startTime));
long bulkTimeMillis = System.currentTimeMillis() - startTime;
final long bulkId = System.nanoTime();
String slowestIndex = "";
int slowestShardId = 0;
long slowTimeMillis = 0L;
try {
DiscoveryNodes dn = clusterService.state().getNodes();
Collection<BulkStats> statsCollection = shardStats.values();
ArrayList<BulkStats> statsList = new ArrayList<TransportBulkAction.BulkStats>(statsCollection);
CollectionUtil.introSort(statsList);
int slowRank = 0;
// since ES does not handle arrays well, do separate output of each shard
for (BulkStats stats : statsList) {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
builder.field("bulkId", bulkId);
builder.field("totalShards", statsList.size());
builder.field("totalBulkReqMillis", bulkTimeMillis);
builder.field("bulkItems", bulkRequest.requests.size());
builder.field("shardId", stats.shardId.getId());
builder.field("index", stats.shardId.getIndex());
builder.field("shardMillis", stats.latencyMillis);
builder.field("slowRank", ++slowRank);
builder.field("events", stats.numItems);
builder.field("bytes", stats.bytes);
builder.field("failedItems", stats.failedItems);
if (stats.shardRouting != null) {
builder.field("nodeId", stats.shardRouting.currentNodeId());
final InetSocketTransportAddress inetAddress =
(InetSocketTransportAddress) dn.get(stats.shardRouting.currentNodeId())
.address();
builder.field("hostname", reverseIPLookupCache.get(inetAddress.address()));
}
builder.endObject();
logger.info(builder.string());
}
} catch (Exception e) {
logger.error("Error generating json shard stats", e);
}
//logger.info("Slowest shard for request id " + bulkId + ",index=" + slowestIndex + ",shardId=" + slowestShardId);
listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), bulkTimeMillis));

This comment has been minimized.

@duttly

duttly Oct 23, 2014

Minor, but you may want to log the BulkResponse right at the start. Makes for easier reading.

This comment has been minimized.

@srimaruti

srimaruti Oct 23, 2014
Author

Here we are logging response latency per shard. So we need to wait for all the responses to comeback for all the sub requests associated with the bulk request

}
});
}
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.