Skip to content

Commit

Permalink
Do not send query operations to Lite Members
Browse files Browse the repository at this point in the history
Fixes hazelcast#8849

Also the same fix applied for Replicated Map

(cherry picked from commit af63a0c)
  • Loading branch information
jerrinot committed Sep 22, 2016
1 parent 57290c1 commit ed7b856
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import static com.hazelcast.cluster.memberselector.MemberSelectors.DATA_MEMBER_SELECTOR;
import static com.hazelcast.query.PagingPredicateAccessor.getNearestAnchorEntry;
import static com.hazelcast.spi.ExecutionService.QUERY_EXECUTOR;
import static com.hazelcast.spi.properties.GroupProperty.QUERY_PREDICATE_PARALLEL_EVALUATION;
Expand Down Expand Up @@ -533,7 +534,7 @@ protected Future<QueryResult> queryOnLocalMember(String mapName, Predicate predi
}

protected List<Future<QueryResult>> queryOnMembers(String mapName, Predicate predicate, IterationType iterationType) {
Collection<Member> members = clusterService.getMembers();
Collection<Member> members = clusterService.getMembers(DATA_MEMBER_SELECTOR);
List<Future<QueryResult>> futures = new ArrayList<Future<QueryResult>>(members.size());
for (Member member : members) {
Operation operation = new QueryOperation(mapName, predicate, iterationType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.io.IOException;
import java.util.Collection;

import static com.hazelcast.cluster.memberselector.MemberSelectors.DATA_MEMBER_SELECTOR;

/**
* Puts a set of records to the replicated map.
*/
Expand Down Expand Up @@ -73,7 +75,7 @@ public void run() throws Exception {

private void publishReplicationMessage(Data key, Data value, VersionResponsePair response) {
OperationService operationService = getNodeEngine().getOperationService();
Collection<Member> members = getNodeEngine().getClusterService().getMembers();
Collection<Member> members = getNodeEngine().getClusterService().getMembers(DATA_MEMBER_SELECTOR);
for (Member member : members) {
Address address = member.getAddress();
if (address.equals(getNodeEngine().getThisAddress())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import com.hazelcast.core.MapStoreAdapter;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.DataSerializable;
import com.hazelcast.nio.serialization.Portable;
import com.hazelcast.nio.serialization.PortableFactory;
import com.hazelcast.query.Predicate;
import com.hazelcast.query.SampleObjects.Employee;
import com.hazelcast.query.SampleObjects.PortableEmployee;
import com.hazelcast.query.SampleObjects.ValueType;
Expand All @@ -40,12 +44,14 @@
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import static com.hazelcast.test.TimeConstants.MINUTE;
import static org.junit.Assert.assertEquals;
Expand All @@ -57,6 +63,43 @@ public class QueryAdvancedTest extends HazelcastTestSupport {

private static final long TIMEOUT_MINUTES = 2 * MINUTE;

@Test
public void testQueryOperationAreNotSentToLiteMembers() {
TestHazelcastInstanceFactory nodeFactory = createHazelcastInstanceFactory(2);
HazelcastInstance fullMember = nodeFactory.newHazelcastInstance();
HazelcastInstance liteMember = nodeFactory.newHazelcastInstance(new Config().setLiteMember(true));
assertClusterSizeEventually(2, fullMember);

IMap<Integer, Integer> map = fullMember.getMap(randomMapName());
DeserializationCountingPredicate predicate = new DeserializationCountingPredicate();

//initialize all partitions
for (int i = 0; i < 5000; i++) {
map.put(i, i);
}

map.values(predicate);
assertEquals(0, predicate.serilizationCount());
}

public static class DeserializationCountingPredicate implements Predicate, DataSerializable {
private static final AtomicInteger counter = new AtomicInteger();

public boolean apply(Map.Entry mapEntry) {
return false;
}

public int serilizationCount() {
return counter.get();
}

public void writeData(ObjectDataOutput out) throws IOException { }

public void readData(ObjectDataInput in) throws IOException {
counter.incrementAndGet();
}
}

@Test(timeout = TIMEOUT_MINUTES)
@SuppressWarnings("deprecation")
public void testQueryWithTTL() throws Exception {
Expand Down

0 comments on commit ed7b856

Please sign in to comment.