From 8fd7ea6a17cce2e5784ecef2a58822d4845d13ef Mon Sep 17 00:00:00 2001 From: Zhongjie Wu Date: Wed, 1 May 2013 18:25:18 -0700 Subject: [PATCH] vector clock fix --- src/java/voldemort/versioning/ClockEntry.java | 17 ++ .../InvalidClockEntryException.java | 24 +++ .../voldemort/versioning/VectorClock.java | 189 ++++++++---------- .../voldemort/versioning/VectorClockTest.java | 51 ++++- 4 files changed, 178 insertions(+), 103 deletions(-) create mode 100644 src/java/voldemort/versioning/InvalidClockEntryException.java diff --git a/src/java/voldemort/versioning/ClockEntry.java b/src/java/voldemort/versioning/ClockEntry.java index 27bc0cac19..326cb6d916 100644 --- a/src/java/voldemort/versioning/ClockEntry.java +++ b/src/java/voldemort/versioning/ClockEntry.java @@ -28,6 +28,7 @@ * */ @NotThreadsafe +@Deprecated public final class ClockEntry implements Cloneable, Serializable { private static final long serialVersionUID = 1; @@ -108,11 +109,27 @@ public String toString() { } public void setNodeId(short nodeId) { + if(nodeId < 0) + throw new IllegalArgumentException("Node id " + nodeId + " is not in the range (0, " + + Short.MAX_VALUE + ")."); this.nodeId = nodeId; } public void setVersion(long version) { + if(version < 1) + throw new IllegalArgumentException("Version " + version + " is not in the range (1, " + + Short.MAX_VALUE + ")."); this.version = version; } + public void validate() { + if(nodeId < 0) + throw new InvalidClockEntryException("Node id " + nodeId + " is not in the range (0, " + + Short.MAX_VALUE + ")."); + if(version < 1) + throw new InvalidClockEntryException("Version " + version + " is not in the range (1, " + + Short.MAX_VALUE + ")."); + + } + } diff --git a/src/java/voldemort/versioning/InvalidClockEntryException.java b/src/java/voldemort/versioning/InvalidClockEntryException.java new file mode 100644 index 0000000000..dcbbd29ebf --- /dev/null +++ b/src/java/voldemort/versioning/InvalidClockEntryException.java @@ -0,0 +1,24 @@ +package voldemort.versioning; + +import voldemort.VoldemortException; + +public class InvalidClockEntryException extends VoldemortException { + + private static final long serialVersionUID = 1L; + + public InvalidClockEntryException() { + super(); + } + + public InvalidClockEntryException(String s, Throwable t) { + super(s, t); + } + + public InvalidClockEntryException(String s) { + super(s); + } + + public InvalidClockEntryException(Throwable t) { + super(t); + } +} diff --git a/src/java/voldemort/versioning/VectorClock.java b/src/java/voldemort/versioning/VectorClock.java index 50ab8699ab..82d7b73570 100644 --- a/src/java/voldemort/versioning/VectorClock.java +++ b/src/java/voldemort/versioning/VectorClock.java @@ -18,12 +18,16 @@ import java.io.Serializable; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import voldemort.annotations.concurrency.NotThreadsafe; import voldemort.utils.ByteUtils; +import voldemort.utils.Utils; -import com.google.common.collect.Lists; +import com.google.common.collect.Maps; /** * A vector of the number of writes mastered by each node. The vector is stored @@ -40,8 +44,8 @@ public class VectorClock implements Version, Serializable { private static final int MAX_NUMBER_OF_VERSIONS = Short.MAX_VALUE; - /* A sorted list of live versions ordered from least to greatest */ - private final List versions; + /* A map of versions keyed by nodeId */ + private final Map versionMap; /* * The time of the last update on the server on which the update was @@ -53,21 +57,30 @@ public class VectorClock implements Version, Serializable { * Construct an empty VectorClock */ public VectorClock() { - this(new ArrayList(0), System.currentTimeMillis()); + this(System.currentTimeMillis()); } public VectorClock(long timestamp) { - this(new ArrayList(0), timestamp); + this.versionMap = new HashMap(0); + this.timestamp = timestamp; } /** - * Create a VectorClock with the given version and timestamp + * This function is not safe because it may break the pre-condition that + * clock entries should be sorted by nodeId * - * @param versions The version to prepopulate - * @param timestamp The timestamp to prepopulate */ + @Deprecated public VectorClock(List versions, long timestamp) { - this.versions = versions; + this.versionMap = new HashMap(0); + this.timestamp = timestamp; + for(ClockEntry clockEntry: versions) { + this.versionMap.put(clockEntry.getNodeId(), clockEntry.getVersion()); + } + } + + public VectorClock(Map versionMap, long timestamp) { + this.versionMap = Utils.notNull(versionMap); this.timestamp = timestamp; } @@ -101,12 +114,12 @@ public VectorClock(byte[] bytes, int offset) { throw new IllegalArgumentException("Too few bytes: expected at least " + minimumBytes + " but found only " + bytes.length + "."); - this.versions = new ArrayList(numEntries); + this.versionMap = new HashMap(numEntries); int index = 3 + offset; for(int i = 0; i < numEntries; i++) { short nodeId = ByteUtils.readShort(bytes, index); long version = ByteUtils.readBytes(bytes, index + ByteUtils.SIZE_OF_SHORT, versionSize); - this.versions.add(new ClockEntry(nodeId, version)); + this.versionMap.put(nodeId, version); index += entrySize; } this.timestamp = ByteUtils.readLong(bytes, index); @@ -120,7 +133,7 @@ public byte[] toBytes() { public int toBytes(byte[] buf, int offset) { // write the number of versions - ByteUtils.writeShort(buf, (short) versions.size(), offset); + ByteUtils.writeShort(buf, (short) versionMap.size(), offset); offset += ByteUtils.SIZE_OF_SHORT; // write the size of each version in bytes byte versionSize = ByteUtils.numberOfBytesRequired(getMaxVersion()); @@ -128,9 +141,12 @@ public int toBytes(byte[] buf, int offset) { offset++; int clockEntrySize = ByteUtils.SIZE_OF_SHORT + versionSize; - for(ClockEntry v: versions) { - ByteUtils.writeShort(buf, v.getNodeId(), offset); - ByteUtils.writeBytes(buf, v.getVersion(), offset + ByteUtils.SIZE_OF_SHORT, versionSize); + for(Map.Entry entry: versionMap.entrySet()) { + ByteUtils.writeShort(buf, entry.getKey(), offset); + ByteUtils.writeBytes(buf, + entry.getValue(), + offset + ByteUtils.SIZE_OF_SHORT, + versionSize); offset += clockEntrySize; } ByteUtils.writeLong(buf, this.timestamp, offset); @@ -139,7 +155,7 @@ public int toBytes(byte[] buf, int offset) { public int sizeInBytes() { byte versionSize = ByteUtils.numberOfBytesRequired(getMaxVersion()); - return ByteUtils.SIZE_OF_SHORT + 1 + this.versions.size() + return ByteUtils.SIZE_OF_SHORT + 1 + this.versionMap.size() * (ByteUtils.SIZE_OF_SHORT + versionSize) + ByteUtils.SIZE_OF_LONG; } @@ -155,34 +171,23 @@ public void incrementVersion(int node, long time) { this.timestamp = time; - // stop on the index greater or equal to the node - boolean found = false; - int index = 0; - for(; index < versions.size(); index++) { - if(versions.get(index).getNodeId() == node) { - found = true; - break; - } else if(versions.get(index).getNodeId() > node) { - found = false; - break; - } + Long version = versionMap.get((short) node); + if(version == null) { + version = 1L; + } else { + version = version + 1L; } - if(found) { - versions.set(index, versions.get(index).incremented()); - } else if(index < versions.size() - 1) { - versions.add(index, new ClockEntry((short) node, 1)); - } else { - // we don't already have a version for this, so add it - if(versions.size() > MAX_NUMBER_OF_VERSIONS) - throw new IllegalStateException("Vector clock is full!"); - versions.add(index, new ClockEntry((short) node, 1)); + versionMap.put((short) node, version); + if(versionMap.size() >= MAX_NUMBER_OF_VERSIONS) { + throw new IllegalStateException("Vector clock is full!"); } } /** - * Get new vector clock based on this clock but incremented on index nodeId + * List Get new vector clock based on this clock but incremented on index + * nodeId * * @param nodeId The id of the node to increment * @return A vector clock equal on each element execept that indexed by @@ -196,7 +201,7 @@ public VectorClock incremented(int nodeId, long time) { @Override public VectorClock clone() { - return new VectorClock(Lists.newArrayList(versions), this.timestamp); + return new VectorClock(Maps.newHashMap(versionMap), this.timestamp); } @Override @@ -208,24 +213,27 @@ public boolean equals(Object object) { if(!object.getClass().equals(VectorClock.class)) return false; VectorClock clock = (VectorClock) object; - return versions.equals(clock.versions); + return versionMap.equals(clock.versionMap); } @Override public int hashCode() { - return versions.hashCode(); + return versionMap.hashCode(); } @Override public String toString() { StringBuilder builder = new StringBuilder(); builder.append("version("); - if(this.versions.size() > 0) { - for(int i = 0; i < this.versions.size() - 1; i++) { - builder.append(this.versions.get(i)); + int versionsLeft = versionMap.size(); + for(Map.Entry entry: versionMap.entrySet()) { + versionsLeft--; + Short node = entry.getKey(); + Long version = entry.getValue(); + builder.append(node + ":" + version); + if(versionsLeft > 0) { builder.append(", "); } - builder.append(this.versions.get(this.versions.size() - 1)); } builder.append(")"); builder.append(" ts:" + timestamp); @@ -234,41 +242,29 @@ public String toString() { public long getMaxVersion() { long max = -1; - for(ClockEntry entry: versions) - max = Math.max(entry.getVersion(), max); + for(Long version: versionMap.values()) + max = Math.max(version, max); return max; } public VectorClock merge(VectorClock clock) { VectorClock newClock = new VectorClock(); - int i = 0; - int j = 0; - while(i < this.versions.size() && j < clock.versions.size()) { - ClockEntry v1 = this.versions.get(i); - ClockEntry v2 = clock.versions.get(j); - if(v1.getNodeId() == v2.getNodeId()) { - newClock.versions.add(new ClockEntry(v1.getNodeId(), Math.max(v1.getVersion(), - v2.getVersion()))); - i++; - j++; - } else if(v1.getNodeId() < v2.getNodeId()) { - newClock.versions.add(v1.clone()); - i++; + for(Map.Entry entry: this.versionMap.entrySet()) { + newClock.versionMap.put(entry.getKey(), entry.getValue()); + } + for(Map.Entry entry: clock.versionMap.entrySet()) { + Long version = newClock.versionMap.get(entry.getKey()); + if(version == null) { + newClock.versionMap.put(entry.getKey(), entry.getValue()); } else { - newClock.versions.add(v2.clone()); - j++; + newClock.versionMap.put(entry.getKey(), Math.max(version, entry.getValue())); } } - // Okay now there may be leftovers on one or the other list remaining - for(int k = i; k < this.versions.size(); k++) - newClock.versions.add(this.versions.get(k).clone()); - for(int k = j; k < clock.versions.size(); k++) - newClock.versions.add(clock.versions.get(k).clone()); - return newClock; } + @Override public Occurred compare(Version v) { if(!(v instanceof VectorClock)) throw new IllegalArgumentException("Cannot compare Versions of different types."); @@ -277,12 +273,10 @@ public Occurred compare(Version v) { } /** - * Is this Reflexive, AntiSymetic, and Transitive? Compare two VectorClocks, - * the outcomes will be one of the following: -- Clock 1 is BEFORE clock 2 - * if there exists an i such that c1(i) <= c(2) and there does not exist a j - * such that c1(j) > c2(j). -- Clock 1 is CONCURRENT to clock 2 if there - * exists an i, j such that c1(i) < c2(i) and c1(j) > c2(j) -- Clock 1 is - * AFTER clock 2 otherwise + * Compare two VectorClocks, the outcomes will be one of the following: -- + * v1 <= v2 : BEFORE (implies v2 AFTER or BEFORE v1)
+ * v1 > v2 => AFTER (implies v2 BEFORE v1)
+ * v1 <> v2 => CONCURRENTLY (implies v2 CONCURRENTLY to v1) * * @param v1 The first VectorClock * @param v2 The second VectorClock @@ -293,37 +287,26 @@ public static Occurred compare(VectorClock v1, VectorClock v2) { // We do two checks: v1 <= v2 and v2 <= v1 if both are true then boolean v1Bigger = false; boolean v2Bigger = false; - int p1 = 0; - int p2 = 0; - - while(p1 < v1.versions.size() && p2 < v2.versions.size()) { - ClockEntry ver1 = v1.versions.get(p1); - ClockEntry ver2 = v2.versions.get(p2); - if(ver1.getNodeId() == ver2.getNodeId()) { - if(ver1.getVersion() > ver2.getVersion()) - v1Bigger = true; - else if(ver2.getVersion() > ver1.getVersion()) - v2Bigger = true; - p1++; - p2++; - } else if(ver1.getNodeId() > ver2.getNodeId()) { - // since ver1 is bigger that means it is missing a version that - // ver2 has + + Map v1MapCopy = Maps.newHashMap(v1.versionMap); + for(Map.Entry v2Entry: v2.versionMap.entrySet()) { + Long v1Version = v1MapCopy.remove(v2Entry.getKey()); + Long v2Version = v2Entry.getValue(); + if(v1Version == null) { v2Bigger = true; - p2++; - } else { - // this means ver2 is bigger which means it is missing a version - // ver1 has + } else if(v1Version < v2Version) { + v2Bigger = true; + } else if(v1Version > v2Version) { v1Bigger = true; - p1++; } + // two clockEntry (of same nodeId) with the same version + // will not make either version bigger } - - /* Okay, now check for left overs */ - if(p1 < v1.versions.size()) + // if there are clockEntry not touched in v1MapCopy then v1 has bigger + // clocks + if(v1MapCopy.size() > 0) { v1Bigger = true; - else if(p2 < v2.versions.size()) - v2Bigger = true; + } /* * This is the case where they are equal. Consciously return BEFORE, so @@ -347,8 +330,12 @@ public long getTimestamp() { return this.timestamp; } + @Deprecated public List getEntries() { - return this.versions; + List clocks = new ArrayList(versionMap.size()); + for(Map.Entry entry: versionMap.entrySet()) { + clocks.add(new ClockEntry(entry.getKey(), entry.getValue())); + } + return Collections.unmodifiableList(clocks); } - } diff --git a/test/unit/voldemort/versioning/VectorClockTest.java b/test/unit/voldemort/versioning/VectorClockTest.java index c5c53e6e6f..a9373e0630 100644 --- a/test/unit/voldemort/versioning/VectorClockTest.java +++ b/test/unit/voldemort/versioning/VectorClockTest.java @@ -16,8 +16,13 @@ package voldemort.versioning; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static voldemort.TestUtils.getClock; -import junit.framework.TestCase; + +import org.junit.Test; + import voldemort.TestUtils; import com.google.common.collect.Lists; @@ -27,8 +32,10 @@ * * */ -public class VectorClockTest extends TestCase { +@SuppressWarnings("deprecation") +public class VectorClockTest { + @Test public void testEqualsAndHashcode() { VectorClock one = getClock(1, 2); VectorClock other = getClock(1, 2); @@ -36,6 +43,7 @@ public void testEqualsAndHashcode() { assertEquals(one.hashCode(), other.hashCode()); } + @Test public void testComparisons() { assertTrue("The empty clock should not happen before itself.", getClock().compare(getClock()) != Occurred.CONCURRENTLY); @@ -51,6 +59,7 @@ public void testComparisons() { && getClock(1, 2, 2, 3).compare(getClock(2, 2)) == Occurred.AFTER); } + @Test public void testMerge() { // merging two clocks should create a clock contain the element-wise // maximums @@ -74,6 +83,7 @@ public void testMerge() { * See gihub issue #25: Incorrect coersion of version to short before * passing to ClockEntry constructor */ + @Test public void testMergeWithLargeVersion() { VectorClock clock1 = getClock(1); VectorClock clock2 = new VectorClock(Lists.newArrayList(new ClockEntry((short) 1, @@ -83,6 +93,7 @@ public void testMergeWithLargeVersion() { assertEquals(mergedClock.getMaxVersion(), Short.MAX_VALUE + 1); } + @Test public void testSerialization() { assertEquals("The empty clock serializes incorrectly.", getClock(), @@ -93,6 +104,7 @@ public void testSerialization() { new VectorClock(clock.toBytes())); } + @Test public void testSerializationWraps() { VectorClock clock = getClock(1, 1, 2, 3, 3, 6); for(int i = 0; i < 300; i++) @@ -100,6 +112,7 @@ public void testSerializationWraps() { assertEquals("Clock does not serialize to itself.", clock, new VectorClock(clock.toBytes())); } + @Test public void testIncrementOrderDoesntMatter() { // Clocks should have the property that no matter what order the // increment operations are done in the resulting clocks are equal @@ -119,6 +132,7 @@ public void testIncrementOrderDoesntMatter() { } } + @Test public void testIncrementAndSerialize() { int node = 1; VectorClock vc = getClock(node); @@ -132,4 +146,37 @@ public void testIncrementAndSerialize() { assertEquals(increments + 1, vc.getMaxVersion()); } + /** + * A test for comparing vector clocks that nodes of clock entries are not + * sorted In case people insert clock entries without using increment we + * need to test although it has been deprecated + */ + @Test + public void testNodeClockEntryDeprecate() { + VectorClock vc1 = new VectorClock(); + try { + vc1.getEntries().add(new ClockEntry((short) 2, 2)); + fail("Did not throw UnsupportedOperationException"); + } catch(UnsupportedOperationException e) { + + } + } + + @Test + public void testVersion0NotAcceptable() { + try { + ClockEntry clockEntry = new ClockEntry(); + clockEntry.setVersion(0); + fail("Did not throw IllegalArgumentException"); + } catch(IllegalArgumentException e) {} + } + + @Test + public void testNodeLess0NotAcceptable() { + try { + ClockEntry clockEntry = new ClockEntry(); + clockEntry.setNodeId((short) -1); + fail("Did not throw IllegalArgumentException"); + } catch(IllegalArgumentException e) {} + } }