Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

improve refreshing logic to resync mappings on upgrade, reduce the am…

…ount of cluster events processing requires if the even if fired from several nodes / sources
  • Loading branch information...
commit 24f1f0ff964a88b35d5a9614c707905d0d9d5cd5 1 parent ffc7426
@kimchy kimchy authored
View
26 ...lasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java
@@ -26,6 +26,8 @@
import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.collect.Lists;
+import org.elasticsearch.common.collect.Maps;
+import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.common.inject.Inject;
@@ -44,6 +46,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -64,6 +67,8 @@
private final NodeMappingCreatedAction mappingCreatedAction;
+ private final Map<String, Set<String>> indicesAndTypesToRefresh = Maps.newHashMap();
+
@Inject public MetaDataMappingService(Settings settings, ClusterService clusterService, IndicesService indicesService, NodeMappingCreatedAction mappingCreatedAction) {
super(settings);
this.clusterService = clusterService;
@@ -75,10 +80,27 @@
* Refreshes mappings if they are not the same between original and parsed version
*/
public void refreshMapping(final String index, final String... types) {
+ synchronized (indicesAndTypesToRefresh) {
+ Set<String> sTypes = indicesAndTypesToRefresh.get(index);
+ if (sTypes == null) {
+ sTypes = Sets.newHashSet();
+ indicesAndTypesToRefresh.put(index, sTypes);
+ }
+ sTypes.addAll(Arrays.asList(types));
+ }
clusterService.submitStateUpdateTask("refresh-mapping [" + index + "][" + Arrays.toString(types) + "]", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
boolean createdIndex = false;
try {
+ Set<String> sTypes;
+ synchronized (indicesAndTypesToRefresh) {
+ sTypes = indicesAndTypesToRefresh.remove(index);
+ }
+ // we already processed those types...
+ if (sTypes == null || sTypes.isEmpty()) {
+ return currentState;
+ }
+
// first, check if it really needs to be updated
final IndexMetaData indexMetaData = currentState.metaData().index(index);
if (indexMetaData == null) {
@@ -91,7 +113,7 @@ public void refreshMapping(final String index, final String... types) {
// we need to create the index here, and add the current mapping to it, so we can merge
indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), currentState.nodes().localNode().id());
createdIndex = true;
- for (String type : types) {
+ for (String type : sTypes) {
// only add the current relevant mapping (if exists)
if (indexMetaData.mappings().containsKey(type)) {
indexService.mapperService().add(type, indexMetaData.mappings().get(type).source().string());
@@ -100,7 +122,7 @@ public void refreshMapping(final String index, final String... types) {
}
IndexMetaData.Builder indexMetaDataBuilder = newIndexMetaDataBuilder(indexMetaData);
List<String> updatedTypes = Lists.newArrayList();
- for (String type : types) {
+ for (String type : sTypes) {
DocumentMapper mapper = indexService.mapperService().documentMapper(type);
if (!mapper.mappingSource().equals(indexMetaData.mappings().get(type).source())) {
updatedTypes.add(type);
Please sign in to comment.
Something went wrong with that request. Please try again.