Browse files

Merge pull request #26 from dgomezferro/api

Api Improvements
  • Loading branch information...
2 parents 0899de0 + 2c7ec93 commit 9d9fca90cc367152c8d6b6624e502074027ef8a5 @dgomezferro dgomezferro committed Mar 21, 2013
Showing with 1,863 additions and 2,104 deletions.
  1. +4 −4 README.md
  2. +11 −3 conf/log4j.properties
  3. +1 −75 pom.xml
  4. +0 −181 src/main/java/com/yahoo/omid/client/TransactionManager.java
  5. +0 −64 src/main/java/com/yahoo/omid/client/TransactionState.java
  6. +0 −314 src/main/java/com/yahoo/omid/client/TransactionalTable.java
  7. +14 −2 src/main/java/com/yahoo/omid/{client → }/regionserver/Compacter.java
  8. +2 −2 ...ava/com/yahoo/omid/{client/CommitUnsuccessfulException.java → transaction/RollbackException.java}
  9. +562 −0 src/main/java/com/yahoo/omid/transaction/TTable.java
  10. +31 −0 src/main/java/com/yahoo/omid/transaction/Transaction.java
  11. +1 −1 src/main/java/com/yahoo/omid/{client → transaction}/TransactionException.java
  12. +221 −0 src/main/java/com/yahoo/omid/transaction/TransactionManager.java
  13. +78 −0 src/main/java/com/yahoo/omid/transaction/TransactionState.java
  14. +118 −223 src/main/java/com/yahoo/omid/tso/CommitHashMap.java
  15. +48 −78 src/main/java/com/yahoo/omid/tso/RowKey.java
  16. +572 −571 src/main/java/com/yahoo/omid/tso/TSOHandler.java
  17. +3 −2 src/main/java/com/yahoo/omid/tso/TSOState.java
  18. +2 −2 src/main/java/com/yahoo/omid/tso/ThroughputMonitor.java
  19. +0 −64 src/main/native/Makefile
  20. +0 −339 src/main/native/nativelib.cc
  21. +1 −1 src/main/resources/log4j.properties
  22. +1 −1 src/test/java/com/yahoo/omid/OmidTestBase.java
  23. +9 −9 src/test/java/com/yahoo/omid/TestAbortTransaction.java
  24. +56 −56 src/test/java/com/yahoo/omid/TestBasicTransaction.java
  25. +29 −29 src/test/java/com/yahoo/omid/TestCompaction.java
  26. +9 −9 src/test/java/com/yahoo/omid/TestMultiplePut.java
  27. +6 −6 src/test/java/com/yahoo/omid/TestNonexistentRow.java
  28. +10 −10 src/test/java/com/yahoo/omid/TestSingleColumnFamily.java
  29. +36 −36 src/test/java/com/yahoo/omid/TestTransactionConflict.java
  30. +21 −21 src/test/java/com/yahoo/omid/TestUpdateScan.java
  31. +11 −0 src/test/java/com/yahoo/omid/TestUtils.java
  32. +5 −1 src/test/java/com/yahoo/omid/tso/TestClientHandler.java
  33. +1 −0 src/test/java/com/yahoo/omid/tso/TestCommitQuery.java
