Skip to content

Commit

Permalink
重构mvcc模块的部分代码,让各个类的职责更清晰
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Oct 5, 2018
1 parent 90fe0f2 commit 8e890ae
Show file tree
Hide file tree
Showing 9 changed files with 439 additions and 407 deletions.
Expand Up @@ -47,8 +47,6 @@ public interface Transaction {

long getTransactionId();

// long getCommitTimestamp();

boolean isAutoCommit();

void setAutoCommit(boolean autoCommit);
Expand Down
Expand Up @@ -17,39 +17,46 @@
*/
package org.lealone.transaction.mvcc;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;

import org.lealone.common.exceptions.DbException;
import org.lealone.common.util.DataUtils;
import org.lealone.db.DataBuffer;
import org.lealone.db.Session;
import org.lealone.db.api.ErrorCode;
import org.lealone.db.value.ValueLong;
import org.lealone.db.value.ValueString;
import org.lealone.storage.Storage;
import org.lealone.storage.StorageMap;
import org.lealone.storage.type.ObjectDataType;
import org.lealone.storage.type.StorageDataType;
import org.lealone.transaction.Transaction;
import org.lealone.transaction.mvcc.MVCCTransactionMap.MVCCReplicationMap;
import org.lealone.transaction.mvcc.log.LogSyncService;
import org.lealone.transaction.mvcc.log.RedoLogRecord;

public class MVCCTransaction implements Transaction {

// 以下几个public或包级别的字段是在其他地方频繁使用的,
// 为了使用方便或节省一点点性能开销就不通过getter方法访问了
final MVCCTransactionEngine transactionEngine;
public final long transactionId;
public final String transactionName;

String globalTransactionName;
int logId;
int status;
boolean autoCommit;
protected long commitTimestamp;
protected Session session;
LinkedList<LogRecord> logRecords = new LinkedList<>();

private HashMap<String, Integer> savepoints;
private final LogSyncService logSyncService;

LinkedList<LogRecord> logRecords = new LinkedList<>();
private HashMap<String, Integer> savepoints;
private Session session;
private int status;
private boolean autoCommit;
private boolean prepared;

public MVCCTransaction(MVCCTransactionEngine engine, long tid) {
this(engine, tid, null);
Expand All @@ -59,12 +66,13 @@ public MVCCTransaction(MVCCTransactionEngine engine, long tid, String hostAndPor
transactionEngine = engine;
transactionId = tid;
transactionName = getTransactionName(hostAndPort, tid);
logSyncService = engine.getLogSyncService();
status = Transaction.STATUS_OPEN;
}

static class LogRecord {
final String mapName;
Object key;
Object key; // 没有用final,在replicationPrepareCommit方法那里有特殊用途
final TransactionalValue oldValue;
final TransactionalValue newValue;

Expand All @@ -86,6 +94,29 @@ public void logUndo() {
--logId;
}

@Override
public String getGlobalTransactionName() {
return globalTransactionName;
}

@Override
public void setGlobalTransactionName(String globalTransactionName) {
this.globalTransactionName = globalTransactionName;
}

@Override
public void setSession(Session session) {
this.session = session;
}

public Session getSession() {
return session;
}

public boolean isShardingMode() {
return session != null && !session.isLocal() && session.isShardingMode();
}

@Override
public int getStatus() {
return status;
Expand Down Expand Up @@ -165,7 +196,7 @@ public <K, V> MVCCTransactionMap<K, V> openMap(String name, String mapType, Stor
}
StorageMap<K, TransactionalValue> map = storage.openMap(name, mapType, keyType, valueType, parameters);
if (!map.isInMemory()) {
transactionEngine.redo(map);
logSyncService.redo(map);
}
transactionEngine.addMap((StorageMap<Object, TransactionalValue>) map);
return createTransactionMap(map, isShardingMode);
Expand All @@ -192,14 +223,18 @@ public int getSavepointId() {
return logId;
}

private boolean prepared;

@Override
public void prepareCommit() {
checkNotClosed();
prepared = true;
RedoLogRecord r = transactionEngine.createRedoLogRecord(this);
transactionEngine.prepareCommit(this, r);

RedoLogRecord r = createRedoLogRecord();
// 事务没有进行任何操作时不用同步日志
if (r != null) {
// 先写redoLog
logSyncService.addRedoLogRecord(r);
}
logSyncService.prepareCommit(this);
}

@Override
Expand All @@ -217,21 +252,109 @@ public void commit(String allLocalTransactionNames) {
commitLocal();
}

void endTransaction() {
protected void commitLocal() {
checkNotClosed();
if (prepared) {
commitFinal();
if (session != null && session.getRunnable() != null) {
try {
session.getRunnable().run();
} catch (Exception e) {
throw DbException.convert(e);
}
}
} else {
RedoLogRecord r = createRedoLogRecord();
if (r != null) { // 事务没有进行任何操作时不用同步日志
// 先写redoLog
logSyncService.addAndMaybeWaitForSync(r);
}
// 分布式事务推迟提交
if (isLocal()) {
commitFinal();
}
}
}

protected void commitFinal() {
commitFinal(transactionId);
}

// tid在分布式场景下可能是其他事务的tid
protected void commitFinal(long tid) {
// 避免并发提交(TransactionValidator线程和其他读写线程都有可能在检查到分布式事务有效后帮助提交最终事务)
MVCCTransaction t = transactionEngine.removeTransaction(tid);
if (t == null)
return;
StorageMap<Object, TransactionalValue> map;
for (LogRecord r : t.logRecords) {
map = transactionEngine.getMap(r.mapName);
if (map == null) {
// map was later removed
} else {
TransactionalValue value = map.get(r.key);
if (value == null) {
// nothing to do
} else if (value.value == null) {
// remove the value
map.remove(r.key);
} else {
map.put(r.key, new TransactionalValue(value.value));
}
}
}
t.endTransaction();
}

private void endTransaction() {
savepoints = null;
logRecords = null;
status = STATUS_CLOSED;
transactionEngine.removeTransaction(transactionId);
}

protected RedoLogRecord createRedoLogRecord() {
if (logRecords.isEmpty())
return null;
try (DataBuffer writeBuffer = DataBuffer.create()) {
String mapName;
TransactionalValue value;
StorageMap<?, ?> map;
int lastPosition = 0, keyValueStart, memory;

for (LogRecord r : logRecords) {
mapName = r.mapName;
value = r.newValue;
map = transactionEngine.getMap(mapName);

// 有可能在执行DROP DATABASE时删除了
if (map == null) {
continue;
}

transactionEngine.currentTransactions.remove(transactionId);
}
ValueString.type.write(writeBuffer, mapName);
keyValueStart = writeBuffer.position();
writeBuffer.putInt(0);

protected void commitLocal() {
checkNotClosed();
if (prepared) {
transactionEngine.commit(this);
} else {
RedoLogRecord r = transactionEngine.createRedoLogRecord(this);
transactionEngine.commit(this, r);
map.getKeyType().write(writeBuffer, r.key);
if (value.value == null)
writeBuffer.put((byte) 0);
else {
writeBuffer.put((byte) 1);
((TransactionalValueType) map.getValueType()).valueType.write(writeBuffer, value.value);
}

writeBuffer.putInt(keyValueStart, writeBuffer.position() - keyValueStart - 4);
memory = writeBuffer.position() - lastPosition;
lastPosition = writeBuffer.position();
transactionEngine.incrementEstimatedMemory(mapName, memory);
}

ByteBuffer buffer = writeBuffer.getAndFlipBuffer();
ByteBuffer values = ByteBuffer.allocateDirect(buffer.limit());
values.put(buffer);
values.flip();
return new RedoLogRecord(transactionId, values);
}
}

Expand Down Expand Up @@ -277,14 +400,6 @@ public void rollbackToSavepoint(int savepointId) {
logId = savepointId;
}

public long getCommitTimestamp() {
return commitTimestamp;
}

protected void setCommitTimestamp(long commitTimestamp) {
this.commitTimestamp = commitTimestamp;
}

private void rollbackTo(long toLogId) {
while (--logId >= toLogId) {
LogRecord r = logRecords.removeLast();
Expand Down Expand Up @@ -324,32 +439,9 @@ public static String getTransactionName(String hostAndPort, long tid) {
return buff.toString();
}

@Override
public String getGlobalTransactionName() {
return globalTransactionName;
}

@Override
public void setGlobalTransactionName(String globalTransactionName) {
this.globalTransactionName = globalTransactionName;
}

@Override
public void setSession(Session session) {
this.session = session;
}

public Session getSession() {
return session;
}

public boolean isShardingMode() {
return session != null && !session.isLocal() && session.isShardingMode();
}

Object lastKey;
TransactionalValue lastValue;
StorageMap<Object, TransactionalValue> lastStorageMap;
private Object lastKey;
private TransactionalValue lastValue;
private StorageMap<Object, TransactionalValue> lastStorageMap;

@Override
public void replicationPrepareCommit(long validKey) {
Expand All @@ -363,4 +455,11 @@ public void replicationPrepareCommit(long validKey) {
logRecords.getLast().key = key; // 替换原来的key
}
}

void logAppend(StorageMap<Object, TransactionalValue> map, Object key, TransactionalValue newValue) {
log(map.getName(), key, null, newValue);
lastKey = key;
lastValue = newValue;
lastStorageMap = map;
}
}

0 comments on commit 8e890ae

Please sign in to comment.