Skip to content

Commit

Permalink
Request level coordinator slow logs
Browse files Browse the repository at this point in the history
Signed-off-by: David Zane <davizane@amazon.com>
  • Loading branch information
dzane17 committed Nov 15, 2023
1 parent fb6fe1b commit ccc1cd9
Show file tree
Hide file tree
Showing 31 changed files with 990 additions and 74 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased 2.x]
### Added
- Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351))
- Request level coordinator slow logs ([#10650](https://github.com/opensearch-project/OpenSearch/pull/10650))
- [Remote Store] Add repository stats for remote store([#10567](https://github.com/opensearch-project/OpenSearch/pull/10567))
- Add search query categorizer ([#10255](https://github.com/opensearch-project/OpenSearch/pull/10255))
- Introduce ConcurrentQueryProfiler to profile query using concurrent segment search path and support concurrency during rewrite and create weight ([10352](https://github.com/opensearch-project/OpenSearch/pull/10352))
Expand Down
10 changes: 10 additions & 0 deletions distribution/docker/src/docker/config/log4j2.properties
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ logger.deprecation.appenderRef.deprecation_rolling.ref = deprecation_rolling
logger.deprecation.appenderRef.header_warning.ref = header_warning
logger.deprecation.additivity = false

appender.search_request_slowlog_json_appender.type = Console
appender.search_request_slowlog_json_appender.name = search_request_slowlog_json_appender
appender.search_request_slowlog_json_appender.layout.type = OpenSearchJsonLayout
appender.search_request_slowlog_json_appender.layout.type_name = search_request_slowlog

logger.search_request_slowlog_logger.name = cluster.search.request.slowlog
logger.search_request_slowlog_logger.level = trace
logger.search_request_slowlog_logger.appenderRef.search_request_slowlog_json_appender.ref = search_request_slowlog_json_appender
logger.search_request_slowlog_logger.additivity = false

appender.index_search_slowlog_rolling.type = Console
appender.index_search_slowlog_rolling.name = index_search_slowlog_rolling
appender.index_search_slowlog_rolling.layout.type = OpenSearchJsonLayout
Expand Down
41 changes: 41 additions & 0 deletions distribution/src/config/log4j2.properties
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,47 @@ logger.deprecation.appenderRef.deprecation_rolling_old.ref = deprecation_rolling
logger.deprecation.appenderRef.header_warning.ref = header_warning
logger.deprecation.additivity = false

######## Search Request Slowlog JSON ####################
appender.search_request_slowlog_json_appender.type = RollingFile
appender.search_request_slowlog_json_appender.name = search_request_slowlog_json_appender
appender.search_request_slowlog_json_appender.fileName = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs\
.cluster_name}_index_search_slowlog.json
appender.search_request_slowlog_json_appender.filePermissions = rw-r-----
appender.search_request_slowlog_json_appender.layout.type = OpenSearchJsonLayout
appender.search_request_slowlog_json_appender.layout.type_name = search_request_slowlog
appender.search_request_slowlog_json_appender.layout.opensearchmessagefields=message,took,took_millis,phase_took,total_hits,search_type,shards,source,id

appender.search_request_slowlog_json_appender.filePattern = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs\
.cluster_name}_index_search_slowlog-%i.json.gz
appender.search_request_slowlog_json_appender.policies.type = Policies
appender.search_request_slowlog_json_appender.policies.size.type = SizeBasedTriggeringPolicy
appender.search_request_slowlog_json_appender.policies.size.size = 1GB
appender.search_request_slowlog_json_appender.strategy.type = DefaultRolloverStrategy
appender.search_request_slowlog_json_appender.strategy.max = 4
#################################################
######## Search Request Slowlog Log File - old style pattern ####
appender.search_request_slowlog_log_appender.type = RollingFile
appender.search_request_slowlog_log_appender.name = search_request_slowlog_log_appender
appender.search_request_slowlog_log_appender.fileName = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs.cluster_name}\
_index_search_slowlog.log
appender.search_request_slowlog_log_appender.filePermissions = rw-r-----
appender.search_request_slowlog_log_appender.layout.type = PatternLayout
appender.search_request_slowlog_log_appender.layout.pattern = [%d{ISO8601}][%-5p][%c{1.}] [%node_name]%marker %m%n

appender.search_request_slowlog_log_appender.filePattern = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs.cluster_name}\
_index_search_slowlog-%i.log.gz
appender.search_request_slowlog_log_appender.policies.type = Policies
appender.search_request_slowlog_log_appender.policies.size.type = SizeBasedTriggeringPolicy
appender.search_request_slowlog_log_appender.policies.size.size = 1GB
appender.search_request_slowlog_log_appender.strategy.type = DefaultRolloverStrategy
appender.search_request_slowlog_log_appender.strategy.max = 4
#################################################
logger.search_request_slowlog_logger.name = cluster.search.request.slowlog
logger.search_request_slowlog_logger.level = trace
logger.search_request_slowlog_logger.appenderRef.search_request_slowlog_json_appender.ref = search_request_slowlog_json_appender
logger.search_request_slowlog_logger.appenderRef.search_request_slowlog_log_appender.ref = search_request_slowlog_log_appender
logger.search_request_slowlog_logger.additivity = false

######## Search slowlog JSON ####################
appender.index_search_slowlog_rolling.type = RollingFile
appender.index_search_slowlog_rolling.name = index_search_slowlog_rolling
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -115,13 +115,12 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final int maxConcurrentRequestsPerNode;
private final Map<String, PendingExecutions> pendingExecutionsPerNode = new ConcurrentHashMap<>();
private final boolean throttleConcurrentRequests;
private final SearchRequestContext searchRequestContext;

private SearchPhase currentPhase;

private final List<Releasable> releasables = new ArrayList<>();

private Optional<SearchRequestOperationsListener> searchRequestOperationsListener;

AbstractSearchAsyncAction(
String name,
Logger logger,
Expand All @@ -140,7 +139,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
SearchPhaseResults<Result> resultConsumer,
int maxConcurrentRequestsPerNode,
SearchResponse.Clusters clusters,
SearchRequestOperationsListener searchRequestOperationsListener
SearchRequestContext searchRequestContext
) {
super(name);
final List<SearchShardIterator> toSkipIterators = new ArrayList<>();
Expand Down Expand Up @@ -176,7 +175,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
this.indexRoutings = indexRoutings;
this.results = resultConsumer;
this.clusters = clusters;
this.searchRequestOperationsListener = Optional.ofNullable(searchRequestOperationsListener);
this.searchRequestContext = searchRequestContext;
}

@Override
Expand Down Expand Up @@ -427,18 +426,26 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha
clusterState.version()
);
}
onPhaseEnd();
onPhaseEnd(searchRequestContext);
executePhase(nextPhase);
}
}