View
8 README.md
@@ -1,7 +1,7 @@
Omid
=====
-The Omid project provides transactional support for key-value stores using Snapshot Isolation. Omid stands for Optimistically transactional Management in Datastores. At this stage of the project, HBase is the only supported data-store.
+The Omid project provides transactional support for key-value stores using Snapshot Isolation. Omid stands for Optimistically transactional Management in Datasources. At this stage of the project, HBase is the only supported data-store.
If you have any question, please take a look to the [Wiki](https://github.com/yahoo/omid/wiki) or contact us at omid-project@googlegroups.com or read [the online archives](https://groups.google.com/forum/?fromgroups=#!forum/omid-project)
@@ -22,7 +22,7 @@ The core architecture of the software is described in more detail in the [Techni
Compilation
-----------
-Omid uses Maven as its build system. We are using a temporary repository for Zookeeper and Bookkeeper packages to ease the installation procedure.
+Omid uses Maven for its build system. We are using a temporary repository for Zookeeper and Bookkeeper packages to ease the installation procedure.
To compile Omid:
@@ -48,7 +48,7 @@ Hence, the order of starting should be:
3. TSO
4. Hbase
-### Zookeeper & Bookkeeper
+### Zookeeper & Bookkeepergit
Omid doesn't use anything special in Zookeeper or Bookkeeper, so you can use any install for these. However, if you are running this anywhere but localhost, you need to update the setting for HBase and TSO. See the HBase docs for changing the Zookeeper quorum. For TSO, you need to modify bin/omid.sh.
For simplicity we've included a utility script which starts Zookeeper and Bookkeeper. Run:
@@ -91,4 +91,4 @@ The logging preferences can be adjusted in src/main/resources/log4j.properties.
Acknowledgement
-------
-This project has been partially supported by the EU Comission through the Cumulo Nimbo project (FP7-257993).
+This project has been partially supported by the EU Comission through the Cumulo Nimbo project (FP7-257993).
View
14 conf/log4j.properties
@@ -1,3 +1,6 @@
+
+
+
########################################################################
#
# Copyright (c) 2011 Yahoo! Inc. All rights reserved.
@@ -39,7 +42,12 @@ log4j.appender.R.MaxBackupIndex=5
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.O.layout=org.apache.log4j.PatternLayout
-log4j.appender.R.layout.ConversionPattern=[%d{ISO8601}]%5p%6.6r[%t]%x - %C.%M(%F:%L) - %m%n
-log4j.appender.O.layout.ConversionPattern=[%d{ISO8601}]%5p%6.6r[%t]%x - %C.%M(%F:%L) - %m%n
+#log4j.appender.R.layout.ConversionPattern=[%d{ISO8601}]%5p%6.6r[%t]%x - %C.%M(%F:%L) - %m%n
+#log4j.appender.O.layout.ConversionPattern=[%d{ISO8601}]%5p%6.6r[%t]%x - %C.%M(%F:%L) - %m%n
+
+log4j.appender.R.layout.ConversionPattern=[%d{HH:mm:ss,SSS}]%5p%6.6r[%t]%x - %C{1}.%M(%F:%L) - %m%n
+log4j.appender.O.layout.ConversionPattern=[%d{HH:mm:ss,SSS}]%5p%6.6r[%t]%x - %C{1}.%M(%F:%L) - %m%n
-log4j.logger.com.yahoo.omid.tso.ThroughputMonitor=TRACE
+log4j.logger.com.yahoo.omid.tso.ThroughputMonitor=INFO
+log4j.logger.com.yahoo.omid.notifications=TRACE
+log4j.logger.com.yahoo.omid.examples.notifications=TRACE
View
76 pom.xml
@@ -66,62 +66,7 @@
</execution>
</executions>
</plugin>
- <plugin>
- <artifactId>maven-antrun-plugin</artifactId>
- <version>1.7</version>
- <executions>
- <execution>
- <!-- Build native code after .java compilation -->
- <id>build-native</id>
- <phase>process-classes</phase>
- <goals>
- <goal>run</goal>
- </goals>
- <configuration>
- <target name="jni"
- description="Generate headers and compile the native code">
- <echo>Generating JNI headers</echo>
- <exec executable="javah">
- <arg value="-d" />
- <arg
- value="${project.basedir}/target/main/native" />
- <arg value="-classpath" />
- <arg
- value="${project.build.outputDirectory}" />
- <arg value="-jni" />
- <arg
- value="com.yahoo.omid.tso.CommitHashMap" />
- </exec>
- <exec
- dir="${project.basedir}/src/main/native"
- executable="make" failonerror="true">
- <arg value="all" />
- </exec>
- </target>
- </configuration>
- </execution>
- <execution>
- <!-- Make clean in the Maven clean phase -->
- <id>clean-lib</id>
- <phase>clean</phase>
- <goals>
- <goal>run</goal>
- </goals>
- <configuration>
- <target name="clean-lib" description="Delete native lib">
- <echo>Deleting native lib</echo>
- <exec
- dir="${project.basedir}/src/main/native"
- executable="make">
- <arg value="clean" />
- </exec>
- </target>
- </configuration>
- </execution>
- </executions>
- </plugin>
</plugins>
-
<pluginManagement>
<plugins>
<!--This plugin's configuration is used to store Eclipse
@@ -139,23 +84,6 @@
org.apache.maven.plugins
</groupId>
<artifactId>
- maven-antrun-plugin
- </artifactId>
- <versionRange>[1.3,)</versionRange>
- <goals>
- <goal>run</goal>
- </goals>
- </pluginExecutionFilter>
- <action>
- <ignore></ignore>
- </action>
- </pluginExecution>
- <pluginExecution>
- <pluginExecutionFilter>
- <groupId>
- org.apache.maven.plugins
- </groupId>
- <artifactId>
maven-dependency-plugin
</artifactId>
<versionRange>[2.1,)</versionRange>
@@ -198,9 +126,7 @@
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
- <version>0.92.0</version>
- <type>jar</type>
- <scope>compile</scope>
+ <version>0.94.3</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
View
181 src/main/java/com/yahoo/omid/client/TransactionManager.java
@@ -1,181 +0,0 @@
-/**
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. See accompanying LICENSE file.
- */
-
-package com.yahoo.omid.client;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HTable;
-
-/**
- * Provides the methods necessary to create and commit transactions.
- *
- * @see TransactionalTable
- *
- */
-public class TransactionManager {
- private static final Log LOG = LogFactory.getLog(TSOClient.class);
-
- static TSOClient tsoclient = null;
- private static Object lock = new Object();
- private Configuration conf;
- private HashMap<byte[], HTable> tableCache;
-
- public TransactionManager(Configuration conf) throws TransactionException, IOException {
- this.conf = conf;
- synchronized (lock) {
- if (tsoclient == null) {
- tsoclient = new TSOClient(conf);
- }
- }
- tableCache = new HashMap<byte[], HTable>();
- }
-
- /**
- * Starts a new transaction.
- *
- * This method returns an opaque {@link TransactionState} object, used by {@link TransactionalTable}'s methods
- * for performing operations on a given transaction.
- *
- * @return Opaque object which identifies one transaction.
- * @throws TransactionException
- */
- public TransactionState beginTransaction() throws TransactionException {
- SyncCreateCallback cb = new SyncCreateCallback();
- try {
- tsoclient.getNewTimestamp(cb);
- cb.await();
- } catch (Exception e) {
- throw new TransactionException("Could not get new timestamp", e);
- }
- if (cb.getException() != null) {
- throw new TransactionException("Error retrieving timestamp", cb.getException());
- }
-
- return new TransactionState(cb.getStartTimestamp(), tsoclient);
- }
-
- /**
- * Commits a transaction. If the transaction is aborted it automatically rollbacks the changes and
- * throws a {@link CommitUnsuccessfulException}.
- *
- * @param transactionState Object identifying the transaction to be committed.
- * @throws CommitUnsuccessfulException
- * @throws TransactionException
- */
- public void tryCommit(TransactionState transactionState)
- throws CommitUnsuccessfulException, TransactionException {
- if (LOG.isTraceEnabled()) {
- LOG.trace("tryCommit " + transactionState.getStartTimestamp());
- }
- SyncCommitCallback cb = new SyncCommitCallback();
- try {
- tsoclient.commit(transactionState.getStartTimestamp(),
- transactionState.getRows(), cb);
- cb.await();
- } catch (Exception e) {
- throw new TransactionException("Could not commit", e);
- }
- if (cb.getException() != null) {
- throw new TransactionException("Error committing", cb.getException());
- }
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("doneCommit " + transactionState.getStartTimestamp() +
- " TS_c: " + cb.getCommitTimestamp() +
- " Success: " + (cb.getResult() == TSOClient.Result.OK));
- }
-
- if (cb.getResult() == TSOClient.Result.ABORTED) {
- cleanup(transactionState);
- throw new CommitUnsuccessfulException();
- }
- transactionState.setCommitTimestamp(cb.getCommitTimestamp());
- }
-
- /**
- * Aborts a transaction and automatically rollbacks the changes.
- *
- * @param transactionState Object identifying the transaction to be committed.
- * @throws TransactionException
- */
- public void abort(TransactionState transactionState) throws TransactionException {
- if (LOG.isTraceEnabled()) {
- LOG.trace("abort " + transactionState.getStartTimestamp());
- }
- try {
- tsoclient.abort(transactionState.getStartTimestamp());
- } catch (Exception e) {
- throw new TransactionException("Could not abort", e);
- }
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("doneAbort " + transactionState.getStartTimestamp());
- }
-
- // Make sure its commit timestamp is 0, so the cleanup does the right job
- transactionState.setCommitTimestamp(0);
- cleanup(transactionState);
- }
-
- private void cleanup(final TransactionState transactionState)
- throws TransactionException {
- Map<byte[], List<Delete>> deleteBatches = new HashMap<byte[], List<Delete>>();
- for (final RowKeyFamily rowkey : transactionState.getRows()) {
- List<Delete> batch = deleteBatches.get(rowkey.getTable());
- if (batch == null) {
- batch = new ArrayList<Delete>();
- deleteBatches.put(rowkey.getTable(), batch);
- }
- Delete delete = new Delete(rowkey.getRow());
- for (Entry<byte[], List<KeyValue>> entry : rowkey.getFamilies().entrySet()) {
- for (KeyValue kv : entry.getValue()) {
- delete.deleteColumn(entry.getKey(), kv.getQualifier(), transactionState.getStartTimestamp());
- }
- }
- batch.add(delete);
- }
- for (final Entry<byte[], List<Delete>> entry : deleteBatches.entrySet()) {
- try {
- HTable table = tableCache.get(entry.getKey());
- if (table == null) {
- table = new HTable(conf, entry.getKey());
- tableCache.put(entry.getKey(), table);
- }
- table.delete(entry.getValue());
- } catch (IOException ioe) {
- throw new TransactionException("Could not clean up for table " + entry.getKey(), ioe);
- }
- }
- AbortCompleteCallback cb = new SyncAbortCompleteCallback();
- try {
- tsoclient.completeAbort(transactionState.getStartTimestamp(), cb );
- } catch (IOException ioe) {
- throw new TransactionException("Could not notify TSO about cleanup completion for transaction " +
- transactionState.getStartTimestamp(), ioe);
- }
- }
-}
View
64 src/main/java/com/yahoo/omid/client/TransactionState.java
@@ -1,64 +0,0 @@
-/**
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. See accompanying LICENSE file.
- */
-
-package com.yahoo.omid.client;
-
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- *
- * This class contains the required information to represent an Omid's transaction, including the set of rows modified.
- *
- */
-public class TransactionState {
- private long startTimestamp;
- private long commitTimestamp;
- private Set<RowKeyFamily> rows;
-
- public TSOClient tsoclient;
-
- TransactionState(long startTimestamp, TSOClient client) {
- this.rows = new HashSet<RowKeyFamily>();
- this.startTimestamp = startTimestamp;;
- this.commitTimestamp = 0;
- this.tsoclient = client;
- }
-
- public long getStartTimestamp() {
- return startTimestamp;
- }
-
- public long getCommitTimestamp() {
- return commitTimestamp;
- }
-
- public void setCommitTimestamp(long commitTimestamp) {
- this.commitTimestamp = commitTimestamp;
- }
-
- public RowKeyFamily[] getRows() {
- return rows.toArray(new RowKeyFamily[0]);
- }
-
- public void addRow(RowKeyFamily row) {
- rows.add(row);
- }
-
- public String toString() {
- return "Transaction-" + Long.toHexString(startTimestamp);
- }
-}
View
314 src/main/java/com/yahoo/omid/client/TransactionalTable.java
@@ -1,314 +0,0 @@
-/**
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. See accompanying LICENSE file.
- */
-
-package com.yahoo.omid.client;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NavigableMap;
-import java.util.NavigableSet;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.TimeRange;
-import org.apache.hadoop.hbase.util.Bytes;
-
-/**
- * Provides transactional methods for accessing and modifying a given snapshot of data identified by an opaque
- * {@link TransactionState} object.
- *
- */
-public class TransactionalTable extends HTable {
-
- public static long getsPerformed = 0;
- public static long elementsGotten = 0;
- public static long elementsRead = 0;
- public static long extraGetsPerformed = 0;
- public static double extraVersionsAvg = 3;
-
- /** We always ask for CACHE_VERSIONS_OVERHEAD extra versions */
- private static int CACHE_VERSIONS_OVERHEAD = 3;
- /** Average number of versions needed to reach the right snapshot */
- public double versionsAvg = 3;
- /** How fast do we adapt the average */
- private static final double alpha = 0.975;
-
- public TransactionalTable(Configuration conf, byte[] tableName) throws IOException {
- super(conf, tableName);
- }
-
- public TransactionalTable(Configuration conf, String tableName) throws IOException {
- this(conf, Bytes.toBytes(tableName));
- }
-
- /**
- * Transactional version of {@link HTable#get(Get)}
- *
- * @param transactionState Identifier of the transaction
- * @see HTable#get(Get)
- * @throws IOException
- */
- public Result get(TransactionState transactionState, final Get get) throws IOException {
- final int requestedVersions = (int) (versionsAvg + CACHE_VERSIONS_OVERHEAD);
- final long readTimestamp = transactionState.getStartTimestamp();
- final Get tsget = new Get(get.getRow());
- TimeRange timeRange = get.getTimeRange();
- long startTime = timeRange.getMin();
- long endTime = Math.min(timeRange.getMax(), readTimestamp + 1);
- tsget.setTimeRange(startTime, endTime).setMaxVersions(requestedVersions);
- Map<byte[], NavigableSet<byte[]>> kvs = get.getFamilyMap();
- for (Map.Entry<byte[], NavigableSet<byte[]>> entry : kvs.entrySet()) {
- byte[] family = entry.getKey();
- NavigableSet<byte[]> qualifiers = entry.getValue();
- if (qualifiers == null || qualifiers.isEmpty()) {
- tsget.addFamily(family);
- } else {
- for (byte[] qualifier : qualifiers) {
- tsget.addColumn(family, qualifier);
- }
- }
- }
- getsPerformed++;
- // Return the KVs that belong to the transaction snapshot, ask for more versions if needed
- return new Result( filter(transactionState, super.get(tsget).list(), requestedVersions) );
- }
-
- /**
- * Transactional version of {@link HTable#delete(Delete)}
- *
- * @param transactionState Identifier of the transaction
- * @see HTable#delete(Delete)
- * @throws IOException
- */
- public void delete(TransactionState transactionState, Delete delete) throws IOException {
- final long startTimestamp = transactionState.getStartTimestamp();
- boolean issueGet = false;
-
- final Put deleteP = new Put(delete.getRow(), startTimestamp);
- final Get deleteG = new Get(delete.getRow());
- Map<byte[], List<KeyValue>> fmap = delete.getFamilyMap();
- if (fmap.isEmpty()) {
- issueGet = true;
- }
- for (List<KeyValue> kvl : fmap.values()) {
- for (KeyValue kv : kvl) {
- switch(KeyValue.Type.codeToType(kv.getType())) {
- case DeleteColumn:
- deleteP.add(kv.getFamily(), kv.getQualifier(), startTimestamp, null);
- break;
- case DeleteFamily:
- deleteG.addFamily(kv.getFamily());
- issueGet = true;
- break;
- case Delete:
- if (kv.getTimestamp() == HConstants.LATEST_TIMESTAMP) {
- deleteP.add(kv.getFamily(), kv.getQualifier(), startTimestamp, null);
- break;
- } else {
- throw new UnsupportedOperationException("Cannot delete specific versions on Snapshot Isolation.");
- }
- }
- }
- }
- if (issueGet) {
- // It's better to perform a transactional get to avoid deleting more than necessary
- Result result = this.get(transactionState, deleteG);
- for (Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> entryF : result.getMap().entrySet()) {
- byte[] family = entryF.getKey();
- for (Entry<byte[], NavigableMap<Long, byte[]>> entryQ : entryF.getValue().entrySet()) {
- byte[] qualifier = entryQ.getKey();
- deleteP.add(family, qualifier, null);
- }
- }
- }
-
- transactionState.addRow(new RowKeyFamily(delete.getRow(), getTableName(), deleteP.getFamilyMap()));
-
- put(deleteP);
- }
-
- /**
- * Transactional version of {@link HTable#put(Put)}
- *
- * @param transactionState Identifier of the transaction
- * @see HTable#put(Put)
- * @throws IOException
- */
- public void put(TransactionState transactionState, Put put) throws IOException, IllegalArgumentException {
- final long startTimestamp = transactionState.getStartTimestamp();
- // create put with correct ts
- final Put tsput = new Put(put.getRow(), startTimestamp);
- Map<byte[], List<KeyValue>> kvs = put.getFamilyMap();
- for (List<KeyValue> kvl : kvs.values()) {
- for (KeyValue kv : kvl) {
- tsput.add(new KeyValue(kv.getRow(), kv.getFamily(), kv.getQualifier(), startTimestamp, kv.getValue()));
- }
- }
-
- // should add the table as well
- transactionState.addRow(new RowKeyFamily(tsput.getRow(), getTableName(), tsput.getFamilyMap()));
-
- put(tsput);
- }
-
- /**
- * Transactional version of {@link HTable#getScanner(Scan)}
- *
- * @param transactionState Identifier of the transaction
- * @see HTable#getScanner(Scan)
- * @throws IOException
- */
- public ResultScanner getScanner(TransactionState transactionState, Scan scan) throws IOException {
- Scan tsscan = new Scan(scan);
- tsscan.setMaxVersions((int) (versionsAvg + CACHE_VERSIONS_OVERHEAD));
- tsscan.setTimeRange(0, transactionState.getStartTimestamp() + 1);
- ClientScanner scanner = new ClientScanner(transactionState, tsscan, (int) (versionsAvg + CACHE_VERSIONS_OVERHEAD));
- scanner.initialize();
- return scanner;
- }
-
- /**
- * Filters the raw results returned from HBase and returns only those belonging to the current snapshot, as
- * defined by the transactionState object. If the raw results don't contain enough information for a particular
- * qualifier, it will request more versions from HBase.
- *
- * @param transactionState Defines the current snapshot
- * @param kvs Raw KVs that we are going to filter
- * @param localVersions Number of versions requested from hbase
- * @return Filtered KVs belonging to the transaction snapshot
- * @throws IOException
- */
- private List<KeyValue> filter(TransactionState transactionState, List<KeyValue> kvs, int localVersions) throws IOException {
- final int requestVersions = localVersions * 2 + CACHE_VERSIONS_OVERHEAD;
- if (kvs == null) {
- return Collections.emptyList();
- }
-
- long startTimestamp = transactionState.getStartTimestamp();
- // Filtered kvs
- List<KeyValue> filtered = new ArrayList<KeyValue>();
- // Map from column to older uncommitted timestamp
- List<Get> pendingGets = new ArrayList<Get>();
- ColumnWrapper lastColumn = new ColumnWrapper(null, null);
- long oldestUncommittedTS = Long.MAX_VALUE;
- boolean validRead = true;
- // Number of versions needed to reach a committed value
- int versionsProcessed = 0;
-
- for (KeyValue kv : kvs) {
- ColumnWrapper currentColumn = new ColumnWrapper(kv.getFamily(), kv.getQualifier());
- if (!currentColumn.equals(lastColumn)) {
- // New column, if we didn't read a committed value for last one, add it to pending
- if (!validRead && versionsProcessed == localVersions) {
- Get get = new Get(kv.getRow());
- get.addColumn(kv.getFamily(), kv.getQualifier());
- get.setMaxVersions(requestVersions); // TODO set maxVersions wisely
- get.setTimeRange(0, oldestUncommittedTS - 1);
- pendingGets.add(get);
- }
- validRead = false;
- versionsProcessed = 0;
- oldestUncommittedTS = Long.MAX_VALUE;
- lastColumn = currentColumn;
- }
- if (validRead) {
- // If we already have a committed value for this column, skip kv
- continue;
- }
- versionsProcessed++;
- if (transactionState.tsoclient.validRead(kv.getTimestamp(), startTimestamp)) {
- // Valid read, add it to result unless it's a delete
- if (kv.getValueLength() > 0) {
- filtered.add(kv);
- }
- validRead = true;
- // Update versionsAvg: increase it quickly, decrease it slowly
- versionsAvg =
- versionsProcessed > versionsAvg ?
- versionsProcessed :
- alpha * versionsAvg + (1 - alpha) * versionsProcessed;
- } else {
- // Uncomitted, keep track of oldest uncommitted timestamp
- oldestUncommittedTS = Math.min(oldestUncommittedTS, kv.getTimestamp());
- }
- }
-
- // If we have pending columns, request (and filter recursively) them
- if (!pendingGets.isEmpty()) {
- Result[] results = this.get(pendingGets);
- for (Result r : results) {
- filtered.addAll(filter(transactionState, r.list(), requestVersions));
- }
- }
- Collections.sort(filtered, KeyValue.COMPARATOR);
- return filtered;
- }
-
- protected class ClientScanner extends HTable.ClientScanner {
- private TransactionState state;
- private int maxVersions;
-
- ClientScanner(TransactionState state, Scan scan, int maxVersions) {
- super(scan);
- this.state = state;
- this.maxVersions = maxVersions;
- }
-
- @Override
- public Result next() throws IOException {
- List<KeyValue> filteredResult = Collections.emptyList();
- while (filteredResult.isEmpty()) {
- Result result = super.next();
- if (result == null) {
- return null;
- }
- filteredResult = filter(state, result.list(), maxVersions);
- }
- return new Result(filteredResult);
- }
-
- // In principle no need to override, copied from super.next(int) to make sure it works even if super.next(int)
- // changes its implementation
- @Override
- public Result [] next(int nbRows) throws IOException {
- // Collect values to be returned here
- ArrayList<Result> resultSets = new ArrayList<Result>(nbRows);
- for(int i = 0; i < nbRows; i++) {
- Result next = next();
- if (next != null) {
- resultSets.add(next);
- } else {
- break;
- }
- }
- return resultSets.toArray(new Result[resultSets.size()]);
- }
-
- }
-
-}
View
16 ...o/omid/client/regionserver/Compacter.java → ...om/yahoo/omid/regionserver/Compacter.java
@@ -1,4 +1,4 @@
-package com.yahoo.omid.client.regionserver;
+package com.yahoo.omid.regionserver;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -151,7 +151,19 @@ public boolean next(List<KeyValue> result, int limit) throws IOException {
public void close() throws IOException {
internalScanner.close();
}
-
+
+ @Override
+ public boolean next(List<KeyValue> results, String metric)
+ throws IOException {
+ return next(results);
+ }
+
+ @Override
+ public boolean next(List<KeyValue> result, int limit, String metric)
+ throws IOException {
+ return next(result, limit);
+ }
+
}
private class Handler extends SimpleChannelUpstreamHandler {
View
4 ...d/client/CommitUnsuccessfulException.java → ...o/omid/transaction/RollbackException.java
@@ -14,8 +14,8 @@
* limitations under the License. See accompanying LICENSE file.
*/
-package com.yahoo.omid.client;
+package com.yahoo.omid.transaction;
-public class CommitUnsuccessfulException extends Exception {
+public class RollbackException extends Exception {
private static final long serialVersionUID = -9163407697376986830L;
}
View
562 src/main/java/com/yahoo/omid/transaction/TTable.java
@@ -0,0 +1,562 @@
+/**
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package com.yahoo.omid.transaction;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.ClientScanner;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.yahoo.omid.client.ColumnWrapper;
+import com.yahoo.omid.client.RowKeyFamily;
+
+/**
+ * Provides transactional methods for accessing and modifying a given snapshot
+ * of data identified by an opaque {@link Transaction} object.
+ *
+ */
+public class TTable {
+
+ public static long getsPerformed = 0;
+ public static long elementsGotten = 0;
+ public static long elementsRead = 0;
+ public static long extraGetsPerformed = 0;
+ public static double extraVersionsAvg = 3;
+
+ /** We always ask for CACHE_VERSIONS_OVERHEAD extra versions */
+ private static int CACHE_VERSIONS_OVERHEAD = 3;
+ /** Average number of versions needed to reach the right snapshot */
+ public double versionsAvg = 3;
+ /** How fast do we adapt the average */
+ private static final double alpha = 0.975;
+
+ private HTable table;
+
+ public TTable(Configuration conf, byte[] tableName) throws IOException {
+ table = new HTable(conf, tableName);
+ }
+
+ public TTable(Configuration conf, String tableName) throws IOException {
+ this(conf, Bytes.toBytes(tableName));
+ }
+
+ /**
+ * Extracts certain cells from a given row.
+ *
+ * @param get
+ * The object that specifies what data to fetch and from which
+ * row.
+ * @return The data coming from the specified row, if it exists. If the row
+ * specified doesn't exist, the {@link Result} instance returned
+ * won't contain any {@link KeyValue}, as indicated by
+ * {@link Result#isEmpty()}.
+ * @throws IOException
+ * if a remote or network exception occurs.
+ */
+ public Result get(Transaction transaction, final Get get) throws IOException {
+ if (!(transaction instanceof TransactionState)) {
+ throw new IllegalArgumentException("transaction should be an instance of " + TransactionState.class);
+ }
+ TransactionState transactionState = (TransactionState) transaction;
+
+ final int requestedVersions = (int) (versionsAvg + CACHE_VERSIONS_OVERHEAD);
+ final long readTimestamp = transactionState.getStartTimestamp();
+ final Get tsget = new Get(get.getRow());
+ TimeRange timeRange = get.getTimeRange();
+ long startTime = timeRange.getMin();
+ long endTime = Math.min(timeRange.getMax(), readTimestamp + 1);
+ tsget.setTimeRange(startTime, endTime).setMaxVersions(requestedVersions);
+ Map<byte[], NavigableSet<byte[]>> kvs = get.getFamilyMap();
+ for (Map.Entry<byte[], NavigableSet<byte[]>> entry : kvs.entrySet()) {
+ byte[] family = entry.getKey();
+ NavigableSet<byte[]> qualifiers = entry.getValue();
+ if (qualifiers == null || qualifiers.isEmpty()) {
+ tsget.addFamily(family);
+ } else {
+ for (byte[] qualifier : qualifiers) {
+ tsget.addColumn(family, qualifier);
+ }
+ }
+ }
+ getsPerformed++;
+ // Return the KVs that belong to the transaction snapshot, ask for more
+ // versions if needed
+ return new Result(filter(transactionState, table.get(tsget).list(), requestedVersions));
+ }
+
+ /**
+ * Deletes the specified cells/row.
+ *
+ * @param delete
+ * The object that specifies what to delete.
+ * @throws IOException
+ * if a remote or network exception occurs.
+ */
+ public void delete(Transaction transaction, Delete delete) throws IOException {
+ if (!(transaction instanceof TransactionState)) {
+ throw new IllegalArgumentException("transaction should be an instance of " + TransactionState.class);
+ }
+ TransactionState transactionState = (TransactionState) transaction;
+
+ final long startTimestamp = transactionState.getStartTimestamp();
+ boolean issueGet = false;
+
+ final Put deleteP = new Put(delete.getRow(), startTimestamp);
+ final Get deleteG = new Get(delete.getRow());
+ Map<byte[], List<KeyValue>> fmap = delete.getFamilyMap();
+ if (fmap.isEmpty()) {
+ issueGet = true;
+ }
+ for (List<KeyValue> kvl : fmap.values()) {
+ for (KeyValue kv : kvl) {
+ switch (KeyValue.Type.codeToType(kv.getType())) {
+ case DeleteColumn:
+ deleteP.add(kv.getFamily(), kv.getQualifier(), startTimestamp, null);
+ break;
+ case DeleteFamily:
+ deleteG.addFamily(kv.getFamily());
+ issueGet = true;
+ break;
+ case Delete:
+ if (kv.getTimestamp() == HConstants.LATEST_TIMESTAMP) {
+ deleteP.add(kv.getFamily(), kv.getQualifier(), startTimestamp, null);
+ break;
+ } else {
+ throw new UnsupportedOperationException(
+ "Cannot delete specific versions on Snapshot Isolation.");
+ }
+ }
+ }
+ }
+ if (issueGet) {
+ // It's better to perform a transactional get to avoid deleting more
+ // than necessary
+ Result result = this.get(transactionState, deleteG);
+ for (Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> entryF : result.getMap().entrySet()) {
+ byte[] family = entryF.getKey();
+ for (Entry<byte[], NavigableMap<Long, byte[]>> entryQ : entryF.getValue().entrySet()) {
+ byte[] qualifier = entryQ.getKey();
+ deleteP.add(family, qualifier, null);
+ }
+ }
+ }
+
+ transactionState.addRow(new RowKeyFamily(delete.getRow(), getTableName(), deleteP.getFamilyMap()));
+
+ table.put(deleteP);
+ }
+
+ /**
+ * Puts some data in the table.
+ * <p>
+ * If {@link #isAutoFlush isAutoFlush} is false, the update is buffered
+ * until the internal buffer is full.
+ *
+ * @param put
+ * The data to put.
+ * @throws IOException
+ * if a remote or network exception occurs.
+ * @since 0.20.0
+ */
+ public void put(Transaction transaction, Put put) throws IOException, IllegalArgumentException {
+ if (!(transaction instanceof TransactionState)) {
+ throw new IllegalArgumentException("transaction should be an instance of " + TransactionState.class);
+ }
+ TransactionState transactionState = (TransactionState) transaction;
+
+ final long startTimestamp = transactionState.getStartTimestamp();
+ // create put with correct ts
+ final Put tsput = new Put(put.getRow(), startTimestamp);
+ Map<byte[], List<KeyValue>> kvs = put.getFamilyMap();
+ for (List<KeyValue> kvl : kvs.values()) {
+ for (KeyValue kv : kvl) {
+ tsput.add(new KeyValue(kv.getRow(), kv.getFamily(), kv.getQualifier(), startTimestamp, kv.getValue()));
+ }
+ }
+
+ // should add the table as well
+ transactionState.addRow(new RowKeyFamily(tsput.getRow(), getTableName(), tsput.getFamilyMap()));
+
+ table.put(tsput);
+ }
+
+ /**
+ * Returns a scanner on the current table as specified by the {@link Scan}
+ * object. Note that the passed {@link Scan}'s start row and caching
+ * properties maybe changed.
+ *
+ * @param scan
+ * A configured {@link Scan} object.
+ * @return A scanner.
+ * @throws IOException
+ * if a remote or network exception occurs.
+ */
+ public ResultScanner getScanner(Transaction transaction, Scan scan) throws IOException {
+ if (!(transaction instanceof TransactionState)) {
+ throw new IllegalArgumentException("transaction should be an instance of " + TransactionState.class);
+ }
+ TransactionState transactionState = (TransactionState) transaction;
+
+ Scan tsscan = new Scan(scan);
+ tsscan.setMaxVersions((int) (versionsAvg + CACHE_VERSIONS_OVERHEAD));
+ tsscan.setTimeRange(0, transactionState.getStartTimestamp() + 1);
+ TransactionalClientScanner scanner = new TransactionalClientScanner(transactionState, getConfiguration(),
+ tsscan, getTableName(), (int) (versionsAvg + CACHE_VERSIONS_OVERHEAD));
+ return scanner;
+ }
+
+ /**
+ * Filters the raw results returned from HBase and returns only those
+ * belonging to the current snapshot, as defined by the transactionState
+ * object. If the raw results don't contain enough information for a
+ * particular qualifier, it will request more versions from HBase.
+ *
+ * @param transactionState
+ * Defines the current snapshot
+ * @param kvs
+ * Raw KVs that we are going to filter
+ * @param localVersions
+ * Number of versions requested from hbase
+ * @return Filtered KVs belonging to the transaction snapshot
+ * @throws IOException
+ */
+ private List<KeyValue> filter(TransactionState transactionState, List<KeyValue> kvs, int localVersions)
+ throws IOException {
+ final int requestVersions = localVersions * 2 + CACHE_VERSIONS_OVERHEAD;
+ if (kvs == null) {
+ return Collections.emptyList();
+ }
+
+ long startTimestamp = transactionState.getStartTimestamp();
+ // Filtered kvs
+ List<KeyValue> filtered = new ArrayList<KeyValue>();
+ // Map from column to older uncommitted timestamp
+ List<Get> pendingGets = new ArrayList<Get>();
+ ColumnWrapper lastColumn = new ColumnWrapper(null, null);
+ long oldestUncommittedTS = Long.MAX_VALUE;
+ boolean validRead = true;
+ // Number of versions needed to reach a committed value
+ int versionsProcessed = 0;
+
+ for (KeyValue kv : kvs) {
+ ColumnWrapper currentColumn = new ColumnWrapper(kv.getFamily(), kv.getQualifier());
+ if (!currentColumn.equals(lastColumn)) {
+ // New column, if we didn't read a committed value for last one,
+ // add it to pending
+ if (!validRead && versionsProcessed == localVersions) {
+ Get get = new Get(kv.getRow());
+ get.addColumn(kv.getFamily(), kv.getQualifier());
+ get.setMaxVersions(requestVersions); // TODO set maxVersions
+ // wisely
+ get.setTimeRange(0, oldestUncommittedTS - 1);
+ pendingGets.add(get);
+ }
+ validRead = false;
+ versionsProcessed = 0;
+ oldestUncommittedTS = Long.MAX_VALUE;
+ lastColumn = currentColumn;
+ }
+ if (validRead) {
+ // If we already have a committed value for this column, skip kv
+ continue;
+ }
+ versionsProcessed++;
+ if (transactionState.tsoclient.validRead(kv.getTimestamp(), startTimestamp)) {
+ // Valid read, add it to result unless it's a delete
+ if (kv.getValueLength() > 0) {
+ filtered.add(kv);
+ }
+ validRead = true;
+ // Update versionsAvg: increase it quickly, decrease it slowly
+ versionsAvg = versionsProcessed > versionsAvg ? versionsProcessed : alpha * versionsAvg + (1 - alpha)
+ * versionsProcessed;
+ } else {
+ // Uncomitted, keep track of oldest uncommitted timestamp
+ oldestUncommittedTS = Math.min(oldestUncommittedTS, kv.getTimestamp());
+ }
+ }
+
+ // If we have pending columns, request (and filter recursively) them
+ if (!pendingGets.isEmpty()) {
+ Result[] results = table.get(pendingGets);
+ for (Result r : results) {
+ filtered.addAll(filter(transactionState, r.list(), requestVersions));
+ }
+ }
+ Collections.sort(filtered, KeyValue.COMPARATOR);
+ return filtered;
+ }
+
+ protected class TransactionalClientScanner extends ClientScanner {
+ private TransactionState state;
+ private int maxVersions;
+
+ TransactionalClientScanner(TransactionState state, Configuration conf, Scan scan, byte[] table, int maxVersions)
+ throws IOException {
+ super(conf, scan, table);
+ this.state = state;
+ this.maxVersions = maxVersions;
+ }
+
+ @Override
+ public Result next() throws IOException {
+ List<KeyValue> filteredResult = Collections.emptyList();
+ while (filteredResult.isEmpty()) {
+ Result result = super.next();
+ if (result == null) {
+ return null;
+ }
+ filteredResult = filter(state, result.list(), maxVersions);
+ }
+ return new Result(filteredResult);
+ }
+
+ // In principle no need to override, copied from super.next(int) to make
+ // sure it works even if super.next(int)
+ // changes its implementation
+ @Override
+ public Result[] next(int nbRows) throws IOException {
+ // Collect values to be returned here
+ ArrayList<Result> resultSets = new ArrayList<Result>(nbRows);
+ for (int i = 0; i < nbRows; i++) {
+ Result next = next();
+ if (next != null) {
+ resultSets.add(next);
+ } else {
+ break;
+ }
+ }
+ return resultSets.toArray(new Result[resultSets.size()]);
+ }
+
+ }
+
+ /**
+ * Gets the name of this table.
+ *
+ * @return the table name.
+ */
+ public byte[] getTableName() {
+ return table.getTableName();
+ }
+
+ /**
+ * Returns the {@link Configuration} object used by this instance.
+ * <p>
+ * The reference returned is not a copy, so any change made to it will
+ * affect this instance.
+ */
+ public Configuration getConfiguration() {
+ return table.getConfiguration();
+ }
+
+ /**
+ * Gets the {@link HTableDescriptor table descriptor} for this table.
+ *
+ * @throws IOException
+ * if a remote or network exception occurs.
+ */
+ public HTableDescriptor getTableDescriptor() throws IOException {
+ return table.getTableDescriptor();
+ }
+
+ /**
+ * Test for the existence of columns in the table, as specified in the Get.
+ * <p>
+ *
+ * This will return true if the Get matches one or more keys, false if not.
+ * <p>
+ *
+ * This is a server-side call so it prevents any data from being transfered
+ * to the client.
+ *
+ * @param get
+ * the Get
+ * @return true if the specified Get matches one or more keys, false if not
+ * @throws IOException
+ * e
+ */
+ public boolean exists(Transaction transaction, Get get) throws IOException {
+ Result result = get(transaction, get);
+ return !result.isEmpty();
+ }
+
+ /*
+ * @Override public void batch(Transaction transaction, List<? extends Row>
+ * actions, Object[] results) throws IOException, InterruptedException { //
+ * TODO Auto-generated method stub
+ *
+ * }
+ *
+ * @Override public Object[] batch(Transaction transaction, List<? extends
+ * Row> actions) throws IOException, InterruptedException { // TODO
+ * Auto-generated method stub return null; }
+ *
+ * @Override public <R> void batchCallback(Transaction transaction, List<?
+ * extends Row> actions, Object[] results, Callback<R> callback) throws
+ * IOException, InterruptedException { // TODO Auto-generated method stub
+ *
+ * }
+ *
+ * @Override public <R> Object[] batchCallback(List<? extends Row> actions,
+ * Callback<R> callback) throws IOException, InterruptedException { // TODO
+ * Auto-generated method stub return null; }
+ */
+
+ /**
+ * Extracts certain cells from the given rows, in batch.
+ *
+ * @param gets
+ * The objects that specify what data to fetch and from which
+ * rows.
+ *
+ * @return The data coming from the specified rows, if it exists. If the row
+ * specified doesn't exist, the {@link Result} instance returned
+ * won't contain any {@link KeyValue}, as indicated by
+ * {@link Result#isEmpty()}. If there are any failures even after
+ * retries, there will be a null in the results array for those
+ * Gets, AND an exception will be thrown.
+ * @throws IOException
+ * if a remote or network exception occurs.
+ *
+ */
+ public Result[] get(Transaction transaction, List<Get> gets) throws IOException {
+ Result[] results = new Result[gets.size()];
+ int i = 0;
+ for (Get get : gets) {
+ results[i++] = get(transaction, get);
+ }
+ return results;
+ }
+
+ /**
+ * Gets a scanner on the current table for the given family.
+ *
+ * @param family
+ * The column family to scan.
+ * @return A scanner.
+ * @throws IOException
+ * if a remote or network exception occurs.
+ */
+ public ResultScanner getScanner(Transaction transaction, byte[] family) throws IOException {
+ Scan scan = new Scan();
+ scan.addFamily(family);
+ return getScanner(transaction, scan);
+ }
+
+ /**
+ * Gets a scanner on the current table for the given family and qualifier.
+ *
+ * @param family
+ * The column family to scan.
+ * @param qualifier
+ * The column qualifier to scan.
+ * @return A scanner.
+ * @throws IOException
+ * if a remote or network exception occurs.
+ */
+ public ResultScanner getScanner(Transaction transaction, byte[] family, byte[] qualifier) throws IOException {
+ Scan scan = new Scan();
+ scan.addColumn(family, qualifier);
+ return getScanner(transaction, scan);
+ }
+
+ /**
+ * Puts some data in the table, in batch.
+ * <p>
+ * If {@link #isAutoFlush isAutoFlush} is false, the update is buffered
+ * until the internal buffer is full.
+ * <p>
+ * This can be used for group commit, or for submitting user defined
+ * batches. The writeBuffer will be periodically inspected while the List is
+ * processed, so depending on the List size the writeBuffer may flush not at
+ * all, or more than once.
+ *
+ * @param puts
+ * The list of mutations to apply. The batch put is done by
+ * aggregating the iteration of the Puts over the write buffer at
+ * the client-side for a single RPC call.
+ * @throws IOException
+ * if a remote or network exception occurs.
+ */
+ public void put(Transaction transaction, List<Put> puts) throws IOException {
+ for (Put put : puts) {
+ put(transaction, put);
+ }
+ }
+
+ /**
+ * Deletes the specified cells/rows in bulk.
+ *
+ * @param deletes
+ * List of things to delete. List gets modified by this method
+ * (in particular it gets re-ordered, so the order in which the
+ * elements are inserted in the list gives no guarantee as to the
+ * order in which the {@link Delete}s are executed).
+ * @throws IOException
+ * if a remote or network exception occurs. In that case the
+ * {@code deletes} argument will contain the {@link Delete}
+ * instances that have not be successfully applied.
+ */
+ public void delete(Transaction transaction, List<Delete> deletes) throws IOException {
+ for (Delete delete : deletes) {
+ delete(transaction, delete);
+ }
+ }
+
+ /**
+ * Provides access to the underliying HTable in order to configure it or to
+ * perform unsafe (non-transactional) operations. The latter would break the
+ * transactional guarantees of the whole system.
+ *
+ * @return The underlying HTable object
+ */
+ public HTableInterface getHTable() {
+ return table;
+ }
+
+ /**
+ * Releases any resources held or pending changes in internal buffers.
+ *
+ * @throws IOException
+ * if a remote or network exception occurs.
+ */
+ public void close() throws IOException {
+ table.close();
+ }
+
+}
View
31 src/main/java/com/yahoo/omid/transaction/Transaction.java
@@ -0,0 +1,31 @@
+/**
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package com.yahoo.omid.transaction;
+
+/**
+ *
+ * This class contains the required information to represent an Omid's
+ * transaction, including the set of rows modified.
+ *
+ */
+public interface Transaction {
+ /**
+ * Modify the transaction associated with the current thread such that the
+ * only possible outcome of the transaction is to roll back the transaction.
+ */
+ public void setRollbackOnly();
+}
View
2 ...hoo/omid/client/TransactionException.java → ...mid/transaction/TransactionException.java
@@ -14,7 +14,7 @@
* limitations under the License. See accompanying LICENSE file.
*/
-package com.yahoo.omid.client;
+package com.yahoo.omid.transaction;
public class TransactionException extends Exception {
View
221 src/main/java/com/yahoo/omid/transaction/TransactionManager.java
@@ -0,0 +1,221 @@
+/**
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package com.yahoo.omid.transaction;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTable;
+
+import com.yahoo.omid.client.AbortCompleteCallback;
+import com.yahoo.omid.client.RowKeyFamily;
+import com.yahoo.omid.client.SyncAbortCompleteCallback;
+import com.yahoo.omid.client.SyncCommitCallback;
+import com.yahoo.omid.client.SyncCreateCallback;
+import com.yahoo.omid.client.TSOClient;
+
+/**
+ * Provides the methods necessary to create and commit transactions.
+ *
+ * @see TTable
+ *
+ */
+public class TransactionManager {
+ private static final Log LOG = LogFactory.getLog(TSOClient.class);
+
+ static TSOClient tsoclient = null;
+ private static Object lock = new Object();
+ private Configuration conf;
+ private HashMap<byte[], HTable> tableCache;
+
+ public TransactionManager(Configuration conf) throws IOException {
+ this.conf = conf;
+ synchronized (lock) {
+ if (tsoclient == null) {
+ tsoclient = new TSOClient(conf);
+ }
+ }
+ tableCache = new HashMap<byte[], HTable>();
+ }
+
+ /**
+ * Starts a new transaction.
+ *
+ * This method returns an opaque {@link Transaction} object, used by
+ * {@link TTable}'s methods for performing operations on a given
+ * transaction.
+ *
+ * @return Opaque object which identifies one transaction.
+ * @throws TransactionException
+ */
+ public Transaction begin() throws TransactionException {
+ SyncCreateCallback cb = new SyncCreateCallback();
+ try {
+ tsoclient.getNewTimestamp(cb);
+ cb.await();
+ } catch (Exception e) {
+ throw new TransactionException("Could not get new timestamp", e);
+ }
+ if (cb.getException() != null) {
+ throw new TransactionException("Error retrieving timestamp", cb.getException());
+ }
+
+ return new TransactionState(cb.getStartTimestamp(), tsoclient);
+ }
+
+ /**
+ * Commits a transaction. If the transaction is aborted it automatically
+ * rollbacks the changes and throws a {@link RollbackException}.
+ *
+ * @param transaction
+ * Object identifying the transaction to be committed.
+ * @throws RollbackException
+ * @throws TransactionException
+ */
+ public void commit(Transaction transaction) throws RollbackException, TransactionException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("commit " + transaction);
+ }
+
+ if (!(transaction instanceof TransactionState)) {
+ throw new IllegalArgumentException("transaction should be an instance of " + TransactionState.class);
+ }
+ TransactionState transactionState = (TransactionState) transaction;
+
+ // Check rollbackOnly status
+ if (transactionState.isRollbackOnly()) {
+ rollback(transactionState);
+ throw new RollbackException();
+ }
+
+ SyncCommitCallback cb = new SyncCommitCallback();
+ try {
+ tsoclient.commit(transactionState.getStartTimestamp(), transactionState.getRows(), cb);
+ cb.await();
+ } catch (Exception e) {
+ throw new TransactionException("Could not commit", e);
+ }
+ if (cb.getException() != null) {
+ throw new TransactionException("Error committing", cb.getException());
+ }
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("doneCommit " + transactionState.getStartTimestamp() + " TS_c: " + cb.getCommitTimestamp()
+ + " Success: " + (cb.getResult() == TSOClient.Result.OK));
+ }
+
+ if (cb.getResult() == TSOClient.Result.ABORTED) {
+ cleanup(transactionState);
+ throw new RollbackException();
+ }
+ transactionState.setCommitTimestamp(cb.getCommitTimestamp());
+ }
+
+ /**
+ * Aborts a transaction and automatically rollbacks the changes.
+ *
+ * @param transaction
+ * Object identifying the transaction to be committed.
+ */
+ public void rollback(Transaction transaction) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("abort " + transaction);
+ }
+
+ if (!(transaction instanceof TransactionState)) {
+ throw new IllegalArgumentException("transaction should be an instance of " + TransactionState.class);
+ }
+ TransactionState transactionState = (TransactionState) transaction;
+
+ try {
+ tsoclient.abort(transactionState.getStartTimestamp());
+ } catch (Exception e) {
+ LOG.warn("Couldn't notify TSO about the abort", e);
+ }
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("doneAbort " + transactionState.getStartTimestamp());
+ }
+
+ // Make sure its commit timestamp is 0, so the cleanup does the right job
+ transactionState.setCommitTimestamp(0);
+ cleanup(transactionState);
+ }
+
+ private void cleanup(final TransactionState transactionState) {
+ Map<byte[], List<Delete>> deleteBatches = new HashMap<byte[], List<Delete>>();
+ for (final RowKeyFamily rowkey : transactionState.getRows()) {
+ List<Delete> batch = deleteBatches.get(rowkey.getTable());
+ if (batch == null) {
+ batch = new ArrayList<Delete>();
+ deleteBatches.put(rowkey.getTable(), batch);
+ }
+ Delete delete = new Delete(rowkey.getRow());
+ for (Entry<byte[], List<KeyValue>> entry : rowkey.getFamilies().entrySet()) {
+ for (KeyValue kv : entry.getValue()) {
+ delete.deleteColumn(entry.getKey(), kv.getQualifier(), transactionState.getStartTimestamp());
+ }
+ }
+ batch.add(delete);
+ }
+
+ boolean cleanupFailed = false;
+ List<HTable> tablesToFlush = new ArrayList<HTable>();
+ for (final Entry<byte[], List<Delete>> entry : deleteBatches.entrySet()) {
+ try {
+ HTable table = tableCache.get(entry.getKey());
+ if (table == null) {
+ table = new HTable(conf, entry.getKey());
+ table.setAutoFlush(false, true);
+ tableCache.put(entry.getKey(), table);
+ }
+ table.delete(entry.getValue());
+ tablesToFlush.add(table);
+ } catch (IOException ioe) {
+ cleanupFailed = true;
+ }
+ }
+ for (HTable table : tablesToFlush) {
+ try {
+ table.flushCommits();
+ } catch (IOException e) {
+ cleanupFailed = true;
+ }
+ }
+
+ if (cleanupFailed) {
+ LOG.warn("Cleanup failed, some values not deleted");
+ // we can't notify the TSO of completion
+ return;
+ }
+ AbortCompleteCallback cb = new SyncAbortCompleteCallback();
+ try {
+ tsoclient.completeAbort(transactionState.getStartTimestamp(), cb);
+ } catch (IOException ioe) {
+ LOG.warn("Coudldn't notify the TSO of rollback completion", ioe);
+ }
+ }
+}
View
78 src/main/java/com/yahoo/omid/transaction/TransactionState.java
@@ -0,0 +1,78 @@
+/**
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package com.yahoo.omid.transaction;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import com.yahoo.omid.client.RowKeyFamily;
+import com.yahoo.omid.client.TSOClient;
+
+/**
+ *
+ * This class contains the required information to represent an Omid's
+ * transaction, including the set of rows modified.
+ *
+ */
+class TransactionState implements Transaction {
+ private boolean rollbackOnly;
+ private long startTimestamp;
+ private long commitTimestamp;
+ private Set<RowKeyFamily> rows;
+
+ public TSOClient tsoclient;
+
+ TransactionState(long startTimestamp, TSOClient client) {
+ this.rows = new HashSet<RowKeyFamily>();
+ this.startTimestamp = startTimestamp;
+ this.commitTimestamp = 0;
+ this.tsoclient = client;
+ }
+
+ public long getStartTimestamp() {
+ return startTimestamp;
+ }
+
+ public long getCommitTimestamp() {
+ return commitTimestamp;
+ }
+
+ public void setCommitTimestamp(long commitTimestamp) {
+ this.commitTimestamp = commitTimestamp;
+ }
+
+ public RowKeyFamily[] getRows() {
+ return rows.toArray(new RowKeyFamily[0]);
+ }
+
+ public void addRow(RowKeyFamily row) {
+ rows.add(row);
+ }
+
+ public String toString() {
+ return "Transaction-" + Long.toHexString(startTimestamp);
+ }
+
+ @Override
+ public void setRollbackOnly() {
+ rollbackOnly = true;
+ }
+
+ public boolean isRollbackOnly() {
+ return rollbackOnly;
+ }
+}
View
341 src/main/java/com/yahoo/omid/tso/CommitHashMap.java
@@ -23,233 +23,128 @@
import org.jboss.netty.util.internal.ConcurrentHashMap;
/**
- * A hash map that uses byte[] for the key rather than longs.
+ * This class stores the mapping between start a commit timestamps and between
+ * modified row and commit timestamp.
*
- * Change it to lazyly clean the old entries, i.e., upon a hit This would reduce
- * the mem access benefiting from cache locality
+ * Both mappings are respresented as a long->long mapping, each of them
+ * implemented using a single long []
*
- * @author maysam
+ * For a map of size N we create an array of size 2*N and store the keys on even
+ * indexes and values on odd indexes.
+ *
+ * Each time an entry is removed, we update the largestDeletedTimestamp if the
+ * entry's commit timestamp is greater than this value.
+ *
+ * Rationale: we want queries to be fast and touch as least memory regions as
+ * possible
+ *
+ * TODO: improve garbage collection, right now an entry is picked at random (by
+ * hash) which could cause the eviction of a very recent timestamp
*/
class CommitHashMap {
- native void init(int initialCapacity, int maxCommits, float loadFactor);
-
- native static long gettotalput();
-
- native static long gettotalget();
-
- native static long gettotalwalkforput();
-
- native static long gettotalwalkforget();
-
- // Load the library
- static {
- System.loadLibrary("tso-commithashmap");
- }
-
- /**
- * Constructs a new, empty hashtable with a default capacity and load factor,
- * which is <code>1000</code> and <code>0.75</code> respectively.
- */
- public CommitHashMap() {
- this(1000, 0.75f);
- }
-
- /**
- * Constructs a new, empty hashtable with the specified initial capacity and
- * default load factor, which is <code>0.75</code>.
- *
- * @param initialCapacity
- * the initial capacity of the hashtable.
- * @throws IllegalArgumentException
- * if the initial capacity is less than zero.
- */
- public CommitHashMap(int initialCapacity) {
- this(initialCapacity, 0.75f);
- }
-
- /**
- * Constructs a new, empty hashtable with the specified initial capacity and
- * the specified load factor.
- *
- * @param initialCapacity
- * the initial capacity of the hashtable.
- * @param loadFactor
- * the load factor of the hashtable.
- * @throws IllegalArgumentException
- * if the initial capacity is less than zero, or if the load
- * factor is nonpositive.
- */
- public CommitHashMap(int initialCapacity, float loadFactor) {
- if (initialCapacity < 0) {
- throw new IllegalArgumentException("Illegal Capacity: " + initialCapacity);
- }
- if (loadFactor <= 0) {
- throw new IllegalArgumentException("Illegal Load: " + loadFactor);
- }
- if (initialCapacity == 0) {
- initialCapacity = 1;
- }
-
- //assuming the worst case that each transaction modifies a value, this is the right size because it is proportional to the hashmap size
- int txnCommitArraySize = (int) (initialCapacity * loadFactor);
- this.init(initialCapacity, txnCommitArraySize, loadFactor);
- }
-
- /**
- * Returns the value to which the specified key is mapped in this map. If
- * there are multiple values with the same key, return the first The first is
- * the one with the largest key, because (i) put always put the recent ones
- * ahead, (ii) a new put on the same key has always larger value (because
- * value is commit timestamp and the map is atmoic)
- *
- * @param key
- * a key in the hashtable.
- * @return the value to which the key is mapped in this hashtable;
- * <code>null</code> if the key is not mapped to any value in this
- * hashtable.
- * @see #put(int, Object)
- */
- native long get(byte[] rowId, byte[] tableId, int hash);
-
- /**
- * Maps the specified <code>key</code> to the specified <code>value</code> in
- * this hashtable. The key cannot be <code>null</code>.
- *
- * The value can be retrieved by calling the <code>get</code> method with a
- * key that is equal to the original key.
- *
- * It guarantees that if multiple entries with the same keys exist then the
- * first one is the most fresh one, i.e., with the largest value
- *
- * @param key
- * the hashtable key.
- * @param value
- * the value.
- * @throws NullPointerException
- * if the key is <code>null</code>. return true if the vlaue is
- * replaced
- */
- native long put(byte[] rowId, byte[] tableId, long value, int hash, long largestDeletedTimestamp);
-
- /**
- * Returns the commit timestamp
- *
- * @param startTimestamp the transaction start timestamp
- * @return commit timestamp if such mapping exist, 0 otherwise
- */
- native long getCommittedTimestamp(long startTimestamp);
- native long setCommitted(long startTimestamp, long commitTimestamp, long largestDeletedTimestamp);
-
- // set of half aborted transactions
- // TODO: set the initial capacity in a smarter way
- Set<AbortedTransaction> halfAborted = Collections.newSetFromMap(new ConcurrentHashMap<AbortedTransaction, Boolean>(10000));
-
- private AtomicLong abortedSnapshot = new AtomicLong();
-
- long getAndIncrementAbortedSnapshot() {
- return abortedSnapshot.getAndIncrement();
- }
-
- // add a new half aborted transaction
- void setHalfAborted(long startTimestamp) {
- halfAborted.add(new AbortedTransaction(startTimestamp, abortedSnapshot.get()));
- }
-
- // call when a half aborted transaction is fully aborted
- void setFullAborted(long startTimestamp) {
- halfAborted.remove(new AbortedTransaction(startTimestamp, 0));
- }
-
- // query to see if a transaction is half aborted
- boolean isHalfAborted(long startTimestamp) {
- return halfAborted.contains(new AbortedTransaction(startTimestamp, 0));
- }
+ private final int size;
+ private long largestDeletedTimestamp;
+ private final long[] startCommitMapping;
+ private final long[] rowsCommitMapping;
+
+ /**
+ * Constructs a new, empty hashtable with a default size of 1000
+ */
+ public CommitHashMap() {
+ this(1000);
+ }
+
+ /**
+ * Constructs a new, empty hashtable with the specified size
+ *
+ * @param size
+ * the initial size of the hashtable.
+ * @throws IllegalArgumentException
+ * if the size is less than zero.
+ */
+ public CommitHashMap(int size) {
+ if (size < 0) {
+ throw new IllegalArgumentException("Illegal size: " + size);
+ }
+
+ this.size = size;
+ this.startCommitMapping = new long[size * 2];
+ this.rowsCommitMapping = new long[size * 2];
+ }
+
+ private int index(long hash) {
+ return (int) (Math.abs(hash) % size);
+ }
+
+ public long getLatestWriteForRow(long hash) {
+ int index = index(hash);
+ return rowsCommitMapping[index];
+ }
+
+ public void putLatestWriteForRow(long hash, long commitTimestamp) {
+ int index = index(hash);
+ long oldCommitTS = rowsCommitMapping[index];
+
+ if (oldCommitTS == commitTimestamp)
+ return;
+
+ rowsCommitMapping[index] = commitTimestamp;
+ largestDeletedTimestamp = Math.max(oldCommitTS, largestDeletedTimestamp);
+ }
+
+ public long getCommittedTimestamp(long startTimestamp) {
+ int indexStart = 2 * index(startTimestamp);
+ int indexCommit = indexStart + 1;
+
+ if (startCommitMapping[indexStart] == startTimestamp) {
+ return startCommitMapping[indexCommit];
+ } else {
+ return 0;
+ }
+ }
+
+ public void setCommittedTimestamp(long startTimestamp, long commitTimestamp) {
+ int indexStart = 2 * index(startTimestamp);
+ int indexCommit = indexStart + 1;
+
+ long oldCommitTS = startCommitMapping[indexCommit];
+ if (oldCommitTS == commitTimestamp)
+ return;
+
+ startCommitMapping[indexStart] = startTimestamp;
+ startCommitMapping[indexCommit] = commitTimestamp;
+ largestDeletedTimestamp = Math.max(oldCommitTS, largestDeletedTimestamp);
+ }
+
+ // set of half aborted transactions
+ // TODO: set the initial capacity in a smarter way
+ Set<AbortedTransaction> halfAborted = Collections.newSetFromMap(new ConcurrentHashMap<AbortedTransaction, Boolean>(
+ 10000));
+
+ private AtomicLong abortedSnapshot = new AtomicLong();
+
+ long getAndIncrementAbortedSnapshot() {
+ return abortedSnapshot.getAndIncrement();
+ }
+
+ // add a new half aborted transaction
+ void setHalfAborted(long startTimestamp) {
+ halfAborted.add(new AbortedTransaction(startTimestamp, abortedSnapshot.get()));
+ }
+
+ // call when a half aborted transaction is fully aborted
+ void setFullAborted(long startTimestamp) {
+ halfAborted.remove(new AbortedTransaction(startTimestamp, 0));
+ }
+
+ // query to see if a transaction is half aborted
+ boolean isHalfAborted(long startTimestamp) {
+ return halfAborted.contains(new AbortedTransaction(startTimestamp, 0));
+ }
+
+ public long getLargestDeletedTimestamp() {
+ return largestDeletedTimestamp;
+ }
}
-
-/*
- * class CommitHashMap {
- *
- * private Entry table[];
- *
- * private int count;
- *
- * private int threshold;
- *
- * private static long largestOrder = 1;
- *
- * private TSOState sharedState = null;
- *
- * public static int totalget = 0; public static int totalwalkforget = 0; public
- * static int totalput = 0; public static int totalwalkforput = 0;
- *
- * private static class Entry { long order;//the assigned order after insert int
- * hash;//keep the computed hash for efficient comparison of keys byte[]
- * key;//which is row id; long tag;//which is the start timestamp; long
- * value;//which is commit timestamp Entry next;
- *
- * protected Entry(byte[] key, int hash, long tag, long value, Entry next) {
- * this.order = ++largestOrder; this.hash = hash; this.key = key; this.tag =
- * tag; this.value = value; this.next = next; } protected Entry(Entry next) {
- * this.order = 0; this.hash = 0; this.key = new byte[8]; this.tag = 0;
- * this.value = 0; this.next = next; } }
- *
- * public CommitHashMap(TSOState state) { this(1000, 0.75f, state); }
- *
- * public CommitHashMap(int initialCapacity, TSOState state) {
- * this(initialCapacity, 0.75f, state); }
- *
- * public CommitHashMap(int initialCapacity, float loadFactor, TSOState state) {
- * super(); if (initialCapacity < 0) { throw new
- * IllegalArgumentException("Illegal Capacity: " + initialCapacity); } if
- * (loadFactor <= 0) { throw new IllegalArgumentException("Illegal Load: " +
- * loadFactor); } if (initialCapacity == 0) { initialCapacity = 1; }
- *
- * table = new Entry[initialCapacity]; this.sharedState = state; threshold =
- * (int) (initialCapacity * loadFactor); //Initialze the table not to interfere
- * with garbage collection
- * System.out.println("MEMORY initialization start ..."); for (int i = 0; i <
- * initialCapacity; i++) { if (i%1000000==0) System.out.println("MEMORY i="+ i);
- * Entry e1 = new Entry(null); Entry e2 = new Entry(null); Entry e3 = new
- * Entry(null); e1.next = e2; e2.next = e3; table[i] = e1; }
- * System.out.println("MEMORY initialization end"); }
- *
- * public int size() { return count; }
- *
- * public boolean isEmpty() { return count == 0; }
- *
- * public long get(byte[] key, int hash) { totalget++; totalwalkforget++;//at
- * least one for array access Entry tab[] = table; //int hash = (int)key; int
- * index = (hash & 0x7FFFFFFF) % tab.length; for (Entry e = tab[index]; e !=
- * null; e = e.next) { totalwalkforget++; if (e.order == 0)//empty break; if
- * (e.hash == hash) //if (java.util.Arrays.equals(e.key, key)) //if (e.key ==
- * key) return e.value; } return 0; }
- *
- * public boolean put(byte[] key, long tag, long value, int hash) { totalput++;
- * totalwalkforput++;//at least one for array access // Makes sure the key is
- * not already in the hashtable. Entry tab[] = table; //int hash = (int)key; int
- * index = (hash & 0x7FFFFFFF) % tab.length; Entry firstReference = null;
- * //boolean thereIsOld = false; for (Entry e = tab[index]; e != null; e =
- * e.next) { totalwalkforput++; boolean isOld = e.order == 0 ? true :
- * largestOrder - e.order > threshold; if (isOld) { if (e.value >
- * sharedState.largestDeletedTimestamp) sharedState.largestDeletedTimestamp =
- * e.value; if (firstReference != null) {//swap it with e //e.key =
- * firstReference.key; for (byte b = 0; b < 8; b++) e.key[b] =
- * firstReference.key[b]; e.hash = firstReference.hash; e.tag =
- * firstReference.tag; e.value = firstReference.value; e.order =
- * firstReference.order; e = firstReference; } for (byte b = 0; b < 8; b++)
- * e.key[b] = key[b]; //e.key = key; e.hash = hash; e.tag = tag; e.value =
- * value; e.order = ++largestOrder; totalgced++; totalgcrun++; return true; }
- * //update the first reference to the key if (e.hash == hash && firstReference
- * == null) //if (java.util.Arrays.equals(e.key, key) && firstReference == null)
- * //if (e.key == key && firstReference == null) firstReference = e; //if
- * (!thereIsOld && isOld)//there are some old items here //thereIsOld = true; }
- *
- * // Creates the new entry. Entry e = new Entry(key, hash, tag, value,
- * tab[index]); if (count % 100000 == 0)
- * System.out.println("NNNNNNNNNNNNNNNNNNNew Entry " + count); tab[index] = e;
- * count++; return false; }
- *
- * public static int totalgcrun = 0; public static int totalgced = 0; }
- */
View
126 src/main/java/com/yahoo/omid/tso/RowKey.java
@@ -25,90 +25,60 @@
import org.jboss.netty.buffer.ChannelBuffer;
public class RowKey {
- private byte[] rowId;
- private byte[] tableId;
- private int hash = 0;
+ private byte[] rowId;
+ private byte[] tableId;
+ private int hash = 0;
- public RowKey() {
- rowId = new byte[0];
- tableId = new byte[0];
- }
+ public RowKey() {
+ rowId = new byte[0];
+ tableId = new byte[0];
+ }
- public RowKey(byte[] r, byte[] t) {
- rowId = r;
- tableId = t;
- }
- public byte[] getTable() {
- return tableId;
- }
+ public RowKey(byte[] r, byte[] t) {
+ rowId = r;
+ tableId = t;
+ }
- public byte[] getRow() {
- return rowId;
- }
+ public byte[] getTable() {
+ return tableId;
+ }
- public String toString() {
- return new String(tableId) + ":" + new String(rowId);
- }
+ public byte[] getRow() {
+ return rowId;
+ }
- public static RowKey readObject(ChannelBuffer aInputStream) {
- int hash = aInputStream.readInt();
- short len = aInputStream.readByte();
-// byte[] rowId = RowKeyBuffer.nextRowKey(len);
- byte[] rowId = new byte[len];
- aInputStream.readBytes(rowId, 0, len);
- len = aInputStream.readByte();
-// byte[] tableId = RowKeyBuffer.nextRowKey(len);
- byte[] tableId = new byte[len];
- aInputStream.readBytes(tableId, 0, len);
- RowKey rk = new RowKey(rowId, tableId);
- rk.hash = hash;
- return rk;
- }
+ public String toString() {
+ return Bytes.toString(tableId) + ":" + Bytes.toString(rowId);
+ }
- public void writeObject(DataOutputStream aOutputStream)
- throws IOException {
- hashCode();
- aOutputStream.writeInt(hash);
- aOutputStream.writeByte(rowId.length);
- aOutputStream.write(rowId,0,rowId.length);
- aOutputStream.writeByte(tableId.length);
- aOutputStream.write(tableId,0,tableId.length);
- }
+ public static RowKey readObject(ChannelBuffer aInputStream) {
+ RowKey rk = new RowKey(null, null);
+ rk.hash = aInputStream.readInt();
+ return rk;
+ }
- public boolean equals(Object obj) {
- if (obj instanceof RowKey) {
- RowKey other = (RowKey)obj;
-
- return Bytes.equals(other.rowId, rowId)
- && Bytes.equals(other.tableId, tableId);
- }
- return false;
- }
+ public void writeObject(DataOutputStream aOutputStream) throws IOException {
+ hashCode();
+ aOutputStream.writeInt(hash);
+ }
- public int hashCode() {
- if (hash != 0) {
- return hash;
- }
- //hash is the xor or row and table id
- /*int h = 0;
- for(int i =0; i < Math.min(8, rowId.length); i++){
- h <<= 8;
- h ^= (int)rowId[i] & 0xFF;
- }
- hash = h;
- h = 0;
- for(int i =0; i < Math.min(8,tableId.length); i++){
- h <<= 8;
- h ^= (int)tableId[i] & 0xFF;
- }
- hash ^= h;
- return hash;*/
- byte[] key = Arrays.copyOf(tableId, tableId.length + rowId.length);
- System.arraycopy(rowId, 0, key, tableId.length, rowId.length);
- hash = MurmurHash.getInstance().hash(key, 0, key.length, 0xdeadbeef);
- //return MurmurHash3.MurmurHash3_x64_32(rowId, 0xDEADBEEF);
- // return (31*Arrays.hashCode(tableId)) + Arrays.hashCode(rowId);
- return hash;
- }
-}
+ public boolean equals(Object obj) {
+ if (obj instanceof RowKey) {
+ RowKey other = (RowKey) obj;
+
+ return Bytes.equals(other.rowId, rowId)
+ && Bytes.equals(other.tableId, tableId);
+ }
+ return false;
+ }
+ public int hashCode() {
+ if (hash != 0) {
+ return hash;
+ }
+ byte[] key = Arrays.copyOf(tableId, tableId.length + rowId.length);
+ System.arraycopy(rowId, 0, key, tableId.length, rowId.length);
+ hash = MurmurHash.getInstance().hash(key, 0, key.length, 0xdeadbeef);
+ return hash;
+ }
+}
View
1,143 src/main/java/com/yahoo/omid/tso/TSOHandler.java
@@ -1,571 +1,572 @@
-/**
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. See accompanying LICENSE file.
- */
-
-package com.yahoo.omid.tso;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.channel.group.ChannelGroup;
-
-import com.yahoo.omid.replication.SharedMessageBuffer.ReadingBuffer;
-import com.yahoo.omid.tso.messages.AbortRequest;
-import com.yahoo.omid.tso.messages.AbortedTransactionReport;
-import com.yahoo.omid.tso.messages.CommitQueryRequest;
-import com.yahoo.omid.tso.messages.CommitQueryResponse;
-import com.yahoo.omid.tso.messages.CommitRequest;
-import com.yahoo.omid.tso.messages.CommitResponse;
-import com.yahoo.omid.tso.messages.FullAbortRequest;
-import com.yahoo.omid.tso.messages.TimestampRequest;
-import com.yahoo.omid.tso.messages.TimestampResponse;
-import com.yahoo.omid.tso.persistence.LoggerAsyncCallback.AddRecordCallback;
-import com.yahoo.omid.tso.persistence.LoggerException;
-import com.yahoo.omid.tso.persistence.LoggerException.Code;
-import com.yahoo.omid.tso.persistence.LoggerProtocol;
-
-/**
- * ChannelHandler for the TSO Server
- * @author maysam
- *
- */
-public class TSOHandler extends SimpleChannelHandler {
-
- private static final Log LOG = LogFactory.getLog(TSOHandler.class);
-
- /**
- * Bytes monitor
- */
- public static final AtomicInteger transferredBytes = new AtomicInteger();
-// public static int transferredBytes = 0;
- public static int abortCount = 0;
- public static int hitCount = 0;
- public static long queries = 0;
-
- /**
- * Channel Group
- */
- private ChannelGroup channelGroup = null;
-
- private Map<Channel, ReadingBuffer> messageBuffersMap = new HashMap<Channel, ReadingBuffer>();
-
- /**
- * Timestamp Oracle
- */
- private TimestampOracle timestampOracle = null;
-
- /**
- * The wrapper for the shared state of TSO
- */
- private TSOState sharedState;
-
- private FlushThread flushThread;
- private ScheduledExecutorService scheduledExecutor;
- private ScheduledFuture<?> flushFuture;
-
- private ExecutorService executor;
-
- /**
- * Constructor
- * @param channelGroup
- */
- public TSOHandler(ChannelGroup channelGroup, TSOState state) {
- this.channelGroup = channelGroup;
- this.timestampOracle = state.getSO();
- this.sharedState = state;
- }
-
- public void start() {
- this.flushThread = new FlushThread();
- this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(Thread.currentThread().getThreadGroup(), r);
- t.setDaemon(true);
- t.setName("Flush Thread");
- return t;
- }
- });
- this.flushFuture = scheduledExecutor.schedule(flushThread, TSOState.FLUSH_TIMEOUT, TimeUnit.MILLISECONDS);
- this.executor = Executors.newSingleThreadExecutor();
- }
-
- /**
- * Returns the number of transferred bytes
- * @return the number of transferred bytes
- */
- public static long getTransferredBytes() {
- return transferredBytes.longValue();
- }