Permalink
Browse files

Several fixes on CompositeRetentionStoreReader.

  • Loading branch information...
1 parent 07d2699 commit c3b8f3aacd4bc1cf7eea22db1f78dd3fb61b1763 @alejandroperez alejandroperez committed Aug 1, 2012
View
@@ -116,6 +116,13 @@
<dependencies>
<dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>13.0-rc2</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.3.0</version>
@@ -42,7 +42,7 @@
* @param map - the result map (keys to value events) to fill in
* @return the next position from where new events will be read.
*/
- public Position get(Position pos, Map<K, Event<V>> map) {
+ public final Position get(Position pos, Map<K, Event<V>> map) {
ArrayList<Event<K>> list = new ArrayList<Event<K>>(1000);
Position nextPos = get(pos, list);
@@ -17,17 +17,20 @@
package krati.retention;
import java.util.Arrays;
-import java.util.LinkedList;
import java.util.List;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkArgument;
import krati.retention.clock.Clock;
/**
- *
+ * A Position composed of a list of individual positions.
+ * @see CompositeRetentionStoreReader
+ * Class invariant: of the internal positions, only the first one can be indexed.
* @author spike(alperez)
*
*/
-public class CompositePosition implements Position {
+public final class CompositePosition implements Position {
private static final long serialVersionUID = 1L;
private final Position[] positions;
@@ -39,19 +42,33 @@ public boolean equals(Object o) {
return Arrays.equals(this.positions, p.positions);
}
+ /**
+ * Checks that no internal position, except maybe the first one are indexed.
+ * @return
+ */
+ private boolean checkClassInvariant() {
+ for (int i = 1; i < positions.length; i++) {
+ if (positions[i].isIndexed())
+ return false;
+ }
+ return true;
+ }
+
@Override
public int hashCode() {
- return positions.hashCode();
+ return Arrays.hashCode(positions);
}
public CompositePosition(Position... positions) {
- //TODO: verify size > 0
+ checkArgument(positions.length > 0);
this.positions = positions;
+ checkArgument(checkClassInvariant());
}
public CompositePosition(List<Position> positions) {
- //TODO: verify size > 0
- this.positions = (Position []) positions.toArray();
+ checkArgument(positions.size() > 0);
+ this.positions = positions.toArray(new Position[0]);
+ checkArgument(checkClassInvariant());
}
@Override
@@ -71,11 +88,8 @@ public int getIndex() {
@Override
public boolean isIndexed() {
- //O solo el primero.
- for (Position p : positions) {
- if (p.isIndexed()) return true;
- }
- return false;
+ //Given the class invariant, this is equivalent to check all of them.
+ return positions[0].isIndexed();
}
public int dimension() {
@@ -87,14 +101,25 @@ public int dimension() {
}
public Position getPosition(int index) {
- //TODO: validate index < dimension
+ checkArgument(index > 0 && index < positions.length);
return positions[index];
}
+ /**
+ * This method returns the component positions.
+ * It's equal to the list/array that was used for construction.
+ * Returns a copy, so modifications to the returned array won't change
+ * this instance of Position.
+ */
public Position[] getPositions() {
- return positions;
+ return positions.clone();
}
+ /**
+ * Returns a Clock, composed of all the clocks of the underlying Positions.
+ * For example, if this CompositePositions contains 2 positions that return clocks
+ * (2:43) and (1:3:1), this method will return a Clock equal to (2:43:1:3:1)
+ */
@Override
public Clock getClock() {
long[] values = new long[dimension()];
@@ -121,7 +146,12 @@ public final String toString() {
return b.toString();
}
+ /**
+ * Parses the output of toString into a new CompositePosition instance.
+ * The behavior is undefined if string is of a different format.
+ */
public static CompositePosition parsePosition(String s) {
+ checkNotNull(s);
String[] parts = s.split("\\|");
SimplePosition[] positions = new SimplePosition[parts.length];
for (int i = 0; i < positions.length; i++) {
@@ -1,31 +1,61 @@
package krati.retention;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkArgument;
+
import krati.retention.clock.Clock;
-public class CompositeRetentionStoreReader<K, V> extends AbstractRetentionStoreReader<K, Map<String, V>> {
+/**
+ * A CompositeRetentionStoreReader is a RetentionStoreReader that can be created on top
+ * of individual RetentionStoreReaders. This allows to have individual retentions for
+ * individual stores, and since this class is stateless (in the sense that doesn't save
+ * data) it's possible to create ad-hoc joiners over any combination of data stores.
+ * @author spike(alperez)
+ */
+public class CompositeRetentionStoreReader<K, V> extends
+AbstractRetentionStoreReader<K, Map<String, V>> {
private final ArrayList<RetentionStoreReader<K, V>> stores;
public CompositeRetentionStoreReader(List<RetentionStoreReader<K, V>> stores) {
- //TODO: validate stores.length > 0
+ checkNotNull(stores);
+ checkArgument(stores.size() > 0);
+ checkArgument(storeSourcesUnique(stores));
+
Set<String> sources = new HashSet<String>(stores.size());
for (RetentionStoreReader<K, V> rsr : stores) {
if (sources.contains(rsr.getSource())) {
- throw new IllegalArgumentException("Retention name " + rsr.getSource() + " is duplicated.");
+ throw new IllegalArgumentException("Retention name " + rsr.getSource()
+ + " is duplicated.");
}
sources.add(rsr.getSource());
}
- //TODO: validate uniqueness of getSource's
+
this.stores = new ArrayList<RetentionStoreReader<K, V>>(stores);
}
-
+
+ /**
+ * Argument check helper.
+ *
+ * @return true if all the passed readers have unique source names (as
+ * returned by getSource).
+ */
+ private boolean storeSourcesUnique(List<RetentionStoreReader<K, V>> ss) {
+ Set<String> names = new HashSet<String>();
+ for (RetentionStoreReader<?, ?> s : ss) {
+ names.add(s.getSource());
+ }
+ return names.size() == ss.size();
+ }
+
@Override
public Position getPosition() {
ArrayList<Position> pos = new ArrayList<Position>(stores.size());
@@ -37,29 +67,59 @@ public Position getPosition() {
@Override
public Position getPosition(Clock sinceClock) {
+ checkNotNull(sinceClock);
+ checkArgument(sinceClock.dimension() == 0 || sinceClock.dimension() == getClockDimension());
ArrayList<Position> pos = new ArrayList<Position>(stores.size());
- for (RetentionStoreReader<K, V> store : stores) {
- pos.add(store.getPosition(sinceClock));
+ int index = 0;
+ boolean rewrite = false;
+ if (sinceClock.dimension() == 0) {
+ rewrite = true;
+ } else {
+ for (RetentionStoreReader<K, V> store : stores) {
+ int nextIndex = index + store.getClockDimension();
+ Clock newClock = new Clock(Arrays.copyOfRange(sinceClock.values(), index, nextIndex));
+ Position newPos = store.getPosition(newClock);
+ if (newPos.isIndexed() && index != 0) {
+ rewrite = true;
+ break;
+ }
+ index = nextIndex;
+ pos.add(store.getPosition(newClock));
+ }
+ }
+ // If one store that's not the first falls into bootstrapping mode,
+ // we'll reset the position
+ // as bootstrapping from the first, and to the present for all the rest.
+ if (rewrite) {
+ pos = new ArrayList<Position>(stores.size());
+ pos.add(stores.get(0).getPosition(Clock.ZERO)); // bootstrap from
+ // the first store
+ for (index = 0; index < stores.size(); index++) {
+ pos.add(stores.get(index).getPosition());// current position all
+ // the rest
+ }
}
return new CompositePosition(pos);
}
@Override
public Position get(Position pos, List<Event<K>> list) {
+ checkArgument(pos instanceof CompositePosition, "pos does not subclass CompositePosition.");
CompositePosition cp = (CompositePosition) pos;
- //TODO: assert cp.dimension == stores.size
+ checkArgument(cp.dimension() == stores.size());
+
Position[] pp = cp.getPositions();
- for (int i=0; i < stores.size(); i++) {
+ for (int i = 0; i < stores.size(); i++) {
List<Event<K>> tList = new LinkedList<Event<K>>();
Position np = stores.get(i).get(pp[i], tList);
if (tList.size() > 0) {
list.addAll(tList);
pp[i] = np;
- //TODO: assert pp[i] not equal np, or else we'll never finish.
+ // TODO: assert pp[i] not equal np, or else we'll never finish.
return new CompositePosition(pp);
}
}
- //if we're here, we don't have any updates.
+ // if we're here, we don't have any updates.
return pos;
}
@@ -75,9 +135,18 @@ public String getSource() {
@Override
public Map<String, V> get(K key) throws Exception {
Map<String, V> retVal = new HashMap<String, V>();
- for (RetentionStoreReader<K,V> rsr : stores) {
+ for (RetentionStoreReader<K, V> rsr : stores) {
retVal.put(rsr.getSource(), rsr.get(key));
}
return retVal;
}
+
+ @Override
+ public int getClockDimension() {
+ int dim = 0;
+ for (RetentionStoreReader<K, V> store : stores) {
+ dim += store.getClockDimension();
+ }
+ return dim;
+ }
}
@@ -113,4 +113,9 @@
* @return <code>true</code> if this EventBatch is full. Otherwise, <code>false</code>.
*/
public boolean isFull();
+
+ /**
+ * @return the Clock.dimension() for the type of Clocks that this event Batch handles.
+ */
+ public int getClockDimension();
}
@@ -58,4 +58,14 @@
*/
public Position get(Position pos, List<Event<T>> list);
+ /**
+ * Although the clock class is generic in the number of values, only clocks with exactly
+ * the same number of values (aka clock dimension) are (trivially) comparable. Hence
+ * a RetentionClient can only deal with clocks of a particular and predefined dimension.
+ *
+ * @return dimension of the clocks this RetentionClient accepts and returns.
+ */
+ public abstract int getClockDimension();
+
+
}
@@ -39,7 +39,7 @@
/**
* @return the data source of this RetentionStoreReader.
*/
- public abstract String getSource();
+ public String getSource();
/**
* Convenience method. Composition of get(Position, List<Event>) and get.
@@ -20,6 +20,9 @@
import java.util.Iterator;
import java.util.List;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkArgument;
+
import krati.retention.clock.Clock;
import krati.retention.clock.Occurred;
@@ -36,6 +39,7 @@
public final class SimpleEventBatch<T> implements EventBatch<T>, Cloneable {
private static final long serialVersionUID = 1L;
private final long _origin;
+ private final int clockDimension;
private final int _capacity;
private volatile Clock _minClock;
private volatile Clock _maxClock;
@@ -47,10 +51,14 @@ public SimpleEventBatch(long origin, Clock initClock) {
this(origin, initClock, EventBatch.DEFAULT_BATCH_SIZE);
}
+
public SimpleEventBatch(long origin, Clock initClock, int capacity) {
+ checkArgument(capacity > 0);
+ checkArgument(!initClock.equals(Clock.ZERO));
this._origin = origin;
this._minClock = initClock;
this._maxClock = initClock;
+ this.clockDimension = initClock.dimension();
this._capacity = Math.max(EventBatch.MINIMUM_BATCH_SIZE, capacity);
this._events = new ArrayList<Event<T>>(this._capacity);
this._creationTime = System.currentTimeMillis();
@@ -225,4 +233,10 @@ public String toString() {
batch._completionTime = _completionTime;
return batch;
}
+
+
+ @Override
+ public int getClockDimension() {
+ return clockDimension;
+ }
}
@@ -749,4 +749,10 @@ protected boolean mergeEventsToLastBatch() throws IOException {
return false;
}
+
+ @Override
+ public int getClockDimension() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
}
Oops, something went wrong.

0 comments on commit c3b8f3a

Please sign in to comment.