private void onPhaseEnd() {
this.searchRequestOperationsListener.ifPresent(searchRequestOperations -> { searchRequestOperations.onPhaseEnd(this); });
private void onPhaseEnd(SearchRequestContext searchRequestContext) {
if (getCurrentPhase() != null) {
long tookInNanos = System.nanoTime() - getCurrentPhase().getStartTimeInNanos();
searchRequestContext.updatePhaseTookMap(getCurrentPhase().getName(), TimeUnit.NANOSECONDS.toMillis(tookInNanos));
}
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseEnd(this, searchRequestContext);
}

private void onPhaseStart(SearchPhase phase) {
setCurrentPhase(phase);
this.searchRequestOperationsListener.ifPresent(searchRequestOperations -> { searchRequestOperations.onPhaseStart(this); });
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseStart(this);
}

private void onRequestEnd(SearchRequestContext searchRequestContext) {
this.searchRequestContext.getSearchRequestOperationsListener().onRequestEnd(this, searchRequestContext);
}

private void executePhase(SearchPhase phase) {
Expand Down Expand Up @@ -696,15 +703,18 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At
searchContextId = null;
}
}
searchRequestContext.setTotalHits(internalSearchResponse.hits().getTotalHits());
searchRequestContext.setShardStats(results.getNumShards(), successfulOps.get(), skippedOps.get(), failures.length);
onPhaseEnd(searchRequestContext);
onRequestEnd(searchRequestContext);
listener.onResponse(buildSearchResponse(internalSearchResponse, failures, scrollId, searchContextId));
}
onPhaseEnd();
setCurrentPhase(null);
}

