Skip to content
Browse files

ISPN-2777 ReplicatedConsistentHash.isKeyLocalToNode(node, key) should…

… return false if the node is not a member yet

* Also added some sanity checks to detect illegal usage.
* Fix incomplete transport mocking in PutForExternalReadTest.testExceptionSuppression and resolve some generics related compiler warnings.
  • Loading branch information...
1 parent e59fe88 commit 9e52b8fd61d877a79637a1757e8ecc0e3d3ae992 @anistor anistor committed with Mircea Markus Jan 30, 2013
View
20 core/src/main/java/org/infinispan/distribution/ch/ReplicatedConsistentHash.java
@@ -39,11 +39,13 @@
public class ReplicatedConsistentHash implements ConsistentHash {
private final List<Address> members;
+ private final Set<Address> membersSet;
private static final Set<Integer> theSegment = Collections.singleton(0);
public ReplicatedConsistentHash(List<Address> members) {
- this.members = new ArrayList<Address>(members);
+ this.members = Collections.unmodifiableList(new ArrayList<Address>(members));
+ this.membersSet = Collections.unmodifiableSet(new HashSet<Address>(members));
}
@Override
@@ -73,16 +75,28 @@ public int getSegment(Object key) {
@Override
public List<Address> locateOwnersForSegment(int segmentId) {
+ if (segmentId != 0) {
+ throw new IllegalArgumentException("Unknown segment id : " + segmentId);
+ }
return members;
}
@Override
public Address locatePrimaryOwnerForSegment(int segmentId) {
+ if (segmentId != 0) {
+ throw new IllegalArgumentException("Unknown segment id : " + segmentId);
+ }
return members.get(0);
}
@Override
public Set<Integer> getSegmentsForOwner(Address owner) {
+ if (owner == null) {
+ throw new IllegalArgumentException("owner cannot be null");
+ }
+ if (!membersSet.contains(owner)) {
+ throw new IllegalArgumentException("The node is not a member : " + owner);
+ }
return theSegment;
}
@@ -103,12 +117,12 @@ public Address locatePrimaryOwner(Object key) {
@Override
public Set<Address> locateAllOwners(Collection<Object> keys) {
- return new HashSet<Address>(members);
+ return membersSet;
}
@Override
public boolean isKeyLocalToNode(Address nodeAddress, Object key) {
- return true;
+ return membersSet.contains(nodeAddress);
}
@Override
View
3 core/src/main/java/org/infinispan/statetransfer/StateTransferManagerImpl.java
@@ -23,6 +23,7 @@
package org.infinispan.statetransfer;
+import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -270,7 +271,7 @@ public void forwardCommandIfNeeded(TopologyAffectedCommand command, Set<Object>
if (cmdTopologyId < localTopologyId) {
ConsistentHash writeCh = cacheTopology.getWriteConsistentHash();
- Set<Address> newTargets = writeCh.locateAllOwners(affectedKeys);
+ Set<Address> newTargets = new HashSet<Address>(writeCh.locateAllOwners(affectedKeys));
newTargets.remove(rpcManager.getAddress());
// Forwarding to the originator would create a cycle
// TODO This may not be the "real" originator, but one of the original recipients
View
98 core/src/test/java/org/infinispan/api/mvcc/PutForExternalReadTest.java
@@ -28,15 +28,13 @@
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertNull;
import static org.testng.AssertJUnit.fail;
import java.lang.reflect.Method;
-import java.util.ArrayList;
import java.util.List;
import javax.transaction.Status;
@@ -46,7 +44,8 @@
import org.infinispan.Cache;
import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
-import org.infinispan.config.Configuration;
+import org.infinispan.configuration.cache.CacheMode;
+import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.remoting.rpc.ResponseFilter;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.rpc.RpcManager;
@@ -67,13 +66,13 @@
@Override
protected void createCacheManagers() throws Throwable {
- Configuration c = getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC, true);
+ ConfigurationBuilder c = getDefaultClusteredCacheConfig(CacheMode.REPL_SYNC, true);
createClusteredCaches(2, "replSync", c);
}
public void testNoOpWhenKeyPresent() {
- final Cache cache1 = cache(0, "replSync");
- final Cache cache2 = cache(1, "replSync");
+ final Cache<String, String> cache1 = cache(0, "replSync");
+ final Cache<String, String> cache2 = cache(1, "replSync");
cache1.putForExternalRead(key, value);
eventually(new Condition() {
@@ -120,8 +119,8 @@ private ResponseMode anyResponseMode() {
}
public void testTxSuspension() throws Exception {
- final Cache cache1 = cache(0, "replSync");
- final Cache cache2 = cache(1, "replSync");
+ final Cache<String, String> cache1 = cache(0, "replSync");
+ final Cache<String, String> cache2 = cache(1, "replSync");
cache1.put(key + "0", value);
@@ -156,60 +155,43 @@ public boolean isSatisfied() throws Exception {
});
}
-
public void testExceptionSuppression() throws Exception {
- Cache cache1 = cache(0, "replSync");
- Cache cache2 = cache(1, "replSync");
- Transport mockTransport = mock(Transport.class);
- RpcManagerImpl rpcManager = (RpcManagerImpl) TestingUtil.extractComponent(cache1, RpcManager.class);
- Transport originalTransport = TestingUtil.extractComponent(cache1, Transport.class);
- try {
-
- Address mockAddress1 = mock(Address.class);
- Address mockAddress2 = mock(Address.class);
-
- List<Address> memberList = new ArrayList<Address>(2);
- memberList.add(mockAddress1);
- memberList.add(mockAddress2);
+ Cache<String, String> cache1 = cache(0, "replSync");
+ Cache<String, String> cache2 = cache(1, "replSync");
- rpcManager.setTransport(mockTransport);
-
- when(mockTransport.getMembers()).thenReturn(memberList);
-
- when(mockTransport.getViewId()).thenReturn(originalTransport.getViewId());
+ Transport originalTransport = TestingUtil.extractComponent(cache1, Transport.class);
+ Transport mockTransport = spy(originalTransport);
+ doThrow(new RuntimeException("Barf!")).when(mockTransport).invokeRemotely(anyAddresses(),
+ (CacheRpcCommand) anyObject(), anyResponseMode(), anyLong(), anyBoolean(), (ResponseFilter) anyObject());
- when(mockTransport.invokeRemotely(anyAddresses(), (CacheRpcCommand) anyObject(), anyResponseMode(),
- anyLong(), anyBoolean(), (ResponseFilter) anyObject()))
- .thenThrow(new RuntimeException("Barf!"));
+ RpcManagerImpl rpcManager = (RpcManagerImpl) TestingUtil.extractComponent(cache1, RpcManager.class);
+ rpcManager.setTransport(mockTransport);
- try {
- cache1.put(key, value);
- fail("Should have barfed");
- }
- catch (RuntimeException re) {
- }
+ try {
+ cache1.put(key, value);
+ fail("Should have barfed");
+ } catch (RuntimeException re) {
+ }
- // clean up any indeterminate state left over
- try {
- cache1.remove(key);
- fail("Should have barfed");
- }
- catch (RuntimeException re) {
- }
+ // clean up any indeterminate state left over
+ try {
+ cache1.remove(key);
+ fail("Should have barfed");
+ } catch (RuntimeException re) {
+ }
- assertNull("Should have cleaned up", cache1.get(key));
+ assertNull("Should have cleaned up", cache1.get(key));
+ assertNull("Should have cleaned up", cache1.getAdvancedCache().getDataContainer().get(key));
+ assertNull("Should have cleaned up", cache2.get(key));
+ assertNull("Should have cleaned up", cache2.getAdvancedCache().getDataContainer().get(key));
- // should not barf
- cache1.putForExternalRead(key, value);
- }
- finally {
- if (rpcManager != null) rpcManager.setTransport(originalTransport);
- }
+ // should not barf
+ cache1.putForExternalRead(key, value);
}
public void testBasicPropagation() throws Exception {
- Cache cache1 = cache(0, "replSync");
- Cache cache2 = cache(1, "replSync");
+ Cache<String, String> cache1 = cache(0, "replSync");
+ Cache<String, String> cache2 = cache(1, "replSync");
assert !cache1.containsKey(key);
assert !cache2.containsKey(key);
@@ -252,8 +234,8 @@ public void testCacheModeLocalInTx(Method m) throws Exception {
* Tests that suspended transactions do not leak. See JBCACHE-1246.
*/
public void testMemLeakOnSuspendedTransactions() throws Exception {
- Cache cache1 = cache(0, "replSync");
- Cache cache2 = cache(1, "replSync");
+ Cache<String, String> cache1 = cache(0, "replSync");
+ Cache<String, String> cache2 = cache(1, "replSync");
TransactionManager tm1 = TestingUtil.getTransactionManager(cache1);
TransactionManager tm2 = TestingUtil.getTransactionManager(cache2);
ReplListener replListener2 = replListener(cache2);
@@ -324,7 +306,7 @@ public boolean isSatisfied() throws Exception {
}
public void testMultipleIdenticalPutForExternalReadCalls() {
- Cache cache1 = cache(0, "replSync");
+ Cache<String, String> cache1 = cache(0, "replSync");
cache1.putForExternalRead(key, value);
cache1.putForExternalRead(key, value2);
assertEquals(value, cache1.get(key));
@@ -336,8 +318,8 @@ public void testMultipleIdenticalPutForExternalReadCalls() {
* @throws Exception
*/
private void cacheModeLocalTest(boolean transactional, Method m) throws Exception {
- Cache<Object, Object> cache1 = cache(0, "replSync");
- Cache<Object, Object> cache2 = cache(1, "replSync");
+ Cache<String, String> cache1 = cache(0, "replSync");
+ Cache<String, String> cache2 = cache(1, "replSync");
TransactionManager tm1 = TestingUtil.getTransactionManager(cache1);
if (transactional)
tm1.begin();

0 comments on commit 9e52b8f

Please sign in to comment.
Something went wrong with that request. Please try again.