Skip to content

Commit

Permalink
Fetch index settings asynchronously (#135) (#161)
Browse files Browse the repository at this point in the history
In some cases, like using ./gradlew run, I've run into issues where
OpenSearch crashes, complaining that we're making a blocking call on a
listener thread, because we fetch index settings to see if a result
transformer has been configured for the current index.

I'm kind of surprised that we haven't run into this in production
use-cases, but it may just be because assertions are not enabled in
production. Regardless, blocking calls on listener threads are a bad
idea, since that's how you run out of listener threads.

This change makes the index settings call asynchronous and chains the
remaining logic off of that.

Signed-off-by: Michael Froh <froh@amazon.com>
Co-authored-by: Michael Froh <froh@amazon.com>
  • Loading branch information
mingshl and msfroh committed Jul 12, 2023
1 parent 64948c5 commit dc1b183
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
import org.opensearch.search.aggregations.InternalAggregations;
Expand Down Expand Up @@ -89,7 +90,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void app

// TODO: Remove originalSearchSource and replace with a deep copy of the SearchRequest object
// once https://github.com/opensearch-project/OpenSearch/issues/869 is implemented
SearchSourceBuilder originalSearchSource = null;
SearchSourceBuilder originalSearchSource;
if (searchRequest.source() != null) {
originalSearchSource = searchRequest.source().shallowCopy();
if (searchRequest.source().fetchSource() != null) {
Expand All @@ -98,6 +99,8 @@ public <Request extends ActionRequest, Response extends ActionResponse> void app
originalSearchSource.fetchSource(new FetchSourceContext(fetchSourceContext.fetchSource(),
fetchSourceContext.includes(), fetchSourceContext.excludes()));
}
} else {
originalSearchSource = null;
}

final String[] indices = searchRequest.indices();
Expand All @@ -107,27 +110,29 @@ public <Request extends ActionRequest, Response extends ActionResponse> void app
return;
}

List<ResultTransformerConfiguration> resultTransformerConfigurations =
getResultTransformerConfigurations(indices[0], searchRequest);

LinkedHashMap<ResultTransformer, ResultTransformerConfiguration> orderedTransformersAndConfigs = new LinkedHashMap<>();
for (ResultTransformerConfiguration config : resultTransformerConfigurations) {
ResultTransformer resultTransformer = resultTransformerMap.get(config.getTransformerName());
// TODO: Should transformers make a decision based on the original request or the request they receive in the chain
if (resultTransformer.shouldTransform(searchRequest, config)) {
searchRequest = resultTransformer.preprocessRequest(searchRequest, config);
orderedTransformersAndConfigs.put(resultTransformer, config);
ActionListener<List<ResultTransformerConfiguration>> resultTransformerConfigsListener = ActionListener.wrap(rtc -> {
LinkedHashMap<ResultTransformer, ResultTransformerConfiguration> orderedTransformersAndConfigs = new LinkedHashMap<>();
SearchRequest transformedRequest = searchRequest;
for (ResultTransformerConfiguration config : rtc) {
ResultTransformer resultTransformer = resultTransformerMap.get(config.getTransformerName());
// TODO: Should transformers make a decision based on the original request or the request they receive in the chain
if (resultTransformer.shouldTransform(searchRequest, config)) {
transformedRequest = resultTransformer.preprocessRequest(transformedRequest, config);
orderedTransformersAndConfigs.put(resultTransformer, config);
}
}
}

if (!orderedTransformersAndConfigs.isEmpty()) {
final ActionListener<Response> searchResponseListener = createSearchResponseListener(
listener, startTime, orderedTransformersAndConfigs, searchRequest, originalSearchSource);
chain.proceed(task, action, request, searchResponseListener);
return;
}
if (!orderedTransformersAndConfigs.isEmpty()) {
final ActionListener<Response> searchResponseListener = createSearchResponseListener(
listener, startTime, orderedTransformersAndConfigs, transformedRequest, originalSearchSource);
chain.proceed(task, action, request, searchResponseListener);
return;
}
chain.proceed(task, action, request, listener);
}, listener::onFailure);

chain.proceed(task, action, request, listener);
getResultTransformerConfigurations(indices[0], searchRequest, resultTransformerConfigsListener);
}

