Skip to content

Commit

Permalink
优化HBase事务实现,使用本地事务状态表替换HTable的方式
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Jan 12, 2015
1 parent 58a99f9 commit f0913a3
Show file tree
Hide file tree
Showing 11 changed files with 153 additions and 214 deletions.
Expand Up @@ -67,6 +67,8 @@ public class FrontendSession extends SessionWithState implements DataHandler {
public static final int COMMAND_EXECUTE_DISTRIBUTED_SAVEPOINT_ADD = 104; public static final int COMMAND_EXECUTE_DISTRIBUTED_SAVEPOINT_ADD = 104;
public static final int COMMAND_EXECUTE_DISTRIBUTED_SAVEPOINT_ROLLBACK = 105; public static final int COMMAND_EXECUTE_DISTRIBUTED_SAVEPOINT_ROLLBACK = 105;


public static final int COMMAND_EXECUTE_TRANSACTION_VALIDATE = 106;

public static final int COMMAND_EXECUTE_BATCH_UPDATE_STATEMENT = 120; public static final int COMMAND_EXECUTE_BATCH_UPDATE_STATEMENT = 120;
public static final int COMMAND_EXECUTE_BATCH_UPDATE_PREPAREDSTATEMENT = 121; public static final int COMMAND_EXECUTE_BATCH_UPDATE_PREPAREDSTATEMENT = 121;


Expand Down Expand Up @@ -345,7 +347,7 @@ private void connectServer(ConnectionInfo ci) {
} }


//TODO //TODO
public void handleException(IOException e) { public void handleException(Exception e) {
checkClosed(); checkClosed();
} }


Expand Down Expand Up @@ -686,6 +688,18 @@ public synchronized void rollbackToSavepoint(String name) {
} }
} }


public synchronized boolean validateTransaction(String localTransactionName) {
checkClosed();
try {
transfer.writeInt(FrontendSession.COMMAND_EXECUTE_TRANSACTION_VALIDATE).writeString(localTransactionName);
done(transfer);
return transfer.readBoolean();
} catch (Exception e) {
handleException(e);
return false;
}
}

