Skip to content

Commit

Permalink
占有不同列锁的事务可以不按顺序提交
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Oct 16, 2018
1 parent b620856 commit 7737831
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 58 deletions.
Expand Up @@ -96,13 +96,26 @@ public V remove(K key) {
}

@Override
public boolean replace(K key, V oldValue, V newValue) {
public synchronized boolean replace(K key, V oldValue, V newValue) {
V old = get(key);
if (areValuesEqual(old, oldValue)) {
put(key, newValue);
return true;
}
return false;
// // TODO 改成非同步版本,要考虑合并线程
// V old = get(key);
// if (areValuesEqual(old, oldValue)) {
// if (!buffer.containsKey(key)) {
// old = (V) buffer.putIfAbsent(key, newValue);
// return old == null;
// }
// if (buffer.replace(key, old, newValue)) {
// map.setMaxKey(key); // 更新最大key
// return true;
// }
// }
// return false;
}

@Override
Expand Down
Expand Up @@ -267,7 +267,7 @@ protected void commitFinal(long tid) {
if (t == null)
return;
for (TransactionalLogRecord r : t.logRecords) {
r.commit(transactionEngine);
r.commit(transactionEngine, tid);
}
t.endTransaction();
}
Expand Down
Expand Up @@ -103,7 +103,7 @@ public V get(K key) {
@SuppressWarnings("unchecked")
public V get(K key, int[] columnIndexes) {
TransactionalValue data = map.get(key, columnIndexes);
data = getValue(key, data, columnIndexes);
data = getValue(key, data);
return data == null ? null : (V) data.value;
}

Expand All @@ -115,10 +115,6 @@ public V get(K key, int[] columnIndexes) {
* @return the value
*/
protected TransactionalValue getValue(K key, TransactionalValue data) {
return getValue(key, data, null);
}

protected TransactionalValue getValue(K key, TransactionalValue data, int[] columnIndexes) {
while (true) {
if (data == null) {
// doesn't exist or deleted by a committed transaction
Expand All @@ -140,10 +136,6 @@ protected TransactionalValue getValue(K key, TransactionalValue data, int[] colu
return data;
}

if (!data.isLocked(columnIndexes)) {
return data; // data.getCommitted();
}

TransactionalValue v = getValue(key, data, tid);
if (v != null)
return v;
Expand Down Expand Up @@ -280,19 +272,21 @@ private boolean trySet(K key, V value, TransactionalValue oldValue, int[] column
return true;
}
long tid = oldValue.getTid();
if (tid == 0) {
if (tid == 0 || tid == transaction.transactionId || !oldValue.isLocked(columnIndexes)) {
// committed
transaction.log(mapName, key, oldValue, newValue);
TransactionalValue current = oldValue;
// the transaction is committed:
// overwrite the value
if (!map.replace(key, oldValue, newValue)) {
TransactionalValue old = map.get(key);
if (!old.isLocked(columnIndexes)) {
while (!map.replace(key, current, newValue)) {
current = map.get(key);
if (!current.isLocked(columnIndexes)) {
transaction.logUndo();
newValue = TransactionalValue.create(transaction, value, old, map.getValueType(), columnIndexes);
newValue = TransactionalValue.create(transaction, value, current, map.getValueType(),
columnIndexes);
// 只记录最初的oldValue,而不是current
transaction.log(mapName, key, oldValue, newValue);
map.put(key, newValue); // 强制覆盖
return true;
continue;
} else {
// somebody else was faster
transaction.logUndo();
Expand All @@ -301,17 +295,17 @@ private boolean trySet(K key, V value, TransactionalValue oldValue, int[] column
}
return true;
}
if (tid == transaction.transactionId || !oldValue.isLocked(columnIndexes)) {
// added or updated by this transaction
transaction.log(mapName, key, oldValue, newValue);
if (!map.replace(key, oldValue, newValue)) {
// strange, somebody overwrote the value
// even though the change was not committed
transaction.logUndo();
return false;
}
return true;
}
// if (tid == transaction.transactionId || !oldValue.isLocked(columnIndexes)) {
// // added or updated by this transaction
// transaction.log(mapName, key, oldValue, newValue);
// if (!map.replace(key, oldValue, newValue)) {
// // strange, somebody overwrote the value
// // even though the change was not committed
// transaction.logUndo();
// return false;
// }
// return true;
// }

// the transaction is not yet committed
return false;
Expand Down
Expand Up @@ -43,7 +43,7 @@ public TransactionalLogRecord(String mapName, Object key, TransactionalValue old
}

// 调用这个方法时事务已经提交,redo日志已经写完,这里只是在内存中更新到最新值
public void commit(MVCCTransactionEngine transactionEngine) {
public void commit(MVCCTransactionEngine transactionEngine, long tid) {
StorageMap<Object, TransactionalValue> map = transactionEngine.getMap(mapName);
if (map == null) {
// map was later removed
Expand All @@ -56,7 +56,12 @@ public void commit(MVCCTransactionEngine transactionEngine) {
map.remove(key);
} else {
// map.put(key, TransactionalValue.createCommitted(value.value));
map.put(key, value.commit());
// map.put(key, value.commit());
TransactionalValue newValue = value.commit(tid);
while (!map.replace(key, value, newValue)) {
value = map.get(key);
newValue = value.commit(tid);
}
}
}
}
Expand Down
Expand Up @@ -19,6 +19,7 @@

import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.LinkedList;

import org.lealone.common.util.DataUtils;
import org.lealone.db.DataBuffer;
Expand Down Expand Up @@ -75,7 +76,7 @@ public TransactionalValue getCommitted() {
return this;
}

public TransactionalValue commit() {
public TransactionalValue commit(long tid) {
return this;
}

Expand Down Expand Up @@ -164,7 +165,7 @@ static class Uncommitted extends TransactionalValue {

private final long tid;
private final int logId;
private final TransactionalValue oldValue;
private TransactionalValue oldValue;
private final StorageDataType oldValueType;
private final String hostAndPort;
// 每次修改记录的事务名要全局唯一,
Expand Down Expand Up @@ -223,6 +224,16 @@ static class Uncommitted extends TransactionalValue {
this.version = version;
}

public Uncommitted copy() {
Uncommitted u = new Uncommitted(tid, value, logId, oldValue, oldValueType, hostAndPort,
globalReplicationName, version);
u.replicated = replicated;
u.rowLock = rowLock;
u.lockedColumns = lockedColumns;
u.columnIndexes = columnIndexes;
return u;
}

@Override
public long getTid() {
return tid;
Expand Down Expand Up @@ -302,26 +313,52 @@ public boolean isCommitted() {
}

@Override
public TransactionalValue commit() {
if (oldValue != null && !oldValue.isCommitted()) {
TransactionalValue v = oldValue.getCommitted();
if (v.value != null)
oldValueType.setColumns(v.value, value, columnIndexes);
setColumns(value, columnIndexes);
return oldValue;
public TransactionalValue commit(long tid) {
int[] commitColumnIndexes = null;
Object commitValue = null;
LinkedList<Uncommitted> uncommittedList = new LinkedList<>();
if (tid != this.tid) {
uncommittedList.add(this);
} else {
return createCommitted(value);
commitColumnIndexes = columnIndexes;
commitValue = value;
}
}

void setColumns(Object newValue, int[] columnIndexes) {
if (oldValue != null) {
if (oldValue.value != null)
oldValueType.setColumns(oldValue.value, newValue, columnIndexes);
TransactionalValue oldValue = this.oldValue;
while (oldValue != null) {
if (oldValue instanceof Uncommitted) {
((Uncommitted) oldValue).setColumns(newValue, columnIndexes);
Uncommitted u = (Uncommitted) oldValue;
oldValue = u.oldValue;
if (tid != u.tid) {
// 去掉当前正在提交的事务对应的条目
uncommittedList.add(u);
} else {
commitColumnIndexes = u.columnIndexes;
commitValue = u.value;
}
} else {
oldValue = null;
}
}

Uncommitted ret = null;
Uncommitted uncommitted = null;
for (Uncommitted u : uncommittedList) {
u = u.copy(); // 避免多线程执行时修改原来的链接结构
if (uncommitted == null) {
uncommitted = u;
ret = u;
} else {
uncommitted.oldValue = u;
uncommitted = u;
}
if (u.value != null)
u.oldValueType.setColumns(u.value, commitValue, commitColumnIndexes);
}

if (ret == null)
return createCommitted(value);
else
return ret;
}

private static Uncommitted read(long tid, StorageDataType valueType, ByteBuffer buff,
Expand Down
34 changes: 24 additions & 10 deletions lealone-test/src/test/java/org/lealone/test/sql/dml/UpdateTest.java
Expand Up @@ -69,14 +69,28 @@ void testColumnLock() {
try {
conn.setAutoCommit(false);
sql = "UPDATE UpdateTest SET f1 = 'a2' WHERE pk = '02'";
assertEquals(1, executeUpdate(sql));

Thread t1 = new Thread(() -> {
assertEquals(1, executeUpdate(sql));
});
t1.start();

// 因为支持列锁,所以两个事务更新同一行的不同字段不会产生冲突
Connection conn2 = getConnection();
conn2.setAutoCommit(false);
Statement stmt2 = conn2.createStatement();
String sql2 = "UPDATE UpdateTest SET f2 = 'c' WHERE pk = '02'";
stmt2.executeUpdate(sql2);
Thread t2 = new Thread(() -> {
try {
Connection conn2 = getConnection();
conn2.setAutoCommit(false);
Statement stmt2 = conn2.createStatement();
String sql2 = "UPDATE UpdateTest SET f2 = 'c' WHERE pk = '02'";
stmt2.executeUpdate(sql2);
conn2.commit();
stmt2.close();
conn2.close();
} catch (Exception e) {
e.printStackTrace();
}
});
t2.start();

// 第三个事务不能进行,因为第一个事务锁住f1字段了
Connection conn3 = null;
Expand All @@ -99,17 +113,17 @@ void testColumnLock() {
String sql4 = "UPDATE UpdateTest SET f3 = 4 WHERE pk = '02'";
stmt4.executeUpdate(sql4);

// 可以不按顺序提交事务

conn4.commit();
stmt4.close();
conn4.close();

conn2.commit();
stmt2.close();
conn2.close();

conn.commit();
conn.setAutoCommit(true);

t1.join();
t2.join();
sql = "SELECT f1, f2, f3 FROM UpdateTest WHERE pk = '02'";
assertEquals("a2", getStringValue(1));
assertEquals("c", getStringValue(2));
Expand Down

0 comments on commit 7737831

Please sign in to comment.