Permalink
Browse files

multi type, refresh, fixes, improvements

  • Loading branch information...
1 parent 1a2f7b8 commit ab9f2864a2680f6aad6d31b9f95bc9a17f3965e5 @youurayy committed Mar 22, 2013
@@ -36,7 +36,7 @@
* @author Peter Karich
*/
public class ReIndexAction extends BaseRestHandler {
-
+
@Inject public ReIndexAction(Settings settings, Client client, RestController controller) {
super(settings, client);
@@ -46,8 +46,12 @@
controller.registerHandler(POST, "/{index}/{type}/_reindex", this);
}
}
-
+
@Override public void handleRequest(RestRequest request, RestChannel channel) {
+ handleRequest(request, channel, null, false);
+ }
+
+ public void handleRequest(RestRequest request, RestChannel channel, String newTypeOverride, boolean internalCall) {
logger.info("ReIndexAction.handleRequest [{}]", request.params());
try {
XContentBuilder builder = restContentBuilder(request);
@@ -56,8 +60,8 @@
if (searchIndexName == null || searchIndexName.isEmpty())
searchIndexName = newIndexName;
- String newType = request.param("type");
- String searchType = request.param("searchType");
+ String newType = newTypeOverride != null ? newTypeOverride : request.param("type");
+ String searchType = newTypeOverride != null ? newTypeOverride : request.param("searchType");
if (searchType == null || searchType.isEmpty())
searchType = newType;
@@ -90,12 +94,19 @@
// + how to combine with existing filter?
logger.info("Finished reindexing of index " + searchIndexName + " into " + newIndexName + ", query " + filter);
- channel.sendResponse(new XContentRestResponse(request, OK, builder));
+
+ if (!internalCall)
+ channel.sendResponse(new XContentRestResponse(request, OK, builder));
} catch (IOException ex) {
- try {
- channel.sendResponse(new XContentThrowableRestResponse(request, ex));
- } catch (Exception ex2) {
- logger.error("problem while rolling index", ex2);
+ if (!internalCall) {
+ try {
+ channel.sendResponse(new XContentThrowableRestResponse(request, ex));
+ } catch (Exception ex2) {
+ logger.error("problem while rolling index", ex2);
+ }
+ }
+ else {
+ throw new RuntimeException(ex);
}
}
}
@@ -1,11 +1,13 @@
package com.pannous.es.reindex;
import java.io.IOException;
+import java.util.Map;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.count.CountRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@@ -20,6 +22,12 @@
import static org.elasticsearch.rest.RestRequest.Method.*;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.StringRestResponse;
+import org.elasticsearch.rest.XContentRestResponse;
+import org.elasticsearch.rest.XContentThrowableRestResponse;
+import static org.elasticsearch.rest.RestRequest.Method.*;
+import static org.elasticsearch.rest.RestStatus.*;
+import static org.elasticsearch.rest.action.support.RestXContentBuilder.*;
+import org.elasticsearch.common.xcontent.XContentBuilder;
/**
* @author Peter Karich
@@ -42,50 +50,85 @@
@Override public void handleRequest(RestRequest request, RestChannel channel) {
logger.info("ReIndexWithCreate.handleRequest [{}]", request.toString());
-
- // required parameters
- String newIndexName = request.param("index");
- if (newIndexName.isEmpty()) {
- channel.sendResponse(new StringRestResponse(RestStatus.EXPECTATION_FAILED, "parameter index missing"));
- return;
- }
- String type = request.param("type", "");
- if (type.isEmpty()) {
- channel.sendResponse(new StringRestResponse(RestStatus.EXPECTATION_FAILED, "parameter type missing"));
- return;
- }
- String searchIndexName = request.param("searchIndex");
- if (searchIndexName.isEmpty()) {
- channel.sendResponse(new StringRestResponse(RestStatus.EXPECTATION_FAILED, "parameter searchIndex missing"));
- return;
- }
- int newShards = request.paramAsInt("newIndexShards", -1);
try {
- createIdenticalIndex(searchIndexName, type, newIndexName, newShards);
- } catch (Exception ex) {
- String str = "Problem while creating index " + newIndexName + " from " + searchIndexName + " " + ex.getMessage();
- logger.error(str, ex);
- channel.sendResponse(new StringRestResponse(RestStatus.INTERNAL_SERVER_ERROR, str));
- return;
- }
+ XContentBuilder builder = restContentBuilder(request);
+ // required parameters
+ String newIndexName = request.param("index");
+ if (newIndexName.isEmpty()) {
+ channel.sendResponse(new StringRestResponse(RestStatus.EXPECTATION_FAILED, "parameter index missing"));
+ return;
+ }
+ String type = request.param("type", "");
+ if (type.isEmpty()) {
+ channel.sendResponse(new StringRestResponse(RestStatus.EXPECTATION_FAILED, "parameter type missing"));
+ return;
+ }
+ String searchIndexName = request.param("searchIndex");
+ if (searchIndexName.isEmpty()) {
+ channel.sendResponse(new StringRestResponse(RestStatus.EXPECTATION_FAILED, "parameter searchIndex missing"));
+ return;
+ }
+ int newShards = request.paramAsInt("newIndexShards", -1);
+ try {
+ if(client.admin().indices().exists(new IndicesExistsRequest(newIndexName)).actionGet().exists()) {
+ logger.info("target index already exists, skip creation: " + newIndexName);
+ }
+ else {
+ createIdenticalIndex(searchIndexName, type, newIndexName, newShards);
+ }
+ } catch (Exception ex) {
+ String str = "Problem while creating index " + newIndexName + " from " + searchIndexName + " " + ex.getMessage();
+ logger.error(str, ex);
+ channel.sendResponse(new StringRestResponse(RestStatus.INTERNAL_SERVER_ERROR, str));
+ return;
+ }
+
+ // TODO: what if queries goes to the old index while we reindexed?
+ // now reindex
+
+ if(type.equals("*")) {
- // TODO: what if queries goes to the old index while we reindexed?
- // now reindex
- reindexAction.handleRequest(request, channel);
+ IndexMetaData indexData = client.admin().cluster().state(new ClusterStateRequest()).
+ actionGet().state().metaData().indices().get(searchIndexName);
+ Settings searchIndexSettings = indexData.settings();
- boolean delete = request.paramAsBoolean("delete", false);
- if (delete) {
- long oldCount = client.count(new CountRequest(searchIndexName)).actionGet().count();
- long newCount = client.count(new CountRequest(newIndexName)).actionGet().count();
- if (oldCount == newCount) {
- logger.info("deleting " + searchIndexName);
- client.admin().indices().delete(new DeleteIndexRequest(searchIndexName)).actionGet();
+ for(Map.Entry<String, MappingMetaData> me : indexData.mappings().entrySet()) {
+ reindexAction.handleRequest(request, channel, me.getKey(), true);
+ }
+ }
+ else {
+ reindexAction.handleRequest(request, channel, type, true);
+ }
+
+ boolean delete = request.paramAsBoolean("delete", false);
+ if (delete) {
+
+ // make sure to refresh the index here
+ // (e.g. the index may be paused or refreshing with a very long interval):
+ logger.info("refreshing " + searchIndexName);
+ client.admin().indices().refresh(new RefreshRequest(newIndexName)).actionGet();
+
+ long oldCount = client.count(new CountRequest(searchIndexName)).actionGet().count();
+ long newCount = client.count(new CountRequest(newIndexName)).actionGet().count();
+ if (oldCount == newCount) {
+ logger.info("deleting " + searchIndexName);
+ client.admin().indices().delete(new DeleteIndexRequest(searchIndexName)).actionGet();
+ }
}
- }
- boolean copyAliases = request.paramAsBoolean("copyAliases", false);
- if (copyAliases)
- copyAliases(request);
+ boolean copyAliases = request.paramAsBoolean("copyAliases", false);
+ if (copyAliases)
+ copyAliases(request);
+
+ channel.sendResponse(new XContentRestResponse(request, OK, builder));
+
+ } catch (Exception ex) { // also catch the RuntimeException thrown by ReIndexAction
+ try {
+ channel.sendResponse(new XContentThrowableRestResponse(request, ex));
+ } catch (Exception ex2) {
+ logger.error("problem while rolling index", ex2);
+ }
+ }
}
/**
@@ -99,10 +142,23 @@ private void createIdenticalIndex(String oldIndex, String type,
ImmutableSettings.Builder settingBuilder = ImmutableSettings.settingsBuilder().put(searchIndexSettings);
if (newIndexShards > 0)
settingBuilder.put("index.number_of_shards", newIndexShards);
- MappingMetaData mappingMeta = indexData.mapping(type);
- CreateIndexRequest createReq = new CreateIndexRequest(newIndex).
+
+ CreateIndexRequest createReq;
+
+ if(type.equals("*")) {
+ createReq = new CreateIndexRequest(newIndex);
+ for(Map.Entry<String, MappingMetaData> me : indexData.mappings().entrySet()) {
+ createReq.mapping(me.getKey(), me.getValue().sourceAsMap());
+ }
+ createReq.settings(settingBuilder.build());
+ }
+ else {
+ MappingMetaData mappingMeta = indexData.mapping(type);
+ createReq = new CreateIndexRequest(newIndex).
mapping(type, mappingMeta.sourceAsMap()).
settings(settingBuilder.build());
+ }
+
client.admin().indices().create(createReq).actionGet();
}
@@ -112,17 +168,25 @@ private void copyAliases(RestRequest request) {
IndexMetaData meta = client.admin().cluster().state(new ClusterStateRequest()).
actionGet().state().metaData().index(searchIndexName);
IndicesAliasesRequest aReq = new IndicesAliasesRequest();
- for (String oldAlias : meta.aliases().keySet()) {
- aReq.addAlias(index, oldAlias);
+ boolean empty = true;
+ if(meta != null && meta.aliases() != null) {
+ for (String oldAlias : meta.aliases().keySet()) {
+ empty = false;
+ aReq.addAlias(index, oldAlias);
+ }
}
boolean aliasIncludeIndex = request.paramAsBoolean("addOldIndexAsAlias", false);
if (aliasIncludeIndex) {
- if (client.admin().indices().exists(new IndicesExistsRequest(searchIndexName)).actionGet().exists())
+ if (client.admin().indices().exists(new IndicesExistsRequest(searchIndexName)).actionGet().exists()) {
logger.warn("Cannot add old index name (" + searchIndexName + ") as alias to index "
+ index + " - as old index still exists");
- else
+ }
+ else {
aReq.addAlias(index, searchIndexName);
+ empty = false;
+ }
}
- client.admin().indices().aliases(aReq).actionGet();
+ if(!empty) //!aReq.aliasActions().isEmpty())
+ client.admin().indices().aliases(aReq).actionGet();
}
}

0 comments on commit ab9f286

Please sign in to comment.