Skip to content

Commit

Permalink
rename setTransaction to setParentTransaction
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Mar 31, 2017
1 parent 2bf3ee8 commit 6de04e6
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 65 deletions.
Expand Up @@ -958,6 +958,7 @@ public Object append(Object value, DataType valueType, Session session) {
ReplicationSession rs = new ReplicationSession(sessions);
rs.setRpcTimeout(ConfigDescriptor.getRpcTimeout());
rs.setAutoCommit(session.isAutoCommit());
rs.setParentTransaction(s.getTransaction());
StorageCommand c = null;
try {
c = rs.createStorageCommand();
Expand Down
37 changes: 21 additions & 16 deletions lealone-client/src/main/java/org/lealone/client/ClientCommand.java
Expand Up @@ -187,7 +187,8 @@ private Result executeQuery(int maxRows, boolean scrollable, AsyncHandler<AsyncR
int resultId = session.getNextId();
Result result = null;
try {
boolean isDistributedQuery = session.getTransaction() != null && !session.getTransaction().isAutoCommit();
boolean isDistributedQuery = session.getParentTransaction() != null
&& !session.getParentTransaction().isAutoCommit();

if (prepared) {
if (isDistributedQuery) {
Expand Down Expand Up @@ -232,7 +233,7 @@ private Result getQueryResult(boolean isDistributedQuery, int fetch, int resultI
public void runInternal() {
try {
if (isDistributedQuery)
session.getTransaction().addLocalTransactionNames(transfer.readString());
session.getParentTransaction().addLocalTransactionNames(transfer.readString());

int columnCount = transfer.readInt();
int rowCount = transfer.readInt();
Expand Down Expand Up @@ -291,7 +292,8 @@ private int executeUpdate(String replicationName, AsyncHandler<AsyncResult<Integ
}
int updateCount = 0;
try {
boolean isDistributedUpdate = session.getTransaction() != null && !session.getTransaction().isAutoCommit();
boolean isDistributedUpdate = session.getParentTransaction() != null
&& !session.getParentTransaction().isAutoCommit();

if (prepared) {
if (isDistributedUpdate) {
Expand Down Expand Up @@ -340,7 +342,7 @@ private int getUpdateCount(boolean isDistributedUpdate, int id, AsyncHandler<Asy
public void runInternal() {
try {
if (isDistributedUpdate)
session.getTransaction().addLocalTransactionNames(transfer.readString());
session.getParentTransaction().addLocalTransactionNames(transfer.readString());

int updateCount = transfer.readInt();
long key = transfer.readLong();
Expand Down Expand Up @@ -444,8 +446,9 @@ public Object executePut(String replicationName, String mapName, ByteBuffer key,
byte[] bytes = null;
int id = session.getNextId();
try {
boolean isDistributedUpdate = session.getTransaction() != null && !session.getTransaction().isAutoCommit();
if (isDistributedUpdate) {
boolean isDistributed = session.getParentTransaction() != null
&& !session.getParentTransaction().isAutoCommit();
if (isDistributed) {
session.traceOperation("COMMAND_STORAGE_DISTRIBUTED_TRANSACTION_PUT", id);
transfer.writeRequestHeader(id, Session.COMMAND_STORAGE_DISTRIBUTED_TRANSACTION_PUT);
} else if (replicationName != null) {
Expand All @@ -463,8 +466,8 @@ public Object executePut(String replicationName, String mapName, ByteBuffer key,
@Override
public void runInternal() {
try {
if (isDistributedUpdate)
session.getTransaction().addLocalTransactionNames(transfer.readString());
if (isDistributed)
session.getParentTransaction().addLocalTransactionNames(transfer.readString());
resultRef.set(transfer.readBytes());
} catch (IOException e) {
throw DbException.convert(e);
Expand All @@ -486,8 +489,9 @@ public Object executeGet(String mapName, ByteBuffer key) {
byte[] bytes = null;
int id = session.getNextId();
try {
boolean isDistributedUpdate = session.getTransaction() != null && !session.getTransaction().isAutoCommit();
if (isDistributedUpdate) {
boolean isDistributed = session.getParentTransaction() != null
&& !session.getParentTransaction().isAutoCommit();
if (isDistributed) {
session.traceOperation("COMMAND_STORAGE_DISTRIBUTED_TRANSACTION_GET", id);
transfer.writeRequestHeader(id, Session.COMMAND_STORAGE_DISTRIBUTED_TRANSACTION_GET);
} else {
Expand All @@ -500,8 +504,8 @@ public Object executeGet(String mapName, ByteBuffer key) {
@Override
public void runInternal() {
try {
if (isDistributedUpdate)
session.getTransaction().addLocalTransactionNames(transfer.readString());
if (isDistributed)
session.getParentTransaction().addLocalTransactionNames(transfer.readString());
resultRef.set(transfer.readBytes());
} catch (IOException e) {
throw DbException.convert(e);
Expand Down Expand Up @@ -551,8 +555,9 @@ public Object executeAppend(String replicationName, String mapName, ByteBuffer v
AtomicLong resultAL = new AtomicLong();
int id = session.getNextId();
try {
boolean isDistributedUpdate = session.getTransaction() != null && !session.getTransaction().isAutoCommit();
if (isDistributedUpdate) {
boolean isDistributed = session.getParentTransaction() != null
&& !session.getParentTransaction().isAutoCommit();
if (isDistributed) {
session.traceOperation("COMMAND_STORAGE_DISTRIBUTED_TRANSACTION_APPEND", id);
transfer.writeRequestHeader(id, Session.COMMAND_STORAGE_DISTRIBUTED_TRANSACTION_APPEND);
} else {
Expand All @@ -566,8 +571,8 @@ public Object executeAppend(String replicationName, String mapName, ByteBuffer v
@Override
public void runInternal() {
try {
if (isDistributedUpdate)
session.getTransaction().addLocalTransactionNames(transfer.readString());
if (isDistributed)
session.getParentTransaction().addLocalTransactionNames(transfer.readString());
resultAL.set(transfer.readLong());
} catch (IOException e) {
throw DbException.convert(e);
Expand Down
12 changes: 0 additions & 12 deletions lealone-client/src/main/java/org/lealone/client/ClientSession.java
Expand Up @@ -79,7 +79,6 @@ public class ClientSession extends SessionBase implements DataHandler, Transacti
private final Object lobSyncObject = new Object();
private int sessionId;
private LobStorage lobStorage;
private Transaction transaction;
private AsyncConnection asyncConnection;
private InetSocketAddress inetSocketAddress;

Expand Down Expand Up @@ -610,17 +609,6 @@ public void runInternal() {
}
}

// 要加synchronized,避免ClientCommand在执行更新和查询时其他线程把transaction置null
@Override
public synchronized void setTransaction(Transaction transaction) {
this.transaction = transaction;
}

@Override
public Transaction getTransaction() {
return transaction;
}

public synchronized ClientBatchCommand getClientBatchCommand(ArrayList<String> batchCommands) {
checkClosed();
return new ClientBatchCommand(this, transfer.copy(this), batchCommands);
Expand Down
4 changes: 2 additions & 2 deletions lealone-common/src/main/java/org/lealone/db/Session.java
Expand Up @@ -153,9 +153,9 @@ public interface Session extends Closeable, Transaction.Participant {

Transaction getTransaction();

boolean containsTransaction();
Transaction getParentTransaction();

void setTransaction(Transaction transaction);
void setParentTransaction(Transaction transaction);

void rollback();

Expand Down
22 changes: 17 additions & 5 deletions lealone-common/src/main/java/org/lealone/db/SessionBase.java
Expand Up @@ -21,6 +21,7 @@

import org.lealone.common.exceptions.DbException;
import org.lealone.storage.StorageMap;
import org.lealone.transaction.Transaction;

public abstract class SessionBase implements Session {

Expand All @@ -33,6 +34,7 @@ public abstract class SessionBase implements Session {
protected boolean invalid;

protected boolean autoCommit = true;
protected Transaction parentTransaction;

@Override
public String getReplicationName() {
Expand All @@ -59,11 +61,6 @@ public StorageMap<Object, Object> getStorageMap(String mapName) {
throw DbException.getUnsupportedException("getStorageMap");
}

@Override
public boolean containsTransaction() {
return false;
}

@Override
public int getNextId() {
return nextId.incrementAndGet();
Expand Down Expand Up @@ -146,4 +143,19 @@ public void setAutoCommit(boolean autoCommit) {
this.autoCommit = autoCommit;
}

@Override
public Transaction getTransaction() {
throw DbException.getUnsupportedException("getTransaction");
}

@Override
public Transaction getParentTransaction() {
return parentTransaction;
}

// 要加synchronized,避免ClientCommand在执行更新和查询时其他线程把transaction置null
@Override
public synchronized void setParentTransaction(Transaction parentTransaction) {
this.parentTransaction = parentTransaction;
}
}
Expand Up @@ -200,14 +200,9 @@ public int getModificationId() {
}

@Override
public Transaction getTransaction() {
return sessions[0].getTransaction();
}

@Override
public void setTransaction(Transaction transaction) {
public void setParentTransaction(Transaction transaction) {
for (int i = 0; i < n; i++)
sessions[i].setTransaction(transaction);
sessions[i].setParentTransaction(transaction);
}

@Override
Expand Down
5 changes: 5 additions & 0 deletions lealone-db/src/main/java/org/lealone/db/ServerCommand.java
Expand Up @@ -27,6 +27,7 @@
import org.lealone.storage.StorageMap;
import org.lealone.storage.type.WriteBuffer;
import org.lealone.storage.type.WriteBufferPool;
import org.lealone.transaction.Transaction;

public class ServerCommand extends CommandBase implements StorageCommand {

Expand Down Expand Up @@ -138,6 +139,10 @@ public Object executeAppend(String replicationName, String mapName, ByteBuffer v
StorageMap<Object, Object> map = session.getStorageMap(mapName);
Object result = map.append(map.getValueType().read(value));
commandUpdateResult.addResult(this, ((ValueLong) result).getLong());
Transaction parentTransaction = session.getParentTransaction();
if (parentTransaction != null && !parentTransaction.isAutoCommit()) {
parentTransaction.addLocalTransactionNames(session.getTransaction().getLocalTransactionNames());
}
return result;
}

Expand Down
25 changes: 4 additions & 21 deletions lealone-db/src/main/java/org/lealone/db/ServerSession.java
Expand Up @@ -536,19 +536,11 @@ public void commit(String allLocalTransactionNames) {
}
unlockAll();
clean();
releaseSessionCache();
sessionStatus = SessionStatus.NO_TRANSACTION;
}

private void endTransaction() {
if (!sessionCache.isEmpty()) {
for (Session s : sessionCache.values()) {
s.setTransaction(null);
SessionPool.release(s);
}

sessionCache.clear();
}

if (!isRoot)
setAutoCommit(true);

Expand Down Expand Up @@ -598,6 +590,8 @@ public void rollback() {
// currentStatements.get(i).rollback();
// }
currentStatements.clear();
clean();
releaseSessionCache();
}

/**
Expand Down Expand Up @@ -690,7 +684,6 @@ private void unlockAll() {
}
locks.clear();
}
releaseSessionCache();
}

public ArrayList<ServerSession> checkDeadlock() {
Expand All @@ -702,7 +695,7 @@ public ArrayList<ServerSession> checkDeadlock() {
private void releaseSessionCache() {
if (!sessionCache.isEmpty()) {
for (Session s : sessionCache.values()) {
s.setTransaction(null);
s.setParentTransaction(null);
SessionPool.release(s);
}

Expand Down Expand Up @@ -1303,11 +1296,6 @@ public SQLParser getParser() {
return database.createParser(this);
}

@Override
public void setTransaction(Transaction transaction) {
this.transaction = transaction;
}

@Override
public Session connect() {
return this;
Expand Down Expand Up @@ -1345,11 +1333,6 @@ public StorageMap<Object, Object> getStorageMap(String mapName) {
return (StorageMap<Object, Object>) transactionEngine.getTransactionMap(mapName).getInstance(getTransaction());
}

@Override
public boolean containsTransaction() {
return transaction != null;
}

private SessionStatus sessionStatus = SessionStatus.NO_TRANSACTION;

@Override
Expand Down
4 changes: 2 additions & 2 deletions lealone-db/src/main/java/org/lealone/db/SessionPool.java
Expand Up @@ -95,8 +95,8 @@ public static Command getCommand(ServerSession originalSession, PreparedStatemen
session = getSession(originalSession, url, true);
}

if (session.getTransaction() == null)
session.setTransaction(originalSession.getTransaction());
if (session.getParentTransaction() == null)
session.setParentTransaction(originalSession.getTransaction());

if (isNew)
originalSession.addSession(url, session);
Expand Down

0 comments on commit 6de04e6

Please sign in to comment.