Skip to content

Commit

Permalink
实现新的读时撤销算法,允许底层各类存储引擎随时保存未提交事务的脏数据
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Oct 7, 2018
1 parent 426916c commit ce26dcb
Show file tree
Hide file tree
Showing 5 changed files with 241 additions and 128 deletions.
Expand Up @@ -299,7 +299,7 @@ protected void commitFinal(long tid) {
// remove the value // remove the value
map.remove(r.key); map.remove(r.key);
} else { } else {
map.put(r.key, new TransactionalValue(value.value)); map.put(r.key, TransactionalValue.createCommitted(value.value));
} }
} }
} }
Expand Down
Expand Up @@ -123,8 +123,8 @@ protected TransactionalValue getValue(K key, TransactionalValue data) {
// 数据从节点A迁移到节点B的过程中,如果把A中未提交的值也移到B中, // 数据从节点A迁移到节点B的过程中,如果把A中未提交的值也移到B中,
// 那么在节点B中会读到不一致的数据,此时需要从节点A读出正确的值 // 那么在节点B中会读到不一致的数据,此时需要从节点A读出正确的值
// TODO 如何更高效的判断,不用比较字符串 // TODO 如何更高效的判断,不用比较字符串
if (data.hostAndPort != null && !data.hostAndPort.equals(NetEndpoint.getLocalTcpHostAndPort())) { if (data.getHostAndPort() != null && !data.getHostAndPort().equals(NetEndpoint.getLocalTcpHostAndPort())) {
return getRemoteTransactionalValue(data.hostAndPort, key); return getRemoteTransactionalValue(data.getHostAndPort(), key);
} }
if (tid == transaction.transactionId) { if (tid == transaction.transactionId) {
return data; return data;
Expand All @@ -134,8 +134,10 @@ protected TransactionalValue getValue(K key, TransactionalValue data) {
if (v != null) if (v != null)
return v; return v;


if (!transaction.transactionEngine.containsTransaction(tid)) // TODO // 底层存储写入了未提交事务的脏数据,并且在事务提交前数据库崩溃了
return null; if (!transaction.transactionEngine.containsTransaction(tid)) {
return data.undo(map, key);
}


// get the value before the uncommitted transaction // get the value before the uncommitted transaction
LinkedList<LogRecord> d = transaction.transactionEngine.getTransaction(tid).logRecords; LinkedList<LogRecord> d = transaction.transactionEngine.getTransaction(tid).logRecords;
Expand All @@ -151,7 +153,7 @@ protected TransactionalValue getValue(K key, TransactionalValue data) {
"The transaction log might be corrupt for key {0}", key); "The transaction log might be corrupt for key {0}", key);
} }
} else { } else {
LogRecord r = d.get(data.logId); LogRecord r = d.get(data.getLogId());
data = r.oldValue; data = r.oldValue;
} }
} }
Expand Down Expand Up @@ -236,7 +238,7 @@ public boolean trySet(K key, V value) {
} }


