Skip to content

Commit

Permalink
[Monitoring] Fetch shard data more efficiently (elastic#54028)
Browse files Browse the repository at this point in the history
* For the nodes listing page, do not fetch shard data for indices

* Optimize our shard queries for the index and node listing pages

* This change isn't necessary

* Rename file and function

* Use optimized query for ml jobs and es overview

* Apply to node/index detail page, and more renaming

* Unnecessary change

* Fix tests

* Add basic tests

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
  • Loading branch information
2 people authored and thomasneirynck committed Jan 12, 2020
1 parent b4bb614 commit b5389fb
Show file tree
Hide file tree
Showing 20 changed files with 1,922 additions and 433 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ describe('getPaginatedNodes', () => {
},
},
};
const shardStats = {
const nodesShardCount = {
nodes: {
1: {
shardCount: 10,
Expand All @@ -78,7 +78,7 @@ describe('getPaginatedNodes', () => {
pagination,
sort,
queryText,
{ clusterStats, shardStats }
{ clusterStats, nodesShardCount }
);
expect(nodes).toEqual({
pageOfNodes: [
Expand All @@ -98,7 +98,7 @@ describe('getPaginatedNodes', () => {
pagination,
{ ...sort, field: 'foo', direction: 'desc' },
queryText,
{ clusterStats, shardStats }
{ clusterStats, nodesShardCount }
);
expect(nodes).toEqual({
pageOfNodes: [
Expand All @@ -118,7 +118,7 @@ describe('getPaginatedNodes', () => {
pagination,
sort,
'tw',
{ clusterStats, shardStats }
{ clusterStats, nodesShardCount }
);
expect(nodes).toEqual({
pageOfNodes: [{ name: 'two', uuid: 2, isOnline: false, shardCount: 5, foo: 12 }],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import { LISTING_METRICS_NAMES, LISTING_METRICS_PATHS } from './nodes_listing_me
* @param {Object} shardStats: per-node information about shards
* @return {Array} node info combined with metrics for each node from handle_response
*/
export async function getNodes(req, esIndexPattern, pageOfNodes, clusterStats, shardStats) {
export async function getNodes(req, esIndexPattern, pageOfNodes, clusterStats, nodesShardCount) {
checkParam(esIndexPattern, 'esIndexPattern in getNodes');

const start = moment.utc(req.payload.timeRange.min).valueOf();
Expand Down Expand Up @@ -104,5 +104,9 @@ export async function getNodes(req, esIndexPattern, pageOfNodes, clusterStats, s
const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('monitoring');
const response = await callWithRequest(req, 'search', params);

return handleResponse(response, clusterStats, shardStats, pageOfNodes, { min, max, bucketSize });
return handleResponse(response, clusterStats, nodesShardCount, pageOfNodes, {
min,
max,
bucketSize,
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export async function getPaginatedNodes(
pagination,
sort,
queryText,
{ clusterStats, shardStats }
{ clusterStats, nodesShardCount }
) {
const config = req.server.config();
const size = config.get('xpack.monitoring.max_bucket_size');
Expand All @@ -45,7 +45,7 @@ export async function getPaginatedNodes(
const clusterState = get(clusterStats, 'cluster_state', { nodes: {} });
for (const node of nodes) {
node.isOnline = !isUndefined(get(clusterState, ['nodes', node.uuid]));
node.shardCount = get(shardStats, `nodes[${node.uuid}].shardCount`, 0);
node.shardCount = get(nodesShardCount, `nodes[${node.uuid}].shardCount`, 0);
}

// `metricSet` defines a list of metrics that are sortable in the UI
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,23 @@ import { uncovertMetricNames } from '../../convert_metric_names';
* Process the response from the get_nodes query
* @param {Object} response: response data from get_nodes
* @param {Object} clusterStats: cluster stats from cluster state document
* @param {Object} shardStats: per-node information about shards
* @param {Object} nodesShardCount: per-node information about shards
* @param {Object} timeOptions: min, max, and bucketSize needed for date histogram creation
* @return {Array} node info combined with metrics for each node
*/
export function handleResponse(response, clusterStats, shardStats, pageOfNodes, timeOptions = {}) {
export function handleResponse(
response,
clusterStats,
nodesShardCount,
pageOfNodes,
timeOptions = {}
) {
if (!get(response, 'hits.hits')) {
return [];
}

const nodeHits = get(response, 'hits.hits', []);
const nodesInfo = mapNodesInfo(nodeHits, clusterStats, shardStats);
const nodesInfo = mapNodesInfo(nodeHits, clusterStats, nodesShardCount);

/*
* Every node bucket is an object with a field for nodeId and fields for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import { calculateNodeType, getNodeTypeClassLabel } from '../';
/**
* @param {Array} nodeHits: info about each node from the hits in the get_nodes query
* @param {Object} clusterStats: cluster stats from cluster state document
* @param {Object} shardStats: per-node information about shards
* @param {Object} nodesShardCount: per-node information about shards
* @return {Object} summarized info about each node keyed by nodeId
*/
export function mapNodesInfo(nodeHits, clusterStats, shardStats) {
export function mapNodesInfo(nodeHits, clusterStats, nodesShardCount) {
const clusterState = get(clusterStats, 'cluster_state', { nodes: {} });

return nodeHits.reduce((prev, node) => {
Expand All @@ -35,7 +35,7 @@ export function mapNodesInfo(nodeHits, clusterStats, shardStats) {
isOnline,
nodeTypeLabel: nodeTypeLabel,
nodeTypeClass: nodeTypeClass,
shardCount: get(shardStats, `nodes[${sourceNode.uuid}].shardCount`, 0),
shardCount: get(nodesShardCount, `nodes[${sourceNode.uuid}].shardCount`, 0),
},
};
}, {});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { get } from 'lodash';
import { checkParam } from '../../error_missing_required';
import { createQuery } from '../../create_query';
import { ElasticsearchMetric } from '../../metrics';
import { calculateIndicesTotals } from './calculate_shard_stat_indices_totals';

async function getUnassignedShardData(req, esIndexPattern, cluster) {
const config = req.server.config();
const maxBucketSize = config.get('xpack.monitoring.max_bucket_size');
const metric = ElasticsearchMetric.getMetricFields();

const params = {
index: esIndexPattern,
size: 0,
ignoreUnavailable: true,
body: {
sort: { timestamp: { order: 'desc' } },
query: createQuery({
type: 'shards',
clusterUuid: cluster.cluster_uuid,
metric,
filters: [{ term: { state_uuid: get(cluster, 'cluster_state.state_uuid') } }],
}),
aggs: {
indices: {
terms: {
field: 'shard.index',
size: maxBucketSize,
},
aggs: {
state: {
filter: {
terms: {
'shard.state': ['UNASSIGNED', 'INITIALIZING'],
},
},
aggs: {
primary: {
terms: {
field: 'shard.primary',
size: 2,
},
},
},
},
},
},
},
},
};

const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('monitoring');
return await callWithRequest(req, 'search', params);
}

export async function getIndicesUnassignedShardStats(req, esIndexPattern, cluster) {
checkParam(esIndexPattern, 'esIndexPattern in elasticsearch/getShardStats');

const response = await getUnassignedShardData(req, esIndexPattern, cluster);
const indices = get(response, 'aggregations.indices.buckets', []).reduce((accum, bucket) => {
const index = bucket.key;
const states = get(bucket, 'state.primary.buckets', []);
const unassignedReplica = states
.filter(state => state.key_as_string === 'false')
.reduce((total, state) => total + state.doc_count, 0);
const unassignedPrimary = states
.filter(state => state.key_as_string === 'true')
.reduce((total, state) => total + state.doc_count, 0);

let status = 'green';
if (unassignedReplica > 0) {
status = 'yellow';
}
if (unassignedPrimary > 0) {
status = 'red';
}

accum[index] = {
unassigned: { primary: unassignedPrimary, replica: unassignedReplica },
status,
};
return accum;
}, {});

const indicesTotals = calculateIndicesTotals(indices);
return { indices, indicesTotals };
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { getIndicesUnassignedShardStats } from './get_indices_unassigned_shard_stats';

describe('getIndicesUnassignedShardStats', () => {
it('should return the unassigned shard stats for indices', async () => {
const indices = {
12345: { status: 'red', unassigned: { primary: 1, replica: 0 } },
6789: { status: 'yellow', unassigned: { primary: 0, replica: 1 } },
absdf82: { status: 'green', unassigned: { primary: 0, replica: 0 } },
};

const req = {
server: {
config: () => ({
get: () => {},
}),
plugins: {
elasticsearch: {
getCluster: () => ({
callWithRequest: () => ({
aggregations: {
indices: {
buckets: Object.keys(indices).map(id => ({
key: id,
state: {
primary: {
buckets:
indices[id].unassigned.primary || indices[id].unassigned.replica
? [
{
key_as_string: indices[id].unassigned.primary
? 'true'
: 'false',
doc_count: 1,
},
]
: [],
},
},
})),
},
},
}),
}),
},
},
},
};
const esIndexPattern = '*';
const cluster = {};
const stats = await getIndicesUnassignedShardStats(req, esIndexPattern, cluster);
expect(stats.indices).toEqual(indices);
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { get } from 'lodash';
import { checkParam } from '../../error_missing_required';
import { createQuery } from '../../create_query';
import { ElasticsearchMetric } from '../../metrics';

async function getShardCountPerNode(req, esIndexPattern, cluster) {
const config = req.server.config();
const maxBucketSize = config.get('xpack.monitoring.max_bucket_size');
const metric = ElasticsearchMetric.getMetricFields();

const params = {
index: esIndexPattern,
size: 0,
ignoreUnavailable: true,
body: {
sort: { timestamp: { order: 'desc' } },
query: createQuery({
type: 'shards',
clusterUuid: cluster.cluster_uuid,
metric,
filters: [{ term: { state_uuid: get(cluster, 'cluster_state.state_uuid') } }],
}),
aggs: {
nodes: {
terms: {
field: 'shard.node',
size: maxBucketSize,
},
},
},
},
};

const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('monitoring');
return await callWithRequest(req, 'search', params);
}

export async function getNodesShardCount(req, esIndexPattern, cluster) {
checkParam(esIndexPattern, 'esIndexPattern in elasticsearch/getShardStats');

const response = await getShardCountPerNode(req, esIndexPattern, cluster);
const nodes = get(response, 'aggregations.nodes.buckets', []).reduce((accum, bucket) => {
accum[bucket.key] = { shardCount: bucket.doc_count };
return accum;
}, {});
return { nodes };
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { getNodesShardCount } from './get_nodes_shard_count';

describe('getNodeShardCount', () => {
it('should return the shard count per node', async () => {
const nodes = {
12345: { shardCount: 10 },
6789: { shardCount: 1 },
absdf82: { shardCount: 20 },
};

const req = {
server: {
config: () => ({
get: () => {},
}),
plugins: {
elasticsearch: {
getCluster: () => ({
callWithRequest: () => ({
aggregations: {
nodes: {
buckets: Object.keys(nodes).map(id => ({
key: id,
doc_count: nodes[id].shardCount,
})),
},
},
}),
}),
},
},
},
};
const esIndexPattern = '*';
const cluster = {};
const counts = await getNodesShardCount(req, esIndexPattern, cluster);
expect(counts.nodes).toEqual(nodes);
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
* @param {Object} config - Kibana config service
* @param {Boolean} includeNodes - whether to add the aggs for node shards
*/
export function getShardAggs(config, includeNodes) {
export function getShardAggs(config, includeNodes, includeIndices) {
const maxBucketSize = config.get('xpack.monitoring.max_bucket_size');
const aggSize = 10;
const indicesAgg = {
Expand Down Expand Up @@ -40,7 +40,7 @@ export function getShardAggs(config, includeNodes) {
};

return {
...{ indices: indicesAgg },
...{ indices: includeIndices ? indicesAgg : undefined },
...{ nodes: includeNodes ? nodesAgg : undefined },
};
}
Loading

0 comments on commit b5389fb

Please sign in to comment.