readWrite branch merged into the master branch #13

Closed
wants to merge 48 commits into
from

Projects

None yet

3 participants

@maysamyabandeh

The readWrite branch includes features like serializability as well as high-granularity locks to scale with number of cores in the status oracle. This merge makes these features available to the main users on the master branch.

dgomezferro and others added some commits Dec 2, 2011
@dgomezferro dgomezferro Fix mac compilation af84971
@dgomezferro dgomezferro Synchronize access to shared variables 4933cb4
@dgomezferro dgomezferro read write support (poor perf -15%) 00913af
Maysam Yabandeh simplify the filter function to work with only one column\n TODO: gen…
…eralize it
84495a8
Maysam Yabandeh Towards handling elders
Done: maintain elder and failedelder list
TODO: reincarnation, GC, elder broadcast, get using failedelder
fe036d1
Maysam Yabandeh elder for simclient complete
TODO: GC, HBase client
7f72211
Maysam Yabandeh elder for HBase client complete
TODO: GC, wwRows->WAL, send failedElder at connection start, check
elders at TSO query
5e4d3ae
Maysam Yabandeh debug + eldest 44c3d54
Maysam Yabandeh debug eldest in HBase get by adding a filter 7b4a383
Maysam Yabandeh crcimbo: ww insteaof rw 56ef384
Maysam Yabandeh combine rw and ww, switch to rw efd7a53
Maysam Yabandeh fix the min problem in Compactor + set first timestamp to >=1 to read…
… HBase initial values
54f51be
Maysam Yabandeh add a configuration part for isolation conf 312774a
Maysam Yabandeh bug fixed: compactor did not switch to the next row properly -> delet…
…ing everything in some rows
b7cc6e9
Maysam Yabandeh add a bit debugging printf b0e98ec
Maysam Yabandeh update sim clients
remove sleep in client start
39ea774
Maysam Yabandeh Merge branch 'nativelocks' into readWrite
Conflicts:
	pom.xml
	src/main/java/com/yahoo/omid/tso/TSOHandler.java
	src/main/java/com/yahoo/omid/tso/TSOServer.java
a8c7615
Maysam Yabandeh fixing some synchronized points
still needs more work
d245f66
Maysam Yabandeh a few optimizations: e.g., remove lockedSet c3f3090
Maysam Yabandeh enable tests with var number of rows,
fix a performance bug with read-only transactions,
compile nativelib based on the jar file
set max number of test messages from int to long
cf1ce73
Maysam Yabandeh put back the initial connection delay
measure forced flushes
geometric distribution of requests
4affed0
Maysam Yabandeh refactor the filter function 6bc70e3
Maysam Yabandeh extend filter to multi-column gets 19f0a87
Maysam Yabandeh report failedElder at the connection time, add elders Tc to commit ha…
…shmap
763806f
Maysam Yabandeh temporarily fix the problem of ignoring retry b09c36d
Maysam Yabandeh check for deletes in filter 393a527
Maysam Yabandeh small commentation before merging with master e1d3012
Maysam Yabandeh merged with master 39a1128
Maysam Yabandeh fix compile problems caused by merge 9edb62c
Maysam Yabandeh make it work with unit tests 85f2e6e
@dgomezferro dgomezferro was assigned May 18, 2012
@dgomezferro
Contributor

A couple of high level comments, I'll add more specific comments later if I have any.

This is a huge change so I'd want to have at least a couple of unit tests for the different IsolationLevels.

Have you integrated the new Isolation Level with the recovery procedure from BK? Does the same procedure work for both?

A couple of files have been rewritten, maybe it's a good time to go for the consisten indentation in issue #9. Could you use 4 spaces for these files?

