Skip to content

Commit

Permalink
DATAES-31 : Add the ability to index arbitrary JSON strings
Browse files Browse the repository at this point in the history
  • Loading branch information
mohsinh committed Nov 23, 2013
1 parent a833c89 commit 9386129
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 2 deletions.
Expand Up @@ -531,9 +531,16 @@ private IndexRequestBuilder prepareIndex(IndexQuery query) {
String type = isBlank(query.getType()) ? retrieveTypeFromPersistentEntity(query.getObject().getClass())[0]
: query.getType();

IndexRequestBuilder indexRequestBuilder = client.prepareIndex(indexName, type, query.getId()).setSource(
resultsMapper.getEntityMapper().mapToString(query.getObject()));
IndexRequestBuilder indexRequestBuilder = null;

if(query.getObject() != null) {
indexRequestBuilder = client.prepareIndex(indexName, type, query.getId()).setSource(
resultsMapper.getEntityMapper().mapToString(query.getObject()));
} else if(query.getSource() != null) {
indexRequestBuilder = client.prepareIndex(indexName, type, query.getId()).setSource(query.getSource());
} else {
throw new ElasticsearchException("object or source is null, failed to index the document [id: " + query.getId() + "]");
}
if (query.getVersion() != null) {
indexRequestBuilder.setVersion(query.getVersion());
indexRequestBuilder.setVersionType(EXTERNAL);
Expand All @@ -544,16 +551,19 @@ private IndexRequestBuilder prepareIndex(IndexQuery query) {
}
}

@Override
public void refresh(String indexName, boolean waitForOperation) {
client.admin().indices().refresh(refreshRequest(indexName).force(waitForOperation)).actionGet();
}

@Override
public <T> void refresh(Class<T> clazz, boolean waitForOperation) {
ElasticsearchPersistentEntity persistentEntity = getPersistentEntityFor(clazz);
client.admin().indices()
.refresh(refreshRequest(persistentEntity.getIndexName()).force(waitForOperation)).actionGet();
}

@Override
public Boolean addAlias(AliasQuery query) {
Assert.notNull(query.getIndexName(), "No index defined for Alias");
Assert.notNull(query.getAliasName(), "No alias defined");
Expand All @@ -568,13 +578,15 @@ public Boolean addAlias(AliasQuery query) {
return indicesAliasesRequestBuilder.execute().actionGet().isAcknowledged();
}

@Override
public Boolean removeAlias(AliasQuery query) {
Assert.notNull(query.getIndexName(), "No index defined for Alias");
Assert.notNull(query.getAliasName(), "No alias defined");
return client.admin().indices().prepareAliases().removeAlias(query.getIndexName(), query.getAliasName())
.execute().actionGet().isAcknowledged();
}

@Override
public Set<String> queryForAlias(String indexName) {
ClusterStateRequest clusterStateRequest = Requests.clusterStateRequest()
.filterRoutingTable(true)
Expand Down
Expand Up @@ -29,6 +29,7 @@ public class IndexQuery {
private Long version;
private String indexName;
private String type;
private String source;

public String getId() {
return id;
Expand Down Expand Up @@ -69,4 +70,12 @@ public String getType() {
public void setType(String type) {
this.type = type;
}

public String getSource() {
return source;
}

public void setSource(String source) {
this.source = source;
}
}
Expand Up @@ -31,6 +31,7 @@
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.data.elasticsearch.ElasticsearchException;
import org.springframework.data.elasticsearch.SampleEntity;
import org.springframework.data.elasticsearch.SampleMappingEntity;
import org.springframework.data.elasticsearch.core.query.*;
Expand Down Expand Up @@ -939,4 +940,71 @@ public void shouldRemoveAlias(){
assertThat(aliases.size(), is(0));
}


@Test
public void shouldIndexDocumentForSpecifiedSource(){

// given
String documentSource = "{\"id\":\"2333343434\",\"type\":null,\"message\":\"some message\",\"rate\":0,\"available\":false,\"highlightedMessage\":null,\"version\":1385208779482}";
IndexQuery indexQuery = new IndexQuery();
indexQuery.setId("2333343434");
indexQuery.setSource(documentSource);
indexQuery.setIndexName("test-index");
indexQuery.setType("test-type");
// when
elasticsearchTemplate.index(indexQuery);
elasticsearchTemplate.refresh(SampleEntity.class, true);
SearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(termQuery("id",indexQuery.getId()))
.withIndices("test-index")
.withTypes("test-type")
.build();
// then
Page<SampleEntity> page = elasticsearchTemplate.queryForPage(searchQuery, SampleEntity.class, new SearchResultMapper() {
@Override
public <T> FacetedPage<T> mapResults(SearchResponse response, Class<T> clazz, Pageable pageable) {
List<SampleEntity> values = new ArrayList<SampleEntity>();
for (SearchHit searchHit : response.getHits()) {
SampleEntity sampleEntity = new SampleEntity();
sampleEntity.setId(searchHit.getId());
sampleEntity.setMessage((String) searchHit.getSource().get("message"));
values.add(sampleEntity);
}
return new FacetedPageImpl<T>((List<T>) values);
}
});
assertThat(page, is(notNullValue()));
assertThat(page.getContent().size(), is(1));
assertThat(page.getContent().get(0).getId(), is(indexQuery.getId()));
}

@Test (expected = ElasticsearchException.class)
public void shouldThrowElasticsearchExceptionWhenNoDocumentSpecified(){
// given
IndexQuery indexQuery = new IndexQuery();
indexQuery.setId("2333343434");
indexQuery.setIndexName("test-index");
indexQuery.setType("test-type");

//when
elasticsearchTemplate.index(indexQuery);
}

@Test
public void shouldReturnIds(){
//given
List<IndexQuery> entities = createSampleEntitiesWithMessage("Test message", 30);
// when
elasticsearchTemplate.bulkIndex(entities);
elasticsearchTemplate.refresh(SampleEntity.class, true);
SearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(termQuery("message", "message"))
.withIndices("test-index")
.withTypes("test-type")
.withPageable(new PageRequest(0,100))
.build();
// then
List<String> ids = elasticsearchTemplate.queryForIds(searchQuery);
assertThat(ids, is(notNullValue()));
assertThat(ids.size(), is(30));
}
}

0 comments on commit 9386129

Please sign in to comment.