Permalink
Browse files

Fixes to dimension.

Tests still don't pass.
  • Loading branch information...
1 parent 1571d38 commit 41ff8151013755decc3900ea78a4c325eb48d81e @alejandroperez alejandroperez committed Aug 15, 2012
@@ -26,13 +26,15 @@
/**
* A Position composed of a list of individual positions.
* @see CompositeRetentionStoreReader
- * Class invariant: of the internal positions, only the first one can be indexed.
+ * Class invariant: of the internal positions, only the first one can be indexed. Also, dimension is equal
+ * to the sum of all individual clocks, except when the first clock is zero.
* @author spike(alperez)
*
*/
public final class CompositePosition implements Position {
private static final long serialVersionUID = 1L;
private final Position[] positions;
+ private final int dimension;
@Override
public boolean equals(Object o) {
@@ -51,23 +53,35 @@ private boolean checkClassInvariant() {
if (positions[i].isIndexed())
return false;
}
- return true;
+ int count = 0;
+ for (Position p : positions) {
+ count += p.getClock().dimension();
+ }
+ if (positions[0].getClock().equals(Clock.ZERO)) {
+ return (dimension > count);
+ } else {
+ return (dimension == count);
+ }
}
@Override
public int hashCode() {
return Arrays.hashCode(positions);
}
- public CompositePosition(Position... positions) {
+ public CompositePosition(int dimension, Position... positions) {
checkArgument(positions.length > 0);
+ checkArgument(dimension > 0);
+ this.dimension = dimension;
this.positions = positions;
checkArgument(checkClassInvariant());
}
- public CompositePosition(List<Position> positions) {
+ public CompositePosition(int dimension, List<Position> positions) {
checkArgument(positions.size() > 0);
+ checkArgument(dimension > 0);
this.positions = positions.toArray(new Position[0]);
+ this.dimension = dimension;
checkArgument(checkClassInvariant());
}
@@ -93,11 +107,7 @@ public boolean isIndexed() {
}
public int dimension() {
- int d = 0;
- for (Position p : positions){
- d += p.getClock().values().length;
- }
- return d;
+ return dimension;
}
public Position getPosition(int index) {
@@ -136,6 +146,7 @@ public Clock getClock() {
@Override
public final String toString() {
StringBuilder b = new StringBuilder();
+ b.append("dimension= ").append(dimension).append(" | ");
for (int i=0; i < positions.length; i++) {
b.append("id= ").append(positions[i].getId())
.append(", offset= ").append(positions[i].getOffset())
@@ -153,8 +164,9 @@ public final String toString() {
public static CompositePosition parsePosition(String s) {
checkNotNull(s);
String[] parts = s.split("\\|");
+ int dimension = Integer.parseInt(parts[0].split("=")[1].trim());
SimplePosition[] positions = new SimplePosition[parts.length];
- for (int i = 0; i < positions.length; i++) {
+ for (int i = 1; i < positions.length; i++) {
String[] fields = parts[i].split("=|,");
String[] clockParts = fields[7].split(":");
long[] clockNumbers = new long[clockParts.length];
@@ -164,6 +176,6 @@ public static CompositePosition parsePosition(String s) {
Clock clock = new Clock(clockNumbers);
positions[i] = new SimplePosition(Integer.parseInt(fields[1].trim()), Long.parseLong(fields[3].trim()), Integer.parseInt(fields[5].trim()), clock);
}
- return new CompositePosition(positions);
+ return new CompositePosition(dimension, positions);
}
}
@@ -62,7 +62,7 @@ public Position getPosition() {
for (RetentionStoreReader<K, V> store : stores) {
pos.add(store.getPosition());
}
- return new CompositePosition(pos);
+ return new CompositePosition(getClockDimension(), pos);
}
@Override
@@ -99,7 +99,7 @@ public Position getPosition(Clock sinceClock) {
// the rest
}
}
- return new CompositePosition(pos);
+ return new CompositePosition(getClockDimension(), pos);
}
@Override
@@ -116,7 +116,7 @@ public Position get(Position pos, List<Event<K>> list) {
list.addAll(tList);
pp[i] = np;
// TODO: assert pp[i] not equal np, or else we'll never finish.
- return new CompositePosition(pp);
+ return new CompositePosition(getClockDimension(), pp);
}
}
// if we're here, we don't have any updates.
@@ -87,6 +87,11 @@
*/
private Serializer<Clock> _eventClockSerializer; // required
+ /**
+ * The dimension of clocks the retention will handle.
+ */
+ private int _clockSize;
+
/**
* The min retention initial size (1000 event batches).
*/
@@ -145,6 +150,10 @@ public void setBatchSize(int batchSize) {
this._batchSize = Math.max(EventBatch.MINIMUM_BATCH_SIZE, batchSize);
}
+ public void setClockSize(int clockSize) {
+ this._clockSize = clockSize;
+ }
+
public int getBatchSize() {
return _batchSize;
}
@@ -157,6 +166,10 @@ public int getNumSyncBatchs() {
return _numSyncBatchs;
}
+ public int getClockSize() {
+ return _clockSize;
+ }
+
public void setRetentionPolicy(RetentionPolicy retentionPolicy) {
this._retentionPolicy = retentionPolicy;
}
@@ -16,6 +16,8 @@
package krati.retention;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import krati.retention.clock.Clock;
/**
@@ -39,6 +41,8 @@
* @param clock - the event clock
*/
public SimpleEvent(T value, Clock clock) {
+ checkNotNull(value);
+ checkNotNull(clock);
this._value = value;
this._clock = clock;
}
@@ -33,6 +33,9 @@
import org.apache.log4j.Logger;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import krati.core.StoreConfig;
import krati.core.segment.SegmentFactory;
import krati.core.segment.WriteBufferSegmentFactory;
@@ -68,7 +71,7 @@
private final RetentionPolicy _retentionPolicy;
private final RetentionPolicyApply _retentionPolicyApply = new RetentionPolicyApply();
private final ScheduledExecutorService _retentionPolicyExecutor = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory());
-
+ private final int clockSize;
/**
* The current batch, to which new events will be added.
*/
@@ -111,7 +114,8 @@ public SimpleRetention(RetentionConfig<T> config) throws Exception {
config.getBatchSize(),
config.getNumSyncBatchs(),
config.getRetentionSegmentFactory(),
- config.getRetentionSegmentFileSizeMB());
+ config.getRetentionSegmentFileSizeMB(),
+ config.getClockSize());
}
/**
@@ -131,10 +135,10 @@ public SimpleRetention(RetentionConfig<T> config) throws Exception {
*/
public SimpleRetention(int id, File homeDir,
RetentionPolicy retentionPolicy,
- EventBatchSerializer<T> batchSerializer, int batchSize) throws Exception {
+ EventBatchSerializer<T> batchSerializer, int batchSize, int clockSize) throws Exception {
this(id, homeDir, 100000,
retentionPolicy, batchSerializer, batchSize,
- new WriteBufferSegmentFactory(), 32 /* storeSegmentFileSizeMB */);
+ new WriteBufferSegmentFactory(), 32 /* storeSegmentFileSizeMB */, clockSize);
}
/**
@@ -157,11 +161,11 @@ public SimpleRetention(int id,
File homeDir, int initialSize,
RetentionPolicy retentionPolicy,
EventBatchSerializer<T> batchSerializer, int batchSize,
- SegmentFactory segmentFactory, int segmentFileSizeMB) throws Exception {
+ SegmentFactory segmentFactory, int segmentFileSizeMB, int clockSize) throws Exception {
this(id, homeDir, initialSize,
retentionPolicy, batchSerializer,
batchSize, 10 /* numSyncBatches */,
- segmentFactory, segmentFileSizeMB);
+ segmentFactory, segmentFileSizeMB, clockSize);
}
/**
@@ -183,12 +187,14 @@ protected SimpleRetention(int id,
RetentionPolicy retentionPolicy,
EventBatchSerializer<T> batchSerializer,
int batchSize, int numSyncBatches,
- SegmentFactory segmentFactory, int segmentFileSizeMB) throws Exception {
+ SegmentFactory segmentFactory, int segmentFileSizeMB, int clockSize) throws Exception {
+ checkArgument(clockSize > 0);
this._id = id;
this._homeDir = homeDir;
this._retentionPolicy = retentionPolicy;
this._eventBatchSerializer = batchSerializer;
this._eventBatchSize = Math.max(EventBatch.MINIMUM_BATCH_SIZE, batchSize);
+ this.clockSize = clockSize;
StoreConfig config = new StoreConfig(homeDir, initialSize);
/********************************************************
@@ -256,6 +262,7 @@ public int compare(EventBatchCursor c1, EventBatchCursor c2) {
}
protected EventBatch<T> nextEventBatch(long offset, Clock initClock) {
+ checkArgument(initClock.equals(Clock.ZERO) || initClock.dimension() == clockSize);
EventBatch<T> b = new SimpleEventBatch<T>(offset, initClock, _eventBatchSize);
_logger.info("Created EventBatch: " + b.getOrigin());
return b;
@@ -393,6 +400,7 @@ public final Position getPosition() {
@Override
public Position getPosition(Clock sinceClock) {
+ checkArgument(sinceClock.equals(Clock.ZERO) || sinceClock.dimension() == clockSize);
long sinceOffset;
Occurred occ = sinceClock.compareTo(getMinClock());
@@ -491,6 +499,7 @@ public Position getPosition(Clock sinceClock) {
*/
@Override
public Position get(Position pos, List<Event<T>> list) {
+ checkArgument(pos.getClock().equals(Clock.ZERO) || pos.getClock().dimension() == clockSize);
EventBatch<T> b;
// Return null if the position is out of retention or in the indexed form.
@@ -547,6 +556,8 @@ public Position get(Position pos, List<Event<T>> list) {
@Override
public synchronized boolean put(Event<T> event) throws Exception {
+ checkNotNull(event);
+ checkArgument(event.getClock().dimension() == clockSize);
if(_batch.isFull()) {
_batch.setCompletionTime(System.currentTimeMillis());
byte[] bytes = _eventBatchSerializer.serialize(_batch);
@@ -752,7 +763,6 @@ protected boolean mergeEventsToLastBatch() throws IOException {
@Override
public int getClockDimension() {
- // TODO Auto-generated method stub
- return 0;
+ return clockSize;
}
}
@@ -85,7 +85,7 @@ public Clock(long... values) {
* <code>Clock.ZERO</code> is returned <code>upon</code> null or zero-length string.
*/
public static Clock parseClock(String str) {
- if(str == null || str.length() == 0) {
+ if(str == null || str.length() == 0 || str.trim().equals("ZERO")) {
return Clock.ZERO;
}
@@ -125,14 +125,18 @@ public static Clock parseClock(byte[] raw) {
*/
@Override
public String toString() {
- StringBuilder b = new StringBuilder();
- if(_values != null && 1 <= _values.length) {
- b.append(_values[0]);
- for(int i = 1; i < _values.length; i++) {
- b.append(':').append(_values[i]);
+ if (this.equals(ZERO)) {
+ return "ZERO";
+ } else {
+ StringBuilder b = new StringBuilder();
+ if(_values != null && 1 <= _values.length) {
+ b.append(_values[0]);
+ for(int i = 1; i < _values.length; i++) {
+ b.append(':').append(_values[i]);
+ }
}
+ return b.toString();
}
- return b.toString();
}
/**
Oops, something went wrong.

0 comments on commit 41ff815

Please sign in to comment.