private boolean trySet(K key, V value, TransactionalValue oldValue) { private boolean trySet(K key, V value, TransactionalValue oldValue) {
TransactionalValue newValue = new TransactionalValue(transaction, value); TransactionalValue newValue = TransactionalValue.create(transaction, value, oldValue, map.getValueType());
String mapName = getName(); String mapName = getName();
if (oldValue == null) { if (oldValue == null) {
// a new value // a new value
Expand Down Expand Up @@ -524,7 +526,7 @@ public void remove() {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public K append(V value) { // 追加新记录时不会产生事务冲突 public K append(V value) { // 追加新记录时不会产生事务冲突
TransactionalValue newValue = new TransactionalValue(transaction, value); TransactionalValue newValue = TransactionalValue.create(transaction, value, null, null);
K key = map.append(newValue); K key = map.append(newValue);
// 记事务log和append新值都是更新内存中的相应数据结构,所以不必把log调用放在append前面 // 记事务log和append新值都是更新内存中的相应数据结构,所以不必把log调用放在append前面
// 放在前面的话调用log方法时就不知道key是什么,当事务要rollback时就不知道如何修改map的内存数据 // 放在前面的话调用log方法时就不知道key是什么,当事务要rollback时就不知道如何修改map的内存数据
Expand All @@ -551,7 +553,7 @@ public MVCCTransactionMap<K, V> getInstance(Transaction transaction) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public V putCommitted(K key, V value) { public V putCommitted(K key, V value) {
DataUtils.checkArgument(value != null, "The value may not be null"); DataUtils.checkArgument(value != null, "The value may not be null");
TransactionalValue newValue = new TransactionalValue(value); TransactionalValue newValue = TransactionalValue.createCommitted(value);
TransactionalValue oldValue = map.put(key, newValue); TransactionalValue oldValue = map.put(key, newValue);
return (V) (oldValue == null ? null : oldValue.value); return (V) (oldValue == null ? null : oldValue.value);
} }
Expand Down
Expand Up @@ -17,56 +17,226 @@
*/ */
package org.lealone.transaction.mvcc; package org.lealone.transaction.mvcc;


import java.nio.ByteBuffer;

import org.lealone.common.util.DataUtils;
import org.lealone.db.DataBuffer;
import org.lealone.db.value.ValueString;
import org.lealone.net.NetEndpoint; import org.lealone.net.NetEndpoint;
import org.lealone.storage.StorageMap;
import org.lealone.storage.type.StorageDataType;


public class TransactionalValue { public class TransactionalValue {


// 每次修改记录的事务名要全局唯一, public final long tid; // 如果是0代表事务已经提交
// 比如用节点的IP拼接一个本地递增的计数器组成字符串就足够了
public final String globalReplicationName;
public final long tid;
public final int logId;
public final Object value; public final Object value;


public long version; // 每次更新时自动加1 public TransactionalValue(long tid, Object value) {
public boolean replicated; this.tid = tid;
public String hostAndPort; this.value = value;
}


public TransactionalValue(Object value) { public int getLogId() {
this(0, 0, value); return 0;
} }


public TransactionalValue(long tid, int logId, Object value) { public String getHostAndPort() {
this.tid = tid; return null;
this.logId = logId;
this.globalReplicationName = null;
this.value = value;
} }


public TransactionalValue(MVCCTransaction transaction, Object value) { public String getGlobalReplicationName() {
this.tid = transaction.transactionId; return null;
this.logId = transaction.logId;
this.globalReplicationName = transaction.globalTransactionName;
this.value = value;
this.hostAndPort = NetEndpoint.getLocalTcpHostAndPort();
} }


public TransactionalValue(long tid, int logId, Object value, long version, String globalTransactionName) { public boolean isReplicated() {
this.tid = tid; return false;
this.logId = logId; }
this.value = value;
this.version = version; public void setReplicated(boolean replicated) {
this.globalReplicationName = globalTransactionName; }

public void incrementVersion() {
}

public <K> TransactionalValue undo(StorageMap<K, TransactionalValue> map, K key) {
return this;
} }


@Override public void write(DataBuffer buff, StorageDataType valueType) {
public String toString() { buff.putVarLong(tid);
StringBuilder buff = new StringBuilder("TransactionalValue[ "); if (value == null) {
buff.append("version = ").append(version); buff.put((byte) 0);
buff.append(", globalReplicationName = ").append(globalReplicationName); } else {
buff.append(", tid = ").append(tid); buff.put((byte) 1);
buff.append(", logId = ").append(logId); valueType.write(buff, value);
buff.append(", value = ").append(value).append(" ]"); }
return buff.toString(); }

public static TransactionalValue read(ByteBuffer buff, StorageDataType valueType, StorageDataType oldValueType) {
long tid = DataUtils.readVarLong(buff);
Object value = null;
if (buff.get() == 1) {
value = valueType.read(buff);
}
if (tid == 0) {
return createCommitted(value);
} else {
return NotCommitted.read(tid, value, buff, oldValueType);
}
}

public static TransactionalValue create(MVCCTransaction transaction, Object value, TransactionalValue oldValue,
StorageDataType oldValueType) {
return new NotCommitted(transaction, value, oldValue, oldValueType);
}

public static TransactionalValue createCommitted(Object value) {
return new Committed(value);
}

static class Committed extends TransactionalValue {
Committed(Object value) {
super(0, value);
}

@Override
public String toString() {
StringBuilder buff = new StringBuilder("Committed[ value = ");
buff.append(value).append(" ]");
return buff.toString();
}
}

static class NotCommitted extends TransactionalValue {

// 每次修改记录的事务名要全局唯一,
// 比如用节点的IP拼接一个本地递增的计数器组成字符串就足够了
private final int logId;
private final TransactionalValue oldValue;
private final StorageDataType oldValueType;
private final String hostAndPort;
private final String globalReplicationName;
private long version; // 每次更新时自动加1
private boolean replicated;

NotCommitted(MVCCTransaction transaction, Object value, TransactionalValue oldValue,
StorageDataType oldValueType) {
super(transaction.transactionId, value);
this.logId = transaction.logId;
this.oldValue = oldValue;
this.oldValueType = oldValueType;
this.hostAndPort = NetEndpoint.getLocalTcpHostAndPort();
this.globalReplicationName = transaction.globalTransactionName;
}

NotCommitted(long tid, Object value, int logId, TransactionalValue oldValue, StorageDataType oldValueType,
String hostAndPort, String globalTransactionName, long version) {
super(tid, value);
this.logId = logId;
this.oldValue = oldValue;
this.oldValueType = oldValueType;
this.hostAndPort = hostAndPort;
this.globalReplicationName = globalTransactionName;
this.version = version;
}

@Override
public int getLogId() {
return logId;
}

@Override
public String getHostAndPort() {
return hostAndPort;
}

@Override
public String getGlobalReplicationName() {
return globalReplicationName;
}

@Override
public boolean isReplicated() {
return replicated;
}

@Override
public void setReplicated(boolean replicated) {
this.replicated = replicated;
}

@Override
public void incrementVersion() {
version++;
}

@Override
public <K> TransactionalValue undo(StorageMap<K, TransactionalValue> map, K key) {
if (oldValue == null) { // insert
map.remove(key);
} else {
map.put(key, oldValue); // update或delete
}
if (oldValue != null) {
return oldValue.undo(map, key);
}
return oldValue;
}

@Override
public void write(DataBuffer buff, StorageDataType valueType) {
super.write(buff, valueType);
buff.putVarInt(logId);
if (oldValue == null) {
buff.put((byte) 0);
} else {
buff.put((byte) 1);
oldValueType.write(buff, oldValue);
}
if (hostAndPort == null) {
buff.put((byte) 0);
} else {
buff.put((byte) 1);
ValueString.type.write(buff, hostAndPort);
}
if (globalReplicationName == null) {
buff.put((byte) 0);
} else {
buff.put((byte) 1);
buff.putVarLong(version);
ValueString.type.write(buff, globalReplicationName);
}
}

private static NotCommitted read(long tid, Object value, ByteBuffer buff, StorageDataType oldValueType) {
int logId = DataUtils.readVarInt(buff);
TransactionalValue oldValue = null;
if (buff.get() == 1) {
oldValue = (TransactionalValue) oldValueType.read(buff);
}
String hostAndPort = null;
if (buff.get() == 1) {
hostAndPort = ValueString.type.read(buff);
}
String globalReplicationName = null;
long version = 0;
if (buff.get() == 1) {
version = DataUtils.readVarLong(buff);
globalReplicationName = ValueString.type.read(buff);
}
return new NotCommitted(tid, value, logId, oldValue, oldValueType, hostAndPort, globalReplicationName,
version);
}

@Override
public String toString() {
StringBuilder buff = new StringBuilder("NotCommitted[ ");
buff.append("tid = ").append(tid);
buff.append(", logId = ").append(logId);
buff.append(", version = ").append(version);
buff.append(", globalReplicationName = ").append(globalReplicationName);
buff.append(", value = ").append(value).append(" ]");
return buff.toString();
}
} }
} }

0 comments on commit ce26dcb

Please sign in to comment.