Skip to content

Commit

Permalink
Prototype timeout propagation
Browse files Browse the repository at this point in the history
  • Loading branch information
prwhelan committed Mar 29, 2024
1 parent 54ae1e5 commit 241296d
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 5 deletions.
Expand Up @@ -453,7 +453,12 @@ void executeRequest(
}
}
});
Rewriteable.rewriteAndFetch(original, searchService.getRewriteContext(timeProvider::absoluteStartMillis), rewriteListener);

Rewriteable.rewriteAndFetch(
original,
searchService.getRewriteContext(timeProvider::absoluteStartMillis, original.source().timeout()),
rewriteListener
);
}

static void adjustSearchType(SearchRequest searchRequest, boolean singleShard) {
Expand Down
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.IndexAnalyzers;
Expand All @@ -32,6 +33,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
Expand Down Expand Up @@ -60,6 +62,43 @@ public class QueryRewriteContext {
protected boolean allowUnmappedFields;
protected boolean mapUnmappedFieldAsString;
protected Predicate<String> allowedFields;
private final TimeValue timeout;

public QueryRewriteContext(
final XContentParserConfiguration parserConfiguration,
final Client client,
final LongSupplier nowInMillis,
final MapperService mapperService,
final MappingLookup mappingLookup,
final Map<String, MappedFieldType> runtimeMappings,
final Predicate<String> allowedFields,
final IndexSettings indexSettings,
final Index fullyQualifiedIndex,
final Predicate<String> indexNameMatcher,
final NamedWriteableRegistry namedWriteableRegistry,
final ValuesSourceRegistry valuesSourceRegistry,
final BooleanSupplier allowExpensiveQueries,
final ScriptCompiler scriptService,
final TimeValue timeout
) {

this.parserConfiguration = parserConfiguration;
this.client = client;
this.nowInMillis = nowInMillis;
this.mapperService = mapperService;
this.mappingLookup = Objects.requireNonNull(mappingLookup);
this.allowUnmappedFields = indexSettings == null || indexSettings.isDefaultAllowUnmappedFields();
this.runtimeMappings = runtimeMappings;
this.allowedFields = allowedFields;
this.indexSettings = indexSettings;
this.fullyQualifiedIndex = fullyQualifiedIndex;
this.indexNameMatcher = indexNameMatcher;
this.writeableRegistry = namedWriteableRegistry;
this.valuesSourceRegistry = valuesSourceRegistry;
this.allowExpensiveQueries = allowExpensiveQueries;
this.scriptService = scriptService;
this.timeout = timeout;
}

public QueryRewriteContext(
final XContentParserConfiguration parserConfiguration,
Expand Down Expand Up @@ -93,6 +132,7 @@ public QueryRewriteContext(
this.valuesSourceRegistry = valuesSourceRegistry;
this.allowExpensiveQueries = allowExpensiveQueries;
this.scriptService = scriptService;
this.timeout = null;
}

public QueryRewriteContext(final XContentParserConfiguration parserConfiguration, final Client client, final LongSupplier nowInMillis) {
Expand All @@ -110,10 +150,31 @@ public QueryRewriteContext(final XContentParserConfiguration parserConfiguration
null,
null,
null,
null,
null
);
}

public QueryRewriteContext(final XContentParserConfiguration parserConfiguration, final Client client, final LongSupplier nowInMillis, final TimeValue timeout) {
this(
parserConfiguration,
client,
nowInMillis,
null,
MappingLookup.EMPTY,
Collections.emptyMap(),
null,
null,
null,
null,
null,
null,
null,
null,
timeout
);
}

/**
* The registry used to build new {@link XContentParser}s. Contains registered named parsers needed to parse the query.
*
Expand Down Expand Up @@ -360,4 +421,8 @@ public Iterable<Map.Entry<String, MappedFieldType>> getAllFields() {
// runtime mappings and non-runtime fields don't overlap, so we can simply concatenate the iterables here
return () -> Iterators.concat(allEntrySet.iterator(), runtimeEntrySet.iterator());
}

public Optional<TimeValue> timeout() {
return Optional.ofNullable(timeout);
}
}
Expand Up @@ -1723,8 +1723,8 @@ public AliasFilter buildAliasFilter(ClusterState state, String index, Set<String
/**
* Returns a new {@link QueryRewriteContext} with the given {@code now} provider
*/
public QueryRewriteContext getRewriteContext(LongSupplier nowInMillis) {
return new QueryRewriteContext(parserConfig, client, nowInMillis);
public QueryRewriteContext getRewriteContext(LongSupplier nowInMillis, @Nullable TimeValue timeout) {
return new QueryRewriteContext(parserConfig, client, nowInMillis, timeout);
}

public DataRewriteContext getDataRewriteContext(LongSupplier nowInMillis) {
Expand Down
Expand Up @@ -1783,7 +1783,10 @@ private void rewriteAndFetchShardRequest(IndexShard shard, ShardSearchRequest re
* Returns a new {@link QueryRewriteContext} with the given {@code now} provider
*/
public QueryRewriteContext getRewriteContext(LongSupplier nowInMillis) {
return indicesService.getRewriteContext(nowInMillis);
return indicesService.getRewriteContext(nowInMillis, null);
}
public QueryRewriteContext getRewriteContext(LongSupplier nowInMillis, TimeValue timeout) {
return indicesService.getRewriteContext(nowInMillis, timeout);
}

public CoordinatorRewriteContextProvider getCoordinatorRewriteContextProvider(LongSupplier nowInMillis) {
Expand Down
Expand Up @@ -16,11 +16,13 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.AbstractQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.xcontent.ObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;
Expand All @@ -30,15 +32,18 @@
import org.elasticsearch.xpack.core.ml.inference.results.TextExpansionResults;
import org.elasticsearch.xpack.core.ml.inference.results.WarningInferenceResults;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.TextExpansionConfigUpdate;
import org.elasticsearch.xpack.core.termsenum.action.TermsEnumRequest;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
import static org.elasticsearch.xpack.core.ml.action.InferModelAction.Request.TIMEOUT;

public class TextExpansionQueryBuilder extends AbstractQueryBuilder<TextExpansionQueryBuilder> {

Expand Down Expand Up @@ -177,7 +182,7 @@ protected QueryBuilder doRewrite(QueryRewriteContext queryRewriteContext) {
List.of(modelText),
TextExpansionConfigUpdate.EMPTY_UPDATE,
false,
InferModelAction.Request.DEFAULT_TIMEOUT_FOR_API
queryRewriteContext.timeout().orElse(InferModelAction.Request.DEFAULT_TIMEOUT_FOR_API)
);
inferRequest.setHighPriority(true);
inferRequest.setPrefixType(TrainedModelPrefixStrings.PrefixType.SEARCH);
Expand Down

0 comments on commit 241296d

Please sign in to comment.