Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

minor refactoring

  • Loading branch information...
commit ef9ab7d59df229842edc2567fc7ae01cefca5dd4 1 parent 65d809d
Maysam Yabandeh authored
View
48 src/main/java/com/yahoo/omid/client/ColumnFamilyAndQuantifier.java
@@ -16,6 +16,8 @@
package com.yahoo.omid.client;
+import java.util.Arrays;
+
//A wrapper for both column family and the qualifier
//Make it easier to be used in maps and hash maps
public class ColumnFamilyAndQuantifier {
@@ -27,36 +29,24 @@
qualifier = q;
}
@Override
- public boolean equals(Object o) {
- if (o instanceof ColumnFamilyAndQuantifier) {
- ColumnFamilyAndQuantifier other = (ColumnFamilyAndQuantifier) o;
- if (family.length != other.family.length || qualifier.length != other.qualifier.length)
- return false;
- for (int i = 0; i < family.length; i++)
- if (family[i] != other.family[i])
- return false;
- for (int i = 0; i < qualifier.length; i++)
- if (qualifier[i] != other.qualifier[i])
- return false;
- return true;
- }
- return false;
+ public boolean equals(Object o) {
+ if (o instanceof ColumnFamilyAndQuantifier) {
+ ColumnFamilyAndQuantifier other = (ColumnFamilyAndQuantifier) o;
+ if (family.length != other.family.length || qualifier.length != other.qualifier.length)
+ return false;
+ return (Arrays.equals(family, other.family) && Arrays.equals(qualifier, other.qualifier));
}
+ return false;
+ }
@Override
- public int hashCode() {
- if (hash != null)
- return hash;
- int h = 0;
- h = computeHash(h, family);
- h = computeHash(h, qualifier);
- hash = h;
- return h;
- }
- private int computeHash(int hash, byte[] larray) {
- hash += larray.length;
- for (int i = 0; i < larray.length; i++) {
- hash += larray[i];
- }
- return hash;
+ public int hashCode() {
+ if (hash != null)
+ return hash;
+ final int prime = 31;
+ int h = 1;
+ h = prime * h + Arrays.hashCode(family);
+ h = prime * h + Arrays.hashCode(qualifier);
+ hash = h;
+ return h;
}
}
View
46 src/main/java/com/yahoo/omid/client/MinVersionsFilter.java
@@ -74,33 +74,33 @@ private void setIncludedVersions(ColumnFamilyAndQuantifier column, int versions)
}
@Override
- public ReturnCode filterKeyValue(KeyValue v) {
- long version = v.getTimestamp();
- if (version >= endTime)
- return ReturnCode.SKIP;
- ColumnFamilyAndQuantifier column = new ColumnFamilyAndQuantifier(v.getFamily(), v.getQualifier());
- int includedVersions = getIncludedVersions(column);
- if (includedVersions < minVersions || version > startTime) {
- includedVersions++;
- setIncludedVersions(column, includedVersions);
- return ReturnCode.INCLUDE;
- }
- return ReturnCode.NEXT_COL;
+ public ReturnCode filterKeyValue(KeyValue v) {
+ long version = v.getTimestamp();
+ if (version >= endTime)
+ return ReturnCode.SKIP;
+ ColumnFamilyAndQuantifier column = new ColumnFamilyAndQuantifier(v.getFamily(), v.getQualifier());
+ int includedVersions = getIncludedVersions(column);
+ if (includedVersions < minVersions || version > startTime) {
+ includedVersions++;
+ setIncludedVersions(column, includedVersions);
+ return ReturnCode.INCLUDE;
}
+ return ReturnCode.NEXT_COL;
+ }
@Override
- public void readFields(DataInput in) throws IOException {
- this.startTime = in.readLong();
- this.endTime = in.readLong();
- this.minVersions = in.readInt();
- init();
- }
+ public void readFields(DataInput in) throws IOException {
+ this.startTime = in.readLong();
+ this.endTime = in.readLong();
+ this.minVersions = in.readInt();
+ init();
+ }
@Override
- public void write(DataOutput out) throws IOException {
- out.writeLong(this.startTime);
- out.writeLong(this.endTime);
- out.writeInt(this.minVersions);
- }
+ public void write(DataOutput out) throws IOException {
+ out.writeLong(this.startTime);
+ out.writeLong(this.endTime);
+ out.writeInt(this.minVersions);
+ }
}
View
2  src/main/java/com/yahoo/omid/client/MinVersionsSingleColumnFilter.java
@@ -28,6 +28,8 @@
/**
* Filter that sets both minTimestamp and minVersions
* Assumes that there is only one column in the output
+ * This filter is more optimized than MinVersionsFilter if
+ * we know that the get asks for a single column qualifier
* @maysam
*/
public class MinVersionsSingleColumnFilter extends FilterBase {
View
15 src/main/java/com/yahoo/omid/client/SyncCommitCallback.java
@@ -22,7 +22,7 @@
public class SyncCommitCallback extends SyncCallbackBase implements CommitCallback {
private Result result;
private long commitTimestamp;
- private RowKey[] wwRows;
+ private RowKey[] wwRows;//rows with write-write conflict
public Result getResult() {
return result;
@@ -40,12 +40,11 @@ public boolean isElder() {
return wwRows;
}
- synchronized
- public void complete(Result res, long commitTimestamp, RowKey[] wwRows) {
- this.result = res;
- this.commitTimestamp = commitTimestamp;
- this.wwRows = wwRows;
- countDown();
- }
+ synchronized public void complete(Result res, long commitTimestamp, RowKey[] wwRows) {
+ this.result = res;
+ this.commitTimestamp = commitTimestamp;
+ this.wwRows = wwRows;
+ countDown();
+ }
}
View
1  src/main/java/com/yahoo/omid/client/TransactionManager.java
@@ -167,7 +167,6 @@ public void abort(TransactionState transactionState) throws TransactionException
private void reincarnate(final TransactionState transactionState, RowKey[] wwRows)
throws TransactionException {
Statistics.fullReport(Statistics.Tag.REINCARNATION, 1);
- //System.out.println("I am reincarnating haha");
Map<byte[], List<Put>> putBatches = new HashMap<byte[], List<Put>>();
for (final RowKeyFamily rowkey : transactionState.getWrittenRows()) {
//TODO: do it only for wwRows
View
264 src/main/java/com/yahoo/omid/client/TransactionalTable.java
@@ -64,17 +64,6 @@
private static final double alpha = 0.975;
// private static final double betha = 1.25;
- // private static Thread monitor = new ThroughputMonitor();
- // private static boolean started = false;
- // {
- // synchronized(monitor) {
- // if (!started) {
- // started = true;
- // monitor.start();
- // }
- // }
- // }
-
public TransactionalTable(Configuration conf, byte[] tableName) throws IOException {
super(conf, tableName);
}
@@ -124,12 +113,6 @@ public Result get(TransactionState transactionState, final Get get) throws IOExc
}
}
}
- // Result result;
- // Result filteredResult;
- // do {
- // result = super.get(tsget);
- // filteredResult = filter(super.get(tsget), readTimestamp, maxVersions);
- // } while (!result.isEmpty() && filteredResult == null);
getsPerformed++;
Result firstResult = super.get(tsget);
Result result = filter(transactionState, firstResult, readTimestamp, nVersions);
@@ -137,14 +120,6 @@ public Result get(TransactionState transactionState, final Get get) throws IOExc
Statistics.partialReportOver(Statistics.Tag.GET_PER_CLIENT_GET);
Statistics.partialReportOver(Statistics.Tag.ASKTSO);
return result == null ? new Result() : result;
- // Scan scan = new Scan(get);
- // scan.setRetainDeletesInOutput(true);
- // ResultScanner rs = this.getScanner(transactionState, scan);
- // Result r = rs.next();
- // if (r == null) {
- // r = new Result();
- // }
- // return r;
}
/**
@@ -185,7 +160,7 @@ public void delete(TransactionState transactionState, Delete delete) throws IOEx
}
}
if (issueGet) {
- Result result = this.get(deleteG);
+ Result result = this.get(transactionState, deleteG);
for (Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> entryF : result.getMap().entrySet()) {
byte[] family = entryF.getKey();
for (Entry<byte[], NavigableMap<Long, byte[]>> entryQ : entryF.getValue().entrySet()) {
@@ -284,9 +259,11 @@ boolean isMoreRecentThan(KeyValueTc other) {
* 1: Normal values for which I have the commit timestamp Tc
* 2: Normal values for which the Tc is lost (Tc < Tmax)
* 3: Values written by failed elders, i.e., (i) elder, (ii) Tc < Tmax, (iii) Tc is retrivable form the failedElders list
- * The normal values could be read in order of Ts (since Ts order and Tc order is the same), but the all the values of elders must be read since Ts and Tc orders are not the same.
+ * The normal values could be read in order of Ts (since Ts order and Tc order is the same), but the all the values of elders
+ * must be read since Ts and Tc orders are not the same.
*/
- private Result filter(TransactionState state, Result unfilteredResult, long startTimestamp, int nMinVersionsAsked) throws IOException {
+ private Result filter(TransactionState state, Result unfilteredResult, long startTimestamp, int nMinVersionsAsked)
+ throws IOException {
ArrayList<KeyValue> filteredList = new ArrayList<KeyValue>();
filter(state, unfilteredResult, startTimestamp, nMinVersionsAsked, filteredList);
if (filteredList.isEmpty())//Some functions (like the scanner) expect null if the results is empty!
@@ -295,7 +272,8 @@ private Result filter(TransactionState state, Result unfilteredResult, long star
}
//add the results to the filteredList, recurse if it is necessary
- private void filter(TransactionState state, Result unfilteredResult, long startTimestamp, int nMinVersionsAsked, ArrayList<KeyValue> filteredList) throws IOException {
+ private void filter(TransactionState state, Result unfilteredResult, long startTimestamp, int nMinVersionsAsked,
+ ArrayList<KeyValue> filteredList) throws IOException {
Statistics.partialReport(Statistics.Tag.GET_PER_CLIENT_GET, 1);
List<KeyValue> kvs = unfilteredResult == null ? null : unfilteredResult.list();
if (unfilteredResult == null || kvs == null) {
@@ -314,7 +292,9 @@ private void filter(TransactionState state, Result unfilteredResult, long startT
int nVersionsRead = 0;
boolean pickedOneForLastColumn = false;
KeyValue lastkv = null;
- //start from the highest Ts and compare their Tc till you reach a one with lost Tc (Ts < Tmax). Then read the rest of the list to make sure that values of failed elders are also read. Then among the normal value and the failedElder with highest Tc, choose one.
+ //start from the highest Ts and compare their Tc till you reach a one with lost Tc (Ts < Tmax).
+ //Then read the rest of the list to make sure that values of failed elders are also read.
+ //Then among the normal value and the failedElder with highest Tc, choose one.
for (KeyValue kv : kvs) {
{//check if the column is switched, if yes process the results of the last column, otherwise keep reading
ColumnFamilyAndQuantifier column = new ColumnFamilyAndQuantifier(kv.getFamily(), kv.getQualifier());
@@ -323,7 +303,9 @@ private void filter(TransactionState state, Result unfilteredResult, long startT
continue;
if (!sameColumn) {//column is switched
if (!pickedOneForLastColumn) //then process the results of the last column
- pickTheRightVersion(filteredList, state, startTimestamp, nVersionsRead, nMinVersionsAsked, lastkv, nextFetchMaxTimestamp, mostRecentValueWithTc, mostRecentKeyValueWithLostTc, mostRecentFailedElder);
+ pickTheRightVersion(filteredList, state, startTimestamp, nVersionsRead, nMinVersionsAsked,
+ lastkv, nextFetchMaxTimestamp, mostRecentValueWithTc,
+ mostRecentKeyValueWithLostTc, mostRecentFailedElder);
//reset column-dependent variables
mostRecentFailedElder.reset();
mostRecentValueWithTc.reset();
@@ -352,7 +334,8 @@ private void filter(TransactionState state, Result unfilteredResult, long startT
continue;//if is is a failedElder, we are done with probing this kv
}
}
- if (mostRecentKeyValueWithLostTc != null) continue;//if it is an elder and we have already seen one with lost Tc, then it was in failedEdler as well.
+ if (mostRecentKeyValueWithLostTc != null) continue;//if it is an elder and we have already seen one
+ //with lost Tc, then it was in failedEdler as well.
long Tc = state.tsoclient.commitTimestamp(Ts, startTimestamp);
if (Tc == -2) continue;//invalid read
if (IsolationLevel.checkForWriteWriteConflicts) {//then everything is in order, and the first version is enough
@@ -362,17 +345,23 @@ private void filter(TransactionState state, Result unfilteredResult, long startT
}
if (Tc == -1) // means valid read with lost Tc
//Case 2: Normal value with lost Tc
- mostRecentKeyValueWithLostTc = kv; //Note: a value with lost Tc could also be a failedElder, so do this check after failedEdler check
+ mostRecentKeyValueWithLostTc = kv; //Note: a value with lost Tc could also be a failedElder,
+ //so do this check after failedEdler check
else
//Case 1: Normal with with Tc
mostRecentValueWithTc.update(kv, Tc); //some kv might be from elders
}
if (!pickedOneForLastColumn)
- pickTheRightVersion(filteredList, state, startTimestamp, nVersionsRead, nMinVersionsAsked, lastkv, nextFetchMaxTimestamp, mostRecentValueWithTc, mostRecentKeyValueWithLostTc, mostRecentFailedElder);
+ pickTheRightVersion(filteredList, state, startTimestamp, nVersionsRead, nMinVersionsAsked,
+ lastkv, nextFetchMaxTimestamp, mostRecentValueWithTc,
+ mostRecentKeyValueWithLostTc, mostRecentFailedElder);
}
//Having processed the versions related to a column, decide which version should be added to the filteredList
- void pickTheRightVersion(ArrayList<KeyValue> filteredList, TransactionState state, long startTimestamp, int nVersionsRead, int nMinVersionsAsked, KeyValue lastkv, long nextFetchMaxTimestamp, KeyValueTc mostRecentValueWithTc, KeyValue mostRecentKeyValueWithLostTc, KeyValueTc mostRecentFailedElder) throws IOException {
+ void pickTheRightVersion(ArrayList<KeyValue> filteredList, TransactionState state, long startTimestamp,
+ int nVersionsRead, int nMinVersionsAsked,
+ KeyValue lastkv, long nextFetchMaxTimestamp, KeyValueTc mostRecentValueWithTc,
+ KeyValue mostRecentKeyValueWithLostTc, KeyValueTc mostRecentFailedElder) throws IOException {
if (mostRecentValueWithTc.isMoreRecentThan(mostRecentFailedElder)) {
addIfItIsNotADelete(mostRecentValueWithTc.kv, filteredList);
return;
@@ -404,154 +393,17 @@ void addIfItIsNotADelete(KeyValue kv, ArrayList<KeyValue> filteredList) {
filteredList.add(kv);
}
- /*
- private Result filter(TransactionState state, Result result, long startTimestamp, int localVersions) throws IOException {
- if (result == null) {
- return null;
- }
- List<KeyValue> kvs = result.list();
- if (kvs == null) {
- return result;
- }
- Map<ByteArray, Map<ByteArray, Integer>> occurrences = new HashMap<ByteArray, Map<ByteArray,Integer>>();
- Map<ByteArray, Map<ByteArray, Long>> minTimestamp = new HashMap<ByteArray, Map<ByteArray,Long>>();
- List<KeyValue> nonDeletes = new ArrayList<KeyValue>();
- List<KeyValue> filtered = new ArrayList<KeyValue>();
- Map<ByteArray, Set<ByteArray>> read = new HashMap<ByteArray, Set<ByteArray>>();
- DeleteTracker tracker = new DeleteTracker();
- for (KeyValue kv : kvs) {
- ByteArray family = new ByteArray(kv.getFamily());
- ByteArray qualifier = new ByteArray(kv.getQualifier());
- Set<ByteArray> readQualifiers = read.get(family);
- if (readQualifiers == null) {
- readQualifiers = new HashSet<ByteArray>();
- read.put(family, readQualifiers);
- } else if (readQualifiers.contains(qualifier)) continue;
-// RowKey rk = new RowKey(kv.getRow(), getTableName());
-if (state.tsoclient.validRead(kv.getTimestamp(), startTimestamp)) {
-if (!tracker.addDeleted(kv))
-nonDeletes.add(kv);
-{
- // Read valid value
- readQualifiers.add(qualifier);
-
- // statistics
- // elementsGotten++;
- Map<ByteArray, Integer> occurrencesCols = occurrences.get(family);
- Integer times = null;
- if (occurrencesCols != null) {
- times = occurrencesCols.get(qualifier);
- }
- if (times != null) {
-// elementsRead += times;
-versionsAvg = times > versionsAvg ? times : alpha * versionsAvg + (1 - alpha) * times;
-// extraVersionsAvg = times > extraVersionsAvg ? times : alpha * extraVersionsAvg + (1 - alpha) * times;
-} else {
- // elementsRead++;
- versionsAvg = alpha * versionsAvg + (1 - alpha);
- // extraVersionsAvg = alpha * extraVersionsAvg + (1 - alpha);
-}
-}
-} else {
-Map<ByteArray, Integer> occurrencesCols = occurrences.get(family);
-Map<ByteArray, Long> minTimestampCols = minTimestamp.get(family);
-if (occurrencesCols == null) {
-occurrencesCols = new HashMap<ByteArray, Integer>();
-minTimestampCols = new HashMap<ByteArray, Long>();
-occurrences.put(family, occurrencesCols);
-minTimestamp.put(family, minTimestampCols);
-}
-Integer times = occurrencesCols.get(qualifier);
-Long timestamp = minTimestampCols.get(qualifier);
-if (times == null) {
-times = 0;
-timestamp = kv.getTimestamp();
-}
-times++;
-timestamp = Math.min(timestamp, kv.getTimestamp());
-if (times == localVersions) {
-// We need to fetch more versions
-Get get = new Get(kv.getRow());
-get.addColumn(kv.getFamily(), kv.getQualifier());
-get.setMaxVersions(localVersions);
-Result r;
-GOTRESULT: do {
- extraGetsPerformed++;
- get.setTimeRange(0, timestamp);
- r = this.get(get);
- List<KeyValue> list = r.list();
- if (list == null) break;
- for (KeyValue t : list) {
- times++;
- timestamp = Math.min(timestamp, t.getTimestamp());
- // rk = new RowKey(kv.getRow(), getTableName());
- if (state.tsoclient.validRead(t.getTimestamp(), startTimestamp)) {
- if (!tracker.addDeleted(t))
- nonDeletes.add(t);
- readQualifiers.add(qualifier);
- elementsGotten++;
- elementsRead += times;
- versionsAvg = times > versionsAvg ? times : alpha * versionsAvg + (1 - alpha) * times;
- extraVersionsAvg = times > extraVersionsAvg ? times : alpha * extraVersionsAvg + (1 - alpha) * times;
- break GOTRESULT;
- }
- }
- } while (r.size() == localVersions);
-} else {
- occurrencesCols.put(qualifier, times);
- minTimestampCols.put(qualifier, timestamp);
-}
-}
- }
-for (KeyValue kv : nonDeletes) {
- if (!tracker.isDeleted(kv)) {
- filtered.add(kv);
- }
-}
-// cacheVersions = (int) versionsAvg;
-if (filtered.isEmpty()) {
- return null;
-}
-return new Result(filtered);
- }
-*/
-
-private class DeleteTracker {
- Map<ByteArray, Long> deletedRows = new HashMap<ByteArray, Long>();
- Map<ByteArray, Long> deletedFamilies = new HashMap<ByteArray, Long>();
- Map<ByteArray, Long> deletedColumns = new HashMap<ByteArray, Long>();
+ protected class ClientScanner extends HTable.ClientScanner {
+ private TransactionState state;
+ private int maxVersions;
- public boolean addDeleted(KeyValue kv) {
- if (kv.getValue().length == 0) {
- deletedColumns.put(new ByteArray(Bytes.add(kv.getFamily(), kv.getQualifier())), kv.getTimestamp());
- return true;
+ ClientScanner(TransactionState state, Scan scan, int maxVersions) {
+ super(scan);
+ this.state = state;
+ this.maxVersions = maxVersions;
}
- return false;
- }
-
- public boolean isDeleted(KeyValue kv) {
- Long timestamp;
- timestamp = deletedRows.get(new ByteArray(kv.getRow()));
- if (timestamp != null && kv.getTimestamp() < timestamp) return true;
- timestamp = deletedFamilies.get(new ByteArray(kv.getFamily()));
- if (timestamp != null && kv.getTimestamp() < timestamp) return true;
- timestamp = deletedColumns.get(new ByteArray(Bytes.add(kv.getFamily(), kv.getQualifier())));
- if (timestamp != null && kv.getTimestamp() < timestamp) return true;
- return false;
- }
-}
-protected class ClientScanner extends HTable.ClientScanner {
- private TransactionState state;
- private int maxVersions;
-
- ClientScanner(TransactionState state, Scan scan, int maxVersions) {
- super(scan);
- this.state = state;
- this.maxVersions = maxVersions;
- }
-
- @Override
+ @Override
public Result next() throws IOException {
Result result;
Result filteredResult;
@@ -565,7 +417,7 @@ public Result next() throws IOException {
return filteredResult;
}
- @Override
+ @Override
public Result[] next(int nbRows) throws IOException {
Result [] results = super.next(nbRows);
for (int i = 0; i < results.length; i++) {
@@ -577,54 +429,6 @@ public Result next() throws IOException {
return results;
}
-}
-
-// public static class ThroughputMonitor extends Thread {
-// private static final Log LOG = LogFactory.getLog(ThroughputMonitor.class);
-//
-// /**
-// * Constructor
-// */
-// public ThroughputMonitor() {
-// }
-//
-// @Override
-// public void run() {
-// try {
-// long oldAskedTSO = TSOClient.askedTSO;
-// long oldElementsGotten = TransactionalTable.elementsGotten;
-// long oldElementsRead = TransactionalTable.elementsRead;
-// long oldExtraGetsPerformed = TransactionalTable.extraGetsPerformed;
-// long oldGetsPerformed = TransactionalTable.getsPerformed;
-// for (;;) {
-// Thread.sleep(10000);
-//
-// long newGetsPerformed = TransactionalTable.getsPerformed;
-// long newElementsGotten = TransactionalTable.elementsGotten;
-// long newElementsRead = TransactionalTable.elementsRead;
-// long newExtraGetsPerformed = TransactionalTable.extraGetsPerformed;
-// long newAskedTSO = TSOClient.askedTSO;
-//
-// System.out.println(String.format("TSO CLIENT: GetsPerformed: %d ElsGotten: %d ElsRead: %d ExtraGets: %d AskedTSO: %d AvgVersions: %f",
-// newGetsPerformed - oldGetsPerformed,
-// newElementsGotten - oldElementsGotten,
-// newElementsRead - oldElementsRead,
-// newExtraGetsPerformed - oldExtraGetsPerformed,
-// newAskedTSO - oldAskedTSO,
-// TransactionalTable.extraVersionsAvg)
-// );
-//
-// oldAskedTSO = newAskedTSO;
-// oldElementsGotten = newElementsGotten;
-// oldElementsRead = newElementsRead;
-// oldExtraGetsPerformed = newExtraGetsPerformed;
-// oldGetsPerformed = newGetsPerformed;
-// }
-// } catch (InterruptedException e) {
-// // Stop monitoring asked
-// return;
-// }
-// }
-// }
+ }
}
Please sign in to comment.
Something went wrong with that request. Please try again.