Skip to content

Commit

Permalink
Create Index: Allow to provide index warmers when creating an index, c…
Browse files Browse the repository at this point in the history
…loses elastic#1917.
  • Loading branch information
kimchy committed May 7, 2012
1 parent ca2dc18 commit f0007fd
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 23 deletions.
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.ElasticSearchParseException;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -65,6 +66,8 @@ public class CreateIndexRequest extends MasterNodeOperationRequest {

private Map<String, String> mappings = newHashMap();

private Map<String, IndexMetaData.Custom> customs = newHashMap();

private TimeValue timeout = new TimeValue(10, TimeUnit.SECONDS);

CreateIndexRequest() {
Expand Down Expand Up @@ -271,15 +274,28 @@ public CreateIndexRequest source(byte[] source, int offset, int length) {
*/
public CreateIndexRequest source(Map<String, Object> source) {
boolean found = false;
if (source.containsKey("settings")) {
settings((Map<String, Object>) source.get("settings"));
found = true;
}
if (source.containsKey("mappings")) {
found = true;
Map<String, Object> mappings = (Map<String, Object>) source.get("mappings");
for (Map.Entry<String, Object> entry : mappings.entrySet()) {
mapping(entry.getKey(), (Map<String, Object>) entry.getValue());
for (Map.Entry<String, Object> entry : source.entrySet()) {
String name = entry.getKey();
if (name.equals("settings")) {
found = true;
settings((Map<String, Object>) entry.getValue());
} else if (name.equals("mappings")) {
found = true;
Map<String, Object> mappings = (Map<String, Object>) entry.getValue();
for (Map.Entry<String, Object> entry1 : mappings.entrySet()) {
mapping(entry1.getKey(), (Map<String, Object>) entry1.getValue());
}
} else {
// maybe custom?
IndexMetaData.Custom.Factory factory = IndexMetaData.lookupFactory(name);
if (factory != null) {
found = true;
try {
customs.put(name, factory.fromMap((Map<String, Object>) entry.getValue()));
} catch (IOException e) {
throw new ElasticSearchParseException("failed to parse custom metadata for [" + name + "]");
}
}
}
}
if (!found) {
Expand All @@ -293,6 +309,15 @@ Map<String, String> mappings() {
return this.mappings;
}

public CreateIndexRequest custom(IndexMetaData.Custom custom) {
customs.put(custom.type(), custom);
return this;
}

Map<String, IndexMetaData.Custom> customs() {
return this.customs;
}

/**
* Timeout to wait for the index creation to be acknowledged by current cluster nodes. Defaults
* to <tt>10s</tt>.
Expand Down Expand Up @@ -338,6 +363,12 @@ public void readFrom(StreamInput in) throws IOException {
for (int i = 0; i < size; i++) {
mappings.put(in.readUTF(), in.readUTF());
}
int customSize = in.readVInt();
for (int i = 0; i < customSize; i++) {
String type = in.readUTF();
IndexMetaData.Custom customIndexMetaData = IndexMetaData.lookupFactorySafe(type).readFrom(in);
customs.put(type, customIndexMetaData);
}
}

@Override
Expand All @@ -352,5 +383,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(entry.getKey());
out.writeUTF(entry.getValue());
}
out.writeVInt(customs.size());
for (Map.Entry<String, IndexMetaData.Custom> entry : customs.entrySet()) {
out.writeUTF(entry.getKey());
IndexMetaData.lookupFactorySafe(entry.getKey()).writeTo(entry.getValue(), out);
}
}
}
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.support.BaseIndicesRequestBuilder;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand Down Expand Up @@ -159,6 +160,11 @@ public CreateIndexRequestBuilder setSource(Map<String, Object> source) {
return this;
}

public CreateIndexRequestBuilder setCustom(IndexMetaData.Custom custom) {
request.custom(custom);
return this;
}

/**
* Sets the settings and mappings as a single source.
*/
Expand Down
Expand Up @@ -83,19 +83,23 @@ protected CreateIndexResponse masterOperation(CreateIndexRequest request, Cluste
final AtomicReference<CreateIndexResponse> responseRef = new AtomicReference<CreateIndexResponse>();
final AtomicReference<Throwable> failureRef = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);
createIndexService.createIndex(new MetaDataCreateIndexService.Request(cause, request.index()).settings(request.settings()).mappings(request.mappings()).timeout(request.timeout()), new MetaDataCreateIndexService.Listener() {
@Override
public void onResponse(MetaDataCreateIndexService.Response response) {
responseRef.set(new CreateIndexResponse(response.acknowledged()));
latch.countDown();
}

@Override
public void onFailure(Throwable t) {
failureRef.set(t);
latch.countDown();
}
});
createIndexService.createIndex(new MetaDataCreateIndexService.Request(cause, request.index()).settings(request.settings())
.mappings(request.mappings())
.customs(request.customs())
.timeout(request.timeout()),
new MetaDataCreateIndexService.Listener() {
@Override
public void onResponse(MetaDataCreateIndexService.Response response) {
responseRef.set(new CreateIndexResponse(response.acknowledged()));
latch.countDown();
}

@Override
public void onFailure(Throwable t) {
failureRef.set(t);
latch.countDown();
}
});

try {
latch.await();
Expand Down
Expand Up @@ -68,6 +68,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static com.google.common.collect.Maps.newHashMap;
import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder;
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.elasticsearch.cluster.metadata.MetaData.newMetaDataBuilder;
Expand Down Expand Up @@ -154,7 +155,9 @@ public ClusterState execute(ClusterState currentState) {
mappings.put(entry.getKey(), parseMapping(entry.getValue()));
}

// TODO: request should be able to add custom metadata
for (Map.Entry<String, Custom> entry : request.customs.entrySet()) {
customs.put(entry.getKey(), entry.getValue());
}

// apply templates, merging the mappings into the request mapping if exists
for (IndexTemplateMetaData template : templates) {
Expand Down Expand Up @@ -503,6 +506,9 @@ public static class Request {

Map<String, String> mappings = Maps.newHashMap();

Map<String, IndexMetaData.Custom> customs = newHashMap();


TimeValue timeout = TimeValue.timeValueSeconds(5);

Set<ClusterBlock> blocks = Sets.newHashSet();
Expand Down Expand Up @@ -536,6 +542,11 @@ public Request mappingsCompressed(Map<String, CompressedString> mappings) throws
return this;
}

public Request customs(Map<String, Custom> customs) {
this.customs.putAll(customs);
return this;
}

public Request blocks(Set<ClusterBlock> blocks) {
this.blocks.addAll(blocks);
return this;
Expand Down
Expand Up @@ -111,4 +111,35 @@ public void templateWarmer() {
client.prepareIndex("test", "type1", "1").setSource("field", "value1").setRefresh(true).execute().actionGet();
client.prepareIndex("test", "type1", "2").setSource("field", "value2").setRefresh(true).execute().actionGet();
}

@Test
public void createIndexWarmer() {
client.admin().indices().prepareDelete().execute().actionGet();

client.admin().indices().prepareCreate("test")
.setSource("{\n" +
" \"settings\" : {\n" +
" \"index.number_of_shards\" : 1\n" +
" },\n" +
" \"warmers\" : {\n" +
" \"warmer_1\" : {\n" +
" \"types\" : [],\n" +
" \"source\" : {\n" +
" \"query\" : {\n" +
" \"match_all\" : {}\n" +
" }\n" +
" }\n" +
" }\n" +
" }\n" +
"}")
.execute().actionGet();

ClusterState clusterState = client.admin().cluster().prepareState().execute().actionGet().state();
IndexWarmersMetaData warmersMetaData = clusterState.metaData().index("test").custom(IndexWarmersMetaData.TYPE);
assertThat(warmersMetaData, Matchers.notNullValue());
assertThat(warmersMetaData.entries().size(), equalTo(1));

client.prepareIndex("test", "type1", "1").setSource("field", "value1").setRefresh(true).execute().actionGet();
client.prepareIndex("test", "type1", "2").setSource("field", "value2").setRefresh(true).execute().actionGet();
}
}

0 comments on commit f0007fd

Please sign in to comment.