Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

ISPN-1244 Data loss possible during state transfer when numOwners == 1 #447

Closed
wants to merge 1 commit into from

2 participants

@maniksurtani
Collaborator
  • Add test contributed by Alex Henveld and Sanne Grinovero
  • Fix RebalanceTask's state pusher selection algorithm
@maniksurtani maniksurtani ISPN-1244 Data loss possible during state transfer when numOwners == 1
* Add test contributed by Alex Henveld and Sanne Grinovero
* Fix RebalanceTask's state pusher selection algorithm
01bba0c
@danberindei
Collaborator

Integrated.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jul 15, 2011
  1. @maniksurtani

    ISPN-1244 Data loss possible during state transfer when numOwners == 1

    maniksurtani authored
    * Add test contributed by Alex Henveld and Sanne Grinovero
    * Fix RebalanceTask's state pusher selection algorithm
This page is out of date. Refresh to see the latest.
View
42 core/src/main/java/org/infinispan/distribution/RebalanceTask.java
@@ -43,7 +43,12 @@
import org.infinispan.util.concurrent.AggregatingNotifyingFutureImpl;
import org.infinispan.util.concurrent.NotifyingNotifiableFuture;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutionException;
import static org.infinispan.distribution.ch.ConsistentHashHelper.createConsistentHash;
@@ -184,7 +189,7 @@ private void invalidateKeys(Collection<Address> oldCacheSet, Collection<Address>
private void pushState(ConsistentHash chOld, ConsistentHash chNew, Map<Address, Map<Object, InternalCacheValue>> states) throws InterruptedException, ExecutionException {
NotifyingNotifiableFuture<Object> stateTransferFuture = new AggregatingNotifyingFutureImpl(null, states.size());
-
+ log.debugf("Size of states map %d.", states.size());
for (Map.Entry<Address, Map<Object, InternalCacheValue>> entry : states.entrySet()) {
final Address target = entry.getKey();
Map<Object, InternalCacheValue> state = entry.getValue();
@@ -192,7 +197,7 @@ private void pushState(ConsistentHash chOld, ConsistentHash chNew, Map<Address,
log.tracef("%s pushing to %s keys %s", self, target, state.keySet());
final RehashControlCommand cmd = cf.buildRehashControlCommand(RehashControlCommand.Type.APPLY_STATE, self,
- newViewId, state, chOld, chNew);
+ newViewId, state, chOld, chNew);
rpcManager.invokeRemotelyInFuture(Collections.singleton(target), cmd,
false, stateTransferFuture, configuration.getRehashRpcTimeout());
@@ -206,7 +211,7 @@ private void pushState(ConsistentHash chOld, ConsistentHash chNew, Map<Address,
log.errorTransferringState(e);
throw e;
}
- log.debugf("Node finished pushing data for rehash %d", newViewId);
+ log.debugf("Node finished pushing data for rehash %d.", newViewId);
}
@@ -215,13 +220,13 @@ private void pushState(ConsistentHash chOld, ConsistentHash chNew, Map<Address,
* if K should be pushed to other servers. Adds K to the <code>keysToRemove</code> list if this node is no longer an
* owner for K.
*
- * @param key The key
- * @param value The value; <code>null</code> if the value is not in the data container
- * @param numOwners The number of owners (grabbed from the configuration)
- * @param chOld The old (current) consistent hash
- * @param chNew The new consistent hash
- * @param cacheStore If the value is <code>null</code>, try to load it from this cache store
- * @param states The result hashmap. Keys are servers, values are states (hashmaps) to be pushed to them
+ * @param key The key
+ * @param value The value; <code>null</code> if the value is not in the data container
+ * @param numOwners The number of owners (grabbed from the configuration)
+ * @param chOld The old (current) consistent hash
+ * @param chNew The new consistent hash
+ * @param cacheStore If the value is <code>null</code>, try to load it from this cache store
+ * @param states The result hashmap. Keys are servers, values are states (hashmaps) to be pushed to them
* @param keysToRemove A list that the keys that we need to remove will be added to
*/
protected void rebalance(Object key, InternalCacheEntry value, int numOwners, ConsistentHash chOld, ConsistentHash chNew,
@@ -239,11 +244,16 @@ protected void rebalance(Object key, InternalCacheEntry value, int numOwners, Co
// 3. The pushing server is the last node in the old owner list that's also in the new owner list
// It will only be null if all the old owners left the cluster
Address pushingOwner = null;
- for (int i = oldOwners.size() - 1; i >= 0; i--) {
- Address server = oldOwners.get(i);
- if (newOwners.contains(server)) {
- pushingOwner = server;
- break;
+ if (oldOwners.size() == 1) {
+ // This could happen if numOwners == 1! See ISPN-1244
+ pushingOwner = oldOwners.get(0);
+ } else {
+ for (int i = oldOwners.size() - 1; i >= 0; i--) {
+ Address server = oldOwners.get(i);
+ if (newOwners.contains(server)) {
+ pushingOwner = server;
+ break;
+ }
}
}
View
91 core/src/test/java/org/infinispan/distribution/rehash/DataLossOnJoinOneOwnerTest.java
@@ -0,0 +1,91 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2011 Red Hat Inc. and/or its affiliates and other contributors
+ * as indicated by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ */
+package org.infinispan.distribution.rehash;
+
+import org.infinispan.Cache;
+import org.infinispan.config.Configuration;
+import org.infinispan.config.GlobalConfiguration;
+import org.infinispan.manager.DefaultCacheManager;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.test.AbstractInfinispanTest;
+import org.infinispan.test.TestingUtil;
+import org.testng.annotations.Test;
+
+/**
+ * Tests data loss suring state transfer when a single data owner is configured.
+ * @author Sanne Grinovero <sanne@infinispan.org> (C) 2011 Red Hat Inc.
+ * @author Alex Heneveld
+ * @author Manik Surtani
+ */
+@Test(groups = "functional", testName = "distribution.rehash.DataLossOnJoinOneOwnerTest")
+public class DataLossOnJoinOneOwnerTest extends AbstractInfinispanTest {
+
+ private static final int WAIT_TIME = 80;
+ private static final String VALUE = DataLossOnJoinOneOwnerTest.class.getName() + "value";
+ private static final String KEY = DataLossOnJoinOneOwnerTest.class.getName() + "key";
+
+ EmbeddedCacheManager cm1;
+ EmbeddedCacheManager cm2;
+
+ /**
+ * It seems that sometimes when a new node joins, existing data is lost.
+ * Can not reproduce with numOwners=2.
+ */
+ public void testDataLossOnJoin() {
+ try {
+ cm1 = newCM();
+ Cache<String, String> c1 = cm1.getCache();
+ c1.put(KEY, VALUE);
+ hasKey(c1);
+ cm2 = newCM();
+ sleep();
+ Cache<String, String> c2 = cm2.getCache();
+ sleep();
+ hasKey(c1);
+ hasKey(c2);
+ }
+ finally {
+ TestingUtil.killCacheManagers(cm1, cm2);
+ }
+ }
+
+ private void sleep() {
+ try {
+ Thread.sleep(WAIT_TIME);
+ } catch (InterruptedException e) {
+ assert false;
+ }
+ }
+
+ private void hasKey(Cache<String, String> cache) {
+ Object object = cache.get(KEY);
+ assert VALUE.equals(object);
+ }
+
+ public EmbeddedCacheManager newCM() {
+ GlobalConfiguration gc = GlobalConfiguration.getClusteredDefault();
+ Configuration c = new Configuration().fluent()
+ .mode(Configuration.CacheMode.DIST_SYNC)
+ .hash().numOwners(1)
+ .clustering().l1().disable()
+ .build();
+ return new DefaultCacheManager(gc, c);
+ }
+
+}
Something went wrong with that request. Please try again.