Skip to content

Commit

Permalink
简单重构MVCC事务引擎方便扩展
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Feb 20, 2017
1 parent 9193f48 commit 35f3860
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 68 deletions.
46 changes: 26 additions & 20 deletions lealone-mvcc/src/main/java/org/lealone/mvcc/MVCCTransaction.java
Expand Up @@ -34,25 +34,29 @@
public class MVCCTransaction implements Transaction {

final MVCCTransactionEngine transactionEngine;
final long transactionId;
final String transactionName;
public final long transactionId;
public final String transactionName;

String globalTransactionName;
int logId;

private int status;
private boolean autoCommit;

private long commitTimestamp;
int status;
boolean autoCommit;
protected long commitTimestamp;
protected Session session;

private HashMap<String, Integer> savepoints;

LinkedList<LogRecord> logRecords = new LinkedList<>();

MVCCTransaction(MVCCTransactionEngine engine, long tid) {
public MVCCTransaction(MVCCTransactionEngine engine, long tid) {
this(engine, tid, null);
}

public MVCCTransaction(MVCCTransactionEngine engine, long tid, String hostAndPort) {
transactionEngine = engine;
transactionId = tid;
transactionName = getTransactionName(null, tid);
status = MVCCTransaction.STATUS_OPEN;
status = Transaction.STATUS_OPEN;
}

static class LogRecord {
Expand All @@ -69,12 +73,12 @@ public LogRecord(String mapName, Object key, TransactionalValue oldValue, Transa
}
}

void log(String mapName, Object key, TransactionalValue oldValue, TransactionalValue newValue) {
public void log(String mapName, Object key, TransactionalValue oldValue, TransactionalValue newValue) {
logRecords.add(new LogRecord(mapName, key, oldValue, newValue));
logId++;
}

void logUndo() {
public void logUndo() {
logRecords.removeLast();
--logId;
}
Expand Down Expand Up @@ -108,6 +112,10 @@ public void setAutoCommit(boolean autoCommit) {
public void setLocal(boolean local) {
}

public boolean isLocal() {
return true;
}

@Override
public void addLocalTransactionNames(String localTransactionNames) {
}
Expand Down Expand Up @@ -198,7 +206,7 @@ void endTransaction() {
transactionEngine.currentTransactions.remove(transactionId);
}

private void commitLocal() {
protected void commitLocal() {
checkNotClosed();
if (prepared) {
transactionEngine.commit(this);
Expand Down Expand Up @@ -241,7 +249,6 @@ public void rollbackToSavepoint(String name) {
}
}
}

}

@Override
Expand All @@ -251,11 +258,11 @@ public void rollbackToSavepoint(int savepointId) {
logId = savepointId;
}

long getCommitTimestamp() {
public long getCommitTimestamp() {
return commitTimestamp;
}

void setCommitTimestamp(long commitTimestamp) {
protected void setCommitTimestamp(long commitTimestamp) {
this.commitTimestamp = commitTimestamp;
}

Expand All @@ -278,7 +285,7 @@ private void rollbackTo(long toLogId) {
}
}

void checkNotClosed() {
protected void checkNotClosed() {
if (status == STATUS_CLOSED) {
throw DataUtils.newIllegalStateException(DataUtils.ERROR_CLOSED, "Transaction is closed");
}
Expand All @@ -289,7 +296,7 @@ public String toString() {
return "" + transactionId;
}

static String getTransactionName(String hostAndPort, long tid) {
public static String getTransactionName(String hostAndPort, long tid) {
if (hostAndPort == null)
hostAndPort = "0:0";
StringBuilder buff = new StringBuilder(hostAndPort);
Expand All @@ -300,15 +307,14 @@ static String getTransactionName(String hostAndPort, long tid) {

@Override
public String getGlobalTransactionName() {
return null;
return globalTransactionName;
}

@Override
public void setGlobalTransactionName(String globalTransactionName) {
this.globalTransactionName = globalTransactionName;
}

private Session session;

@Override
public void setSession(Session session) {
this.session = session;
Expand Down
Expand Up @@ -32,7 +32,6 @@

import org.lealone.common.exceptions.DbException;
import org.lealone.common.util.DataUtils;
import org.lealone.db.Constants;
import org.lealone.mvcc.MVCCTransaction.LogRecord;
import org.lealone.mvcc.log.LogStorage;
import org.lealone.mvcc.log.LogSyncService;
Expand All @@ -48,6 +47,7 @@

public class MVCCTransactionEngine extends TransactionEngineBase {

private static final String NAME = "MVCC";
private static final int DEFAULT_MAP_CACHE_SIZE = 32 * 1024 * 1024; // 32M
private static final int DEFAULT_MAP_SAVE_PERIOD = 1 * 60 * 60 * 1000; // 1小时

Expand All @@ -63,6 +63,8 @@ public class MVCCTransactionEngine extends TransactionEngineBase {
private final AtomicLong lastTransactionId = new AtomicLong();
// key: mapName, value: map key/value ByteBuffer list
private final HashMap<String, ArrayList<ByteBuffer>> pendingRedoLog = new HashMap<>();
// key: mapName
private final ConcurrentHashMap<String, TransactionMap<?, ?>> tmaps = new ConcurrentHashMap<>();

private LogStorage logStorage;
private RedoLog redoLog;
Expand All @@ -71,17 +73,21 @@ public class MVCCTransactionEngine extends TransactionEngineBase {
private boolean init;

// key: transactionId
final ConcurrentSkipListMap<Long, MVCCTransaction> currentTransactions = new ConcurrentSkipListMap<>();
public final ConcurrentSkipListMap<Long, MVCCTransaction> currentTransactions = new ConcurrentSkipListMap<>();

public MVCCTransactionEngine() {
super(Constants.DEFAULT_TRANSACTION_ENGINE_NAME);
super(NAME);
}

public MVCCTransactionEngine(String name) {
super(name);
}

StorageMap<Object, TransactionalValue> getMap(String mapName) {
return maps.get(mapName);
}

void addMap(StorageMap<Object, TransactionalValue> map) {
public void addMap(StorageMap<Object, TransactionalValue> map) {
estimatedMemory.put(map.getName(), 0);
maps.put(map.getName(), map);
}
Expand Down Expand Up @@ -236,7 +242,7 @@ private void initPendingRedoLog() {
}

@SuppressWarnings("unchecked")
<K> void redo(StorageMap<K, TransactionalValue> map) {
public <K> void redo(StorageMap<K, TransactionalValue> map) {
ArrayList<ByteBuffer> logs = pendingRedoLog.remove(map.getName());
if (logs != null && !logs.isEmpty()) {
K key;
Expand All @@ -260,13 +266,17 @@ public MVCCTransaction beginTransaction(boolean autoCommit, boolean isShardingMo
if (!init) {
throw DataUtils.newIllegalStateException(DataUtils.ERROR_TRANSACTION_ILLEGAL_STATE, "Not initialized");
}
long tid = getTransactionId(autoCommit);
MVCCTransaction t = new MVCCTransaction(this, tid);
long tid = getTransactionId(autoCommit, isShardingMode);
MVCCTransaction t = createTransaction(tid);
t.setAutoCommit(autoCommit);
currentTransactions.put(tid, t);
return t;
}

protected MVCCTransaction createTransaction(long tid) {
return new MVCCTransaction(this, tid);
}

@Override
public void close() {
logStorage.close();
Expand All @@ -279,11 +289,15 @@ public void close() {
}
}

private long getTransactionId(boolean autoCommit) {
private long getTransactionId(boolean autoCommit, boolean isShardingMode) {
// 分布式事务使用奇数的事务ID
if (!autoCommit && isShardingMode) {
return nextOddTransactionId();
}
return nextEvenTransactionId();
}

long nextOddTransactionId() {
public long nextOddTransactionId() {
return nextTransactionId(false);
}

Expand Down Expand Up @@ -331,17 +345,19 @@ void commit(MVCCTransaction t) {
}
}

void commit(MVCCTransaction t, RedoLogValue v) {
public void commit(MVCCTransaction t, RedoLogValue v) {
if (v != null) { // 事务没有进行任何操作时不用同步日志
// 先写redoLog
redoLog.put(t.transactionId, v);
logSyncService.maybeWaitForSync(redoLog, t.transactionId);
}

commitFinal(t.transactionId);
// 分布式事务推迟提交
if (t.isLocal()) {
commitFinal(t.transactionId);
}
}

private void commitFinal(long tid) {
protected void commitFinal(long tid) {
// 避免并发提交(TransactionValidator线程和其他读写线程都有可能在检查到分布式事务有效后帮助提交最终事务)
MVCCTransaction t = currentTransactions.remove(tid);
if (t == null)
Expand All @@ -368,7 +384,7 @@ private void commitFinal(long tid) {
t.endTransaction();
}

RedoLogValue getRedoLog(MVCCTransaction t) {
public RedoLogValue getRedoLog(MVCCTransaction t) {
if (t.logRecords.isEmpty())
return null;
WriteBuffer writeBuffer = WriteBufferPool.poll();
Expand Down Expand Up @@ -418,16 +434,17 @@ public boolean supportsMVCC() {
}

@Override
public void addTransactionMap(TransactionMap<?, ?> map) {
public boolean validateTransaction(String localTransactionName) {
return false;
}

@Override
public TransactionMap<?, ?> getTransactionMap(String name) {
return null;
public void addTransactionMap(TransactionMap<?, ?> map) {
tmaps.put(map.getName(), map);
}

@Override
public boolean validateTransaction(String localTransactionName) {
return false;
public TransactionMap<?, ?> getTransactionMap(String name) {
return tmaps.get(name);
}
}
Expand Up @@ -41,9 +41,9 @@ public class MVCCTransactionMap<K, V> implements TransactionMap<K, V> {
* Key: the key of the data.
* Value: { transactionId, logId, value }
*/
private final StorageMap<K, TransactionalValue> map;
protected final StorageMap<K, TransactionalValue> map;

MVCCTransactionMap(MVCCTransaction transaction, StorageMap<K, TransactionalValue> map) {
public MVCCTransactionMap(MVCCTransaction transaction, StorageMap<K, TransactionalValue> map) {
this.transaction = transaction;
this.map = map;
}
Expand Down Expand Up @@ -84,7 +84,7 @@ public V get(K key) {
* @param data the value stored in the main map
* @return the value
*/
private TransactionalValue getValue(K key, TransactionalValue data) {
protected TransactionalValue getValue(K key, TransactionalValue data) {
while (true) {
if (data == null) {
// doesn't exist or deleted by a committed transaction
Expand All @@ -99,12 +99,15 @@ private TransactionalValue getValue(K key, TransactionalValue data) {
return data;
}

TransactionalValue v = getValue(key, data, tid);
if (v != null)
return v;

if (!transaction.transactionEngine.currentTransactions.containsKey(tid)) // TODO
return null;

// get the value before the uncommitted transaction
LinkedList<LogRecord> d = transaction.transactionEngine.currentTransactions.get(tid).logRecords;

if (d == null) {
// this entry should be committed or rolled back
// in the meantime (the transaction might still be open)
Expand All @@ -123,6 +126,10 @@ private TransactionalValue getValue(K key, TransactionalValue data) {
}
}

protected TransactionalValue getValue(K key, TransactionalValue data, long tid) {
return null;
}

/**
* Update the value for the given key.
* <p>
Expand Down Expand Up @@ -185,12 +192,11 @@ public boolean tryPut(K key, V value) {
*
* @param key the key
* @param value the new value (null to remove the value)
* @return true if the value was set, false if there was a concurrent
* update
* @return true if the value was set, false if there was a concurrent update
*/
public boolean trySet(K key, V value) {
TransactionalValue current = map.get(key);
TransactionalValue newValue = new TransactionalValue(transaction.transactionId, transaction.logId, value);
TransactionalValue newValue = new TransactionalValue(transaction, value);

String mapName = getName();
if (current == null) {
Expand Down Expand Up @@ -403,8 +409,8 @@ public long sizeAsLong() {
String mapName = getName();
Storage storage = map.getStorage();
String tmpMapName = storage.nextTemporaryMapName();
StorageMap<Object, Integer> temp = storage.openMap(tmpMapName, null, new ObjectDataType(),
new ObjectDataType(), null);
StorageMap<Object, Integer> temp = storage.openMap(tmpMapName, null, new ObjectDataType(), new ObjectDataType(),
null);
try {
for (MVCCTransaction t : transaction.transactionEngine.currentTransactions.values()) {
LinkedList<LogRecord> records = t.logRecords;
Expand Down

0 comments on commit 35f3860

Please sign in to comment.