Please remove the superfluous comments (e.g. // Added by... ).

@maysamyabandeh

Thanks Daniel. Here are the responses:

  1. Sure, I will add the unit tests.
  2. No, the recovery does not work with the new isolation level. It does not affect the users of the old isolation level though, so it could be added later. It is better to do the merge sooner to avoid a huger merge is future.
  3. Ok, I use 4 space indentation.
  4. I believe "//Added by" comments are helpful. But, you are the one who decides.
@dgomezferro
Contributor
  1. We should open an Issue after merging (or even now). I think we should warn the user in case he uses the new IsolationLevel and the WAL at the same time.
  2. I thought you left the unintentionally, in my opinion we should use git blame for that.
@maysamyabandeh

Good idea. I throw an exception if the recovery is called on the serializable isolation.

@maysamyabandeh

Daniel, the requested changes are applied.

Maysam Yabandeh added some commits May 22, 2012
@dgomezferro dgomezferro commented on an outdated diff May 23, 2012
src/main/java/com/yahoo/omid/Statistics.java
+ long currentTimeMillis = System.currentTimeMillis();
+ if (currentTimeMillis - lastReportTime > reportInterval) {
+ lastReportTime = currentTimeMillis;
+ return false;
+ }
+ return true;
+ }
+ static public void println() {
+ synchronized (histories) {
+ if (skipReport())
+ return;
+ System.out.print("Statistics: ");
+ for (Map.Entry<Tag, History> entry : histories.entrySet()) {
+ Tag tag = entry.getKey();
+ History history = entry.getValue();
+ System.out.print(tag + "Cnt " + history.cnt + " ");
@dgomezferro
dgomezferro May 23, 2012 Contributor

Use logging here, with a low level (TRACE or DEBUG). If it's disabled don't keep statistics even.

@dgomezferro dgomezferro commented on an outdated diff May 23, 2012
.../com/yahoo/omid/client/ColumnFamilyAndQuantifier.java
+//Make it easier to be used in maps and hash maps
+public class ColumnFamilyAndQuantifier {
+ protected byte[] family;
+ protected byte[] qualifier;
+ protected Integer hash = null;
+ ColumnFamilyAndQuantifier(byte[] f, byte[] q) {
+ family = f;
+ qualifier = q;
+ }
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof ColumnFamilyAndQuantifier) {
+ ColumnFamilyAndQuantifier other = (ColumnFamilyAndQuantifier) o;
+ if (family.length != other.family.length || qualifier.length != other.qualifier.length)
+ return false;
+ for (int i = 0; i < family.length; i++)
@dgomezferro
dgomezferro May 23, 2012 Contributor

You could have used Arrays.equals()

@dgomezferro dgomezferro commented on an outdated diff May 23, 2012
.../com/yahoo/omid/client/ColumnFamilyAndQuantifier.java
+ for (int i = 0; i < family.length; i++)
+ if (family[i] != other.family[i])
+ return false;
+ for (int i = 0; i < qualifier.length; i++)
+ if (qualifier[i] != other.qualifier[i])
+ return false;
+ return true;
+ }
+ return false;
+ }
+ @Override
+ public int hashCode() {
+ if (hash != null)
+ return hash;
+ int h = 0;
+ h = computeHash(h, family);
@dgomezferro
dgomezferro May 23, 2012 Contributor

This is not a good hash function. We don't use it but ...

Here is what eclipse generates, simpler and stronger:

public int hashCode() {
    final int prime = 31;
    int result = 1;
    result = prime * result + Arrays.hashCode(family);
    result = prime * result + Arrays.hashCode(qualifier);
    return result;
}
@dgomezferro dgomezferro commented on the diff May 23, 2012
...ain/java/com/yahoo/omid/client/MinVersionsFilter.java
+ }
+
+ private void init() {
+ includedVersionsForLastColumn = 0;
+ lastColumn = null;
+ }
+
+ private int getIncludedVersions(ColumnFamilyAndQuantifier column) {
+ if (lastColumn == null || !lastColumn.equals(column)) {
+ lastColumn = column;
+ includedVersionsForLastColumn = 0;
+ }
+ return includedVersionsForLastColumn;
+ }
+
+ private void setIncludedVersions(ColumnFamilyAndQuantifier column, int versions) {
@dgomezferro
dgomezferro May 23, 2012 Contributor

This function is not needed.

@dgomezferro dgomezferro commented on an outdated diff May 23, 2012
.../yahoo/omid/client/MinVersionsSingleColumnFilter.java
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.filter.*;
+
+/**
+ * Filter that sets both minTimestamp and minVersions
+ * Assumes that there is only one column in the output
+ * @maysam
+ */
+public class MinVersionsSingleColumnFilter extends FilterBase {
@dgomezferro
dgomezferro May 23, 2012 Contributor

Are we using this?

@dgomezferro dgomezferro commented on the diff May 23, 2012
...ain/java/com/yahoo/omid/client/MinVersionsFilter.java
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.filter.*;
+
+/**
+ * Filter that sets both minTimestamp and minVersions
+ * It assumes a single row but works with more than one column family/qualifier
+ * It also assumes the following order <column family, column qualifier, timestamp>
+ * @maysam
+ */
+public class MinVersionsFilter extends FilterBase {
@dgomezferro
dgomezferro May 23, 2012 Contributor

Is there a test for this filter?

@dgomezferro dgomezferro commented on an outdated diff May 23, 2012
...in/java/com/yahoo/omid/client/SyncCommitCallback.java
- return result;
- }
-
- public long getCommitTimestamp() {
- return commitTimestamp;
- }
-
- synchronized
- public void complete(Result res, long commitTimestamp) {
- this.result = res;
- this.commitTimestamp = commitTimestamp;
- countDown();
- }
+ private Result result;
+ private long commitTimestamp;
+ private RowKey[] wwRows;
@dgomezferro
dgomezferro May 23, 2012 Contributor

wwRows is not very descriptive.

@dgomezferro dgomezferro commented on an outdated diff May 23, 2012
...in/java/com/yahoo/omid/client/TransactionManager.java
+ throw new TransactionException("Could not abort", e);
+ }
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("doneAbort " + transactionState.getStartTimestamp());
+ }
+
+ // Make sure its commit timestamp is 0, so the cleanup does the right job
+ transactionState.setCommitTimestamp(0);
+ cleanup(transactionState);
+ }
+
+ private void reincarnate(final TransactionState transactionState, RowKey[] wwRows)
+ throws TransactionException {
+ Statistics.fullReport(Statistics.Tag.REINCARNATION, 1);
+ //System.out.println("I am reincarnating haha");
@dgomezferro
dgomezferro May 23, 2012 Contributor

remove

@dgomezferro dgomezferro commented on an outdated diff May 23, 2012
...in/java/com/yahoo/omid/client/TransactionalTable.java
- issueGet = true;
- break;
- case Delete:
- if (kv.getTimestamp() == HConstants.LATEST_TIMESTAMP) {
- deleteP.add(kv.getFamily(), kv.getQualifier(), startTimestamp, null);
- break;
- } else {
- throw new UnsupportedOperationException("Cannot delete specific versions on Snapshot Isolation.");
- }
+ }
+ // Result result;
+ // Result filteredResult;
+ // do {
+ // result = super.get(tsget);
+ // filteredResult = filter(super.get(tsget), readTimestamp, maxVersions);
+ // } while (!result.isEmpty() && filteredResult == null);
@dgomezferro
dgomezferro May 23, 2012 Contributor

Remove the commented code please.

@dgomezferro dgomezferro commented on an outdated diff May 23, 2012
...in/java/com/yahoo/omid/client/TransactionalTable.java
}
- }
- }
- if (issueGet) {
- Result result = this.get(deleteG);
- for (Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> entryF : result.getMap().entrySet()) {
- byte[] family = entryF.getKey();
- for (Entry<byte[], NavigableMap<Long, byte[]>> entryQ : entryF.getValue().entrySet()) {
- byte[] qualifier = entryQ.getKey();
- deleteP.add(family, qualifier, null);
+ }
+ if (issueGet) {
+ Result result = this.get(deleteG);
@dgomezferro
dgomezferro May 23, 2012 Contributor

I think we should use the transactional Get here, so we get the existing qualifiers for a particular snapshot, and it also takes care of adding the get to the readRows if it needs to.

@dgomezferro dgomezferro commented on the diff May 23, 2012
...in/java/com/yahoo/omid/client/TransactionalTable.java
+
+ //a wrapper for KeyValue and the corresponding Tc
+ private class KeyValueTc {
+ KeyValue kv = null;
+ long Tc;
+ void reset() {
+ kv = null;
+ }
+ //update kv if the new one is more recent
+ void update(KeyValue newkv, long newTc) {
+ if (kv == null || Tc < newTc) {
+ kv = newkv;
+ Tc = newTc;
+ }
+ }
+ boolean isMoreRecentThan(long otherTc) {
@dgomezferro
dgomezferro May 23, 2012 Contributor

never used

@dgomezferro dgomezferro commented on an outdated diff May 23, 2012
...in/java/com/yahoo/omid/client/TransactionalTable.java
-
- private class DeleteTracker {
- Map<ByteArray, Long> deletedRows = new HashMap<ByteArray, Long>();
- Map<ByteArray, Long> deletedFamilies = new HashMap<ByteArray, Long>();
- Map<ByteArray, Long> deletedColumns = new HashMap<ByteArray, Long>();
-
- public boolean addDeleted(KeyValue kv) {
- if (kv.getValue().length == 0) {
+if (filtered.isEmpty()) {
+ return null;
+}
+return new Result(filtered);
+ }
+*/
+
+private class DeleteTracker {
@dgomezferro
dgomezferro May 23, 2012 Contributor

never used

@dgomezferro dgomezferro commented on an outdated diff May 23, 2012
...in/java/com/yahoo/omid/client/TransactionalTable.java
+ return;
+ // We need to fetch more versions
+ Get get = new Get(lastkv.getRow());
+ get.addColumn(lastkv.getFamily(), lastkv.getQualifier());
+ get.setMaxVersions(nMinVersionsAsked);
+ get.setTimeRange(0, nextFetchMaxTimestamp);
+ Result unfilteredResult = this.get(get);
+ filter(state, unfilteredResult, startTimestamp, nMinVersionsAsked, filteredList);
+ }
+
+ void addIfItIsNotADelete(KeyValue kv, ArrayList<KeyValue> filteredList) {
+ if (kv.getValue().length != 0)
+ filteredList.add(kv);
+ }
+
+ /*
@dgomezferro
dgomezferro May 23, 2012 Contributor

remove commented code

@dgomezferro dgomezferro commented on an outdated diff May 23, 2012
...in/java/com/yahoo/omid/client/TransactionalTable.java
+ pickedOneForLastColumn = true;
+ continue;
+ }
+ if (Tc == -1) // means valid read with lost Tc
+ //Case 2: Normal value with lost Tc
+ mostRecentKeyValueWithLostTc = kv; //Note: a value with lost Tc could also be a failedElder, so do this check after failedEdler check
+ else
+ //Case 1: Normal with with Tc
+ mostRecentValueWithTc.update(kv, Tc); //some kv might be from elders
+ }
+ if (!pickedOneForLastColumn)
+ pickTheRightVersion(filteredList, state, startTimestamp, nVersionsRead, nMinVersionsAsked, lastkv, nextFetchMaxTimestamp, mostRecentValueWithTc, mostRecentKeyValueWithLostTc, mostRecentFailedElder);
+ }
+
+ //Having processed the versions related to a column, decide which version should be added to the filteredList
+ void pickTheRightVersion(ArrayList<KeyValue> filteredList, TransactionState state, long startTimestamp, int nVersionsRead, int nMinVersionsAsked, KeyValue lastkv, long nextFetchMaxTimestamp, KeyValueTc mostRecentValueWithTc, KeyValue mostRecentKeyValueWithLostTc, KeyValueTc mostRecentFailedElder) throws IOException {
@dgomezferro
dgomezferro May 23, 2012 Contributor

There are lots of lines very wide, please stick to 120 characters if possible.

Maysam Yabandeh added some commits May 23, 2012
@maysamyabandeh

Daniel, you comments are applied.

@dgomezferro dgomezferro commented on an outdated diff May 24, 2012
src/main/java/com/yahoo/omid/client/TSOClient.java
+ largestDeletedTimestamp = 0;
+ connectionTimestamp = 0;
+ hasConnectionTimestamp = false;
+ }
+
+ @Override
+ synchronized
+ public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e)
+ throws Exception {
+ synchronized(state) {
+ channel = null;
+ state = State.DISCONNECTED;
+ }
+ }
+
+ //In the new implementation, I need direct access to commit timestamp and the logic for deciding the committed version is more complex. Therefero, this function replaces validRead.
@dgomezferro
dgomezferro May 24, 2012 Contributor

Break the line.
Specify what the function returns.

@dgomezferro dgomezferro commented on an outdated diff May 24, 2012
src/main/java/com/yahoo/omid/client/TSOClient.java
+ connectionTimestamp = 0;
+ hasConnectionTimestamp = false;
+ }
+
+ @Override
+ synchronized
+ public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e)
+ throws Exception {
+ synchronized(state) {
+ channel = null;
+ state = State.DISCONNECTED;
+ }
+ }
+
+ //In the new implementation, I need direct access to commit timestamp and the logic for deciding the committed version is more complex. Therefero, this function replaces validRead.
+ public long commitTimestamp(long transaction, long startTimestamp) throws IOException {
@dgomezferro
dgomezferro May 24, 2012 Contributor

If it replaces validRead() then remove it.
validRead() is been tested at TestReadAlgorithm, adapt it to test this new function.

@dgomezferro dgomezferro commented on an outdated diff May 24, 2012
src/main/java/com/yahoo/omid/client/TSOClient.java
+ }
+
+ @Override
+ synchronized
+ public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e)
+ throws Exception {
+ synchronized(state) {
+ channel = null;
+ state = State.DISCONNECTED;
+ }
+ }
+
+ //In the new implementation, I need direct access to commit timestamp and the logic for deciding the committed version is more complex. Therefero, this function replaces validRead.
+ public long commitTimestamp(long transaction, long startTimestamp) throws IOException {
+ if (aborted.contains(transaction))
+ return -2;//invalid read
@dgomezferro
dgomezferro May 24, 2012 Contributor

Use symbolic constants instead of -1, -2

Maysam Yabandeh added some commits May 24, 2012
@maysamyabandeh

Daniel, your comments are applied.

@dgomezferro

I don't think this is the right place for this constants.

@dgomezferro

validRead() is not used anywhere, remove it, and adapt the test that was testing this to test the new function (checking the new return status as well)

Maysam Yabandeh minor refactoring 5637cf7
@maysamyabandeh

Daniel, your comments are applied.

@dgomezferro dgomezferro commented on an outdated diff May 24, 2012
src/main/java/com/yahoo/omid/tso/RowKey.java
@@ -52,6 +54,7 @@ public String toString() {
public static RowKey readObject(ChannelBuffer aInputStream)
throws IOException {
+ //int index = aInputStream.readInt();
@dgomezferro
dgomezferro May 24, 2012 Contributor

remove

@dgomezferro dgomezferro commented on an outdated diff May 24, 2012
src/main/java/com/yahoo/omid/tso/RowKey.java
return rk;
}
public void writeObject(DataOutputStream aOutputStream)
throws IOException {
hashCode();
+ //assert(index != -1);
+ //aOutputStream.writeInt(index);
@dgomezferro
dgomezferro May 24, 2012 Contributor

remove

@dgomezferro dgomezferro and 1 other commented on an outdated diff May 24, 2012
src/main/java/com/yahoo/omid/tso/RowKey.java
@@ -111,5 +117,11 @@ public int hashCode() {
// return (31*Arrays.hashCode(tableId)) + Arrays.hashCode(rowId);
return hash;
}
+
+ //assume: index must be assigned before using the RowKey in a collection
+ //This is used to sort the RowKeys
+ public int compareTo(RowKey rk) {
@dgomezferro
dgomezferro May 24, 2012 Contributor

remove

@maysamyabandeh
maysamyabandeh May 25, 2012

I found the comment useful.

@dgomezferro dgomezferro and 1 other commented on an outdated diff May 24, 2012
src/main/java/com/yahoo/omid/tso/RowKey.java
@@ -24,10 +24,12 @@
import org.apache.hadoop.hbase.util.MurmurHash;
import org.jboss.netty.buffer.ChannelBuffer;
-public class RowKey {
+public class RowKey implements Comparable<RowKey> {
@dgomezferro dgomezferro and 1 other commented on an outdated diff May 24, 2012
src/main/java/com/yahoo/omid/tso/RowKey.java
private byte[] rowId;
private byte[] tableId;
private int hash = 0;
+ //these are not to be serialized, just used for lock-based impl
+ public int index = -1;//the index on hashmap, it is used to sort the rowkeys
@dgomezferro
dgomezferro May 24, 2012 Contributor

is this being used? else remove

@dgomezferro dgomezferro commented on an outdated diff May 24, 2012
src/test/java/com/yahoo/omid/tso/TestCommit.java
@@ -37,8 +37,11 @@ public void testCommit() throws Exception {
clientHandler.sendMessage(new CommitRequest(tr1.timestamp));
CommitResponse cr1 = clientHandler.receiveMessage(CommitResponse.class);
assertTrue(cr1.committed);
- assertTrue(cr1.commitTimestamp > tr1.timestamp);
+ //Changed by Maysam Yabandeh
@dgomezferro
dgomezferro May 24, 2012 Contributor

remove

@dgomezferro dgomezferro commented on an outdated diff May 24, 2012
src/test/java/com/yahoo/omid/tso/TestCommitReport.java
@@ -38,7 +38,10 @@ public void testCommitReport() throws Exception {
clientHandler.sendMessage(new CommitRequest(tr1.timestamp));
CommitResponse cr1 = clientHandler.receiveMessage(CommitResponse.class);
assertTrue(cr1.committed);
- assertTrue(cr1.commitTimestamp > tr1.timestamp);
+ //Changed by Maysam Yabandeh
@dgomezferro
dgomezferro May 24, 2012 Contributor

remove

@dgomezferro dgomezferro commented on an outdated diff May 24, 2012
src/test/java/com/yahoo/omid/tso/TestCommitReport.java
@@ -48,7 +51,10 @@ public void testCommitReport() throws Exception {
secondClientHandler.sendMessage(new CommitRequest(tr2.timestamp));
CommitResponse cr2 = secondClientHandler.receiveMessage(CommitResponse.class);
assertTrue(cr2.committed);
- assertTrue(cr2.commitTimestamp > tr2.timestamp);
+ //Changed by Maysam Yabandeh
@dgomezferro
dgomezferro May 24, 2012 Contributor

remove

@dgomezferro dgomezferro and 1 other commented on an outdated diff May 24, 2012
src/main/java/com/yahoo/omid/tso/Elder.java
+ public int compareTo(Elder e) {
+ //be careful not to cast long to int (neg to pos complexity ...)
+ long diff = this.getId() - e.getId();
+ if (diff > 0) return 1;
+ if (diff < 0) return -1;
+ return 0;
+ }
+ public boolean equals(Object o) {
+ if (o instanceof Elder) {
+ return this.getId() == ((Elder)o).getId();
+ }
+ return false;
+ }
+
+ public int hashCode() {
+ return (int)getId();
@dgomezferro
dgomezferro May 24, 2012 Contributor

bad hashcode.

@maysamyabandeh
maysamyabandeh May 25, 2012

good enough for the Elder class. Also, here quick computation matters more.

@dgomezferro
dgomezferro May 25, 2012 Contributor

Not good enough for several reasons:

  • You are using it in a hashSet, so it is really important to have use a good hashCode()
  • Consecutive elders are going to have very close hashcodes
  • Big startTimestamps are going to hash to the same value, which is really really bad
  • A good hashCode is very fast:
    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + (int) (startTimestamp ^ (startTimestamp >>> 32));
        return result;
    }
@dgomezferro dgomezferro commented on an outdated diff May 24, 2012
src/main/java/com/yahoo/omid/tso/Elders.java
+import java.util.HashSet;
+import java.util.TreeSet;
+import java.util.PriorityQueue;
+import java.util.Iterator;
+
+public class Elders {
+ //set is efficient for membership checking
+ protected HashSet<Elder> setofelders;
+ // The list of the failed elders: the elders that did not reincarnte in a timely manner
+ protected TreeSet<Elder> failedElders;
+ //the eldest of elders: the elder with min ts
+ protected Elder eldest = null;
+ protected boolean eldestChangedSinceLastProbe = false;
+ //heap is efficient for advancing largestDeletedTimestamp
+ //heap.peek is always valid but the other members might be stale
+ protected PriorityQueue<Elder> heapofelders;
@dgomezferro
dgomezferro May 24, 2012 Contributor

encapsulate (private or package private).
Consisten naming: heapOfElders or eldersHeap or even a more descriptive name

@dgomezferro dgomezferro commented on an outdated diff May 24, 2012
src/main/java/com/yahoo/omid/tso/TSOHandler.java
+ /**
+ * The wrapper for the shared state of TSO
+ */
+ private TSOState sharedState;
+
+ private FlushThread flushThread;
+ private ScheduledExecutorService executor;
+ private ScheduledFuture<?> flushFuture;
+
+ /**
+ * Constructor
+ * @param channelGroup
+ */
+ public TSOHandler(ChannelGroup channelGroup, TSOState state) {
+ //System.out.println("This is rwcimbo with elders - no filter is installed");
+ //System.out.println("This is buggy rwcimbo");
@dgomezferro
dgomezferro May 24, 2012 Contributor

remove

Maysam Yabandeh minor refactoring 6d59b1d
@maysamyabandeh

Daniel, your comments are applied.

@dgomezferro dgomezferro commented on the diff May 25, 2012
src/main/java/com/yahoo/omid/tso/Elder.java
+
+ protected long startTimestamp;
+ protected long commitTimestamp = -1;
+ //i need this constructor only for search purposes, do not use it to store elders
+ public Elder(long id) {
+ this.startTimestamp = id;
+ }
+ public Elder(long id, long commitTimestamp) {
+ this.startTimestamp = id;
+ this.commitTimestamp = commitTimestamp;
+ }
+ public long getId() {
+ return startTimestamp;
+ }
+ public long getCommitTimestamp() {
+ assert(commitTimestamp != -1);//this could happen if it is not set by constructor
@dgomezferro
dgomezferro May 25, 2012 Contributor

is this a debugging assertion or something you would want to enforce at runtime?

@dgomezferro dgomezferro commented on an outdated diff May 25, 2012
.../java/com/yahoo/omid/tso/messages/CommitResponse.java
@@ -47,6 +48,16 @@
public boolean committed = true;
/**
+ * Rows with write-write conflicts
+ */
+ public RowKey[] wwRows;
@dgomezferro
dgomezferro May 25, 2012 Contributor

I still don't like the name. I suggest using something like conflict(ing)Rows

@dgomezferro dgomezferro commented on an outdated diff May 25, 2012
.../java/com/yahoo/omid/tso/messages/CommitResponse.java
@@ -47,6 +48,16 @@
public boolean committed = true;
/**
+ * Rows with write-write conflicts
+ */
+ public RowKey[] wwRows;
+ /**
+ * The actual size of wwRows
+ * (some items might be null when the object is being developped)
+ */
+ public int wwRowsLength = 0;
@dgomezferro
dgomezferro May 25, 2012 Contributor

If you don't know the length when you create the object, use an ArrayList instead. I'd rather not have a separate field for the length of the array.

@dgomezferro
Contributor

Are you going to address the indentation and readability issues in this merge or do you want to do that afterwards?

Maysam Yabandeh minfor refactoring 728da4f
@maysamyabandeh

Daniel, your comments are applied

Maysam Yabandeh minor refactoring 0457efb
@dgomezferro dgomezferro commented on an outdated diff May 25, 2012
src/main/java/com/yahoo/omid/tso/TSOHandler.java
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ synchronized (sharedState.hashmap) {
+ sharedState.processAbort(msg.startTimestamp);
+ }
+ synchronized (sharedMsgBufLock) {
+ queueHalfAbort(msg.startTimestamp);
+ }
+ }
+
+ //for reads just need atomic access, no need to hold the locks
+ //if (IsolationLevel.checkForReadWriteConflicts)
+ //for (RowKey r: msg.readRows)
+ //if (lockedSet.remove(r.index))//unlock only if it's locked
+ //sharedState.hashmap.unlock(r.index);
@dgomezferro
dgomezferro May 25, 2012 Contributor

remove comments

@dgomezferro dgomezferro commented on an outdated diff May 25, 2012
src/main/java/com/yahoo/omid/tso/TSOHandler.java
+ e.printStackTrace();
+ }
+ synchronized (sharedState.hashmap) {
+ sharedState.processAbort(msg.startTimestamp);
+ }
+ synchronized (sharedMsgBufLock) {
+ queueHalfAbort(msg.startTimestamp);
+ }
+ }
+
+ //for reads just need atomic access, no need to hold the locks
+ //if (IsolationLevel.checkForReadWriteConflicts)
+ //for (RowKey r: msg.readRows)
+ //if (lockedSet.remove(r.index))//unlock only if it's locked
+ //sharedState.hashmap.unlock(r.index);
+ int li = -1;
@dgomezferro
dgomezferro May 25, 2012 Contributor

what does li stand for? lock index?

@dgomezferro dgomezferro commented on an outdated diff May 25, 2012
src/main/java/com/yahoo/omid/tso/TSOHandler.java
+ synchronized (sharedState.hashmap) {
+ sharedState.processAbort(msg.startTimestamp);
+ }
+ synchronized (sharedMsgBufLock) {
+ queueHalfAbort(msg.startTimestamp);
+ }
+ }
+
+ //for reads just need atomic access, no need to hold the locks
+ //if (IsolationLevel.checkForReadWriteConflicts)
+ //for (RowKey r: msg.readRows)
+ //if (lockedSet.remove(r.index))//unlock only if it's locked
+ //sharedState.hashmap.unlock(r.index);
+ int li = -1;
+ for (RowKey r: msg.writtenRows)
+ //if (lockedSet.remove(r.index))//unlock only if it's locked
@dgomezferro
dgomezferro May 25, 2012 Contributor

I don't understand this. Unneeded?

@dgomezferro dgomezferro commented on an outdated diff May 25, 2012
src/main/java/com/yahoo/omid/tso/TSOHandler.java
+ sharedState.processAbort(msg.startTimestamp);
+ }
+ synchronized (sharedMsgBufLock) {
+ queueHalfAbort(msg.startTimestamp);
+ }
+ }
+
+ //for reads just need atomic access, no need to hold the locks
+ //if (IsolationLevel.checkForReadWriteConflicts)
+ //for (RowKey r: msg.readRows)
+ //if (lockedSet.remove(r.index))//unlock only if it's locked
+ //sharedState.hashmap.unlock(r.index);
+ int li = -1;
+ for (RowKey r: msg.writtenRows)
+ //if (lockedSet.remove(r.index))//unlock only if it's locked
+ if (li != r.index) {
@dgomezferro
dgomezferro May 25, 2012 Contributor

Why is this needed?

@dgomezferro dgomezferro commented on an outdated diff May 25, 2012
src/main/java/com/yahoo/omid/tso/TSOHandler.java
+ sharedState.nextBatch.add(cam);
+ if (sharedState.baos.size() >= TSOState.BATCH_SIZE) {
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Going to add record of size " + sharedState.baos.size());
+ }
+ //sharedState.lh.asyncAddEntry(baos.toByteArray(), this, sharedState.nextBatch);
+ sharedState.addRecord(sharedState.baos.toByteArray(), new AddRecordCallback() {
+ @Override
+ public void addRecordComplete(int rc, Object ctx) {
+ if (rc != Code.OK) {
+ LOG.warn("Write failed: " + LoggerException.getMessage(rc));
+ } else {
+ synchronized (callbackLock) {
+ @SuppressWarnings("unchecked")
+ ArrayList<ChannelandMessage> theBatch = (ArrayList<ChannelandMessage>) ctx;
+ for (ChannelandMessage cam : theBatch) {
@dgomezferro
dgomezferro May 25, 2012 Contributor

bad indentation

@dgomezferro dgomezferro commented on an outdated diff May 25, 2012
src/main/java/com/yahoo/omid/tso/TSOHandler.java
+ sharedState.hashmap.unlock(r.index);
+ li = r.index;
+ }
+
+
+ TSOHandler.transferredBytes.incrementAndGet();
+
+ ChannelandMessage cam = new ChannelandMessage(ctx, reply);
+
+ synchronized (sharedState) {
+ sharedState.nextBatch.add(cam);
+ if (sharedState.baos.size() >= TSOState.BATCH_SIZE) {
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Going to add record of size " + sharedState.baos.size());
+ }
+ //sharedState.lh.asyncAddEntry(baos.toByteArray(), this, sharedState.nextBatch);
@dgomezferro
dgomezferro May 25, 2012 Contributor

remove

@dgomezferro dgomezferro commented on an outdated diff May 25, 2012
src/main/java/com/yahoo/omid/tso/TSOHandler.java
+ Channels.write(cam.ctx, Channels.succeededFuture(cam.ctx.getChannel()), cam.msg);
+ }
+ }
+ }
+ }
+ }, sharedState.nextBatch);
+ sharedState.nextBatch = new ArrayList<ChannelandMessage>(sharedState.nextBatch.size() + 5);
+ sharedState.baos.reset();
+ }
+ }
+
+ }
+
+ //}
+
+ protected void checkForConflictsIn(RowKey[] rows, CommitRequest msg, CommitResponse reply, boolean isAlreadyLocked) {
@dgomezferro
dgomezferro May 25, 2012 Contributor

change isolation level thorough the patch.

This function should be private boolean hasConflicts(...) and modify the reply at the call site.

Maysam Yabandeh minor refactoring 91a1bcc
@maysamyabandeh

Daniel, your comments are applied.

@dgomezferro dgomezferro commented on the diff May 25, 2012
.../com/yahoo/omid/client/ColumnFamilyAndQuantifier.java
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package com.yahoo.omid.client;
+
+import java.util.Arrays;
+
+//A wrapper for both column family and the qualifier
+//Make it easier to be used in maps and hash maps
+public class ColumnFamilyAndQuantifier {
+ protected byte[] family;
+ protected byte[] qualifier;
+ protected Integer hash = null;
@dgomezferro
dgomezferro May 25, 2012 Contributor

make them private

@dgomezferro dgomezferro commented on the diff May 25, 2012
src/main/java/com/yahoo/omid/tso/Elder.java
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+
+package com.yahoo.omid.tso;
+import java.util.Collections;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.ArrayList;
+
+public class Elder implements Comparable<Elder> {
+
+ protected long startTimestamp;
+ protected long commitTimestamp = -1;
+ //i need this constructor only for search purposes, do not use it to store elders
@dgomezferro
dgomezferro May 25, 2012 Contributor

make them private

@dgomezferro dgomezferro commented on the diff May 25, 2012
src/main/java/com/yahoo/omid/tso/TSOHandler.java
+ sharedState.sharedMessageBuffer.rollBackTimestamp();
+ }
+ }
+
+ ChannelBuffer cb = ChannelBuffers.buffer(10);
+
+ private boolean finish;
+
+ /**
+ * Handle the CommitRequest message
+ */
+ public void handle(CommitRequest msg, ChannelHandlerContext ctx) {
+ CommitResponse reply = new CommitResponse(msg.startTimestamp);
+ DataOutputStream toWAL = sharedState.toWAL;
+ reply.committed = true;
+ //HashSet<Integer> lockedSet = new HashSet();
@dgomezferro
dgomezferro May 25, 2012 Contributor

remove

@dgomezferro dgomezferro commented on the diff May 25, 2012
src/main/java/com/yahoo/omid/tso/TSOHandler.java
+ reply.committed = false;
+ LOG.warn("Aborting transaction after restarting TSO");
+ } else if (msg.startTimestamp < sharedState.largestDeletedTimestamp) {
+ // Too old
+ reply.committed = false;//set as abort
+ LOG.warn("Too old starttimestamp: ST "+ msg.startTimestamp +" MAX " + sharedState.largestDeletedTimestamp);
+ } else if (msg.writtenRows.length > 0) {
+ //1. check the read-write conflicts
+ //for reads just need atomic access, no need to hold the locks
+ //do this check befor locking the write rows, otherwise in case of conflict we will face deadlocks
+ if (IsolationLevel.checkForReadWriteConflicts)
+ reply.committed = checkForConflictsIn(msg.readRows, msg.startTimestamp, reply.committed, false);
+ //always lock writes, since gonna update them anyway
+ int lastIndex = -1;
+ for (RowKey r: msg.writtenRows) {
+ if (lastIndex != r.index) { //lockedSet.add(r.index)) {//do not lock twice
@dgomezferro
dgomezferro May 25, 2012 Contributor

what's the commented lockedSet.add() about? what does the comment apply to, the commented code or the if?

why do you need this check of lastIndex != r.index?

Maysam Yabandeh added some commits May 25, 2012
@francisco-perez-sorrosal francisco-perez-sorrosal deleted the readWrite branch Feb 5, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment