Skip to content
Browse files

change the indentation

  • Loading branch information...
1 parent 85f2e6e commit 64eca47ee71d3ffcf1db83b8c73d0cf560b32e3e Maysam Yabandeh committed May 18, 2012
Showing with 3,340 additions and 3,345 deletions.
  1. +19 −19 src/main/java/com/yahoo/omid/IsolationLevel.java
  2. +5 −5 src/main/java/com/yahoo/omid/OmidConfiguration.java
  3. +75 −75 src/main/java/com/yahoo/omid/Statistics.java
  4. +40 −40 src/main/java/com/yahoo/omid/client/ColumnFamilyAndQuantifier.java
  5. +1 −1 src/main/java/com/yahoo/omid/client/CommitCallback.java
  6. +1 −1 src/main/java/com/yahoo/omid/client/CommitQueryCallback.java
  7. +60 −60 src/main/java/com/yahoo/omid/client/MinVersionsFilter.java
  8. +44 −44 src/main/java/com/yahoo/omid/client/MinVersionsSingleColumnFilter.java
  9. +1 −1 src/main/java/com/yahoo/omid/client/ReincarnationCompleteCallback.java
  10. +27 −27 src/main/java/com/yahoo/omid/client/SyncCommitCallback.java
  11. +5 −5 src/main/java/com/yahoo/omid/client/SyncReincarnationCompleteCallback.java
  12. +636 −638 src/main/java/com/yahoo/omid/client/TSOClient.java
  13. +193 −193 src/main/java/com/yahoo/omid/client/TransactionManager.java
  14. +43 −43 src/main/java/com/yahoo/omid/client/TransactionState.java
  15. +512 −515 src/main/java/com/yahoo/omid/client/TransactionalTable.java
  16. +131 −131 src/main/java/com/yahoo/omid/client/regionserver/Compacter.java
  17. +382 −382 src/main/java/com/yahoo/omid/tso/ClientHandler.java
  18. +148 −148 src/main/java/com/yahoo/omid/tso/CommitHashMap.java
  19. +29 −29 src/main/java/com/yahoo/omid/tso/CompacterHandler.java
  20. +37 −37 src/main/java/com/yahoo/omid/tso/Elder.java
  21. +96 −96 src/main/java/com/yahoo/omid/tso/Elders.java
  22. +595 −595 src/main/java/com/yahoo/omid/tso/TSOHandler.java
  23. +35 −35 src/main/java/com/yahoo/omid/tso/messages/EldestUpdate.java
  24. +40 −40 src/main/java/com/yahoo/omid/tso/messages/FailedElderReport.java
  25. +35 −35 src/main/java/com/yahoo/omid/tso/messages/ReincarnationReport.java
  26. +110 −110 src/main/java/com/yahoo/omid/tso/serialization/TSODecoder.java
  27. +40 −40 src/main/java/com/yahoo/omid/tso/serialization/TSOEncoder.java
View
38 src/main/java/com/yahoo/omid/IsolationLevel.java
@@ -24,26 +24,26 @@
*
*/
public class IsolationLevel {
- static public boolean checkForReadWriteConflicts;
- static public boolean checkForWriteWriteConflicts;
+ static public boolean checkForReadWriteConflicts;
+ static public boolean checkForWriteWriteConflicts;
- static {
- Configuration conf = OmidConfiguration.create();
- if (conf.get("tso.rwcheck") == null || conf.get("tso.wwcheck") == null) {
- System.out.println("ISOLATION ERROR: the isolation level parameters are not set: " + conf.get("tso.rwcheck") + " " + conf.get("tso.wwcheck"));
- System.exit(1);
- }
- checkForReadWriteConflicts = conf.getBoolean("tso.rwcheck", false);
- checkForWriteWriteConflicts = conf.getBoolean("tso.wwcheck", true);
- if (IsolationLevel.checkForReadWriteConflicts)
- System.out.println("ISOLATION: check for read-write conflicts");
- if (IsolationLevel.checkForWriteWriteConflicts)
- System.out.println("ISOLATION: check for write-write conflicts");
- if (!IsolationLevel.checkForReadWriteConflicts && !IsolationLevel.checkForWriteWriteConflicts) {
- System.out.println("ISOLATION ERROR: I do not know which version it is");
- System.exit(1);
- }
- }
+ static {
+ Configuration conf = OmidConfiguration.create();
+ if (conf.get("tso.rwcheck") == null || conf.get("tso.wwcheck") == null) {
+ System.out.println("ISOLATION ERROR: the isolation level parameters are not set: " + conf.get("tso.rwcheck") + " " + conf.get("tso.wwcheck"));
+ System.exit(1);
+ }
+ checkForReadWriteConflicts = conf.getBoolean("tso.rwcheck", false);
+ checkForWriteWriteConflicts = conf.getBoolean("tso.wwcheck", true);
+ if (IsolationLevel.checkForReadWriteConflicts)
+ System.out.println("ISOLATION: check for read-write conflicts");
+ if (IsolationLevel.checkForWriteWriteConflicts)
+ System.out.println("ISOLATION: check for write-write conflicts");
+ if (!IsolationLevel.checkForReadWriteConflicts && !IsolationLevel.checkForWriteWriteConflicts) {
+ System.out.println("ISOLATION ERROR: I do not know which version it is");
+ System.exit(1);
+ }
+ }
}
View
10 src/main/java/com/yahoo/omid/OmidConfiguration.java
@@ -24,10 +24,10 @@
*
*/
public class OmidConfiguration extends Configuration {
- public static Configuration create() {
- Configuration conf = new Configuration();
- conf.addDefaultResource("omid-site.xml");
- return conf;
- }
+ public static Configuration create() {
+ Configuration conf = new Configuration();
+ conf.addDefaultResource("omid-site.xml");
+ return conf;
+ }
}
View
150 src/main/java/com/yahoo/omid/Statistics.java
@@ -27,84 +27,84 @@
*
*/
public class Statistics {
- public enum Tag {
- VSN_PER_HBASE_GET,//number of returned version per hbase get operation
- VSN_PER_CLIENT_GET,//number of returned version per client get operation
- GET_PER_CLIENT_GET,//number of hbase get performed per client get operation
- COMMIT,//number of commits
- REINCARNATION,//number of reincarnations
- ASKTSO,//number of queries sent to tso
- EMPTY_GET,//number of hbase get that return nothing
- dummy
- }
- static class History {
- public int cnt;
- public long total;
- }
- static Map<Tag, History> histories = new EnumMap<Tag, History>(Tag.class);
- static Map<Tag, History> partialChanges = new EnumMap<Tag, History>(Tag.class);
- static protected History getHistory(Tag tag, Map<Tag, History> map) {
- History history = map.get(tag);
- if (history == null) {
- history = new History();
- map.put(tag, history);
- }
- return history;
- }
+ public enum Tag {
+ VSN_PER_HBASE_GET,//number of returned version per hbase get operation
+ VSN_PER_CLIENT_GET,//number of returned version per client get operation
+ GET_PER_CLIENT_GET,//number of hbase get performed per client get operation
+ COMMIT,//number of commits
+ REINCARNATION,//number of reincarnations
+ ASKTSO,//number of queries sent to tso
+ EMPTY_GET,//number of hbase get that return nothing
+ dummy
+ }
+ static class History {
+ public int cnt;
+ public long total;
+ }
+ static Map<Tag, History> histories = new EnumMap<Tag, History>(Tag.class);
+ static Map<Tag, History> partialChanges = new EnumMap<Tag, History>(Tag.class);
+ static protected History getHistory(Tag tag, Map<Tag, History> map) {
+ History history = map.get(tag);
+ if (history == null) {
+ history = new History();
+ map.put(tag, history);
+ }
+ return history;
+ }
- static public void partialReport(Tag tag, int value) {
- synchronized (histories) {
- History tmpHistory = getHistory(tag, partialChanges);
- tmpHistory.total += value;
- }
- }
+ static public void partialReport(Tag tag, int value) {
+ synchronized (histories) {
+ History tmpHistory = getHistory(tag, partialChanges);
+ tmpHistory.total += value;
+ }
+ }
- static public void partialReportOver(Tag tag) {
- synchronized (histories) {
- History tmpHistory = getHistory(tag, partialChanges);
- if (tmpHistory.total == 0)
- return;
- History history = getHistory(tag, histories);
- history.cnt ++;
- history.total += tmpHistory.total;
- tmpHistory.total = 0;
- }
- }
+ static public void partialReportOver(Tag tag) {
+ synchronized (histories) {
+ History tmpHistory = getHistory(tag, partialChanges);
+ if (tmpHistory.total == 0)
+ return;
+ History history = getHistory(tag, histories);
+ history.cnt ++;
+ history.total += tmpHistory.total;
+ tmpHistory.total = 0;
+ }
+ }
- static public void fullReport(Tag tag, int value) {
- synchronized (histories) {
- if (value == 0)
- return;
- History history = getHistory(tag, histories);
- history.cnt ++;
- history.total += value;
- }
- }
+ static public void fullReport(Tag tag, int value) {
+ synchronized (histories) {
+ if (value == 0)
+ return;
+ History history = getHistory(tag, histories);
+ history.cnt ++;
+ history.total += value;
+ }
+ }
- static long lastReportTime = System.currentTimeMillis();
- static final long reportInterval = 2000;
- static public boolean skipReport() {
- long currentTimeMillis = System.currentTimeMillis();
- if (currentTimeMillis - lastReportTime > reportInterval) {
- lastReportTime = currentTimeMillis;
- return false;
- }
- return true;
- }
- static public void println() {
- synchronized (histories) {
- if (skipReport())
- return;
- System.out.print("Statistics: ");
- for (Map.Entry<Tag, History> entry : histories.entrySet()) {
- Tag tag = entry.getKey();
- History history = entry.getValue();
- System.out.print(tag + "Cnt " + history.cnt + " ");
- System.out.print(tag + "Sum " + history.total + " ");
- System.out.print(tag + "Avg " + (float) history.total / history.cnt + " ");
- }
- System.out.println();
- }
- }
+ static long lastReportTime = System.currentTimeMillis();
+ static final long reportInterval = 2000;
+ static public boolean skipReport() {
+ long currentTimeMillis = System.currentTimeMillis();
+ if (currentTimeMillis - lastReportTime > reportInterval) {
+ lastReportTime = currentTimeMillis;
+ return false;
+ }
+ return true;
+ }
+ static public void println() {
+ synchronized (histories) {
+ if (skipReport())
+ return;
+ System.out.print("Statistics: ");
+ for (Map.Entry<Tag, History> entry : histories.entrySet()) {
+ Tag tag = entry.getKey();
+ History history = entry.getValue();
+ System.out.print(tag + "Cnt " + history.cnt + " ");
+ System.out.print(tag + "Sum " + history.total + " ");
+ System.out.print(tag + "Avg " + (float) history.total / history.cnt + " ");
+ }
+ System.out.println();
+ }
+ }
}
View
80 src/main/java/com/yahoo/omid/client/ColumnFamilyAndQuantifier.java
@@ -19,44 +19,44 @@
//A wrapper for both column family and the qualifier
//Make it easier to be used in maps and hash maps
public class ColumnFamilyAndQuantifier {
- protected byte[] family;
- protected byte[] qualifier;
- protected Integer hash = null;
- ColumnFamilyAndQuantifier(byte[] f, byte[] q) {
- family = f;
- qualifier = q;
- }
- @Override
- public boolean equals(Object o) {
- if (o instanceof ColumnFamilyAndQuantifier) {
- ColumnFamilyAndQuantifier other = (ColumnFamilyAndQuantifier) o;
- if (family.length != other.family.length || qualifier.length != other.qualifier.length)
- return false;
- for (int i = 0; i < family.length; i++)
- if (family[i] != other.family[i])
- return false;
- for (int i = 0; i < qualifier.length; i++)
- if (qualifier[i] != other.qualifier[i])
- return false;
- return true;
- }
- return false;
- }
- @Override
- public int hashCode() {
- if (hash != null)
- return hash;
- int h = 0;
- h = computeHash(h, family);
- h = computeHash(h, qualifier);
- hash = h;
- return h;
- }
- private int computeHash(int hash, byte[] larray) {
- hash += larray.length;
- for (int i = 0; i < larray.length; i++) {
- hash += larray[i];
- }
- return hash;
- }
+ protected byte[] family;
+ protected byte[] qualifier;
+ protected Integer hash = null;
+ ColumnFamilyAndQuantifier(byte[] f, byte[] q) {
+ family = f;
+ qualifier = q;
+ }
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof ColumnFamilyAndQuantifier) {
+ ColumnFamilyAndQuantifier other = (ColumnFamilyAndQuantifier) o;
+ if (family.length != other.family.length || qualifier.length != other.qualifier.length)
+ return false;
+ for (int i = 0; i < family.length; i++)
+ if (family[i] != other.family[i])
+ return false;
+ for (int i = 0; i < qualifier.length; i++)
+ if (qualifier[i] != other.qualifier[i])
+ return false;
+ return true;
+ }
+ return false;
+ }
+ @Override
+ public int hashCode() {
+ if (hash != null)
+ return hash;
+ int h = 0;
+ h = computeHash(h, family);
+ h = computeHash(h, qualifier);
+ hash = h;
+ return h;
+ }
+ private int computeHash(int hash, byte[] larray) {
+ hash += larray.length;
+ for (int i = 0; i < larray.length; i++) {
+ hash += larray[i];
+ }
+ return hash;
+ }
}
View
2 src/main/java/com/yahoo/omid/client/CommitCallback.java
@@ -20,6 +20,6 @@
import com.yahoo.omid.tso.RowKey;
public interface CommitCallback extends Callback {
- public void complete(Result res, long commitTimestamp, RowKey[] wwRows);
+ public void complete(Result res, long commitTimestamp, RowKey[] wwRows);
}
View
2 src/main/java/com/yahoo/omid/client/CommitQueryCallback.java
@@ -17,5 +17,5 @@
package com.yahoo.omid.client;
public interface CommitQueryCallback extends Callback {
- public void complete(boolean committed, long commitTimestamp, boolean retry);
+ public void complete(boolean committed, long commitTimestamp, boolean retry);
}
View
120 src/main/java/com/yahoo/omid/client/MinVersionsFilter.java
@@ -33,74 +33,74 @@
*/
public class MinVersionsFilter extends FilterBase {
- //read at least minVersions and go till reach startTime
- long startTime = 0;
- long endTime = Long.MAX_VALUE;
- int minVersions;
+ //read at least minVersions and go till reach startTime
+ long startTime = 0;
+ long endTime = Long.MAX_VALUE;
+ int minVersions;
- //keep track of included versions for each column qualifier of each column family
- int includedVersionsForLastColumn;
- ColumnFamilyAndQuantifier lastColumn;
+ //keep track of included versions for each column qualifier of each column family
+ int includedVersionsForLastColumn;
+ ColumnFamilyAndQuantifier lastColumn;
- /**
- * Used during deserialization. Do not use otherwise.
- */
- public MinVersionsFilter() {
- super();
- }
+ /**
+ * Used during deserialization. Do not use otherwise.
+ */
+ public MinVersionsFilter() {
+ super();
+ }
- public MinVersionsFilter(long startTime, long endTime, int minVersions) {
- this.startTime = startTime;
- this.endTime = endTime;
- this.minVersions = minVersions;
- init();
- }
+ public MinVersionsFilter(long startTime, long endTime, int minVersions) {
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.minVersions = minVersions;
+ init();
+ }
- private void init() {
- includedVersionsForLastColumn = 0;
- lastColumn = null;
- }
+ private void init() {
+ includedVersionsForLastColumn = 0;
+ lastColumn = null;
+ }
- private int getIncludedVersions(ColumnFamilyAndQuantifier column) {
- if (lastColumn == null || !lastColumn.equals(column)) {
- lastColumn = column;
- includedVersionsForLastColumn = 0;
- }
- return includedVersionsForLastColumn;
- }
+ private int getIncludedVersions(ColumnFamilyAndQuantifier column) {
+ if (lastColumn == null || !lastColumn.equals(column)) {
+ lastColumn = column;
+ includedVersionsForLastColumn = 0;
+ }
+ return includedVersionsForLastColumn;
+ }
- private void setIncludedVersions(ColumnFamilyAndQuantifier column, int versions) {
- includedVersionsForLastColumn = versions;
- }
+ private void setIncludedVersions(ColumnFamilyAndQuantifier column, int versions) {
+ includedVersionsForLastColumn = versions;
+ }
- @Override
- public ReturnCode filterKeyValue(KeyValue v) {
- long version = v.getTimestamp();
- if (version >= endTime)
- return ReturnCode.SKIP;
- ColumnFamilyAndQuantifier column = new ColumnFamilyAndQuantifier(v.getFamily(), v.getQualifier());
- int includedVersions = getIncludedVersions(column);
- if (includedVersions < minVersions || version > startTime) {
- includedVersions++;
- setIncludedVersions(column, includedVersions);
- return ReturnCode.INCLUDE;
- }
- return ReturnCode.NEXT_COL;
- }
+ @Override
+ public ReturnCode filterKeyValue(KeyValue v) {
+ long version = v.getTimestamp();
+ if (version >= endTime)
+ return ReturnCode.SKIP;
+ ColumnFamilyAndQuantifier column = new ColumnFamilyAndQuantifier(v.getFamily(), v.getQualifier());
+ int includedVersions = getIncludedVersions(column);
+ if (includedVersions < minVersions || version > startTime) {
+ includedVersions++;
+ setIncludedVersions(column, includedVersions);
+ return ReturnCode.INCLUDE;
+ }
+ return ReturnCode.NEXT_COL;
+ }
- @Override
- public void readFields(DataInput in) throws IOException {
- this.startTime = in.readLong();
- this.endTime = in.readLong();
- this.minVersions = in.readInt();
- init();
- }
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.startTime = in.readLong();
+ this.endTime = in.readLong();
+ this.minVersions = in.readInt();
+ init();
+ }
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeLong(this.startTime);
- out.writeLong(this.endTime);
- out.writeInt(this.minVersions);
- }
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeLong(this.startTime);
+ out.writeLong(this.endTime);
+ out.writeInt(this.minVersions);
+ }
}
View
88 src/main/java/com/yahoo/omid/client/MinVersionsSingleColumnFilter.java
@@ -32,57 +32,57 @@
*/
public class MinVersionsSingleColumnFilter extends FilterBase {
- //read at least minVersions and go till reach startTime
- long startTime = 0;
- long endTime = Long.MAX_VALUE;
- int minVersions;
+ //read at least minVersions and go till reach startTime
+ long startTime = 0;
+ long endTime = Long.MAX_VALUE;
+ int minVersions;
- int includedVersions;
+ int includedVersions;
- /**
- * Used during deserialization. Do not use otherwise.
- */
- public MinVersionsSingleColumnFilter() {
- super();
- }
+ /**
+ * Used during deserialization. Do not use otherwise.
+ */
+ public MinVersionsSingleColumnFilter() {
+ super();
+ }
- public MinVersionsSingleColumnFilter(long startTime, long endTime, int minVersions) {
- this.startTime = startTime;
- this.endTime = endTime;
- this.minVersions = minVersions;
- init();
- }
+ public MinVersionsSingleColumnFilter(long startTime, long endTime, int minVersions) {
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.minVersions = minVersions;
+ init();
+ }
- private void init() {
- includedVersions = 0;
- }
+ private void init() {
+ includedVersions = 0;
+ }
- @Override
- public ReturnCode filterKeyValue(KeyValue v) {
- long version = v.getTimestamp();
- if (version >= endTime)
- return ReturnCode.SKIP;
- if (includedVersions < minVersions || version > startTime) {
- includedVersions++;
- return ReturnCode.INCLUDE;
- }
- return ReturnCode.NEXT_COL;
- }
+ @Override
+ public ReturnCode filterKeyValue(KeyValue v) {
+ long version = v.getTimestamp();
+ if (version >= endTime)
+ return ReturnCode.SKIP;
+ if (includedVersions < minVersions || version > startTime) {
+ includedVersions++;
+ return ReturnCode.INCLUDE;
+ }
+ return ReturnCode.NEXT_COL;
+ }
- @Override
- public void readFields(DataInput in) throws IOException {
- this.startTime = in.readLong();
- this.endTime = in.readLong();
- this.minVersions = in.readInt();
- init();
- }
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.startTime = in.readLong();
+ this.endTime = in.readLong();
+ this.minVersions = in.readInt();
+ init();
+ }
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeLong(this.startTime);
- out.writeLong(this.endTime);
- out.writeInt(this.minVersions);
- }
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeLong(this.startTime);
+ out.writeLong(this.endTime);
+ out.writeInt(this.minVersions);
+ }
}
View
2 src/main/java/com/yahoo/omid/client/ReincarnationCompleteCallback.java
@@ -17,5 +17,5 @@
package com.yahoo.omid.client;
public interface ReincarnationCompleteCallback extends Callback {
- public void complete();
+ public void complete();
}
View
54 src/main/java/com/yahoo/omid/client/SyncCommitCallback.java
@@ -20,32 +20,32 @@
import com.yahoo.omid.tso.RowKey;
public class SyncCommitCallback extends SyncCallbackBase implements CommitCallback {
- private Result result;
- private long commitTimestamp;
- private RowKey[] wwRows;
-
- public Result getResult() {
- return result;
- }
-
- public long getCommitTimestamp() {
- return commitTimestamp;
- }
-
- public boolean isElder() {
- return wwRows != null && wwRows.length != 0;
- }
-
- public RowKey[] getWWRows() {
- return wwRows;
- }
-
- synchronized
- public void complete(Result res, long commitTimestamp, RowKey[] wwRows) {
- this.result = res;
- this.commitTimestamp = commitTimestamp;
- this.wwRows = wwRows;
- countDown();
- }
+ private Result result;
+ private long commitTimestamp;
+ private RowKey[] wwRows;
+
+ public Result getResult() {
+ return result;
+ }
+
+ public long getCommitTimestamp() {
+ return commitTimestamp;
+ }
+
+ public boolean isElder() {
+ return wwRows != null && wwRows.length != 0;
+ }
+
+ public RowKey[] getWWRows() {
+ return wwRows;
+ }
+
+ synchronized
+ public void complete(Result res, long commitTimestamp, RowKey[] wwRows) {
+ this.result = res;
+ this.commitTimestamp = commitTimestamp;
+ this.wwRows = wwRows;
+ countDown();
+ }
}
View
10 src/main/java/com/yahoo/omid/client/SyncReincarnationCompleteCallback.java
@@ -17,10 +17,10 @@
package com.yahoo.omid.client;
public class SyncReincarnationCompleteCallback extends SyncCallbackBase
- implements ReincarnationCompleteCallback {
+ implements ReincarnationCompleteCallback {
- synchronized
- public void complete() {
- countDown();
- }
+ synchronized
+ public void complete() {
+ countDown();
+ }
}
View
1,274 src/main/java/com/yahoo/omid/client/TSOClient.java
@@ -72,659 +72,657 @@
import com.yahoo.omid.Statistics;
public class TSOClient extends SimpleChannelHandler {
- private static final Log LOG = LogFactory.getLog(TSOClient.class);
-
- public static long askedTSO = 0;
-
- public enum Result {
- OK, ABORTED
- };
-
- private Queue<CreateCallback> createCallbacks;
- private Map<Long, CommitCallback> commitCallbacks;
- private Map<Long, List<CommitQueryCallback>> isCommittedCallbacks;
-
- private Committed committed = new Committed();
- private Set<Long> aborted = Collections.synchronizedSet(new HashSet<Long>(1000));
- public Map<Long, Long> failedElders = Collections.synchronizedMap(new HashMap<Long, Long>(1000));
- private long largestDeletedTimestamp;
- //the txn id of the in-progress elder with lowest start timestamp
- private long eldest = 0;//by default, fetch all starting from 0
- private long connectionTimestamp = 0;
- private boolean hasConnectionTimestamp = false;
-
- private ChannelFactory factory;
- private ClientBootstrap bootstrap;
- private Channel channel;
- private InetSocketAddress addr;
- private int max_retries;
- private int retries;
- private int retry_delay_ms;
- private Timer retryTimer;
-
- private enum State {
- DISCONNECTED, CONNECTING, CONNECTED, RETRY_CONNECT_WAIT
- };
-
- private interface Op {
- public void execute(Channel channel);
-
- public void error(Exception e);
- }
-
- private class AbortOp implements Op {
- long transactionId;
-
- AbortOp(long transactionid) throws IOException {
- this.transactionId = transactionid;
- }
-
- public void execute(Channel channel) {
- try {
- synchronized (commitCallbacks) {
- if (commitCallbacks.containsKey(transactionId)) {
- throw new IOException("Already committing transaction " + transactionId);
- }
+ private static final Log LOG = LogFactory.getLog(TSOClient.class);
+
+ public static long askedTSO = 0;
+
+ public enum Result {
+ OK, ABORTED
+ };
+
+ private Queue<CreateCallback> createCallbacks;
+ private Map<Long, CommitCallback> commitCallbacks;
+ private Map<Long, List<CommitQueryCallback>> isCommittedCallbacks;
+
+ private Committed committed = new Committed();
+ private Set<Long> aborted = Collections.synchronizedSet(new HashSet<Long>(1000));
+ public Map<Long, Long> failedElders = Collections.synchronizedMap(new HashMap<Long, Long>(1000));
+ private long largestDeletedTimestamp;
+ //the txn id of the in-progress elder with lowest start timestamp
+ private long eldest = 0;//by default, fetch all starting from 0
+ private long connectionTimestamp = 0;
+ private boolean hasConnectionTimestamp = false;
+
+ private ChannelFactory factory;
+ private ClientBootstrap bootstrap;
+ private Channel channel;
+ private InetSocketAddress addr;
+ private int max_retries;
+ private int retries;
+ private int retry_delay_ms;
+ private Timer retryTimer;
+
+ private enum State {
+ DISCONNECTED, CONNECTING, CONNECTED, RETRY_CONNECT_WAIT
+ };
+
+ private interface Op {
+ public void execute(Channel channel);
+
+ public void error(Exception e);
+ }
+
+ private class AbortOp implements Op {
+ long transactionId;
+
+ AbortOp(long transactionid) throws IOException {
+ this.transactionId = transactionid;
+ }
+
+ public void execute(Channel channel) {
+ try {
+ synchronized (commitCallbacks) {
+ if (commitCallbacks.containsKey(transactionId)) {
+ throw new IOException("Already committing transaction " + transactionId);
+ }
+ }
+
+ AbortRequest ar = new AbortRequest();
+ ar.startTimestamp = transactionId;
+ ChannelFuture f = channel.write(ar);
+ f.addListener(new ChannelFutureListener() {
+ public void operationComplete(ChannelFuture future) {
+ if (!future.isSuccess()) {
+ error(new IOException("Error writing to socket"));
+ }
+ }
+ });
+ } catch (Exception e) {
+ error(e);
+ }
+ }
+
+ public void error(Exception e) {
+ }
+ }
+
+ private class NewTimestampOp implements Op {
+ private CreateCallback cb;
+
+ NewTimestampOp(CreateCallback cb) {
+ this.cb = cb;
+ }
+
+ public void execute(Channel channel) {
+ try {
+ synchronized(createCallbacks) {
+ createCallbacks.add(cb);
+ }
+
+ TimestampRequest tr = new TimestampRequest();
+ ChannelFuture f = channel.write(tr);
+ f.addListener(new ChannelFutureListener() {
+ public void operationComplete(ChannelFuture future) {
+ if (!future.isSuccess()) {
+ error(new IOException("Error writing to socket"));
+ }
+ }
+ });
+ } catch (Exception e) {
+ error(e);
}
+ }
- AbortRequest ar = new AbortRequest();
- ar.startTimestamp = transactionId;
- ChannelFuture f = channel.write(ar);
- f.addListener(new ChannelFutureListener() {
- public void operationComplete(ChannelFuture future) {
- if (!future.isSuccess()) {
- error(new IOException("Error writing to socket"));
- }
- }
- });
- } catch (Exception e) {
- error(e);
- }
- }
-
- public void error(Exception e) {
- }
- }
-
- private class NewTimestampOp implements Op {
- private CreateCallback cb;
-
- NewTimestampOp(CreateCallback cb) {
- this.cb = cb;
- }
-
- public void execute(Channel channel) {
- try {
+ public void error(Exception e) {
synchronized(createCallbacks) {
- createCallbacks.add(cb);
+ createCallbacks.remove();
+ }
+
+ cb.error(e);
+ }
+ }
+
+ private class CommitQueryOp implements Op {
+ long startTimestamp;
+ long pendingWriteTimestamp;
+ CommitQueryCallback cb;
+
+ CommitQueryOp(long startTimestamp, long pendingWriteTimestamp, CommitQueryCallback cb) {
+ this.startTimestamp = startTimestamp;
+ this.pendingWriteTimestamp = pendingWriteTimestamp;
+ this.cb = cb;
+ }
+
+ public void execute(Channel channel) {
+ try {
+ synchronized(isCommittedCallbacks) {
+ List<CommitQueryCallback> callbacks = isCommittedCallbacks.get(startTimestamp);
+ if (callbacks == null) {
+ callbacks = new ArrayList<CommitQueryCallback>(1);
+ }
+ callbacks.add(cb);
+ isCommittedCallbacks.put(startTimestamp, callbacks);
+ }
+
+ CommitQueryRequest qr = new CommitQueryRequest(startTimestamp,
+ pendingWriteTimestamp);
+ ChannelFuture f = channel.write(qr);
+ f.addListener(new ChannelFutureListener() {
+ public void operationComplete(ChannelFuture future) {
+ if (!future.isSuccess()) {
+ error(new IOException("Error writing to socket"));
+ }
+ }
+ });
+ } catch (Exception e) {
+ error(e);
}
+ }
- TimestampRequest tr = new TimestampRequest();
- ChannelFuture f = channel.write(tr);
- f.addListener(new ChannelFutureListener() {
- public void operationComplete(ChannelFuture future) {
- if (!future.isSuccess()) {
- error(new IOException("Error writing to socket"));
- }
- }
- });
- } catch (Exception e) {
- error(e);
- }
- }
-
- public void error(Exception e) {
- synchronized(createCallbacks) {
- createCallbacks.remove();
- }
-
- cb.error(e);
- }
- }
-
- private class CommitQueryOp implements Op {
- long startTimestamp;
- long pendingWriteTimestamp;
- CommitQueryCallback cb;
-
- CommitQueryOp(long startTimestamp, long pendingWriteTimestamp, CommitQueryCallback cb) {
- this.startTimestamp = startTimestamp;
- this.pendingWriteTimestamp = pendingWriteTimestamp;
- this.cb = cb;
- }
-
- public void execute(Channel channel) {
- try {
+ public void error(Exception e) {
synchronized(isCommittedCallbacks) {
- List<CommitQueryCallback> callbacks = isCommittedCallbacks.get(startTimestamp);
- if (callbacks == null) {
- callbacks = new ArrayList<CommitQueryCallback>(1);
- }
- callbacks.add(cb);
- isCommittedCallbacks.put(startTimestamp, callbacks);
+ isCommittedCallbacks.remove(startTimestamp);
+ }
+
+ cb.error(e);
+ }
+ }
+
+ private class CommitOp implements Op {
+ long transactionId;
+ RowKey[] writtenRows;
+ RowKey[] readRows;
+ CommitCallback cb;
+
+ CommitOp(long transactionid, RowKey[] writtenRows, RowKey[] readRows, CommitCallback cb) throws IOException {
+ this.transactionId = transactionid;
+ this.writtenRows = writtenRows;
+ this.readRows = readRows;
+ this.cb = cb;
+ }
+
+ public void execute(Channel channel) {
+ try {
+ synchronized(commitCallbacks) {
+ if (commitCallbacks.containsKey(transactionId)) {
+ throw new IOException("Already committing transaction " + transactionId);
+ }
+ commitCallbacks.put(transactionId, cb);
+ }
+
+ CommitRequest cr = new CommitRequest();
+ cr.startTimestamp = transactionId;
+ cr.writtenRows = writtenRows;
+ cr.readRows = readRows;
+ ChannelFuture f = channel.write(cr);
+ f.addListener(new ChannelFutureListener() {
+ public void operationComplete(ChannelFuture future) {
+ if (!future.isSuccess()) {
+ error(new IOException("Error writing to socket"));
+ }
+ }
+ });
+ } catch (Exception e) {
+ error(e);
}
-
- CommitQueryRequest qr = new CommitQueryRequest(startTimestamp,
- pendingWriteTimestamp);
- ChannelFuture f = channel.write(qr);
- f.addListener(new ChannelFutureListener() {
- public void operationComplete(ChannelFuture future) {
- if (!future.isSuccess()) {
- error(new IOException("Error writing to socket"));
- }
- }
- });
- } catch (Exception e) {
- error(e);
- }
- }
-
- public void error(Exception e) {
- synchronized(isCommittedCallbacks) {
- isCommittedCallbacks.remove(startTimestamp);
- }
-
- cb.error(e);
- }
- }
-
- private class CommitOp implements Op {
- long transactionId;
- RowKey[] writtenRows;
- RowKey[] readRows;
- CommitCallback cb;
-
- CommitOp(long transactionid, RowKey[] writtenRows, RowKey[] readRows, CommitCallback cb) throws IOException {
- this.transactionId = transactionid;
- this.writtenRows = writtenRows;
- this.readRows = readRows;
- this.cb = cb;
- }
-
- public void execute(Channel channel) {
- try {
+ }
+
+ public void error(Exception e) {
synchronized(commitCallbacks) {
- if (commitCallbacks.containsKey(transactionId)) {
- throw new IOException("Already committing transaction " + transactionId);
- }
- commitCallbacks.put(transactionId, cb);
+ commitCallbacks.remove(transactionId);
}
-
- CommitRequest cr = new CommitRequest();
- cr.startTimestamp = transactionId;
- cr.writtenRows = writtenRows;
- cr.readRows = readRows;
- ChannelFuture f = channel.write(cr);
- f.addListener(new ChannelFutureListener() {
- public void operationComplete(ChannelFuture future) {
- if (!future.isSuccess()) {
- error(new IOException("Error writing to socket"));
- }
- }
- });
- } catch (Exception e) {
- error(e);
- }
- }
-
- public void error(Exception e) {
- synchronized(commitCallbacks) {
- commitCallbacks.remove(transactionId);
- }
- cb.error(e);
- }
- }
-
- private class AbortCompleteOp implements Op {
- long transactionId;
- AbortCompleteCallback cb;
-
- AbortCompleteOp(long transactionId, AbortCompleteCallback cb) throws IOException {
- this.transactionId = transactionId;
- this.cb = cb;
- }
-
- public void execute(Channel channel) {
- try {
- FullAbortReport far = new FullAbortReport();
- far.startTimestamp = transactionId;
-
- ChannelFuture f = channel.write(far);
- f.addListener(new ChannelFutureListener() {
- public void operationComplete(ChannelFuture future) {
- if (!future.isSuccess()) {
- error(new IOException("Error writing to socket"));
- } else {
- cb.complete();
- }
- }
- });
- } catch (Exception e) {
- error(e);
- }
-
- }
-
- public void error(Exception e) {
- cb.error(e);
- }
- }
-
- private class ReincarnationCompleteOp implements Op {
- long transactionId;
- ReincarnationCompleteCallback cb;
-
- ReincarnationCompleteOp(long transactionId, ReincarnationCompleteCallback cb) throws IOException {
- this.transactionId = transactionId;
- this.cb = cb;
- }
-
- public void execute(Channel channel) {
- try {
- ReincarnationReport rr = new ReincarnationReport();
- rr.startTimestamp = transactionId;
-
- ChannelFuture f = channel.write(rr);
- f.addListener(new ChannelFutureListener() {
- public void operationComplete(ChannelFuture future) {
- if (!future.isSuccess()) {
- error(new IOException("Error writing to socket"));
- } else {
- cb.complete();
- }
- }
- });
- } catch (Exception e) {
- error(e);
- }
-
- }
-
- public void error(Exception e) {
- cb.error(e);
- }
- }
-
- private ArrayBlockingQueue<Op> queuedOps;
-
- private State state;
-
- public final long getEldest() {
- return eldest;
- }
- public TSOClient(Configuration conf) throws IOException {
- state = State.DISCONNECTED;
- queuedOps = new ArrayBlockingQueue<Op>(200);
- retryTimer = new Timer(true);
-
- commitCallbacks = Collections.synchronizedMap(new HashMap<Long, CommitCallback>());
- isCommittedCallbacks = Collections.synchronizedMap(new HashMap<Long, List<CommitQueryCallback>>());
- createCallbacks = new ConcurrentLinkedQueue<CreateCallback>();
- channel = null;
-
- System.out.println("Starting TSOClient");
-
- // Start client with Nb of active threads = 3 as maximum.
- factory = new NioClientSocketChannelFactory(Executors
- .newCachedThreadPool(), Executors.newCachedThreadPool(), 3);
- // Create the bootstrap
- bootstrap = new ClientBootstrap(factory);
-
- int executorThreads = conf.getInt("tso.executor.threads", 3);
-
- bootstrap.getPipeline().addLast("executor", new ExecutionHandler(
- new OrderedMemoryAwareThreadPoolExecutor(executorThreads, 1024*1024, 4*1024*1024)));
- bootstrap.getPipeline().addLast("handler", this);
- bootstrap.setOption("tcpNoDelay", false);
- bootstrap.setOption("keepAlive", true);
- bootstrap.setOption("reuseAddress", true);
- bootstrap.setOption("connectTimeoutMillis", 100);
-
- String host = conf.get("tso.host");
- int port = conf.getInt("tso.port", 1234);
- max_retries = conf.getInt("tso.max_retries", 10);
- retry_delay_ms = conf.getInt("tso.retry_delay_ms", 3000);
-
- if (host == null) {
- throw new IOException("tso.host missing from configuration");
- }
-
- addr = new InetSocketAddress(host, port);
- connectIfNeeded();
- }
-
- private State connectIfNeeded() throws IOException {
- synchronized (state) {
- if (state == State.CONNECTED || state == State.CONNECTING) {
- return state;
- }
- if (state == State.RETRY_CONNECT_WAIT) {
- return State.CONNECTING;
- }
-
- if (retries > max_retries) {
- IOException e = new IOException("Max connection retries exceeded");
- bailout(e);
- throw e;
- }
- retries++;
- bootstrap.connect(addr);
- state = State.CONNECTING;
- return state;
- }
- }
-
- private void withConnection(Op op) throws IOException {
- State state = connectIfNeeded();
-
- if (state == State.CONNECTING) {
- try {
- queuedOps.put(op);
- } catch (InterruptedException e) {
- throw new IOException("Couldn't add new operation", e);
- }
- } else if (state == State.CONNECTED) {
- op.execute(channel);
- } else {
- throw new IOException("Invalid connection state " + state);
- }
- }
-
- public void getNewTimestamp(CreateCallback cb) throws IOException {
- withConnection(new NewTimestampOp(cb));
- }
-
- public void isCommitted(long startTimestamp, long pendingWriteTimestamp, CommitQueryCallback cb)
- throws IOException {
- withConnection(new CommitQueryOp(startTimestamp, pendingWriteTimestamp, cb));
- }
-
- public void abort(long transactionId) throws IOException {
- withConnection(new AbortOp(transactionId));
- }
-
- private static RowKey[] EMPTY_ROWS = new RowKey[0];
- public void commit(long transactionId, RowKey[] writtenRows, RowKey[] readRows, CommitCallback cb) throws IOException {
- if (writtenRows.length == 0) {
- readRows = EMPTY_ROWS;
- }
- withConnection(new CommitOp(transactionId, writtenRows, readRows, cb));
- }
-
- public void completeAbort(long transactionId, AbortCompleteCallback cb) throws IOException {
- withConnection(new AbortCompleteOp(transactionId, cb));
- }
-
- //call this function after the reincarnation is complete
- //it sends a report to the tso
- public void completeReincarnation(long transactionId, ReincarnationCompleteCallback cb) throws IOException {
- withConnection(new ReincarnationCompleteOp(transactionId, cb));
- }
-
- @Override
- synchronized
- public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) {
- e.getChannel().getPipeline().addFirst("decoder", new TSODecoder());
- e.getChannel().getPipeline().addAfter("decoder", "encoder",
- new TSOEncoder());
- }
-
- /**
- * Starts the traffic
- */
- @Override
- synchronized
- public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
- synchronized (state) {
- channel = e.getChannel();
- state = State.CONNECTED;
- retries = 0;
- }
- clearState();
- Op o = queuedOps.poll();;
- while (o != null && state == State.CONNECTED) {
- o.execute(channel);
- o = queuedOps.poll();
- }
- }
-
- private void clearState() {
- committed = new Committed();
- aborted.clear();
- largestDeletedTimestamp = 0;
- connectionTimestamp = 0;
- hasConnectionTimestamp = false;
- }
-
- @Override
- synchronized
- public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e)
- throws Exception {
- synchronized(state) {
- channel = null;
- state = State.DISCONNECTED;
- }
- }
-
- //Added by Maysam Yabandeh
- //In the new implementation, I need direct access to commit timestamp and the logic for deciding the committed version is more complex. Therefero, this function replaces validRead.
- public long commitTimestamp(long transaction, long startTimestamp) throws IOException {
- if (aborted.contains(transaction))
- return -2;//invalid read
- long commitTimestamp = committed.getCommit(transaction);
- if (commitTimestamp != -1 && commitTimestamp > startTimestamp)
- return -2;//invalid read
- if (commitTimestamp != -1 && commitTimestamp <= startTimestamp)
- return commitTimestamp;
-
- if (hasConnectionTimestamp && transaction > connectionTimestamp)
- return transaction <= largestDeletedTimestamp ? -1 : -2;
- //TODO: it works only if it runs one transaction at a time
- if (transaction <= largestDeletedTimestamp)
- return -1;//committed but the tc is lost
-
- Statistics.partialReport(Statistics.Tag.ASKTSO, 1);
- askedTSO++;
- SyncCommitQueryCallback cb = new SyncCommitQueryCallback();
- isCommitted(startTimestamp, transaction, cb);
- try {
- cb.await();
- } catch (InterruptedException e) {
- throw new IOException("Commit query didn't complete", e);
- }
- if (!cb.isAClearAnswer())
- //TODO: throw a proper exception
- throw new IOException("Either abort or retry the transaction");
- return cb.isCommitted() ? cb.commitTimestamp() : -2;
- }
-
- public boolean validRead(long transaction, long startTimestamp) throws IOException {
- if (transaction == startTimestamp)
- return true;
- if (aborted.contains(transaction))
- return false;
- long commitTimestamp = committed.getCommit(transaction);
- if (commitTimestamp != -1)
- return commitTimestamp <= startTimestamp;
- if (hasConnectionTimestamp && transaction > connectionTimestamp)
- return transaction <= largestDeletedTimestamp;
- if (transaction <= largestDeletedTimestamp)
- return true;
-// System.out.format("Asking TSO... hasConnectionTimestamp: %s connectionTimestamp: %d transaction: %d startTimestamp: %d\n",
-// Boolean.valueOf(hasConnectionTimestamp).toString(), connectionTimestamp, transaction, startTimestamp);
- askedTSO++;
- SyncCommitQueryCallback cb = new SyncCommitQueryCallback();
- isCommitted(startTimestamp, transaction, cb);
- try {
- cb.await();
- } catch (InterruptedException e) {
- throw new IOException("Commit query didn't complete", e);
- }
- if (!cb.isAClearAnswer())
- //TODO: throw a proper exception
- throw new IOException("Either abort or retry the transaction");
- return cb.isCommitted();
- }
-
- /**
- * When a message is received, handle it based on its type
- */
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("messageReceived " + e.getMessage());
- }
- Object msg = e.getMessage();
- if (msg instanceof CommitResponse) {
- CommitResponse r = (CommitResponse)msg;
- CommitCallback cb = null;
- synchronized (commitCallbacks) {
- cb = commitCallbacks.remove(r.startTimestamp);
- }
- if (cb == null) {
- LOG.error("Received a commit response for a nonexisting commit");
- return;
- }
- cb.complete(r.committed ? Result.OK : Result.ABORTED, r.commitTimestamp, r.wwRows);
- } else if (msg instanceof TimestampResponse) {
- CreateCallback cb = createCallbacks.poll();
- long timestamp = ((TimestampResponse)msg).timestamp;
- if (!hasConnectionTimestamp || timestamp < connectionTimestamp) {
- hasConnectionTimestamp = true;
- connectionTimestamp = timestamp;
- }
- if (cb == null) {
- LOG.error("Receiving a timestamp response, but none requested: " + timestamp);
- return;
- }
- cb.complete(timestamp);
- } else if (msg instanceof CommitQueryResponse) {
- CommitQueryResponse r = (CommitQueryResponse)msg;
- if (r.commitTimestamp != 0) {
- committed.commit(r.queryTimestamp, r.commitTimestamp);
- } else if (r.committed) {
- committed.commit(r.queryTimestamp, largestDeletedTimestamp);
- }
- List<CommitQueryCallback> cbs = null;
- synchronized (isCommittedCallbacks) {
- cbs = isCommittedCallbacks.remove(r.startTimestamp);
- }
- if (cbs == null) {
- LOG.error("Received a commit query response for a nonexisting request");
- return;
- }
- for (CommitQueryCallback cb : cbs) {
- cb.complete(r.committed, r.commitTimestamp, r.retry);
- }
- } else if (msg instanceof CommittedTransactionReport) {
- CommittedTransactionReport ctr = (CommittedTransactionReport) msg;
- committed.commit(ctr.startTimestamp, ctr.commitTimestamp);
- //Added by Maysam Yabandeh
- //Always add (Tc, Tc) as well since some transactions might be elders and reinsert their written data
- committed.commit(ctr.commitTimestamp, ctr.commitTimestamp);
- } else if (msg instanceof FullAbortReport) {
- FullAbortReport r = (FullAbortReport) msg;
- aborted.remove(r.startTimestamp);
- } else if (msg instanceof FailedElderReport) {
- FailedElderReport r = (FailedElderReport) msg;
- failedElders.put(r.startTimestamp, r.commitTimestamp);
- LOG.warn("Client: " + r);
- } else if (msg instanceof EldestUpdate) {
- EldestUpdate r = (EldestUpdate) msg;
- eldest = r.startTimestamp;
- //LOG.warn("Client: " + r);
- } else if (msg instanceof ReincarnationReport) {
- ReincarnationReport r = (ReincarnationReport) msg;
- Long Tc = failedElders.remove(r.startTimestamp);
- boolean res = Tc != null;
- LOG.warn("Client: " + res + " " + r);
- } else if (msg instanceof AbortedTransactionReport) {
- AbortedTransactionReport r = (AbortedTransactionReport) msg;
- aborted.add(r.startTimestamp);
- } else if (msg instanceof LargestDeletedTimestampReport) {
- LargestDeletedTimestampReport r = (LargestDeletedTimestampReport) msg;
- largestDeletedTimestamp = r.largestDeletedTimestamp;
- committed.raiseLargestDeletedTransaction(r.largestDeletedTimestamp);
- } else {
- LOG.error("Unknown message received " + msg);
- }
- processMessage((TSOMessage) msg);
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx,
- ExceptionEvent e)
- throws Exception {
- System.out.println("Unexpected exception " + e.getCause());
- e.getCause().printStackTrace();
-
- synchronized(state) {
-
- if (state == State.CONNECTING) {
- state = State.RETRY_CONNECT_WAIT;
- if (LOG.isTraceEnabled()) {
- LOG.trace("Retrying connect in " + retry_delay_ms + "ms " + retries);
- }
+ cb.error(e);
+ }
+ }
+
+ private class AbortCompleteOp implements Op {
+ long transactionId;
+ AbortCompleteCallback cb;
+
+ AbortCompleteOp(long transactionId, AbortCompleteCallback cb) throws IOException {
+ this.transactionId = transactionId;
+ this.cb = cb;
+ }
+
+ public void execute(Channel channel) {
try {
- retryTimer.schedule(new TimerTask() {
- public void run() {
- synchronized (state) {
- state = State.DISCONNECTED;
- try {
- connectIfNeeded();
- } catch (IOException e) {
- bailout(e);
- }
+ FullAbortReport far = new FullAbortReport();
+ far.startTimestamp = transactionId;
+
+ ChannelFuture f = channel.write(far);
+ f.addListener(new ChannelFutureListener() {
+ public void operationComplete(ChannelFuture future) {
+ if (!future.isSuccess()) {
+ error(new IOException("Error writing to socket"));
+ } else {
+ cb.complete();
}
- }
- }, retry_delay_ms);
- } catch (Exception cause) {
- bailout(cause);
+ }
+ });
+ } catch (Exception e) {
+ error(e);
}
- } else {
- LOG.error("Exception on channel", e.getCause());
- }
- }
- }
-
- public void bailout(Exception cause) {
- synchronized (state) {
- state = State.DISCONNECTED;
- }
- LOG.error("Unrecoverable error in client, bailing out", cause);
- Exception e = new IOException("Unrecoverable error", cause);
- Op o = queuedOps.poll();;
- while (o != null) {
- o.error(e);
- o = queuedOps.poll();
- }
- synchronized (createCallbacks) {
- for (CreateCallback cb : createCallbacks) {
+
+ }
+
+ public void error(Exception e) {
cb.error(e);
- }
- createCallbacks.clear();
- }
+ }
+ }
+
+ private class ReincarnationCompleteOp implements Op {
+ long transactionId;
+ ReincarnationCompleteCallback cb;
+
+ ReincarnationCompleteOp(long transactionId, ReincarnationCompleteCallback cb) throws IOException {
+ this.transactionId = transactionId;
+ this.cb = cb;
+ }
+
+ public void execute(Channel channel) {
+ try {
+ ReincarnationReport rr = new ReincarnationReport();
+ rr.startTimestamp = transactionId;
+
+ ChannelFuture f = channel.write(rr);
+ f.addListener(new ChannelFutureListener() {
+ public void operationComplete(ChannelFuture future) {
+ if (!future.isSuccess()) {
+ error(new IOException("Error writing to socket"));
+ } else {
+ cb.complete();
+ }
+ }
+ });
+ } catch (Exception e) {
+ error(e);
+ }
+
+ }
- synchronized(commitCallbacks) {
- for (CommitCallback cb : commitCallbacks.values()) {
+ public void error(Exception e) {
cb.error(e);
- }
- commitCallbacks.clear();
- }
-
- synchronized(isCommittedCallbacks) {
- for (List<CommitQueryCallback> cbs : isCommittedCallbacks.values()) {
- for (CommitQueryCallback cb : cbs) {
- cb.error(e);
+ }
+ }
+
+ private ArrayBlockingQueue<Op> queuedOps;
+
+ private State state;
+
+ public final long getEldest() {
+ return eldest;
+ }
+ public TSOClient(Configuration conf) throws IOException {
+ state = State.DISCONNECTED;
+ queuedOps = new ArrayBlockingQueue<Op>(200);
+ retryTimer = new Timer(true);
+
+ commitCallbacks = Collections.synchronizedMap(new HashMap<Long, CommitCallback>());
+ isCommittedCallbacks = Collections.synchronizedMap(new HashMap<Long, List<CommitQueryCallback>>());
+ createCallbacks = new ConcurrentLinkedQueue<CreateCallback>();
+ channel = null;
+
+ System.out.println("Starting TSOClient");
+
+ // Start client with Nb of active threads = 3 as maximum.
+ factory = new NioClientSocketChannelFactory(Executors
+ .newCachedThreadPool(), Executors.newCachedThreadPool(), 3);
+ // Create the bootstrap
+ bootstrap = new ClientBootstrap(factory);
+
+ int executorThreads = conf.getInt("tso.executor.threads", 3);
+
+ bootstrap.getPipeline().addLast("executor", new ExecutionHandler(
+ new OrderedMemoryAwareThreadPoolExecutor(executorThreads, 1024*1024, 4*1024*1024)));
+ bootstrap.getPipeline().addLast("handler", this);
+ bootstrap.setOption("tcpNoDelay", false);
+ bootstrap.setOption("keepAlive", true);
+ bootstrap.setOption("reuseAddress", true);
+ bootstrap.setOption("connectTimeoutMillis", 100);
+
+ String host = conf.get("tso.host");
+ int port = conf.getInt("tso.port", 1234);
+ max_retries = conf.getInt("tso.max_retries", 10);
+ retry_delay_ms = conf.getInt("tso.retry_delay_ms", 3000);
+
+ if (host == null) {
+ throw new IOException("tso.host missing from configuration");
+ }
+
+ addr = new InetSocketAddress(host, port);
+ connectIfNeeded();
+ }
+
+ private State connectIfNeeded() throws IOException {
+ synchronized (state) {
+ if (state == State.CONNECTED || state == State.CONNECTING) {
+ return state;
+ }
+ if (state == State.RETRY_CONNECT_WAIT) {
+ return State.CONNECTING;
+ }
+
+ if (retries > max_retries) {
+ IOException e = new IOException("Max connection retries exceeded");
+ bailout(e);
+ throw e;
+ }
+ retries++;
+ bootstrap.connect(addr);
+ state = State.CONNECTING;
+ return state;
+ }
+ }
+
+ private void withConnection(Op op) throws IOException {
+ State state = connectIfNeeded();
+
+ if (state == State.CONNECTING) {
+ try {
+ queuedOps.put(op);
+ } catch (InterruptedException e) {
+ throw new IOException("Couldn't add new operation", e);
+ }
+ } else if (state == State.CONNECTED) {
+ op.execute(channel);
+ } else {
+ throw new IOException("Invalid connection state " + state);
+ }
+ }
+
+ public void getNewTimestamp(CreateCallback cb) throws IOException {
+ withConnection(new NewTimestampOp(cb));
+ }
+
+ public void isCommitted(long startTimestamp, long pendingWriteTimestamp, CommitQueryCallback cb)
+ throws IOException {
+ withConnection(new CommitQueryOp(startTimestamp, pendingWriteTimestamp, cb));
+ }
+
+ public void abort(long transactionId) throws IOException {
+ withConnection(new AbortOp(transactionId));
+ }
+
+ private static RowKey[] EMPTY_ROWS = new RowKey[0];
+ public void commit(long transactionId, RowKey[] writtenRows, RowKey[] readRows, CommitCallback cb) throws IOException {
+ if (writtenRows.length == 0) {
+ readRows = EMPTY_ROWS;
+ }
+ withConnection(new CommitOp(transactionId, writtenRows, readRows, cb));
+ }
+
+ public void completeAbort(long transactionId, AbortCompleteCallback cb) throws IOException {
+ withConnection(new AbortCompleteOp(transactionId, cb));
+ }
+
+ //call this function after the reincarnation is complete
+ //it sends a report to the tso
+ public void completeReincarnation(long transactionId, ReincarnationCompleteCallback cb) throws IOException {
+ withConnection(new ReincarnationCompleteOp(transactionId, cb));
+ }
+
+ @Override
+ synchronized
+ public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) {
+ e.getChannel().getPipeline().addFirst("decoder", new TSODecoder());
+ e.getChannel().getPipeline().addAfter("decoder", "encoder",
+ new TSOEncoder());
+ }
+
+ /**
+ * Starts the traffic
+ */
+ @Override
+ synchronized
+ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
+ synchronized (state) {
+ channel = e.getChannel();
+ state = State.CONNECTED;
+ retries = 0;
+ }
+ clearState();
+ Op o = queuedOps.poll();;
+ while (o != null && state == State.CONNECTED) {
+ o.execute(channel);
+ o = queuedOps.poll();
+ }
+ }
+
+ private void clearState() {
+ committed = new Committed();
+ aborted.clear();
+ largestDeletedTimestamp = 0;
+ connectionTimestamp = 0;
+ hasConnectionTimestamp = false;
+ }
+
+ @Override
+ synchronized
+ public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e)
+ throws Exception {
+ synchronized(state) {
+ channel = null;
+ state = State.DISCONNECTED;
+ }
+ }
+
+ //In the new implementation, I need direct access to commit timestamp and the logic for deciding the committed version is more complex. Therefero, this function replaces validRead.
+ public long commitTimestamp(long transaction, long startTimestamp) throws IOException {
+ if (aborted.contains(transaction))
+ return -2;//invalid read
+ long commitTimestamp = committed.getCommit(transaction);
+ if (commitTimestamp != -1 && commitTimestamp > startTimestamp)
+ return -2;//invalid read
+ if (commitTimestamp != -1 && commitTimestamp <= startTimestamp)
+ return commitTimestamp;
+
+ if (hasConnectionTimestamp && transaction > connectionTimestamp)
+ return transaction <= largestDeletedTimestamp ? -1 : -2;
+ //TODO: it works only if it runs one transaction at a time
+ if (transaction <= largestDeletedTimestamp)
+ return -1;//committed but the tc is lost
+
+ Statistics.partialReport(Statistics.Tag.ASKTSO, 1);
+ askedTSO++;
+ SyncCommitQueryCallback cb = new SyncCommitQueryCallback();
+ isCommitted(startTimestamp, transaction, cb);
+ try {
+ cb.await();
+ } catch (InterruptedException e) {
+ throw new IOException("Commit query didn't complete", e);
+ }
+ if (!cb.isAClearAnswer())
+ //TODO: throw a proper exception
+ throw new IOException("Either abort or retry the transaction");
+ return cb.isCommitted() ? cb.commitTimestamp() : -2;
+ }
+
+ public boolean validRead(long transaction, long startTimestamp) throws IOException {
+ if (transaction == startTimestamp)
+ return true;
+ if (aborted.contains(transaction))
+ return false;
+ long commitTimestamp = committed.getCommit(transaction);
+ if (commitTimestamp != -1)
+ return commitTimestamp <= startTimestamp;
+ if (hasConnectionTimestamp && transaction > connectionTimestamp)
+ return transaction <= largestDeletedTimestamp;
+ if (transaction <= largestDeletedTimestamp)
+ return true;
+ // System.out.format("Asking TSO... hasConnectionTimestamp: %s connectionTimestamp: %d transaction: %d startTimestamp: %d\n",
+ // Boolean.valueOf(hasConnectionTimestamp).toString(), connectionTimestamp, transaction, startTimestamp);
+ askedTSO++;
+ SyncCommitQueryCallback cb = new SyncCommitQueryCallback();
+ isCommitted(startTimestamp, transaction, cb);
+ try {
+ cb.await();
+ } catch (InterruptedException e) {
+ throw new IOException("Commit query didn't complete", e);
+ }
+ if (!cb.isAClearAnswer())
+ //TODO: throw a proper exception
+ throw new IOException("Either abort or retry the transaction");
+ return cb.isCommitted();
+ }
+
+ /**
+ * When a message is received, handle it based on its type
+ */
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("messageReceived " + e.getMessage());
+ }
+ Object msg = e.getMessage();
+ if (msg instanceof CommitResponse) {
+ CommitResponse r = (CommitResponse)msg;
+ CommitCallback cb = null;
+ synchronized (commitCallbacks) {
+ cb = commitCallbacks.remove(r.startTimestamp);
+ }
+ if (cb == null) {
+ LOG.error("Received a commit response for a nonexisting commit");
+ return;
+ }
+ cb.complete(r.committed ? Result.OK : Result.ABORTED, r.commitTimestamp, r.wwRows);
+ } else if (msg instanceof TimestampResponse) {
+ CreateCallback cb = createCallbacks.poll();
+ long timestamp = ((TimestampResponse)msg).timestamp;
+ if (!hasConnectionTimestamp || timestamp < connectionTimestamp) {
+ hasConnectionTimestamp = true;
+ connectionTimestamp = timestamp;
+ }
+ if (cb == null) {
+ LOG.error("Receiving a timestamp response, but none requested: " + timestamp);
+ return;
+ }
+ cb.complete(timestamp);
+ } else if (msg instanceof CommitQueryResponse) {
+ CommitQueryResponse r = (CommitQueryResponse)msg;
+ if (r.commitTimestamp != 0) {
+ committed.commit(r.queryTimestamp, r.commitTimestamp);
+ } else if (r.committed) {
+ committed.commit(r.queryTimestamp, largestDeletedTimestamp);
+ }
+ List<CommitQueryCallback> cbs = null;
+ synchronized (isCommittedCallbacks) {
+ cbs = isCommittedCallbacks.remove(r.startTimestamp);
+ }
+ if (cbs == null) {
+ LOG.error("Received a commit query response for a nonexisting request");
+ return;
+ }
+ for (CommitQueryCallback cb : cbs) {
+ cb.complete(r.committed, r.commitTimestamp, r.retry);
+ }
+ } else if (msg instanceof CommittedTransactionReport) {
+ CommittedTransactionReport ctr = (CommittedTransactionReport) msg;
+ committed.commit(ctr.startTimestamp, ctr.commitTimestamp);
+ //Always add (Tc, Tc) as well since some transactions might be elders and reinsert their written data
+ committed.commit(ctr.commitTimestamp, ctr.commitTimestamp);
+ } else if (msg instanceof FullAbortReport) {
+ FullAbortReport r = (FullAbortReport) msg;
+ aborted.remove(r.startTimestamp);
+ } else if (msg instanceof FailedElderReport) {
+ FailedElderReport r = (FailedElderReport) msg;
+ failedElders.put(r.startTimestamp, r.commitTimestamp);
+ LOG.warn("Client: " + r);
+ } else if (msg instanceof EldestUpdate) {
+ EldestUpdate r = (EldestUpdate) msg;
+ eldest = r.startTimestamp;
+ //LOG.warn("Client: " + r);
+ } else if (msg instanceof ReincarnationReport) {
+ ReincarnationReport r = (ReincarnationReport) msg;
+ Long Tc = failedElders.remove(r.startTimestamp);
+ boolean res = Tc != null;
+ LOG.warn("Client: " + res + " " + r);
+ } else if (msg instanceof AbortedTransactionReport) {
+ AbortedTransactionReport r = (AbortedTransactionReport) msg;
+ aborted.add(r.startTimestamp);
+ } else if (msg instanceof LargestDeletedTimestampReport) {
+ LargestDeletedTimestampReport r = (LargestDeletedTimestampReport) msg;
+ largestDeletedTimestamp = r.largestDeletedTimestamp;
+ committed.raiseLargestDeletedTransaction(r.largestDeletedTimestamp);
+ } else {
+ LOG.error("Unknown message received " + msg);
+ }
+ processMessage((TSOMessage) msg);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx,
+ ExceptionEvent e)
+ throws Exception {
+ System.out.println("Unexpected exception " + e.getCause());
+ e.getCause().printStackTrace();
+
+ synchronized(state) {
+
+ if (state == State.CONNECTING) {
+ state = State.RETRY_CONNECT_WAIT;
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Retrying connect in " + retry_delay_ms + "ms " + retries);
+ }
+ try {
+ retryTimer.schedule(new TimerTask() {
+ public void run() {
+ synchronized (state) {
+ state = State.DISCONNECTED;
+ try {
+ connectIfNeeded();
+ } catch (IOException e) {
+ bailout(e);
+ }
+ }
+ }
+ }, retry_delay_ms);
+ } catch (Exception cause) {
+ bailout(cause);
+ }
+ } else {
+ LOG.error("Exception on channel", e.getCause());
+ }
+ }
+ }
+
+ public void bailout(Exception cause) {
+ synchronized (state) {
+ state = State.DISCONNECTED;
+ }
+ LOG.error("Unrecoverable error in client, bailing out", cause);
+ Exception e = new IOException("Unrecoverable error", cause);
+ Op o = queuedOps.poll();;
+ while (o != null) {
+ o.error(e);
+ o = queuedOps.poll();
+ }
+ synchronized (createCallbacks) {
+ for (CreateCallback cb : createCallbacks) {
+ cb.error(e);
}
- }
- isCommittedCallbacks.clear();
- }
- }
-
- protected void processMessage(TSOMessage msg) {
- // TODO Auto-generated method stub
-
- }
+ createCallbacks.clear();
+ }
+
+ synchronized(commitCallbacks) {
+ for (CommitCallback cb : commitCallbacks.values()) {
+ cb.error(e);
+ }
+ commitCallbacks.clear();
+ }
+
+ synchronized(isCommittedCallbacks) {
+ for (List<CommitQueryCallback> cbs : isCommittedCallbacks.values()) {
+ for (CommitQueryCallback cb : cbs) {
+ cb.error(e);
+ }
+ }
+ isCommittedCallbacks.clear();
+ }
+ }
+
+ protected void processMessage(TSOMessage msg) {
+ // TODO Auto-generated method stub
+
+ }
}
View
386 src/main/java/com/yahoo/omid/client/TransactionManager.java
@@ -41,200 +41,200 @@
*
*/
public class TransactionManager {
- private static final Log LOG = LogFactory.getLog(TSOClient.class);
-
- static TSOClient tsoclient = null;
- private static Object lock = new Object();
- private Configuration conf;
- private HashMap<byte[], HTable> tableCache;
-
- public TransactionManager(Configuration conf) throws TransactionException, IOException {
- this.conf = conf;
- synchronized (lock) {
- if (tsoclient == null) {
- tsoclient = new TSOClient(conf);
- }
- }
- tableCache = new HashMap<byte[], HTable>();
- }
-
- //a temorary solution to allow only one transaction at a time
- boolean aTransactionIsInProgress = false;
-
- /**
- * Starts a new transaction.
- *
- * This method returns an opaque {@link TransactionState} object, used by {@link TransactionalTable}'s methods
- * for performing operations on a given transaction.
- *
- * @return Opaque object which identifies one transaction.
- * @throws TransactionException
- */
- public TransactionState beginTransaction() throws TransactionException {
- //TODO: it does not pass the unit tests. To a complete fix
- //assert(aTransactionIsInProgress == false);
- SyncCreateCallback cb = new SyncCreateCallback();
- try {
- tsoclient.getNewTimestamp(cb);
- cb.await();
- } catch (Exception e) {
- throw new TransactionException("Could not get new timestamp", e);
- }
- if (cb.getException() != null) {
- throw new TransactionException("Error retrieving timestamp", cb.getException());
- }
-
- aTransactionIsInProgress=true;
- return new TransactionState(cb.getStartTimestamp(), tsoclient);
- }
-
- /**
- * Commits a transaction. If the transaction is aborted it automatically rollbacks the changes and
- * throws a {@link CommitUnsuccessfulException}.
- *
- * @param transactionState Object identifying the transaction to be committed.
- * @throws CommitUnsuccessfulException
- * @throws TransactionException
- */
- public void tryCommit(TransactionState transactionState)
- throws CommitUnsuccessfulException, TransactionException {
- aTransactionIsInProgress=false;
- Statistics.fullReport(Statistics.Tag.COMMIT, 1);
- if (LOG.isTraceEnabled()) {
- LOG.trace("tryCommit " + transactionState.getStartTimestamp());
- }
- SyncCommitCallback cb = new SyncCommitCallback();
- try {
- tsoclient.commit(transactionState.getStartTimestamp(),
- transactionState.getWrittenRows(),
- transactionState.getReadRows(),cb);
- cb.await();
- } catch (Exception e) {
- throw new TransactionException("Could not commit", e);
- }
- if (cb.getException() != null) {
- throw new TransactionException("Error committing", cb.getException());
- }
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("doneCommit " + transactionState.getStartTimestamp() +
- " TS_c: " + cb.getCommitTimestamp() +
- " Success: " + (cb.getResult() == TSOClient.Result.OK));
- }
-
- if (cb.getResult() == TSOClient.Result.ABORTED) {
- cleanup(transactionState);
- throw new CommitUnsuccessfulException();
- }
- transactionState.setCommitTimestamp(cb.getCommitTimestamp());
- if (cb.isElder()) {
- reincarnate(transactionState, cb.getWWRows());
- try {
- transactionState.tsoclient.completeReincarnation(transactionState.getStartTimestamp(), new SyncReincarnationCompleteCallback());
- } catch (IOException e) {
- LOG.error("Couldn't send reincarnation report", e);
- }
- }
- Statistics.println();
- }
-
- /**
- * Aborts a transaction and automatically rollbacks the changes.
- *
- * @param transactionState Object identifying the transaction to be committed.
- * @throws TransactionException
- */
- public void abort(TransactionState transactionState) throws TransactionException {
- aTransactionIsInProgress=false;
- if (LOG.isTraceEnabled()) {
- LOG.trace("abort " + transactionState.getStartTimestamp());
- }
- try {
- tsoclient.abort(transactionState.getStartTimestamp());
- } catch (Exception e) {
- throw new TransactionException("Could not abort", e);
- }
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("doneAbort " + transactionState.getStartTimestamp());
- }
-
- // Make sure its commit timestamp is 0, so the cleanup does the right job
- transactionState.setCommitTimestamp(0);
- cleanup(transactionState);
- }
-
- private void reincarnate(final TransactionState transactionState, RowKey[] wwRows)
- throws TransactionException {
- Statistics.fullReport(Statistics.Tag.REINCARNATION, 1);
- //System.out.println("I am reincarnating haha");
- Map<byte[], List<Put>> putBatches = new HashMap<byte[], List<Put>>();
- for (final RowKeyFamily rowkey : transactionState.getWrittenRows()) {
- //TODO: do it only for wwRows
- List<Put> batch = putBatches.get(rowkey.getTable());
- if (batch == null) {
- batch = new ArrayList<Put>();
- putBatches.put(rowkey.getTable(), batch);
- }
- Put put = new Put(rowkey.getRow(), transactionState.getCommitTimestamp());
- for (Entry<byte[], List<KeyValue>> entry : rowkey.getFamilies().entrySet())
- for (KeyValue kv : entry.getValue())
- try {
- put.add(new KeyValue(kv.getRow(), kv.getFamily(), kv.getQualifier(), transactionState.getCommitTimestamp(), kv.getValue()));
- } catch (IOException ioe) {
- throw new TransactionException("Could not add put operation in reincarnation " + entry.getKey(), ioe);
- }
- batch.add(put);
- }
- for (final Entry<byte[], List<Put>> entry : putBatches.entrySet()) {
- try {
- HTable table = tableCache.get(entry.getKey());
- if (table == null) {
- table = new HTable(conf, entry.getKey());
- tableCache.put(entry.getKey(), table);
+ private static final Log LOG = LogFactory.getLog(TSOClient.class);
+
+ static TSOClient tsoclient = null;
+ private static Object lock = new Object();
+ private Configuration conf;
+ private HashMap<byte[], HTable> tableCache;
+
+ public TransactionManager(Configuration conf) throws TransactionException, IOException {
+ this.conf = conf;
+ synchronized (lock) {
+ if (tsoclient == null) {
+ tsoclient = new TSOClient(conf);
}
- table.put(entry.getValue());
- } catch (IOException ioe) {
- throw new TransactionException("Could not reincarnate for table " + entry.getKey(), ioe);
- }
- }
- }
-
- private void cleanup(final TransactionState transactionState)
- throws TransactionException {
- Map<byte[], List<Delete>> deleteBatches = new HashMap<byte[], List<Delete>>();
- for (final RowKeyFamily rowkey : transactionState.getWrittenRows()) {
- List<Delete> batch = deleteBatches.get(rowkey.getTable());
- if (batch == null) {
- batch = new ArrayList<Delete>();
- deleteBatches.put(rowkey.getTable(), batch);
- }
- Delete delete = new Delete(rowkey.getRow());
- for (Entry<byte[], List<KeyValue>> entry : rowkey.getFamilies().entrySet()) {
- for (KeyValue kv : entry.getValue()) {
- delete.deleteColumn(entry.getKey(), kv.getQualifier(), transactionState.getStartTimestamp());
+ }
+ tableCache = new HashMap<byte[], HTable>();
+ }
+
+ //a temorary solution to allow only one transaction at a time
+ boolean aTransactionIsInProgress = false;
+
+ /**
+ * Starts a new transaction.
+ *
+ * This method returns an opaque {@link TransactionState} object, used by {@link TransactionalTable}'s methods
+ * for performing operations on a given transaction.
+ *
+ * @return Opaque object which identifies one transaction.
+ * @throws TransactionException
+ */
+ public TransactionState beginTransaction() throws TransactionException {
+ //TODO: it does not pass the unit tests. To a complete fix
+ //assert(aTransactionIsInProgress == false);
+ SyncCreateCallback cb = new SyncCreateCallback();
+ try {
+ tsoclient.getNewTimestamp(cb);
+ cb.await();
+ } catch (Exception e) {
+ throw new TransactionException("Could not get new timestamp", e);
+ }
+ if (cb.getException() != null) {
+ throw new TransactionException("Error retrieving timestamp", cb.getException());
+ }
+
+ aTransactionIsInProgress=true;
+ return new TransactionState(cb.getStartTimestamp(), tsoclient);
+ }
+
+ /**
+ * Commits a transaction. If the transaction is aborted it automatically rollbacks the changes and
+ * throws a {@link CommitUnsuccessfulException}.
+ *
+ * @param transactionState Object identifying the transaction to be committed.
+ * @throws CommitUnsuccessfulException
+ * @throws TransactionException
+ */
+ public void tryCommit(TransactionState transactionState)
+ throws CommitUnsuccessfulException, TransactionException {
+ aTransactionIsInProgress=false;
+ Statistics.fullReport(Statistics.Tag.COMMIT, 1);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("tryCommit " + transactionState.getStartTimestamp());
+ }
+ SyncCommitCallback cb = new SyncCommitCallback();
+ try {
+ tsoclient.commit(transactionState.getStartTimestamp(),
+ transactionState.getWrittenRows(),
+ transactionState.getReadRows(),cb);
+ cb.await();
+ } catch (Exception e) {
+ throw new TransactionException("Could not commit", e);
+ }
+ if (cb.getException() != null) {
+ throw new TransactionException("Error committing", cb.getException());
+ }
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("doneCommit " + transactionState.getStartTimestamp() +
+ " TS_c: " + cb.getCommitTimestamp() +
+ " Success: " + (cb.getResult() == TSOClient.Result.OK));
+ }
+
+ if (cb.getResult() == TSOClient.Result.ABORTED) {
+ cleanup(transactionState);
+ throw new CommitUnsuccessfulException();
+ }
+ transactionState.setCommitTimestamp(cb.getCommitTimestamp());
+ if (cb.isElder()) {
+ reincarnate(transactionState, cb.getWWRows());
+ try {
+ transactionState.tsoclient.completeReincarnation(transactionState.getStartTimestamp(), new SyncReincarnationCompleteCallback());
+ } catch (IOException e) {
+ LOG.error("Couldn't send reincarnation report", e);
}
- }
- batch.add(delete);
- }
- for (final Entry<byte[], List<Delete>> entry : deleteBatches.entrySet()) {
- try {
- HTable table = tableCache.get(entry.getKey());
- if (table == null) {
- table = new HTable(conf, entry.getKey());
- tableCache.put(entry.getKey(), table);
+ }
+ Statistics.println();
+ }
+
+ /**
+ * Aborts a transaction and automatically rollbacks the changes.
+ *
+ * @param transactionState Object identifying the transaction to be committed.
+ * @throws TransactionException
+ */
+ public void abort(TransactionState transactionState) throws TransactionException {
+ aTransactionIsInProgress=false;
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("abort " + transactionState.getStartTimestamp());
+ }
+ try {
+ tsoclient.abort(transactionState.getStartTimestamp());
+ } catch (Exception e) {
+ throw new TransactionException("Could not abort", e);
+ }