Skip to content

Commit

Permalink
Added index and type checks to MetaDataMappingService.CountDownListener
Browse files Browse the repository at this point in the history
  • Loading branch information
bleskes committed Aug 14, 2013
1 parent bf070d9 commit 309c2c3
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 4 deletions.
Expand Up @@ -491,7 +491,7 @@ public ClusterState execute(final ClusterState currentState) throws Exception {
}
}

countDownListener = new CountDownListener(counter, listener);
countDownListener = new CountDownListener(counter, request.indices, request.mappingType, listener);
mappingCreatedAction.add(countDownListener, request.timeout);

return updatedState;
Expand All @@ -506,7 +506,7 @@ public ClusterState execute(final ClusterState currentState) throws Exception {
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (countDownListener != null) {
// the master has applied it on its cluster state
countDownListener.onNodeMappingCreated(null);
countDownListener.decrementCounter();
}
}
});
Expand Down Expand Up @@ -588,16 +588,29 @@ private class CountDownListener implements NodeMappingCreatedAction.Listener {
private final AtomicBoolean notified = new AtomicBoolean();
private final AtomicInteger countDown;
private final Listener listener;
private final List<String> indices;
private final String type;

public CountDownListener(int countDown, Listener listener) {
public CountDownListener(int countDown, String[] indices, String type, Listener listener) {
this.indices = Arrays.asList(indices);
this.type = type;
this.countDown = new AtomicInteger(countDown);
this.listener = listener;
}

@Override
public void onNodeMappingCreated(NodeMappingCreatedAction.NodeMappingCreatedResponse response) {
// response may be null - see clusterStateProcessed implementation in {@link MetaDataMappingService#putMapping}
if (indices.indexOf(response.index()) < 0) {
return;
}
if (type != null && !type.equals(response.type())) {
return;
}
decrementCounter();

}

public void decrementCounter() {
if (countDown.decrementAndGet() == 0) {
mappingCreatedAction.remove(this);
if (notified.compareAndSet(false, true)) {
Expand Down
Expand Up @@ -5,6 +5,7 @@
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.ImmutableSettings;
Expand All @@ -13,9 +14,13 @@
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.MergeMappingException;
import org.elasticsearch.test.integration.AbstractSharedClusterTest;
import org.hamcrest.Matchers;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
import static org.hamcrest.Matchers.*;
Expand Down Expand Up @@ -235,6 +240,70 @@ public void updateDefaultMappingSettings() throws Exception {
.endObject().endObject()
), MapperParsingException.class);

}


@Test
public void updateMappingConcurrently() throws Throwable {
// Test that we can concurrently update different indexes and types.
// NOTE: concurrently updating the mapping of the same type and index can still return before all (relevant) nodes are updated.
// The fix for that requires a backward incompatible change (see issues #3508 )
createIndex("test1");
createIndex("test2");

final Throwable[] threadException = new Throwable[1];
final AtomicBoolean stop = new AtomicBoolean(false);
Thread[] threads = new Thread[3];
final CyclicBarrier barrier = new CyclicBarrier(threads.length);
final ArrayList<Client> clientArray = new ArrayList<Client>();
for (Client c : clients()) {
clientArray.add(c);
}

for (int j = 0; j < threads.length; j++) {
threads[j] = new Thread(new Runnable() {
@Override
public void run() {
try {
barrier.await();

for (int i = 0; i < 100; i++) {
if (stop.get()) {
return;
}

Client client1 = clientArray.get(i % clientArray.size());
Client client2 = clientArray.get((i + 1) % clientArray.size());
String indexName = i % 2 == 0 ? "test2" : "test1";
String typeName = Thread.currentThread().getName() + "_" + i;

PutMappingResponse response = client1.admin().indices().preparePutMapping(indexName).setType(typeName).setSource(
JsonXContent.contentBuilder().startObject().startObject(typeName)
.startObject("properties").startObject("f").field("type", "string").endObject().endObject()
.endObject().endObject()
).get();

assertThat(response.isAcknowledged(), equalTo(true));
ClusterStateResponse clusterStateResponse = client2.admin().cluster().prepareState().setLocal(true).get();
Map<String, MappingMetaData> mappings = clusterStateResponse.getState().metaData().index(indexName).getMappings();
assertThat(mappings.keySet(), Matchers.hasItem(typeName));
}
} catch (Throwable t) {
threadException[0] = t;
stop.set(true);
}
}
});

threads[j].setName("t_" + j);
threads[j].start();
}

for (Thread t : threads) t.join();

if (threadException[0] != null) {
throw threadException[0];
}

}
}

0 comments on commit 309c2c3

Please sign in to comment.