Skip to content

Commit

Permalink
事务的启动从TransactionEngine开始
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Jan 14, 2015
1 parent c36ed7c commit fab6b94
Show file tree
Hide file tree
Showing 16 changed files with 60 additions and 91 deletions.
Expand Up @@ -136,7 +136,6 @@ public ResultInterface executeQuery(int maxRows, boolean scrollable) {
prepareIfRequired(); prepareIfRequired();
try { try {
boolean isDistributedQuery = session.getTransaction() != null && !session.getTransaction().isAutoCommit(); boolean isDistributedQuery = session.getTransaction() != null && !session.getTransaction().isAutoCommit();
//boolean isDistributedQuery = session.isDistributed();
if (isDistributedQuery) { if (isDistributedQuery) {
session.traceOperation("COMMAND_EXECUTE_DISTRIBUTED_QUERY", id); session.traceOperation("COMMAND_EXECUTE_DISTRIBUTED_QUERY", id);
transfer.writeInt(FrontendSession.COMMAND_EXECUTE_DISTRIBUTED_QUERY).writeInt(id).writeInt(objectId) transfer.writeInt(FrontendSession.COMMAND_EXECUTE_DISTRIBUTED_QUERY).writeInt(id).writeInt(objectId)
Expand Down Expand Up @@ -186,7 +185,6 @@ public int executeUpdate() {
prepareIfRequired(); prepareIfRequired();
try { try {
boolean isDistributedUpdate = session.getTransaction() != null && !session.getTransaction().isAutoCommit(); boolean isDistributedUpdate = session.getTransaction() != null && !session.getTransaction().isAutoCommit();
//boolean isDistributedUpdate = session.isDistributed();
if (isDistributedUpdate) { if (isDistributedUpdate) {
session.traceOperation("COMMAND_EXECUTE_DISTRIBUTED_UPDATE", id); session.traceOperation("COMMAND_EXECUTE_DISTRIBUTED_UPDATE", id);
transfer.writeInt(FrontendSession.COMMAND_EXECUTE_DISTRIBUTED_UPDATE).writeInt(id); transfer.writeInt(FrontendSession.COMMAND_EXECUTE_DISTRIBUTED_UPDATE).writeInt(id);
Expand Down
Expand Up @@ -106,8 +106,6 @@ public static FrontendCommand getFrontendCommand(Session originalSession, Prepar
if (fs.getTransaction() == null) if (fs.getTransaction() == null)
fs.setTransaction(originalSession.getTransaction()); fs.setTransaction(originalSession.getTransaction());


//fs.setDistributed(!originalSession.getAutoCommit());

if (isNew) if (isNew)
originalSession.addFrontendSession(url, fs); originalSession.addFrontendSession(url, fs);


Expand Down
Expand Up @@ -34,7 +34,6 @@
import org.lealone.result.SimpleRow; import org.lealone.result.SimpleRow;
import org.lealone.result.SimpleRowValue; import org.lealone.result.SimpleRowValue;
import org.lealone.result.SortOrder; import org.lealone.result.SortOrder;
import org.lealone.transaction.TransactionInterface;
import org.lealone.util.New; import org.lealone.util.New;
import org.lealone.value.CompareMode; import org.lealone.value.CompareMode;
import org.lealone.value.Value; import org.lealone.value.Value;
Expand Down Expand Up @@ -1165,8 +1164,4 @@ public boolean isColumnsModified() {
public void setColumnsModified(boolean modified) { public void setColumnsModified(boolean modified) {
this.isColumnsModified = modified; this.isColumnsModified = modified;
} }

public TransactionInterface getTransaction(Session session) {
return null;
}
} }
2 changes: 2 additions & 0 deletions lealone-sql/src/main/java/org/lealone/engine/Database.java
Expand Up @@ -2079,6 +2079,8 @@ public void setTransactionEngine(TransactionEngine transactionEngine) {
} }


public TransactionEngine getTransactionEngine() { public TransactionEngine getTransactionEngine() {
if (transactionEngine == null)
throw new IllegalStateException("transactionEngine not init");
return transactionEngine; return transactionEngine;
} }
} }
9 changes: 4 additions & 5 deletions lealone-sql/src/main/java/org/lealone/engine/Session.java
Expand Up @@ -1363,14 +1363,13 @@ public String getURL(InetAddress host) {
private volatile TransactionInterface transaction; private volatile TransactionInterface transaction;


public TransactionInterface getTransaction() { public TransactionInterface getTransaction() {
if (transaction == null) {
transaction = database.getTransactionEngine().beginTransaction(this);
transaction.setAutoCommit(autoCommit);
}
return transaction; return transaction;
} }


public void setTransaction(TransactionInterface t) {
transaction = t;
transaction.setAutoCommit(autoCommit);
}

private static Router router = LocalRouter.getInstance(); private static Router router = LocalRouter.getInstance();


public static Router getRouter() { public static Router getRouter() {
Expand Down
Expand Up @@ -43,7 +43,7 @@
import org.lealone.result.Row; import org.lealone.result.Row;
import org.lealone.result.SortOrder; import org.lealone.result.SortOrder;
import org.lealone.transaction.local.LocalTransaction; import org.lealone.transaction.local.LocalTransaction;
import org.lealone.transaction.local.TransactionStore; import org.lealone.transaction.local.DefaultTransactionEngine;
import org.lealone.util.MathUtils; import org.lealone.util.MathUtils;
import org.lealone.util.New; import org.lealone.util.New;
import org.lealone.value.DataType; import org.lealone.value.DataType;
Expand Down Expand Up @@ -73,12 +73,12 @@ public class CBaseTable extends TableBase {
private boolean containsLargeObject; private boolean containsLargeObject;
private Column rowIdColumn; private Column rowIdColumn;


private final TransactionStore store; private final DefaultTransactionEngine transactionEngine;


public CBaseTable(CreateTableData data, CBaseStorageEngine.Store store) { public CBaseTable(CreateTableData data, CBaseStorageEngine.Store store) {
super(data); super(data);
nextAnalyze = database.getSettings().analyzeAuto; nextAnalyze = database.getSettings().analyzeAuto;
this.store = store.getTransactionStore(); this.transactionEngine = store.getTransactionEngine();
this.isHidden = data.isHidden; this.isHidden = data.isHidden;
for (Column col : getColumns()) { for (Column col : getColumns()) {
if (DataType.isLargeObject(col.getType())) { if (DataType.isLargeObject(col.getType())) {
Expand Down Expand Up @@ -411,7 +411,7 @@ public Index addIndex(Session session, String indexName, int indexId, IndexColum
int mainIndexColumn; int mainIndexColumn;
mainIndexColumn = getMainIndexColumn(indexType, cols); mainIndexColumn = getMainIndexColumn(indexType, cols);
if (database.isStarting()) { if (database.isStarting()) {
if (store.store.hasMap("index." + indexId)) { if (transactionEngine.store.hasMap("index." + indexId)) {
mainIndexColumn = -1; mainIndexColumn = -1;
} }
} else if (primaryIndex.getRowCountMax() != 0) { } else if (primaryIndex.getRowCountMax() != 0) {
Expand Down Expand Up @@ -770,15 +770,7 @@ public void checkRename() {
* @param session the session * @param session the session
* @return the transaction * @return the transaction
*/ */
@Override
public LocalTransaction getTransaction(Session session) { public LocalTransaction getTransaction(Session session) {
if (session.getTransaction() == null) {
LocalTransaction t = store.beginTransaction(session);
session.setTransaction(t);
t.setSession(session);
return t;
}

return (LocalTransaction) session.getTransaction(); return (LocalTransaction) session.getTransaction();
} }


Expand Down
Expand Up @@ -31,9 +31,9 @@
import org.lealone.mvstore.MVStoreTool; import org.lealone.mvstore.MVStoreTool;
import org.lealone.store.fs.FileChannelInputStream; import org.lealone.store.fs.FileChannelInputStream;
import org.lealone.store.fs.FileUtils; import org.lealone.store.fs.FileUtils;
import org.lealone.transaction.local.DefaultTransactionEngine;
import org.lealone.transaction.local.LocalTransaction; import org.lealone.transaction.local.LocalTransaction;
import org.lealone.transaction.local.TransactionMap; import org.lealone.transaction.local.TransactionMap;
import org.lealone.transaction.local.TransactionStore;
import org.lealone.util.BitField; import org.lealone.util.BitField;
import org.lealone.util.DataUtils; import org.lealone.util.DataUtils;
import org.lealone.util.New; import org.lealone.util.New;
Expand Down Expand Up @@ -63,7 +63,7 @@ public Table createTable(CreateTableData data) {
if (stores.get(db.getName()) == null) { if (stores.get(db.getName()) == null) {
store = init(db); store = init(db);
stores.put(db.getName(), store); stores.put(db.getName(), store);
db.setTransactionEngine(store.getTransactionStore()); db.setTransactionEngine(store.getTransactionEngine());
} }
} }
} }
Expand Down Expand Up @@ -170,26 +170,26 @@ public static class Store {
private final MVStore store; private final MVStore store;


/** /**
* The transaction store. * The transaction engine.
*/ */
private final TransactionStore transactionStore; private final DefaultTransactionEngine transactionEngine;


private long statisticsStart; private long statisticsStart;


private int temporaryMapId; private int temporaryMapId;


public Store(Database db, MVStore.Builder builder) { public Store(Database db, MVStore.Builder builder) {
this.store = builder.open(); this.store = builder.open();
this.transactionStore = new TransactionStore(store, new ValueDataType(null, db, null)); this.transactionEngine = new DefaultTransactionEngine(store, new ValueDataType(null, db, null));
transactionStore.init(); transactionEngine.init();
} }


public MVStore getStore() { public MVStore getStore() {
return store; return store;
} }


public TransactionStore getTransactionStore() { public DefaultTransactionEngine getTransactionEngine() {
return transactionStore; return transactionEngine;
} }


public HashMap<String, CBaseTable> getTables() { public HashMap<String, CBaseTable> getTables() {
Expand Down Expand Up @@ -233,7 +233,7 @@ public void closeImmediately() {
* rollback all open transactions. * rollback all open transactions.
*/ */
public void initTransactions() { public void initTransactions() {
List<LocalTransaction> list = transactionStore.getOpenTransactions(); List<LocalTransaction> list = transactionEngine.getOpenTransactions();
for (LocalTransaction t : list) { for (LocalTransaction t : list) {
if (t.getStatus() == LocalTransaction.STATUS_COMMITTING) { if (t.getStatus() == LocalTransaction.STATUS_COMMITTING) {
t.commit(); t.commit();
Expand All @@ -258,9 +258,9 @@ public void removeTemporaryMaps(Session session, BitField objectIds) {
if (!objectIds.get(id)) { if (!objectIds.get(id)) {
ValueDataType keyType = new ValueDataType(null, null, null); ValueDataType keyType = new ValueDataType(null, null, null);
ValueDataType valueType = new ValueDataType(null, null, null); ValueDataType valueType = new ValueDataType(null, null, null);
LocalTransaction t = transactionStore.beginTransaction(session); LocalTransaction t = transactionEngine.beginTransaction(session);
TransactionMap<?, ?> m = t.openMap(mapName, keyType, valueType); TransactionMap<?, ?> m = t.openMap(mapName, keyType, valueType);
transactionStore.removeMap(m); transactionEngine.removeMap(m);
t.commit(); t.commit();
} }
} }
Expand Down Expand Up @@ -290,7 +290,7 @@ public void prepareCommit(Session session, String transactionName) {
} }


public ArrayList<InDoubtTransaction> getInDoubtTransactions() { public ArrayList<InDoubtTransaction> getInDoubtTransactions() {
List<LocalTransaction> list = transactionStore.getOpenTransactions(); List<LocalTransaction> list = transactionEngine.getOpenTransactions();
ArrayList<InDoubtTransaction> result = New.arrayList(); ArrayList<InDoubtTransaction> result = New.arrayList();
for (LocalTransaction t : list) { for (LocalTransaction t : list) {
if (t.getStatus() == LocalTransaction.STATUS_PREPARED) { if (t.getStatus() == LocalTransaction.STATUS_PREPARED) {
Expand Down Expand Up @@ -353,7 +353,7 @@ public void close(long maxCompactTime) {
if (!store.isClosed() && store.getFileStore() != null) { if (!store.isClosed() && store.getFileStore() != null) {
boolean compactFully = false; boolean compactFully = false;
if (!store.getFileStore().isReadOnly()) { if (!store.getFileStore().isReadOnly()) {
transactionStore.close(); transactionEngine.close();
if (maxCompactTime == Long.MAX_VALUE) { if (maxCompactTime == Long.MAX_VALUE) {
compactFully = true; compactFully = true;
} }
Expand Down
Expand Up @@ -55,7 +55,6 @@
import org.lealone.message.DbException; import org.lealone.message.DbException;
import org.lealone.result.Row; import org.lealone.result.Row;
import org.lealone.result.RowList; import org.lealone.result.RowList;
import org.lealone.transaction.TransactionInterface;
import org.lealone.util.New; import org.lealone.util.New;
import org.lealone.util.StatementBuilder; import org.lealone.util.StatementBuilder;
import org.lealone.value.Value; import org.lealone.value.Value;
Expand Down Expand Up @@ -791,9 +790,4 @@ public String getPrimaryKeyName() {


return primaryKeyName; return primaryKeyName;
} }

@Override
public TransactionInterface getTransaction(Session session) {
return session.getTransaction();
}
} }
Expand Up @@ -45,6 +45,7 @@ public class HBaseDatabase extends Database {


public HBaseDatabase(DatabaseEngine dbEngine, boolean persistent) { public HBaseDatabase(DatabaseEngine dbEngine, boolean persistent) {
super(dbEngine, persistent); super(dbEngine, persistent);
setTransactionEngine(HBaseTransactionEngine.getInstance());
} }


@Override @Override
Expand Down
Expand Up @@ -111,9 +111,6 @@ public void log(HBaseRow row) {


@Override @Override
public HBaseTransaction getTransaction() { public HBaseTransaction getTransaction() {
if (super.getTransaction() == null) {
super.setTransaction(new HBaseTransaction(this));
}
return (HBaseTransaction) super.getTransaction(); return (HBaseTransaction) super.getTransaction();
} }
} }
Expand Up @@ -53,7 +53,6 @@ protected TransactionBase(Session session) {
} }


public void setSession(Session session) { public void setSession(Session session) {
session.setTransaction(this);
this.session = session; this.session = session;
autoCommit = session.getAutoCommit(); autoCommit = session.getAutoCommit();
} }
Expand Down
Expand Up @@ -46,38 +46,32 @@ public int executeDefineCommand(DefineCommand defineCommand) {


@Override @Override
public int executeInsert(Insert insert) { public int executeInsert(Insert insert) {
insert.getTable().getTransaction(insert.getSession());
return execute(insert.isBatch(), insert); return execute(insert.isBatch(), insert);
} }


@Override @Override
public int executeMerge(Merge merge) { public int executeMerge(Merge merge) {
merge.getTable().getTransaction(merge.getSession());
return execute(merge.isBatch(), merge); return execute(merge.isBatch(), merge);
} }


@Override @Override
public int executeDelete(Delete delete) { public int executeDelete(Delete delete) {
delete.getTable().getTransaction(delete.getSession());
return execute(true, delete); return execute(true, delete);
} }


@Override @Override
public int executeUpdate(Update update) { public int executeUpdate(Update update) {
update.getTable().getTransaction(update.getSession());
return execute(true, update); return execute(true, update);
} }


@Override @Override
public ResultInterface executeSelect(Select select, int maxRows, boolean scrollable) { public ResultInterface executeSelect(Select select, int maxRows, boolean scrollable) {
select.getTable().getTransaction(select.getSession());
beginTransaction(select); beginTransaction(select);
return nestedRouter.executeSelect(select, maxRows, scrollable); return nestedRouter.executeSelect(select, maxRows, scrollable);
} }


private void beginTransaction(Prepared p) { private void beginTransaction(Prepared p) {
// if (p.getSession().getTransaction() == null) p.getSession().getTransaction();
// TransactionManager.beginTransaction(p.getSession());
} }


private int execute(boolean isBatch, Prepared p) { private int execute(boolean isBatch, Prepared p) {
Expand Down
Expand Up @@ -20,9 +20,9 @@
import org.lealone.util.New; import org.lealone.util.New;


/** /**
* A store that supports concurrent MVCC read-committed transactions. * The default transaction engine that supports concurrent MVCC read-committed transactions.
*/ */
public class TransactionStore implements TransactionEngine { public class DefaultTransactionEngine implements TransactionEngine {


/** /**
* The store. * The store.
Expand Down Expand Up @@ -76,7 +76,7 @@ public class TransactionStore implements TransactionEngine {
* *
* @param store the store * @param store the store
*/ */
public TransactionStore(MVStore store) { public DefaultTransactionEngine(MVStore store) {
this(store, new ObjectDataType()); this(store, new ObjectDataType());
} }


Expand All @@ -86,7 +86,7 @@ public TransactionStore(MVStore store) {
* @param store the store * @param store the store
* @param dataType the data type for map keys and values * @param dataType the data type for map keys and values
*/ */
public TransactionStore(MVStore store, DataType dataType) { public DefaultTransactionEngine(MVStore store, DataType dataType) {
this.store = store; this.store = store;
this.dataType = dataType; this.dataType = dataType;
preparedTransactions = store.openMap("openTransactions", new MVMap.Builder<Integer, Object[]>()); preparedTransactions = store.openMap("openTransactions", new MVMap.Builder<Integer, Object[]>());
Expand Down

0 comments on commit fab6b94

Please sign in to comment.