@Override
public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) {
this.searchRequestOperationsListener.ifPresent(searchRequestOperations -> searchRequestOperations.onPhaseFailure(this));
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseFailure(this);
raisePhaseFailure(new SearchPhaseExecutionException(phase.getName(), msg, cause, buildShardFailures()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
SearchTask task,
Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory,
SearchResponse.Clusters clusters,
SearchRequestOperationsListener searchRequestOperationsListener
SearchRequestContext searchRequestContext
) {
// We set max concurrent shard requests to the number of shards so no throttling happens for can_match requests
super(
Expand All @@ -112,7 +112,7 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
new CanMatchSearchPhaseResults(shardsIts.size()),
shardsIts.size(),
clusters,
searchRequestOperationsListener
searchRequestContext
);
this.phaseFactory = phaseFactory;
this.shardsIts = shardsIts;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
final ClusterState clusterState,
final SearchTask task,
SearchResponse.Clusters clusters,
SearchRequestOperationsListener searchRequestOperationsListener
SearchRequestContext searchRequestContext
) {
super(
SearchPhaseName.DFS_PRE_QUERY.getName(),
Expand All @@ -97,7 +97,7 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
new ArraySearchPhaseResults<>(shardsIts.size()),
request.getMaxConcurrentShardRequests(),
clusters,
searchRequestOperationsListener
searchRequestContext
);
this.queryPhaseResultConsumer = queryPhaseResultConsumer;
this.searchPhaseController = searchPhaseController;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
ClusterState clusterState,
SearchTask task,
SearchResponse.Clusters clusters,
SearchRequestOperationsListener searchRequestOperationsListener
SearchRequestContext searchRequestContext
) {
super(
SearchPhaseName.QUERY.getName(),
Expand All @@ -102,7 +102,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
resultConsumer,
request.getMaxConcurrentShardRequests(),
clusters,
searchRequestOperationsListener
searchRequestContext
);
this.topDocsSize = SearchPhaseController.getTopDocsSize(request);
this.trackTotalHitsUpTo = request.resolveTrackTotalHitsUpTo();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.apache.logging.log4j.LogManager;
import org.apache.lucene.search.TotalHits;

import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* This class holds request-level context for search queries at the coordinator node
*
* @opensearch.internal
*/
public class SearchRequestContext {
private final SearchRequestOperationsListener searchRequestOperationsListener;
private long absoluteStartNanos;
private final Map<String, Long> phaseTookMap;
private TotalHits totalHits;
private final EnumMap<ShardStatsFieldNames, Integer> shardStats;

public SearchRequestContext() {
this(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()));
}

public SearchRequestContext(SearchRequestOperationsListener searchRequestOperationsListener) {
this.searchRequestOperationsListener = searchRequestOperationsListener;
this.absoluteStartNanos = System.nanoTime();
this.phaseTookMap = new HashMap<>();
this.shardStats = new EnumMap<>(ShardStatsFieldNames.class);
}

SearchRequestOperationsListener getSearchRequestOperationsListener() {
return searchRequestOperationsListener;
}

void updatePhaseTookMap(String phaseName, Long tookTime) {
this.phaseTookMap.put(phaseName, tookTime);
}

Map<String, Long> phaseTookMap() {
return phaseTookMap;
}

/**
* Override absoluteStartNanos set in constructor.
* For testing only
*/
void setAbsoluteStartNanos(long absoluteStartNanos) {
this.absoluteStartNanos = absoluteStartNanos;
}

/**
* Request start time in nanos
*/
long getAbsoluteStartNanos() {
return absoluteStartNanos;
}

void setTotalHits(TotalHits totalHits) {
this.totalHits = totalHits;
}

TotalHits totalHits() {
return totalHits;
}

void setShardStats(int total, int successful, int skipped, int failed) {
this.shardStats.put(ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_TOTAL, total);
this.shardStats.put(ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_SUCCESSFUL, successful);
this.shardStats.put(ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_SKIPPED, skipped);
this.shardStats.put(ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_FAILED, failed);
}

String formattedShardStats() {
if (shardStats.isEmpty()) {
return "";
} else {
return String.format(
"{%s:%s, %s:%s, %s:%s, %s:%s}",
ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_TOTAL.toString(),
shardStats.get(ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_TOTAL),
ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_SUCCESSFUL.toString(),
shardStats.get(ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_SUCCESSFUL),
ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_SKIPPED.toString(),
shardStats.get(ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_SKIPPED),
ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_FAILED.toString(),
shardStats.get(ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_FAILED)
);
}
}
}

enum ShardStatsFieldNames {
SEARCH_REQUEST_SLOWLOG_SHARD_TOTAL("total"),
SEARCH_REQUEST_SLOWLOG_SHARD_SUCCESSFUL("successful"),
SEARCH_REQUEST_SLOWLOG_SHARD_SKIPPED("skipped"),
SEARCH_REQUEST_SLOWLOG_SHARD_FAILED("failed");

private final String name;

ShardStatsFieldNames(String name) {
this.name = name;
}

@Override
public String toString() {
return this.name;
}
}

0 comments on commit ccc1cd9

Please sign in to comment.