diff --git a/pom.xml b/pom.xml index 2eaf9ca473..bb55d53c32 100644 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,7 @@ 2.6 - 6.7.2 + 7.1.0 2.9.1 2.2.0.BUILD-SNAPSHOT spring.data.elasticsearch diff --git a/src/main/java/org/springframework/data/elasticsearch/client/NodeClientFactoryBean.java b/src/main/java/org/springframework/data/elasticsearch/client/NodeClientFactoryBean.java index 05ee9042c5..11aa10153b 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/NodeClientFactoryBean.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/NodeClientFactoryBean.java @@ -58,7 +58,7 @@ public static class TestNode extends Node { public TestNode(Settings preparedSettings, Collection> classpathPlugins) { - super(InternalSettingsPreparer.prepareEnvironment(preparedSettings, null), classpathPlugins, false); + super(InternalSettingsPreparer.prepareEnvironment(preparedSettings, null, null, null), classpathPlugins, false); } protected void registerDerivedNodeNameWithLogger(String nodeName) { diff --git a/src/main/java/org/springframework/data/elasticsearch/core/DefaultResultMapper.java b/src/main/java/org/springframework/data/elasticsearch/core/DefaultResultMapper.java index ba42b3faba..d422c23754 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/DefaultResultMapper.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/DefaultResultMapper.java @@ -99,7 +99,7 @@ private static EntityMapper initEntityMapper( @Override public AggregatedPage mapResults(SearchResponse response, Class clazz, Pageable pageable) { - long totalHits = response.getHits().getTotalHits(); + long totalHits = response.getHits().getTotalHits().value; float maxScore = response.getHits().getMaxScore(); List results = new ArrayList<>(); diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java index 591bf111ce..c0a923c5fd 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java @@ -29,18 +29,18 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.NoSuchElementException; import java.util.Optional; -import java.util.Set; -import org.apache.http.util.EntityUtils; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions; +import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; @@ -59,18 +59,14 @@ import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.Requests; -import org.elasticsearch.client.Response; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.*; +import org.elasticsearch.client.indices.GetMappingsRequest; +import org.elasticsearch.client.indices.GetMappingsResponse; import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.DeprecationHandler; -import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; @@ -102,7 +98,6 @@ import org.springframework.data.elasticsearch.annotations.Mapping; import org.springframework.data.elasticsearch.annotations.Setting; import org.springframework.data.elasticsearch.core.aggregation.AggregatedPage; -import org.springframework.data.elasticsearch.core.client.support.AliasData; import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter; import org.springframework.data.elasticsearch.core.facet.FacetRequest; @@ -110,14 +105,11 @@ import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty; import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext; import org.springframework.data.elasticsearch.core.query.*; +import org.springframework.data.elasticsearch.utils.ElasticsearchCloseableIterator; import org.springframework.data.util.CloseableIterator; import org.springframework.util.Assert; import org.springframework.util.StringUtils; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; - /** * ElasticsearchRestTemplate * @@ -142,6 +134,7 @@ * @author Christoph Strobl * @author Lorenzo Spinelli * @author Dmitriy Yakovlev + * @author Simon Schneider */ public class ElasticsearchRestTemplate implements ElasticsearchOperations, EsClient, ApplicationContextAware { @@ -204,7 +197,7 @@ public boolean createIndex(Class clazz) { public boolean createIndex(String indexName) { Assert.notNull(indexName, "No index defined for Query"); try { - return client.indices().create(Requests.createIndexRequest(indexName)).isAcknowledged(); + return client.indices().create(Requests.createIndexRequest(indexName), RequestOptions.DEFAULT).isAcknowledged(); } catch (Exception e) { throw new ElasticsearchException("Failed to create index " + indexName, e); } @@ -250,7 +243,7 @@ public boolean putMapping(String indexName, String type, Object mapping) { request.source((XContentBuilder) mapping); } try { - return client.indices().putMapping(request).isAcknowledged(); + return client.indices().putMapping(request, RequestOptions.DEFAULT).isAcknowledged(); } catch (IOException e) { throw new ElasticsearchException("Failed to put mapping for " + indexName, e); } @@ -259,15 +252,14 @@ public boolean putMapping(String indexName, String type, Object mapping) { @Override public Map getMapping(String indexName, String type) { Assert.notNull(indexName, "No index defined for getMapping()"); - Assert.notNull(type, "No type defined for getMapping()"); Map mappings = null; - RestClient restClient = client.getLowLevelClient(); try { - Response response = restClient.performRequest("GET", "/" + indexName + "/_mapping/" + type); - mappings = convertMappingResponse(EntityUtils.toString(response.getEntity()), type); + GetMappingsResponse response = client.indices().getMapping(new GetMappingsRequest().indices(indexName), RequestOptions.DEFAULT); + mappings = new HashMap(); + mappings.put(type, response.mappings().get(indexName)); } catch (Exception e) { throw new ElasticsearchException( - "Error while getting mapping for indexName : " + indexName + " type : " + type + " ", e); + "Error while getting mapping for indexName : " + indexName + " ", e); } return mappings; } @@ -277,23 +269,6 @@ public Map getMapping(Class clazz) { return getMapping(getPersistentEntityFor(clazz).getIndexName(), getPersistentEntityFor(clazz).getIndexType()); } - private Map convertMappingResponse(String mappingResponse, String type) { - ObjectMapper mapper = new ObjectMapper(); - - try { - Map result = null; - JsonNode node = mapper.readTree(mappingResponse); - - node = node.findValue("mappings").findValue(type); - result = mapper.readValue(mapper.writeValueAsString(node), HashMap.class); - - return result; - } catch (IOException e) { - throw new ElasticsearchException("Could not map alias response : " + mappingResponse, e); - } - - } - @Override public ElasticsearchConverter getElasticsearchConverter() { return elasticsearchConverter; @@ -311,7 +286,7 @@ public T queryForObject(GetQuery query, Class clazz, GetResultMapper mapp query.getId()); GetResponse response; try { - response = client.get(request); + response = client.get(request, RequestOptions.DEFAULT); T entity = mapper.mapResult(response, clazz); return entity; } catch (IOException e) { @@ -375,7 +350,7 @@ private List> doMultiSearch(List queries, List> cl private MultiSearchResponse.Item[] getMultiSearchResult(MultiSearchRequest request) { MultiSearchResponse response; try { - response = client.multiSearch(request); + response = client.multiSearch(request, RequestOptions.DEFAULT); } catch (IOException e) { throw new ElasticsearchException("Error for search request: " + request.toString(), e); } @@ -438,7 +413,7 @@ public List queryForIds(SearchQuery query) { } SearchResponse response; try { - response = client.search(request); + response = client.search(request, RequestOptions.DEFAULT); } catch (IOException e) { throw new ElasticsearchException("Error for search request: " + request.toString(), e); } @@ -470,7 +445,7 @@ public Page queryForPage(CriteriaQuery criteriaQuery, Class clazz) { SearchResponse response; try { - response = client.search(request); + response = client.search(request, RequestOptions.DEFAULT); } catch (IOException e) { throw new ElasticsearchException("Error for search request: " + request.toString(), e); } @@ -488,7 +463,7 @@ public Page queryForPage(StringQuery query, Class clazz, SearchResultM request.source().query((wrapperQuery(query.getSource()))); SearchResponse response; try { - response = client.search(request); + response = client.search(request, RequestOptions.DEFAULT); } catch (IOException e) { throw new ElasticsearchException("Error for search request: " + request.toString(), e); } @@ -514,61 +489,7 @@ public CloseableIterator stream(SearchQuery query, final Class clazz, private CloseableIterator doStream(final long scrollTimeInMillis, final ScrolledPage page, final Class clazz, final SearchResultMapper mapper) { - return new CloseableIterator() { - - /** As we couldn't retrieve single result with scroll, store current hits. */ - private volatile Iterator currentHits = page.iterator(); - - /** The scroll id. */ - private volatile String scrollId = page.getScrollId(); - - /** If stream is finished (ie: cluster returns no results. */ - private volatile boolean finished = !currentHits.hasNext(); - - @Override - public void close() { - try { - // Clear scroll on cluster only in case of error (cause elasticsearch auto clear scroll when it's done) - if (!finished && scrollId != null && currentHits != null && currentHits.hasNext()) { - clearScroll(scrollId); - } - } finally { - currentHits = null; - scrollId = null; - } - } - - @Override - public boolean hasNext() { - // Test if stream is finished - if (finished) { - return false; - } - // Test if it remains hits - if (currentHits == null || !currentHits.hasNext()) { - // Do a new request - final ScrolledPage scroll = continueScroll(scrollId, scrollTimeInMillis, clazz, mapper); - // Save hits and scroll id - currentHits = scroll.iterator(); - finished = !currentHits.hasNext(); - scrollId = scroll.getScrollId(); - } - return currentHits.hasNext(); - } - - @Override - public T next() { - if (hasNext()) { - return currentHits.next(); - } - throw new NoSuchElementException(); - } - - @Override - public void remove() { - throw new UnsupportedOperationException("remove"); - } - }; + return new ElasticsearchCloseableIterator(this, page, scrollTimeInMillis, clazz, mapper); } @Override @@ -616,7 +537,7 @@ private long doCount(SearchRequest countRequest, QueryBuilder elasticsearchQuery countRequest.source(sourceBuilder); try { - return client.search(countRequest).getHits().getTotalHits(); + return client.search(countRequest, RequestOptions.DEFAULT).getHits().getTotalHits().value; } catch (IOException e) { throw new ElasticsearchException("Error while searching for request: " + countRequest.toString(), e); } @@ -633,11 +554,11 @@ private long doCount(SearchRequest searchRequest, QueryBuilder elasticsearchQuer } SearchResponse response; try { - response = client.search(searchRequest); + response = client.search(searchRequest, RequestOptions.DEFAULT); } catch (IOException e) { throw new ElasticsearchException("Error for search request: " + searchRequest.toString(), e); } - return response.getHits().getTotalHits(); + return response.getHits().getTotalHits().value; } private SearchRequest prepareCount(Query query, Class clazz) { @@ -690,7 +611,7 @@ private MultiGetResponse getMultiResponse(Query searchQuery, Class clazz) request.add(item); } try { - return client.multiGet(request); + return client.multiGet(request, RequestOptions.DEFAULT); } catch (IOException e) { throw new ElasticsearchException("Error while multiget for request: " + request.toString(), e); } @@ -706,7 +627,7 @@ public String index(IndexQuery query) { String documentId; IndexRequest request = prepareIndex(query); try { - documentId = client.index(request).getId(); + documentId = client.index(request, RequestOptions.DEFAULT).getId(); } catch (IOException e) { throw new ElasticsearchException("Error while index for request: " + request.toString(), e); } @@ -721,7 +642,7 @@ public String index(IndexQuery query) { public UpdateResponse update(UpdateQuery query) { UpdateRequest request = prepareUpdate(query); try { - return client.update(request); + return client.update(request, RequestOptions.DEFAULT); } catch (IOException e) { throw new ElasticsearchException("Error while update for request: " + request.toString(), e); } @@ -760,7 +681,7 @@ public void bulkIndex(List queries) { bulkRequest.add(prepareIndex(query)); } try { - checkForBulkUpdateFailure(client.bulk(bulkRequest)); + checkForBulkUpdateFailure(client.bulk(bulkRequest, RequestOptions.DEFAULT)); } catch (IOException e) { throw new ElasticsearchException("Error while bulk for request: " + bulkRequest.toString(), e); } @@ -773,7 +694,7 @@ public void bulkUpdate(List queries) { bulkRequest.add(prepareUpdate(query)); } try { - checkForBulkUpdateFailure(client.bulk(bulkRequest)); + checkForBulkUpdateFailure(client.bulk(bulkRequest, RequestOptions.DEFAULT)); } catch (IOException e) { throw new ElasticsearchException("Error while bulk for request: " + bulkRequest.toString(), e); } @@ -803,20 +724,19 @@ public boolean indexExists(String indexName) { GetIndexRequest request = new GetIndexRequest(); request.indices(indexName); try { - return client.indices().exists(request); + return client.indices().exists(request, RequestOptions.DEFAULT); } catch (IOException e) { throw new ElasticsearchException("Error while for indexExists request: " + request.toString(), e); } } @Override - public boolean typeExists(String index, String type) { - RestClient restClient = client.getLowLevelClient(); + public boolean typeExists(String index, String _) { try { - Response response = restClient.performRequest("HEAD", index + "/_mapping/" + type); - return (response.getStatusLine().getStatusCode() == 200); + GetMappingsResponse response = client.indices().getMapping(new GetMappingsRequest().indices(index), RequestOptions.DEFAULT); + return response.mappings().containsKey(index); } catch (Exception e) { - throw new ElasticsearchException("Error while checking type exists for index: " + index + " type : " + type + " ", + throw new ElasticsearchException("Error while checking type exists for index: " + index + " ", e); } } @@ -832,7 +752,7 @@ public boolean deleteIndex(String indexName) { if (indexExists(indexName)) { DeleteIndexRequest request = new DeleteIndexRequest(indexName); try { - return client.indices().delete(request).isAcknowledged(); + return client.indices().delete(request, RequestOptions.DEFAULT).isAcknowledged(); } catch (IOException e) { throw new ElasticsearchException("Error while deleting index request: " + request.toString(), e); } @@ -844,7 +764,7 @@ public boolean deleteIndex(String indexName) { public String delete(String indexName, String type, String id) { DeleteRequest request = new DeleteRequest(indexName, type, id); try { - return client.delete(request).getId(); + return client.delete(request, RequestOptions.DEFAULT).getId(); } catch (IOException e) { throw new ElasticsearchException("Error while deleting item request: " + request.toString(), e); } @@ -952,7 +872,7 @@ private SearchResponse doScroll(SearchRequest request, CriteriaQuery criteriaQue request.source().version(true); try { - return client.search(request); + return client.search(request, RequestOptions.DEFAULT); } catch (IOException e) { throw new ElasticsearchException("Error for search request with scroll: " + request.toString(), e); } @@ -981,7 +901,7 @@ private SearchResponse doScroll(SearchRequest request, SearchQuery searchQuery) } try { - return client.search(request); + return client.search(request, RequestOptions.DEFAULT); } catch (IOException e) { throw new ElasticsearchException("Error for search request with scroll: " + request.toString(), e); } @@ -1010,24 +930,19 @@ public ScrolledPage startScroll(long scrollTimeInMillis, CriteriaQuery cr } public ScrolledPage continueScroll(@Nullable String scrollId, long scrollTimeInMillis, Class clazz) { - SearchScrollRequest request = new SearchScrollRequest(scrollId); - request.scroll(TimeValue.timeValueMillis(scrollTimeInMillis)); - SearchResponse response; - try { - response = client.searchScroll(request); - } catch (IOException e) { - throw new ElasticsearchException("Error for search request with scroll: " + request.toString(), e); - } - return resultsMapper.mapResults(response, clazz, Pageable.unpaged()); + return continueScroll(scrollId, scrollTimeInMillis, clazz, null); } public ScrolledPage continueScroll(@Nullable String scrollId, long scrollTimeInMillis, Class clazz, SearchResultMapper mapper) { + if (mapper == null) { + mapper = resultsMapper; + } SearchScrollRequest request = new SearchScrollRequest(scrollId); request.scroll(TimeValue.timeValueMillis(scrollTimeInMillis)); SearchResponse response; try { - response = client.searchScroll(request); + response = client.searchScroll(request, RequestOptions.DEFAULT); } catch (IOException e) { throw new ElasticsearchException("Error for search request with scroll: " + request.toString(), e); } @@ -1040,7 +955,7 @@ public void clearScroll(String scrollId) { request.addScrollId(scrollId); try { // TODO: Something useful with the response. - ClearScrollResponse response = client.clearScroll(request); + ClearScrollResponse response = client.clearScroll(request, RequestOptions.DEFAULT); } catch (IOException e) { throw new ElasticsearchException("Error for search request with scroll: " + request.toString(), e); } @@ -1092,7 +1007,7 @@ private SearchResponse doSearch(SearchRequest searchRequest, SearchQuery searchQ prepareSearch(searchRequest, searchQuery); try { - return client.search(searchRequest); + return client.search(searchRequest, RequestOptions.DEFAULT); } catch (IOException e) { throw new ElasticsearchException("Error for search request with scroll: " + searchRequest.toString(), e); } @@ -1184,7 +1099,7 @@ public boolean createIndex(String indexName, Object settings) { request.settings((XContentBuilder) settings); } try { - return client.indices().create(request).isAcknowledged(); + return client.indices().create(request, RequestOptions.DEFAULT).isAcknowledged(); } catch (IOException e) { throw new ElasticsearchException("Error for creating index: " + request.toString(), e); } @@ -1214,39 +1129,23 @@ public Map getSetting(Class clazz) { @Override // TODO change interface to return Settings. public Map getSetting(String indexName) { Assert.notNull(indexName, "No index defined for getSettings"); - ObjectMapper objMapper = new ObjectMapper(); Map settings = null; - RestClient restClient = client.getLowLevelClient(); try { - Response response = restClient.performRequest("GET", "/" + indexName + "/_settings"); - settings = convertSettingResponse(EntityUtils.toString(response.getEntity()), indexName); - + GetSettingsResponse response = client.indices().getSettings(new GetSettingsRequest().indices(indexName), RequestOptions.DEFAULT); + settings = new HashMap(); + Settings indexSettings = response.getIndexToSettings().get(indexName); + if(indexSettings == null){ + return settings; + } + for (String settingName : indexSettings.keySet()){ + settings.put(settingName, indexSettings.get(settingName)); + } } catch (Exception e) { throw new ElasticsearchException("Error while getting settings for indexName : " + indexName, e); } return settings; } - private Map convertSettingResponse(String settingResponse, String indexName) { - ObjectMapper mapper = new ObjectMapper(); - - try { - Settings settings = Settings.fromXContent(XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, - DeprecationHandler.THROW_UNSUPPORTED_OPERATION, settingResponse)); - String prefix = indexName + ".settings."; - // Backwards compatibility. TODO Change to return Settings object. - Map result = new HashMap(); - Set keySet = settings.keySet(); - for (String key : keySet) { - result.put(key.substring(prefix.length()), settings.get(key)); - } - return result; - } catch (IOException e) { - throw new ElasticsearchException("Could not map alias response : " + settingResponse, e); - } - - } - private SearchRequest prepareSearch(Query query, Class clazz) { setPersistentEntityIndexAndType(query, clazz); return prepareSearch(query, Optional.empty()); @@ -1349,9 +1248,10 @@ private IndexRequest prepareIndex(IndexQuery query) { indexRequest.versionType(versionType); } - if (query.getParentId() != null) { + // TODO find alternative + /*if (query.getParentId() != null) { indexRequest.parent(query.getParentId()); - } + }*/ return indexRequest; } catch (IOException e) { @@ -1364,7 +1264,7 @@ public void refresh(String indexName) { Assert.notNull(indexName, "No index defined for refresh()"); try { // TODO: Do something with the response. - client.indices().refresh(refreshRequest(indexName)); + client.indices().refresh(refreshRequest(indexName), RequestOptions.DEFAULT); } catch (IOException e) { throw new ElasticsearchException("failed to refresh index: " + indexName, e); } @@ -1397,7 +1297,7 @@ public Boolean addAlias(AliasQuery query) { IndicesAliasesRequest request = new IndicesAliasesRequest(); request.addAliasAction(aliasAction); try { - return client.indices().updateAliases(request).isAcknowledged(); + return client.indices().updateAliases(request, RequestOptions.DEFAULT).isAcknowledged(); } catch (IOException e) { throw new ElasticsearchException("failed to update aliases with request: " + request, e); } @@ -1411,7 +1311,7 @@ public Boolean removeAlias(AliasQuery query) { AliasActions aliasAction = new AliasActions(AliasActions.Type.REMOVE); request.addAliasAction(aliasAction); try { - return client.indices().updateAliases(request).isAcknowledged(); + return client.indices().updateAliases(request, RequestOptions.DEFAULT).isAcknowledged(); } catch (IOException e) { throw new ElasticsearchException("failed to update aliases with request: " + request, e); } @@ -1419,55 +1319,15 @@ public Boolean removeAlias(AliasQuery query) { @Override public List queryForAlias(String indexName) { - List aliases = null; - RestClient restClient = client.getLowLevelClient(); - Response response; - String aliasResponse; - + GetAliasesResponse response; try { - response = restClient.performRequest("GET", "/" + indexName + "/_alias/*"); - aliasResponse = EntityUtils.toString(response.getEntity()); + response = client.indices().getAlias(new GetAliasesRequest().indices(indexName), RequestOptions.DEFAULT); } catch (Exception e) { throw new ElasticsearchException("Error while getting mapping for indexName : " + indexName, e); } - - return convertAliasResponse(aliasResponse); + return new ArrayList<>(response.getAliases().get(indexName)); } - /** - * It takes two steps to create a List from the elasticsearch http response because the aliases field - * is actually a Map by alias name, but the alias name is on the AliasMetadata. - * - * @param aliasResponse - * @return - */ - List convertAliasResponse(String aliasResponse) { - ObjectMapper mapper = new ObjectMapper(); - - try { - JsonNode node = mapper.readTree(aliasResponse); - - Iterator names = node.fieldNames(); - String name = names.next(); - node = node.findValue("aliases"); - - Map aliasData = mapper.readValue(mapper.writeValueAsString(node), - new TypeReference>() {}); - - Iterable> aliasIter = aliasData.entrySet(); - List aliasMetaDataList = new ArrayList(); - - for (Map.Entry aliasentry : aliasIter) { - AliasData data = aliasentry.getValue(); - aliasMetaDataList.add(AliasMetaData.newAliasMetaDataBuilder(aliasentry.getKey()).filter(data.getFilter()) - .routing(data.getRouting()).searchRouting(data.getSearch_routing()).indexRouting(data.getIndex_routing()) - .build()); - } - return aliasMetaDataList; - } catch (IOException e) { - throw new ElasticsearchException("Could not map alias response : " + aliasResponse, e); - } - } @Override public ElasticsearchPersistentEntity getPersistentEntityFor(Class clazz) { @@ -1597,7 +1457,7 @@ public SearchResponse suggest(SuggestBuilder suggestion, String... indices) { searchRequest.source(sourceBuilder); try { - return client.search(searchRequest); + return client.search(searchRequest, RequestOptions.DEFAULT); } catch (IOException e) { throw new ElasticsearchException("Could not execute search request : " + searchRequest.toString(), e); } @@ -1606,4 +1466,5 @@ public SearchResponse suggest(SuggestBuilder suggestion, String... indices) { public SearchResponse suggest(SuggestBuilder suggestion, Class clazz) { return suggest(suggestion, retrieveIndexNameFromPersistentEntity(clazz)); } + } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java index d6bc220ec8..7315af256a 100755 --- a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java @@ -98,6 +98,7 @@ import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty; import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext; import org.springframework.data.elasticsearch.core.query.*; +import org.springframework.data.elasticsearch.utils.ElasticsearchCloseableIterator; import org.springframework.data.util.CloseableIterator; import org.springframework.util.Assert; import org.springframework.util.StringUtils; @@ -447,61 +448,7 @@ public CloseableIterator stream(SearchQuery query, final Class clazz, private CloseableIterator doStream(final long scrollTimeInMillis, final ScrolledPage page, final Class clazz, final SearchResultMapper mapper) { - return new CloseableIterator() { - - /** As we couldn't retrieve single result with scroll, store current hits. */ - private volatile Iterator currentHits = page.iterator(); - - /** The scroll id. */ - private volatile String scrollId = page.getScrollId(); - - /** If stream is finished (ie: cluster returns no results. */ - private volatile boolean finished = !currentHits.hasNext(); - - @Override - public void close() { - try { - // Clear scroll on cluster only in case of error (cause elasticsearch auto clear scroll when it's done) - if (!finished && scrollId != null && currentHits != null && currentHits.hasNext()) { - clearScroll(scrollId); - } - } finally { - currentHits = null; - scrollId = null; - } - } - - @Override - public boolean hasNext() { - // Test if stream is finished - if (finished) { - return false; - } - // Test if it remains hits - if (currentHits == null || !currentHits.hasNext()) { - // Do a new request - final ScrolledPage scroll = continueScroll(scrollId, scrollTimeInMillis, clazz, mapper); - // Save hits and scroll id - currentHits = scroll.iterator(); - finished = !currentHits.hasNext(); - scrollId = scroll.getScrollId(); - } - return currentHits.hasNext(); - } - - @Override - public T next() { - if (hasNext()) { - return currentHits.next(); - } - throw new NoSuchElementException(); - } - - @Override - public void remove() { - throw new UnsupportedOperationException("remove"); - } - }; + return new ElasticsearchCloseableIterator<>(this, page, scrollTimeInMillis, clazz, mapper); } @Override @@ -546,7 +493,7 @@ private long doCount(SearchRequestBuilder countRequestBuilder, QueryBuilder elas if (elasticsearchQuery != null) { countRequestBuilder.setQuery(elasticsearchQuery); } - return countRequestBuilder.execute().actionGet().getHits().getTotalHits(); + return countRequestBuilder.execute().actionGet().getHits().getTotalHits().value; } private long doCount(SearchRequestBuilder searchRequestBuilder, QueryBuilder elasticsearchQuery, @@ -559,7 +506,7 @@ private long doCount(SearchRequestBuilder searchRequestBuilder, QueryBuilder ela if (elasticsearchFilter != null) { searchRequestBuilder.setPostFilter(elasticsearchFilter); } - return searchRequestBuilder.execute().actionGet().getHits().getTotalHits(); + return searchRequestBuilder.execute().actionGet().getHits().getTotalHits().value; } private SearchRequestBuilder prepareCount(Query query, Class clazz) { @@ -1161,9 +1108,10 @@ private IndexRequestBuilder prepareIndex(IndexQuery query) { indexRequestBuilder.setVersionType(versionType); } - if (query.getParentId() != null) { - indexRequestBuilder.setParent(query.getParentId()); - } + // TODO find alternative + //if (query.getParentId() != null) { + // indexRequestBuilder.setParent(query.getParentId()); + //} return indexRequestBuilder; } catch (IOException e) { diff --git a/src/main/java/org/springframework/data/elasticsearch/core/FacetedPageImpl.java b/src/main/java/org/springframework/data/elasticsearch/core/FacetedPageImpl.java index 014254e77b..16df735645 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/FacetedPageImpl.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/FacetedPageImpl.java @@ -24,8 +24,8 @@ import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.range.Range; import org.elasticsearch.search.aggregations.bucket.terms.Terms; -import org.elasticsearch.search.aggregations.metrics.stats.extended.ExtendedStats; -import org.elasticsearch.search.aggregations.metrics.sum.Sum; +import org.elasticsearch.search.aggregations.metrics.ExtendedStats; +import org.elasticsearch.search.aggregations.metrics.Sum; import org.joda.time.DateTime; import org.springframework.data.domain.PageImpl; import org.springframework.data.domain.Pageable; diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java index 8b2ddc884b..459b3fb379 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java @@ -167,13 +167,14 @@ private Mono doIndex(Object value, AdaptibleEntity entity, @Nu } } - if (entity.hasParent()) { + // TODO find alternative + /*if (entity.hasParent()) { Object parentId = entity.getParentId(); if (parentId != null) { request.parent(converter.convertId(parentId)); } - } + }*/ request = prepareIndexRequest(value, request); return doIndex(request); diff --git a/src/main/java/org/springframework/data/elasticsearch/utils/ElasticsearchCloseableIterator.java b/src/main/java/org/springframework/data/elasticsearch/utils/ElasticsearchCloseableIterator.java new file mode 100644 index 0000000000..09cbba5f62 --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/utils/ElasticsearchCloseableIterator.java @@ -0,0 +1,79 @@ +package org.springframework.data.elasticsearch.utils; + +import org.springframework.data.elasticsearch.core.*; +import org.springframework.data.util.CloseableIterator; + +import java.util.Iterator; +import java.util.NoSuchElementException; + +public class ElasticsearchCloseableIterator implements CloseableIterator { + + private ElasticsearchOperations elasticsearchTemplate; + private final ScrolledPage page; + private final long scrollTimeInMillis; + private final Class clazz; + private final SearchResultMapper mapper; + /** As we couldn't retrieve single result with scroll, store current hits. */ + private volatile Iterator currentHits; + + /** The scroll id. */ + private volatile String scrollId; + + /** If stream is finished (ie: cluster returns no results. */ + private volatile boolean finished; + + public ElasticsearchCloseableIterator(ElasticsearchOperations elasticsearchTemplate, ScrolledPage page, long scrollTimeInMillis, Class clazz, SearchResultMapper mapper) { + this.elasticsearchTemplate = elasticsearchTemplate; + this.page = page; + this.scrollTimeInMillis = scrollTimeInMillis; + this.clazz = clazz; + this.mapper = mapper; + currentHits = page.iterator(); + scrollId = page.getScrollId(); + finished = !currentHits.hasNext(); + } + + @Override + public void close() { + try { + // Clear scroll on cluster only in case of error (cause elasticsearch auto clear scroll when it's done) + if (!finished && scrollId != null && currentHits != null && currentHits.hasNext()) { + elasticsearchTemplate.clearScroll(scrollId); + } + } finally { + currentHits = null; + scrollId = null; + } + } + + @Override + public boolean hasNext() { + // Test if stream is finished + if (finished) { + return false; + } + // Test if it remains hits + if (currentHits == null || !currentHits.hasNext()) { + // Do a new request + final ScrolledPage scroll = elasticsearchTemplate.continueScroll(scrollId, scrollTimeInMillis, clazz, mapper); + // Save hits and scroll id + currentHits = scroll.iterator(); + finished = !currentHits.hasNext(); + scrollId = scroll.getScrollId(); + } + return currentHits.hasNext(); + } + + @Override + public T next() { + if (hasNext()) { + return currentHits.next(); + } + throw new NoSuchElementException(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("remove"); + } +} diff --git a/src/test/java/org/springframework/data/elasticsearch/TestUtils.java b/src/test/java/org/springframework/data/elasticsearch/TestUtils.java index 18e485d337..4f6a764002 100644 --- a/src/test/java/org/springframework/data/elasticsearch/TestUtils.java +++ b/src/test/java/org/springframework/data/elasticsearch/TestUtils.java @@ -94,7 +94,7 @@ public static boolean isEmptyIndex(String indexName) { return 0L == client .search(new SearchRequest(indexName) .source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery())), RequestOptions.DEFAULT) - .getHits().getTotalHits(); + .getHits().getTotalHits().value; } } diff --git a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java index 45b2d019ce..9989e4d388 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java @@ -135,8 +135,6 @@ public void infoShouldReturnClusterInformation() { client.info().as(StepVerifier::create) // .consumeNextWith(it -> { - - assertThat(it.isAvailable()).isTrue(); assertThat(it.getVersion()).isGreaterThanOrEqualTo(Version.CURRENT); }) // .verifyComplete(); diff --git a/src/test/java/org/springframework/data/elasticsearch/core/DefaultResultMapperTests.java b/src/test/java/org/springframework/data/elasticsearch/core/DefaultResultMapperTests.java index 3f6ee48ddd..7f571aff55 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/DefaultResultMapperTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/DefaultResultMapperTests.java @@ -102,7 +102,7 @@ public void shouldMapAggregationsToPage() { // Given SearchHit[] hits = { createCarHit("Ford", "Grat"), createCarHit("BMW", "Arrow") }; SearchHits searchHits = mock(SearchHits.class); - when(searchHits.getTotalHits()).thenReturn(2L); + when(searchHits.getTotalHits().value).thenReturn(2L); when(searchHits.iterator()).thenReturn(new ArrayIterator(hits)); when(response.getHits()).thenReturn(searchHits); @@ -123,7 +123,7 @@ public void shouldMapSearchRequestToPage() { // Given SearchHit[] hits = { createCarHit("Ford", "Grat"), createCarHit("BMW", "Arrow") }; SearchHits searchHits = mock(SearchHits.class); - when(searchHits.getTotalHits()).thenReturn(2L); + when(searchHits.getTotalHits().value).thenReturn(2L); when(searchHits.iterator()).thenReturn(new ArrayIterator(hits)); when(response.getHits()).thenReturn(searchHits); @@ -141,7 +141,7 @@ public void shouldMapPartialSearchRequestToObject() { // Given SearchHit[] hits = { createCarPartialHit("Ford", "Grat"), createCarPartialHit("BMW", "Arrow") }; SearchHits searchHits = mock(SearchHits.class); - when(searchHits.getTotalHits()).thenReturn(2L); + when(searchHits.getTotalHits().value).thenReturn(2L); when(searchHits.iterator()).thenReturn(new ArrayIterator(hits)); when(response.getHits()).thenReturn(searchHits); @@ -234,7 +234,7 @@ public void setsVersionFromSearchResponse() { when(hit2.getVersion()).thenReturn(5678L); SearchHits searchHits = mock(SearchHits.class); - when(searchHits.getTotalHits()).thenReturn(2L); + when(searchHits.getTotalHits().value).thenReturn(2L); when(searchHits.iterator()).thenReturn(Arrays.asList(hit1, hit2).iterator()); SearchResponse searchResponse = mock(SearchResponse.class);