Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request level coordinator slow logs #10650

Merged
merged 1 commit into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased 2.x]
### Added
- Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351))
- Request level coordinator slow logs ([#10650](https://github.com/opensearch-project/OpenSearch/pull/10650))
- [Remote Store] Add repository stats for remote store([#10567](https://github.com/opensearch-project/OpenSearch/pull/10567))
- Add search query categorizer ([#10255](https://github.com/opensearch-project/OpenSearch/pull/10255))
- Introduce ConcurrentQueryProfiler to profile query using concurrent segment search path and support concurrency during rewrite and create weight ([10352](https://github.com/opensearch-project/OpenSearch/pull/10352))
Expand Down
10 changes: 10 additions & 0 deletions distribution/docker/src/docker/config/log4j2.properties
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ logger.deprecation.appenderRef.deprecation_rolling.ref = deprecation_rolling
logger.deprecation.appenderRef.header_warning.ref = header_warning
logger.deprecation.additivity = false

appender.search_request_slowlog_json_appender.type = Console
appender.search_request_slowlog_json_appender.name = search_request_slowlog_json_appender
appender.search_request_slowlog_json_appender.layout.type = OpenSearchJsonLayout
appender.search_request_slowlog_json_appender.layout.type_name = search_request_slowlog

logger.search_request_slowlog_logger.name = cluster.search.request.slowlog
logger.search_request_slowlog_logger.level = trace
logger.search_request_slowlog_logger.appenderRef.search_request_slowlog_json_appender.ref = search_request_slowlog_json_appender
logger.search_request_slowlog_logger.additivity = false

appender.index_search_slowlog_rolling.type = Console
appender.index_search_slowlog_rolling.name = index_search_slowlog_rolling
appender.index_search_slowlog_rolling.layout.type = OpenSearchJsonLayout
Expand Down
41 changes: 41 additions & 0 deletions distribution/src/config/log4j2.properties
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,47 @@ logger.deprecation.appenderRef.deprecation_rolling_old.ref = deprecation_rolling
logger.deprecation.appenderRef.header_warning.ref = header_warning
logger.deprecation.additivity = false

######## Search Request Slowlog JSON ####################
appender.search_request_slowlog_json_appender.type = RollingFile
appender.search_request_slowlog_json_appender.name = search_request_slowlog_json_appender
appender.search_request_slowlog_json_appender.fileName = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs\
.cluster_name}_index_search_slowlog.json
appender.search_request_slowlog_json_appender.filePermissions = rw-r-----
appender.search_request_slowlog_json_appender.layout.type = OpenSearchJsonLayout
appender.search_request_slowlog_json_appender.layout.type_name = search_request_slowlog
appender.search_request_slowlog_json_appender.layout.opensearchmessagefields=message,took,took_millis,phase_took,total_hits,search_type,shards,source,id

appender.search_request_slowlog_json_appender.filePattern = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs\
.cluster_name}_index_search_slowlog-%i.json.gz
appender.search_request_slowlog_json_appender.policies.type = Policies
appender.search_request_slowlog_json_appender.policies.size.type = SizeBasedTriggeringPolicy
appender.search_request_slowlog_json_appender.policies.size.size = 1GB
appender.search_request_slowlog_json_appender.strategy.type = DefaultRolloverStrategy
appender.search_request_slowlog_json_appender.strategy.max = 4
#################################################
######## Search Request Slowlog Log File - old style pattern ####
appender.search_request_slowlog_log_appender.type = RollingFile
appender.search_request_slowlog_log_appender.name = search_request_slowlog_log_appender
appender.search_request_slowlog_log_appender.fileName = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs.cluster_name}\
_index_search_slowlog.log
appender.search_request_slowlog_log_appender.filePermissions = rw-r-----
appender.search_request_slowlog_log_appender.layout.type = PatternLayout
appender.search_request_slowlog_log_appender.layout.pattern = [%d{ISO8601}][%-5p][%c{1.}] [%node_name]%marker %m%n

appender.search_request_slowlog_log_appender.filePattern = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs.cluster_name}\
_index_search_slowlog-%i.log.gz
appender.search_request_slowlog_log_appender.policies.type = Policies
appender.search_request_slowlog_log_appender.policies.size.type = SizeBasedTriggeringPolicy
appender.search_request_slowlog_log_appender.policies.size.size = 1GB
appender.search_request_slowlog_log_appender.strategy.type = DefaultRolloverStrategy
appender.search_request_slowlog_log_appender.strategy.max = 4
#################################################
logger.search_request_slowlog_logger.name = cluster.search.request.slowlog
logger.search_request_slowlog_logger.level = trace
logger.search_request_slowlog_logger.appenderRef.search_request_slowlog_json_appender.ref = search_request_slowlog_json_appender
logger.search_request_slowlog_logger.appenderRef.search_request_slowlog_log_appender.ref = search_request_slowlog_log_appender
logger.search_request_slowlog_logger.additivity = false

######## Search slowlog JSON ####################
appender.index_search_slowlog_rolling.type = RollingFile
appender.index_search_slowlog_rolling.name = index_search_slowlog_rolling
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -115,13 +115,12 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final int maxConcurrentRequestsPerNode;
private final Map<String, PendingExecutions> pendingExecutionsPerNode = new ConcurrentHashMap<>();
private final boolean throttleConcurrentRequests;
private final SearchRequestContext searchRequestContext;

private SearchPhase currentPhase;

private final List<Releasable> releasables = new ArrayList<>();

private Optional<SearchRequestOperationsListener> searchRequestOperationsListener;

AbstractSearchAsyncAction(
String name,
Logger logger,
Expand All @@ -140,7 +139,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
SearchPhaseResults<Result> resultConsumer,
int maxConcurrentRequestsPerNode,
SearchResponse.Clusters clusters,
SearchRequestOperationsListener searchRequestOperationsListener
SearchRequestContext searchRequestContext
) {
super(name);
final List<SearchShardIterator> toSkipIterators = new ArrayList<>();
Expand Down Expand Up @@ -176,7 +175,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
this.indexRoutings = indexRoutings;
this.results = resultConsumer;
this.clusters = clusters;
this.searchRequestOperationsListener = Optional.ofNullable(searchRequestOperationsListener);
this.searchRequestContext = searchRequestContext;
}

@Override
Expand Down Expand Up @@ -427,18 +426,26 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha
clusterState.version()
);
}
onPhaseEnd();
onPhaseEnd(searchRequestContext);
executePhase(nextPhase);
}
}

private void onPhaseEnd() {
this.searchRequestOperationsListener.ifPresent(searchRequestOperations -> { searchRequestOperations.onPhaseEnd(this); });
private void onPhaseEnd(SearchRequestContext searchRequestContext) {
if (getCurrentPhase() != null) {
long tookInNanos = System.nanoTime() - getCurrentPhase().getStartTimeInNanos();
searchRequestContext.updatePhaseTookMap(getCurrentPhase().getName(), TimeUnit.NANOSECONDS.toMillis(tookInNanos));
}
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseEnd(this, searchRequestContext);
}

private void onPhaseStart(SearchPhase phase) {
setCurrentPhase(phase);
this.searchRequestOperationsListener.ifPresent(searchRequestOperations -> { searchRequestOperations.onPhaseStart(this); });
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseStart(this);
}

private void onRequestEnd(SearchRequestContext searchRequestContext) {
this.searchRequestContext.getSearchRequestOperationsListener().onRequestEnd(this, searchRequestContext);
}

private void executePhase(SearchPhase phase) {
Expand Down Expand Up @@ -696,15 +703,18 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At
searchContextId = null;
}
}
onPhaseEnd();
searchRequestContext.setTotalHits(internalSearchResponse.hits().getTotalHits());
searchRequestContext.setShardStats(results.getNumShards(), successfulOps.get(), skippedOps.get(), failures.length);
onPhaseEnd(searchRequestContext);
onRequestEnd(searchRequestContext);
listener.onResponse(buildSearchResponse(internalSearchResponse, failures, scrollId, searchContextId));
}
setCurrentPhase(null);
}

@Override
public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) {
this.searchRequestOperationsListener.ifPresent(searchRequestOperations -> searchRequestOperations.onPhaseFailure(this));
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseFailure(this);
raisePhaseFailure(new SearchPhaseExecutionException(phase.getName(), msg, cause, buildShardFailures()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
SearchTask task,
Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory,
SearchResponse.Clusters clusters,
SearchRequestOperationsListener searchRequestOperationsListener
SearchRequestContext searchRequestContext
) {
// We set max concurrent shard requests to the number of shards so no throttling happens for can_match requests
super(
Expand All @@ -112,7 +112,7 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
new CanMatchSearchPhaseResults(shardsIts.size()),
shardsIts.size(),
clusters,
searchRequestOperationsListener
searchRequestContext
);
this.phaseFactory = phaseFactory;
this.shardsIts = shardsIts;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
final ClusterState clusterState,
final SearchTask task,
SearchResponse.Clusters clusters,
SearchRequestOperationsListener searchRequestOperationsListener
SearchRequestContext searchRequestContext
) {
super(
SearchPhaseName.DFS_PRE_QUERY.getName(),
Expand All @@ -97,7 +97,7 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
new ArraySearchPhaseResults<>(shardsIts.size()),
request.getMaxConcurrentShardRequests(),
clusters,
searchRequestOperationsListener
searchRequestContext
);
this.queryPhaseResultConsumer = queryPhaseResultConsumer;
this.searchPhaseController = searchPhaseController;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
ClusterState clusterState,
SearchTask task,
SearchResponse.Clusters clusters,
SearchRequestOperationsListener searchRequestOperationsListener
SearchRequestContext searchRequestContext
) {
super(
SearchPhaseName.QUERY.getName(),
Expand All @@ -102,7 +102,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
resultConsumer,
request.getMaxConcurrentShardRequests(),
clusters,
searchRequestOperationsListener
searchRequestContext
);
this.topDocsSize = SearchPhaseController.getTopDocsSize(request);
this.trackTotalHitsUpTo = request.resolveTrackTotalHitsUpTo();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.apache.logging.log4j.LogManager;
import org.apache.lucene.search.TotalHits;

import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;

/**
* This class holds request-level context for search queries at the coordinator node
*
* @opensearch.internal
*/
public class SearchRequestContext {
private final SearchRequestOperationsListener searchRequestOperationsListener;
private long absoluteStartNanos;
private final Map<String, Long> phaseTookMap;
private TotalHits totalHits;
private final EnumMap<ShardStatsFieldNames, Integer> shardStats;

/**
* This constructor is for testing only
*/
public SearchRequestContext() {
this(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()));
}
dzane17 marked this conversation as resolved.
Show resolved Hide resolved

public SearchRequestContext(SearchRequestOperationsListener searchRequestOperationsListener) {
this.searchRequestOperationsListener = searchRequestOperationsListener;
this.absoluteStartNanos = System.nanoTime();
this.phaseTookMap = new HashMap<>();
this.shardStats = new EnumMap<>(ShardStatsFieldNames.class);
}

SearchRequestOperationsListener getSearchRequestOperationsListener() {
return searchRequestOperationsListener;
}

void updatePhaseTookMap(String phaseName, Long tookTime) {
this.phaseTookMap.put(phaseName, tookTime);
}

Map<String, Long> phaseTookMap() {
return phaseTookMap;
}

/**
* Override absoluteStartNanos set in constructor.
* For testing only
*/
void setAbsoluteStartNanos(long absoluteStartNanos) {
this.absoluteStartNanos = absoluteStartNanos;
}

/**
* Request start time in nanos
*/
long getAbsoluteStartNanos() {
return absoluteStartNanos;
}

void setTotalHits(TotalHits totalHits) {
this.totalHits = totalHits;
}

TotalHits totalHits() {
return totalHits;
}

void setShardStats(int total, int successful, int skipped, int failed) {
this.shardStats.put(ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_TOTAL, total);
this.shardStats.put(ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_SUCCESSFUL, successful);
this.shardStats.put(ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_SKIPPED, skipped);
this.shardStats.put(ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_FAILED, failed);
}

String formattedShardStats() {
if (shardStats.isEmpty()) {
return "";
} else {
return String.format(
dzane17 marked this conversation as resolved.
Show resolved Hide resolved
Locale.ROOT,
"{%s:%s, %s:%s, %s:%s, %s:%s}",
ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_TOTAL.toString(),
shardStats.get(ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_TOTAL),
ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_SUCCESSFUL.toString(),
shardStats.get(ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_SUCCESSFUL),
ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_SKIPPED.toString(),
shardStats.get(ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_SKIPPED),
ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_FAILED.toString(),
shardStats.get(ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_FAILED)
);
}
}
}

enum ShardStatsFieldNames {
SEARCH_REQUEST_SLOWLOG_SHARD_TOTAL("total"),
SEARCH_REQUEST_SLOWLOG_SHARD_SUCCESSFUL("successful"),
SEARCH_REQUEST_SLOWLOG_SHARD_SKIPPED("skipped"),
SEARCH_REQUEST_SLOWLOG_SHARD_FAILED("failed");

private final String name;

ShardStatsFieldNames(String name) {
this.name = name;
}

@Override
public String toString() {
return this.name;
}
}