Skip to content
Browse files

minor refactoring

  • Loading branch information...
1 parent 0457efb commit 91a1bcc6ada7954ae0b0e97ef920e65d330d337f Maysam Yabandeh committed May 25, 2012
Showing with 32 additions and 41 deletions.
  1. +32 −41 src/main/java/com/yahoo/omid/tso/TSOHandler.java
View
73 src/main/java/com/yahoo/omid/tso/TSOHandler.java
@@ -282,22 +282,21 @@ public void handle(CommitRequest msg, ChannelHandlerContext ctx) {
//for reads just need atomic access, no need to hold the locks
//do this check befor locking the write rows, otherwise in case of conflict we will face deadlocks
if (IsolationLevel.checkForReadWriteConflicts)
- checkForConflictsIn(msg.readRows, msg, reply, false);
+ reply.committed = checkForConflictsIn(msg.readRows, msg.startTimestamp, reply.committed, false);
//always lock writes, since gonna update them anyway
- int li = -1;
+ int lastIndex = -1;
for (RowKey r: msg.writtenRows) {
- if (li != r.index) { //lockedSet.add(r.index)) {//do not lock twice
- li = r.index;
+ if (lastIndex != r.index) { //lockedSet.add(r.index)) {//do not lock twice
+ lastIndex = r.index;
long tmaxForConflictChecking = sharedState.hashmap.lock(r.index);
if (tmaxForConflictChecking > msg.startTimestamp) {
reply.committed = false;
break;
}
}
}
- if (reply.committed != false)
- if (IsolationLevel.checkForWriteWriteConflicts)
- checkForConflictsIn(msg.writtenRows, msg, reply, true);
+ if (IsolationLevel.checkForWriteWriteConflicts)
+ reply.committed = checkForConflictsIn(msg.writtenRows, msg.startTimestamp, reply.committed, true);
}
else reply.committed = true;
@@ -413,16 +412,12 @@ public void handle(CommitRequest msg, ChannelHandlerContext ctx) {
}
//for reads just need atomic access, no need to hold the locks
- //if (IsolationLevel.checkForReadWriteConflicts)
- //for (RowKey r: msg.readRows)
- //if (lockedSet.remove(r.index))//unlock only if it's locked
- //sharedState.hashmap.unlock(r.index);
- int li = -1;
+ //do it for writes though
+ int lastIndex = -1;
for (RowKey r: msg.writtenRows)
- //if (lockedSet.remove(r.index))//unlock only if it's locked
- if (li != r.index) {
+ if (lastIndex != r.index) {//account for two puts with the same index
sharedState.hashmap.unlock(r.index);
- li = r.index;
+ lastIndex = r.index;
}
@@ -436,7 +431,6 @@ public void handle(CommitRequest msg, ChannelHandlerContext ctx) {
if(LOG.isDebugEnabled()){
LOG.debug("Going to add record of size " + sharedState.baos.size());
}
- //sharedState.lh.asyncAddEntry(baos.toByteArray(), this, sharedState.nextBatch);
sharedState.addRecord(sharedState.baos.toByteArray(), new AddRecordCallback() {
@Override
public void addRecordComplete(int rc, Object ctx) {
@@ -446,9 +440,9 @@ public void addRecordComplete(int rc, Object ctx) {
synchronized (callbackLock) {
@SuppressWarnings("unchecked")
ArrayList<ChannelandMessage> theBatch = (ArrayList<ChannelandMessage>) ctx;
- for (ChannelandMessage cam : theBatch) {
- Channels.write(cam.ctx, Channels.succeededFuture(cam.ctx.getChannel()), cam.msg);
- }
+ for (ChannelandMessage cam : theBatch) {
+ Channels.write(cam.ctx, Channels.succeededFuture(cam.ctx.getChannel()), cam.msg);
+ }
}
}
}
@@ -462,34 +456,31 @@ public void addRecordComplete(int rc, Object ctx) {
//}
- protected void checkForConflictsIn(RowKey[] rows, CommitRequest msg, CommitResponse reply, boolean isAlreadyLocked) {
- if (!reply.committed)//already aborted
- return;
+ boolean checkForConflictsIn(RowKey[] rows, long startTimestamp, boolean committed, boolean isAlreadyLocked) {
+ if (!committed)//already aborted
+ return committed;
for (RowKey r: rows) {
long value;
if (isAlreadyLocked)
value = sharedState.hashmap.get(r.getRow(), r.getTable(), r.hashCode());
else//perform an atomic read that acquires the lock and releases it afterwards
- value = sharedState.hashmap.atomicget(r.getRow(), r.getTable(), r.hashCode(), r.index, msg.startTimestamp);
- if (value != 0 && value > msg.startTimestamp) {
- //System.out.println("Abort...............");
- reply.committed = false;//set as abort
- break;
- } else if (value == 0 && sharedState.largestDeletedTimestamp > msg.startTimestamp) {
+ value = sharedState.hashmap.atomicget(r.getRow(), r.getTable(), r.hashCode(), r.index, startTimestamp);
+ if (value != 0 && value > startTimestamp) {
+ return false;//set as abort
+ } else if (value == 0 && sharedState.largestDeletedTimestamp > startTimestamp) {
//then it could have been committed after start timestamp but deleted by recycling
- System.out.println("Old............... " + sharedState.largestDeletedTimestamp + " " + msg.startTimestamp);
- reply.committed = false;//set as abort
- break;
- } else if (value == -1) {//means that tmaxForConflictChecking > msg.startTimestamp
- System.out.println("Old....-1......... " + sharedState.largestDeletedTimestamp + " " + msg.startTimestamp);
- reply.committed = false;//set as abort
- break;
+ LOG.warn("Old............... " + sharedState.largestDeletedTimestamp + " " + startTimestamp);
+ return false;//set as abort
+ } else if (value == -1) {//means that tmaxForConflictChecking > startTimestamp
+ LOG.warn("Old....-1......... " + sharedState.largestDeletedTimestamp + " " + startTimestamp);
+ return false;//set as abort
}
}
+ return true;
}
//check for write-write conflicts
- protected void checkForElders(CommitResponse reply, CommitRequest msg) {
+ void checkForElders(CommitResponse reply, CommitRequest msg) {
for (RowKey r: msg.writtenRows) {
long value;
value = sharedState.hashmap.get(r.getRow(), r.getTable(), r.hashCode());
@@ -502,7 +493,7 @@ protected void checkForElders(CommitResponse reply, CommitRequest msg) {
}
}
- protected void reportEldestIfChanged(CommitResponse reply, CommitRequest msg) {
+ void reportEldestIfChanged(CommitResponse reply, CommitRequest msg) {
//2. add it to elders list
if (reply.rowsWithWriteWriteConflict != null && reply.rowsWithWriteWriteConflict.size() > 0) {
ArrayList<RowKey> rowsWithWriteWriteConflict = new ArrayList<RowKey>(reply.rowsWithWriteWriteConflict);
@@ -527,7 +518,7 @@ protected void reportEldestIfChanged(CommitResponse reply, CommitRequest msg) {
}
//A write-write conflict is detected and the proper action is taken here
- protected void aWWconflictDetected(CommitResponse reply, CommitRequest msg, RowKey wwRow) {
+ void aWWconflictDetected(CommitResponse reply, CommitRequest msg, RowKey wwRow) {
//Since we abort only for read-write conflicts, here we just keep track of elders (transactions with ww conflict) and tell them to reincarnate themselves by reinserting the items with ww conflict
//1. add it to the reply to the lients
if (reply.rowsWithWriteWriteConflict == null)
@@ -579,9 +570,9 @@ public void addRecordComplete(int rc, Object ctx) {
synchronized (callbackLock) {
@SuppressWarnings("unchecked")
ArrayList<ChannelandMessage> theBatch = (ArrayList<ChannelandMessage>) ctx;
- for (ChannelandMessage cam : theBatch) {
- Channels.write(cam.ctx, Channels.succeededFuture(cam.ctx.getChannel()), cam.msg);
- }
+ for (ChannelandMessage cam : theBatch) {
+ Channels.write(cam.ctx, Channels.succeededFuture(cam.ctx.getChannel()), cam.msg);
+ }
}
}
}

0 comments on commit 91a1bcc

Please sign in to comment.
Something went wrong with that request. Please try again.