Skip to content
This repository has been archived by the owner on Jul 15, 2019. It is now read-only.

Commit

Permalink
temporarily fix the problem of ignoring retry
Browse files Browse the repository at this point in the history
  • Loading branch information
Maysam Yabandeh committed May 15, 2012
1 parent 763806f commit b09c36d
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 14 deletions.
Expand Up @@ -17,5 +17,5 @@
package com.yahoo.omid.client;

public interface CommitQueryCallback extends Callback {
public void complete(boolean committed);
public void complete(boolean committed, long commitTimestamp, boolean retry);
}
16 changes: 15 additions & 1 deletion src/main/java/com/yahoo/omid/client/SyncCommitQueryCallback.java
Expand Up @@ -19,14 +19,28 @@
public class SyncCommitQueryCallback extends SyncCallbackBase
implements CommitQueryCallback {
private boolean committed = false;
private long commitTimestamp;
private boolean retry = false;

public boolean isAClearAnswer() {
return (retry != true);
}

//valid only if isAClearAnswer returns true
public boolean isCommitted() {
return committed;
}

//valid only if isCommitted return true
public long commitTimestamp() {
return commitTimestamp;
}

synchronized
public void complete(boolean committed) {
public void complete(boolean committed, long commitTimestamp, boolean retry) {
this.committed = committed;
this.commitTimestamp = commitTimestamp;
this.retry = retry;
countDown();
}
}
13 changes: 9 additions & 4 deletions src/main/java/com/yahoo/omid/client/TSOClient.java
Expand Up @@ -517,6 +517,7 @@ public long commitTimestamp(long transaction, long startTimestamp) throws IOExce

if (hasConnectionTimestamp && transaction > connectionTimestamp)
return transaction <= largestDeletedTimestamp ? -1 : -2;
//TODO: it works only if it runs one transaction at a time
if (transaction <= largestDeletedTimestamp)
return -1;//committed but the tc is lost

Expand All @@ -529,10 +530,11 @@ public long commitTimestamp(long transaction, long startTimestamp) throws IOExce
} catch (InterruptedException e) {
throw new IOException("Commit query didn't complete", e);
}
return cb.isCommitted() ? -1 : -2;
//TODO: this is wrong. we should ask tso about Tc and then use that to decide
if (!cb.isAClearAnswer())
//TODO: throw a proper exception
throw new IOException("Either abort or retry the transaction");
return cb.isCommitted() ? cb.commitTimestamp() : -2;
}


public boolean validRead(long transaction, long startTimestamp) throws IOException {
if (transaction == startTimestamp)
Expand All @@ -556,6 +558,9 @@ public boolean validRead(long transaction, long startTimestamp) throws IOExcepti
} catch (InterruptedException e) {
throw new IOException("Commit query didn't complete", e);
}
if (!cb.isAClearAnswer())
//TODO: throw a proper exception
throw new IOException("Either abort or retry the transaction");
return cb.isCommitted();
}

Expand Down Expand Up @@ -607,7 +612,7 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
return;
}
for (CommitQueryCallback cb : cbs) {
cb.complete(r.committed);
cb.complete(r.committed, r.commitTimestamp, r.retry);
}
} else if (msg instanceof CommittedTransactionReport) {
CommittedTransactionReport ctr = (CommittedTransactionReport) msg;
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/com/yahoo/omid/client/TransactionManager.java
Expand Up @@ -58,6 +58,9 @@ public TransactionManager(Configuration conf) throws TransactionException, IOExc
tableCache = new HashMap<byte[], HTable>();
}

//a temorary solution to allow only one transaction at a time
boolean aTransactionIsInProgress = false;

/**
* Starts a new transaction.
*
Expand All @@ -68,6 +71,7 @@ public TransactionManager(Configuration conf) throws TransactionException, IOExc
* @throws TransactionException
*/
public TransactionState beginTransaction() throws TransactionException {
assert(aTransactionIsInProgress == false);
SyncCreateCallback cb = new SyncCreateCallback();
try {
tsoclient.getNewTimestamp(cb);
Expand All @@ -79,6 +83,7 @@ public TransactionState beginTransaction() throws TransactionException {
throw new TransactionException("Error retrieving timestamp", cb.getException());
}

aTransactionIsInProgress=true;
return new TransactionState(cb.getStartTimestamp(), tsoclient);
}

Expand All @@ -92,6 +97,7 @@ public TransactionState beginTransaction() throws TransactionException {
*/
public void tryCommit(TransactionState transactionState)
throws CommitUnsuccessfulException, TransactionException {
aTransactionIsInProgress=false;
Statistics.fullReport(Statistics.Tag.COMMIT, 1);
if (LOG.isTraceEnabled()) {
LOG.trace("tryCommit " + transactionState.getStartTimestamp());
Expand Down Expand Up @@ -138,6 +144,7 @@ public void tryCommit(TransactionState transactionState)
* @throws TransactionException
*/
public void abort(TransactionState transactionState) throws TransactionException {
aTransactionIsInProgress=false;
if (LOG.isTraceEnabled()) {
LOG.trace("abort " + transactionState.getStartTimestamp());
}
Expand Down
13 changes: 5 additions & 8 deletions src/main/java/com/yahoo/omid/tso/TSOHandler.java
Expand Up @@ -500,27 +500,24 @@ public void handle(CommitQueryRequest msg, ChannelHandlerContext ctx) {
reply.queryTimestamp = msg.queryTimestamp;
synchronized (sharedState) {
queries++;
//1. check the write-write conflicts
long value;
value = sharedState.hashmap.getCommittedTimestamp(msg.queryTimestamp);
if (value != 0) { //it exists
reply.commitTimestamp = value;
reply.committed = value < msg.startTimestamp;//set as abort
}
else if (sharedState.hashmap.isHalfAborted(msg.queryTimestamp))
else if (sharedState.largestDeletedTimestamp < msg.queryTimestamp)
reply.committed = false;
else if (sharedState.uncommited.isUncommited(msg.queryTimestamp))
else if (sharedState.hashmap.isHalfAborted(msg.queryTimestamp))
reply.committed = false;
else
reply.retry = true;
// else if (sharedState.largestDeletedTimestamp >= msg.queryTimestamp)
// reply.committed = true;
// TODO retry needed? isnt it just fully aborted?
//retry is show that we cannot distinguish two cases
//1. Tc < Tmax
//2. Ts < Tmax && aborted && Cleanedup is sent after we read the value but is received before this query is processed.

ctx.getChannel().write(reply);

// We send the message directly. If after a failure the state is inconsistent we'll detect it

}
}

Expand Down

0 comments on commit b09c36d

Please sign in to comment.