/**
Expand All @@ -139,16 +144,18 @@ public <Request extends ActionRequest, Response extends ActionResponse> void app
* @return ordered and validated list of result transformers, empty list if not specified at
* either request or index level
*/
private List<ResultTransformerConfiguration> getResultTransformerConfigurations(
private void getResultTransformerConfigurations(
final String indexName,
final SearchRequest searchRequest) {
final SearchRequest searchRequest,
ActionListener<List<ResultTransformerConfiguration>> resultTransformerConfigListener) {

List<ResultTransformerConfiguration> configs = new ArrayList<>();

// Request level configuration takes precedence over index level
configs = ConfigurationUtils.getResultTransformersFromRequestConfiguration(searchRequest);
if (!configs.isEmpty()) {
return configs;
resultTransformerConfigListener.onResponse(configs);
return;
}

// Fetch all index settings for this plugin
Expand All @@ -159,10 +166,10 @@ private List<ResultTransformerConfiguration> getResultTransformerConfigurations(
.map(Setting::getKey))
.toArray(String[]::new);

configs = ConfigurationUtils.getResultTransformersFromIndexConfiguration(
openSearchClient.getIndexSettings(indexName, settingNames), resultTransformerMap);

return configs;
ActionListener<Settings> settingsListener = ActionListener.map(resultTransformerConfigListener,
s -> ConfigurationUtils.getResultTransformersFromIndexConfiguration(s, resultTransformerMap));
openSearchClient.getIndexSettings(indexName, settingNames, settingsListener);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/
package org.opensearch.search.relevance.client;

import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.indices.settings.get.GetSettingsAction;
import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse;
Expand All @@ -20,13 +21,13 @@ public OpenSearchClient(Client client) {
this.client = client;
}

public Settings getIndexSettings(String indexName, String[] settingNames) {
public void getIndexSettings(String indexName, String[] settingNames, ActionListener<Settings> settingsListener) {
GetSettingsRequest getSettingsRequest = new GetSettingsRequest()
.indices(indexName);
if (settingNames != null && settingNames.length > 0) {
getSettingsRequest.names(settingNames);
}
GetSettingsResponse getSettingsResponse = client.execute(GetSettingsAction.INSTANCE, getSettingsRequest).actionGet();
return getSettingsResponse.getIndexToSettings().get(indexName);
ActionListener<GetSettingsResponse> responseListener = ActionListener.map(settingsListener, r -> r.getIndexToSettings().get(indexName));
client.execute(GetSettingsAction.INSTANCE, getSettingsRequest, responseListener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/
package org.opensearch.search.relevance.configuration;

import org.opensearch.action.ActionListener;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.common.settings.Settings;
import org.opensearch.search.SearchExtBuilder;
Expand All @@ -33,65 +34,67 @@ public static List<ResultTransformerConfiguration> getResultTransformersFromInde
Map<String, ResultTransformer> resultTransformerMap) {
List<ResultTransformerConfiguration> indexLevelConfigs = new ArrayList<>();

if (settings != null) {
if (settings.getGroups(RESULT_TRANSFORMER_SETTING_PREFIX) != null) {
for (Map.Entry<String, Settings> transformerSettings : settings.getGroups(RESULT_TRANSFORMER_SETTING_PREFIX).entrySet()) {
if (resultTransformerMap.containsKey(transformerSettings.getKey())) {
ResultTransformer transformer = resultTransformerMap.get(transformerSettings.getKey());
indexLevelConfigs.add(transformer.getConfigurationFactory().configure(transformerSettings.getValue()));
}
if (settings != null) {
if (settings.getGroups(RESULT_TRANSFORMER_SETTING_PREFIX) != null) {
for (Map.Entry<String, Settings> transformerSettings : settings.getGroups(RESULT_TRANSFORMER_SETTING_PREFIX).entrySet()) {
if (resultTransformerMap.containsKey(transformerSettings.getKey())) {
ResultTransformer transformer = resultTransformerMap.get(transformerSettings.getKey());
indexLevelConfigs.add(transformer.getConfigurationFactory().configure(transformerSettings.getValue()));
}
}
}
}

return reorderAndValidateConfigs(indexLevelConfigs);
}
return reorderAndValidateConfigs(indexLevelConfigs);
}

/**
* Get result transformer configurations from Search Request
* @param searchRequest input request
* @return ordered and validated list of result transformers, empty list if not specified
*/
public static List<ResultTransformerConfiguration> getResultTransformersFromRequestConfiguration(
final SearchRequest searchRequest) {
/**
* Get result transformer configurations from Search Request
*
* @param searchRequest input request
* @return ordered and validated list of result transformers, empty list if not specified
*/
public static List<ResultTransformerConfiguration> getResultTransformersFromRequestConfiguration(
final SearchRequest searchRequest) {

// Fetch result transformers specified in request
SearchConfigurationExtBuilder requestLevelSearchConfiguration = null;
if (searchRequest.source() != null && searchRequest.source().ext() != null && !searchRequest.source().ext().isEmpty()) {
// Filter ext builders by name
List<SearchExtBuilder> extBuilders = searchRequest.source().ext().stream()
.filter(searchExtBuilder -> SearchConfigurationExtBuilder.NAME.equals(searchExtBuilder.getWriteableName()))
.collect(Collectors.toList());
if (!extBuilders.isEmpty()) {
requestLevelSearchConfiguration = (SearchConfigurationExtBuilder) extBuilders.get(0);
}
}
// Fetch result transformers specified in request
SearchConfigurationExtBuilder requestLevelSearchConfiguration = null;
if (searchRequest.source() != null && searchRequest.source().ext() != null && !searchRequest.source().ext().isEmpty()) {
// Filter ext builders by name
List<SearchExtBuilder> extBuilders = searchRequest.source().ext().stream()
.filter(searchExtBuilder -> SearchConfigurationExtBuilder.NAME.equals(searchExtBuilder.getWriteableName()))
.collect(Collectors.toList());
if (!extBuilders.isEmpty()) {
requestLevelSearchConfiguration = (SearchConfigurationExtBuilder) extBuilders.get(0);
}
}

List<ResultTransformerConfiguration> requestLevelConfigs = new ArrayList<>();
if (requestLevelSearchConfiguration != null) {
requestLevelConfigs = reorderAndValidateConfigs(requestLevelSearchConfiguration.getResultTransformers());
List<ResultTransformerConfiguration> requestLevelConfigs = new ArrayList<>();
if (requestLevelSearchConfiguration != null) {
requestLevelConfigs = reorderAndValidateConfigs(requestLevelSearchConfiguration.getResultTransformers());
}
return requestLevelConfigs;
}
return requestLevelConfigs;
}

/**
* Sort configurations in ascending order of invocation, and validate
* @param configs list of result transformer configurations
* @return ordered and validated list of result transformers
*/
public static List<ResultTransformerConfiguration> reorderAndValidateConfigs(
final List<ResultTransformerConfiguration> configs) throws IllegalArgumentException {
/**
* Sort configurations in ascending order of invocation, and validate
*
* @param configs list of result transformer configurations
* @return ordered and validated list of result transformers
*/
public static List<ResultTransformerConfiguration> reorderAndValidateConfigs(
final List<ResultTransformerConfiguration> configs) throws IllegalArgumentException {

// Sort
configs.sort(Comparator.comparingInt(ResultTransformerConfiguration::getOrder));
// Sort
configs.sort(Comparator.comparingInt(ResultTransformerConfiguration::getOrder));

for (int i = 0; i < configs.size(); ++i) {
if (configs.get(i).getOrder() != (i + 1)) {
throw new IllegalArgumentException("Expected order [" + (i + 1) + "] for transformer [" +
configs.get(i).getTransformerName() + "], but found [" + configs.get(i).getOrder() + "]");
}
}
for (int i = 0; i < configs.size(); ++i) {
if (configs.get(i).getOrder() != (i + 1)) {
throw new IllegalArgumentException("Expected order [" + (i + 1) + "] for transformer [" +
configs.get(i).getTransformerName() + "], but found [" + configs.get(i).getOrder() + "]");
}
}

return configs;
}
return configs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import org.apache.lucene.search.TotalHits;
import org.mockito.Mockito;
import org.opensearch.action.ActionFuture;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.indices.settings.get.GetSettingsAction;
import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest;
Expand Down Expand Up @@ -56,7 +55,8 @@

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;

public class SearchActionFilterTests extends OpenSearchTestCase {

Expand Down Expand Up @@ -115,7 +115,6 @@ public void testIgnoresSearchRequestOnMultipleIndices() {

private static Client buildMockClient(String indexName, Settings... settings) {
Client client = Mockito.mock(Client.class);
ActionFuture<GetSettingsResponse> mockGetSettingsFuture = Mockito.mock(ActionFuture.class);

Settings.Builder settingsBuilder = Settings.builder();
for (Settings settingsEntry : settings) {
Expand All @@ -124,9 +123,11 @@ private static Client buildMockClient(String indexName, Settings... settings) {
Settings settingsObj = settingsBuilder.build();
Map<String, Settings> indexSettingsMap = Map.of(indexName, settingsObj);
GetSettingsResponse getSettingsResponse = new GetSettingsResponse(indexSettingsMap, Collections.emptyMap());
when(mockGetSettingsFuture.actionGet()).thenReturn(getSettingsResponse);
when(client.execute(eq(GetSettingsAction.INSTANCE), any(GetSettingsRequest.class)))
.thenReturn(mockGetSettingsFuture);
doAnswer(invocation -> {
ActionListener<GetSettingsResponse> responseListener = invocation.getArgument(2);
responseListener.onResponse(getSettingsResponse);
return null;
}).when(client).execute(eq(GetSettingsAction.INSTANCE), any(GetSettingsRequest.class), any(ActionListener.class));
return client;
}

Expand All @@ -145,7 +146,8 @@ public void testOperatesOnSingleIndexWithNoTransformers() {
AtomicBoolean proceedCalled = new AtomicBoolean(false);
ActionFilterChain<SearchRequest, SearchResponse> searchFilterChain =
(task1, action, request, listener) -> proceedCalled.set(true);
searchActionFilter.apply(task, SearchAction.NAME, searchRequest, null, searchFilterChain);
ActionListener<SearchResponse> mockListener = mock(ActionListener.class);
searchActionFilter.apply(task, SearchAction.NAME, searchRequest, mockListener, searchFilterChain);
assertTrue(proceedCalled.get());
}

Expand Down Expand Up @@ -263,7 +265,8 @@ public void testTransformerDoesNotRunWhenNotEnabled() {
AtomicBoolean proceedCalled = new AtomicBoolean(false);
ActionFilterChain<SearchRequest, SearchResponse> searchFilterChain =
(task1, action, request, listener) -> proceedCalled.set(true);
searchActionFilter.apply(task, SearchAction.NAME, searchRequest, null, searchFilterChain);
ActionListener<SearchResponse> mockListener = mock(ActionListener.class);
searchActionFilter.apply(task, SearchAction.NAME, searchRequest, mockListener, searchFilterChain);
assertTrue(proceedCalled.get());
// We should try to check for index-level settings
assertTrue(mockTransformer.getTransformerSettingsWasCalled);
Expand Down
9 changes: 9 additions & 0 deletions src/yamlRestTest/resources/rest-api-spec/test/10_basic.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,12 @@

- match:
$body: /^opensearch-search-processor-\d+.\d+.\d+.\d+\n$/

- do:
indices.create:
index: test

- do:
search:
index: test
body: { }

0 comments on commit dc1b183

Please sign in to comment.