public void setTransaction(TransactionInterface transaction) { public void setTransaction(TransactionInterface transaction) {
this.transaction = transaction; this.transaction = transaction;
} }
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.lealone.result.ResultColumn; import org.lealone.result.ResultColumn;
import org.lealone.result.ResultInterface; import org.lealone.result.ResultInterface;
import org.lealone.store.LobStorage; import org.lealone.store.LobStorage;
import org.lealone.transaction.TransactionStatusTable;
import org.lealone.util.IOUtils; import org.lealone.util.IOUtils;
import org.lealone.util.New; import org.lealone.util.New;
import org.lealone.util.SmallLRUCache; import org.lealone.util.SmallLRUCache;
Expand Down Expand Up @@ -477,6 +478,20 @@ private void process() throws IOException {
transfer.flush(); transfer.flush();
break; break;
} }
case FrontendSession.COMMAND_EXECUTE_TRANSACTION_VALIDATE: {
int old = session.getModificationId();
boolean isValid = TransactionStatusTable.isValid(transfer.readString());
int status;
if (session.isClosed()) {
status = FrontendSession.STATUS_CLOSED;
} else {
status = getState(old);
}
transfer.writeInt(status);
transfer.writeBoolean(isValid);
transfer.flush();
break;
}
case FrontendSession.COMMAND_EXECUTE_BATCH_UPDATE_STATEMENT: { case FrontendSession.COMMAND_EXECUTE_BATCH_UPDATE_STATEMENT: {
int size = transfer.readInt(); int size = transfer.readInt();
ArrayList<String> batchCommands = New.arrayList(size); ArrayList<String> batchCommands = New.arrayList(size);
Expand Down
Expand Up @@ -51,10 +51,7 @@ public ResultInterface query(int limit, ResultTarget target) {
ResultInterface result; ResultInterface result;


String[] localRegionNames = whereClauseSupport.getLocalRegionNames(); String[] localRegionNames = whereClauseSupport.getLocalRegionNames();
if (isLocal()) { if (localRegionNames != null && localRegionNames.length != 0) {
result = super.query(limit, target);
addRowToResultTarget = false;
} else if (localRegionNames != null && localRegionNames.length != 0) {
if (localRegionNames.length == 1) { if (localRegionNames.length == 1) {
whereClauseSupport.setCurrentRegionName(localRegionNames[0]); whereClauseSupport.setCurrentRegionName(localRegionNames[0]);
result = super.query(limit, target); result = super.query(limit, target);
Expand All @@ -64,6 +61,9 @@ public ResultInterface query(int limit, ResultTarget target) {
sqlRoutingInfo.localRegions = Arrays.asList(localRegionNames); sqlRoutingInfo.localRegions = Arrays.asList(localRegionNames);
result = CommandParallel.executeQuery(session, sqlRoutingInfo, this, limit, false); result = CommandParallel.executeQuery(session, sqlRoutingInfo, this, limit, false);
} }
} else if (isLocal()) {
result = super.query(limit, target);
addRowToResultTarget = false;
} else { } else {
try { try {
sqlRoutingInfo = HBaseUtils.getSQLRoutingInfo((HBaseSession) session, whereClauseSupport, this); sqlRoutingInfo = HBaseUtils.getSQLRoutingInfo((HBaseSession) session, whereClauseSupport, this);
Expand Down
Expand Up @@ -55,6 +55,7 @@
import org.lealone.message.DbException; import org.lealone.message.DbException;
import org.lealone.result.Row; import org.lealone.result.Row;
import org.lealone.result.RowList; import org.lealone.result.RowList;
import org.lealone.transaction.TransactionInterface;
import org.lealone.util.New; import org.lealone.util.New;
import org.lealone.util.StatementBuilder; import org.lealone.util.StatementBuilder;
import org.lealone.value.Value; import org.lealone.value.Value;
Expand Down Expand Up @@ -790,4 +791,9 @@ public String getPrimaryKeyName() {


return primaryKeyName; return primaryKeyName;
} }

@Override
public TransactionInterface getTransaction(Session session) {
return session.getTransaction();
}
} }
Expand Up @@ -30,9 +30,7 @@
import org.lealone.hbase.dbobject.HBaseSequence; import org.lealone.hbase.dbobject.HBaseSequence;
import org.lealone.hbase.result.HBaseRow; import org.lealone.hbase.result.HBaseRow;
import org.lealone.hbase.transaction.HBaseTransaction; import org.lealone.hbase.transaction.HBaseTransaction;
import org.lealone.message.DbException;
import org.lealone.result.Row; import org.lealone.result.Row;
import org.lealone.transaction.TransactionInterface;


public class HBaseSession extends Session { public class HBaseSession extends Session {


Expand All @@ -46,8 +44,6 @@ public class HBaseSession extends Session {
*/ */
private HRegionServer regionServer; private HRegionServer regionServer;


private volatile HBaseTransaction transaction;

public HBaseSession(Database database, User user, int id) { public HBaseSession(Database database, User user, int id) {
super(database, user, id); super(database, user, id);
} }
Expand Down Expand Up @@ -110,23 +106,14 @@ public String getHostAndPort() {
} }


public void log(HBaseRow row) { public void log(HBaseRow row) {
if (transaction == null) this.getTransaction().log(row);
throw DbException.throwInternalError();
transaction.log(row);
} }


@Override @Override
public HBaseTransaction getTransaction() { public HBaseTransaction getTransaction() {
if (transaction == null) { if (super.getTransaction() == null) {
transaction = new HBaseTransaction(this); super.setTransaction(new HBaseTransaction(this));
super.setTransaction(transaction);
} }
return transaction; return (HBaseTransaction) super.getTransaction();
}

@Override
public void setTransaction(TransactionInterface t) {
transaction = (HBaseTransaction) t;
super.setTransaction(t);
} }
} }

This file was deleted.

Expand Up @@ -24,6 +24,7 @@
import org.lealone.hbase.result.HBaseRow; import org.lealone.hbase.result.HBaseRow;
import org.lealone.hbase.util.HBaseUtils; import org.lealone.hbase.util.HBaseUtils;
import org.lealone.transaction.GlobalTransaction; import org.lealone.transaction.GlobalTransaction;
import org.lealone.transaction.TransactionManager;
import org.lealone.value.Value; import org.lealone.value.Value;


public class HBaseTransaction extends GlobalTransaction { public class HBaseTransaction extends GlobalTransaction {
Expand All @@ -32,7 +33,7 @@ public class HBaseTransaction extends GlobalTransaction {


public HBaseTransaction(Session session) { public HBaseTransaction(Session session) {
super(session); super(session);
String hostAndPort = session.getHostAndPort(); String hostAndPort = TransactionManager.getHostAndPort();


transactionMetaAdd = Bytes.toBytes(hostAndPort + "," + transactionId + "," + HBaseConstants.Tag.ADD); transactionMetaAdd = Bytes.toBytes(hostAndPort + "," + transactionId + "," + HBaseConstants.Tag.ADD);
transactionMetaDelete = Bytes.toBytes(hostAndPort + "," + transactionId + "," + HBaseConstants.Tag.DELETE); transactionMetaDelete = Bytes.toBytes(hostAndPort + "," + transactionId + "," + HBaseConstants.Tag.DELETE);
Expand Down
Expand Up @@ -29,7 +29,7 @@
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.lealone.hbase.engine.HBaseConstants; import org.lealone.hbase.engine.HBaseConstants;
import org.lealone.hbase.engine.HBaseSession; import org.lealone.hbase.engine.HBaseSession;
import org.lealone.hbase.metadata.TransactionStatusTable; import org.lealone.transaction.TransactionStatusTable;
import org.lealone.util.StringUtils; import org.lealone.util.StringUtils;


/** /**
Expand All @@ -38,7 +38,6 @@
* *
*/ */
public class ValidityChecker { public class ValidityChecker {
private final static TransactionStatusTable transactionStatusTable = TransactionStatusTable.getInstance();


public static Result checkResult(byte[] defaultColumnFamilyName, HBaseSession session, HRegionServer regionServer, public static Result checkResult(byte[] defaultColumnFamilyName, HBaseSession session, HRegionServer regionServer,
byte[] regionName, HBaseTransaction t, Result r) throws IOException { byte[] regionName, HBaseTransaction t, Result r) throws IOException {
Expand Down Expand Up @@ -68,7 +67,7 @@ public static Result checkResult(byte[] defaultColumnFamilyName, HBaseSession se
return r; return r;
} }


if ((oldTid % 2 == 0) || !transactionStatusTable.isValid(hostAndPort, oldTid, t)) { if ((oldTid % 2 == 0) || !TransactionStatusTable.isValid(session, hostAndPort, oldTid, t)) {
Get get = new Get(r.getRow()); Get get = new Get(r.getRow());
get.setMaxVersions(1); get.setMaxVersions(1);
get.setTimeRange(0, oldTid - 1); get.setTimeRange(0, oldTid - 1);
Expand Down

0 comments on commit f0913a3

Please sign in to comment.