diff --git a/lealone-mvcc/src/main/java/org/lealone/transaction/mvcc/MVCCTransaction.java b/lealone-mvcc/src/main/java/org/lealone/transaction/mvcc/MVCCTransaction.java index 61c8bd661..a25fb9cdc 100644 --- a/lealone-mvcc/src/main/java/org/lealone/transaction/mvcc/MVCCTransaction.java +++ b/lealone-mvcc/src/main/java/org/lealone/transaction/mvcc/MVCCTransaction.java @@ -299,7 +299,7 @@ protected void commitFinal(long tid) { // remove the value map.remove(r.key); } else { - map.put(r.key, new TransactionalValue(value.value)); + map.put(r.key, TransactionalValue.createCommitted(value.value)); } } } diff --git a/lealone-mvcc/src/main/java/org/lealone/transaction/mvcc/MVCCTransactionMap.java b/lealone-mvcc/src/main/java/org/lealone/transaction/mvcc/MVCCTransactionMap.java index c6fed7b75..f120f99d8 100644 --- a/lealone-mvcc/src/main/java/org/lealone/transaction/mvcc/MVCCTransactionMap.java +++ b/lealone-mvcc/src/main/java/org/lealone/transaction/mvcc/MVCCTransactionMap.java @@ -123,8 +123,8 @@ protected TransactionalValue getValue(K key, TransactionalValue data) { // 数据从节点A迁移到节点B的过程中,如果把A中未提交的值也移到B中, // 那么在节点B中会读到不一致的数据,此时需要从节点A读出正确的值 // TODO 如何更高效的判断,不用比较字符串 - if (data.hostAndPort != null && !data.hostAndPort.equals(NetEndpoint.getLocalTcpHostAndPort())) { - return getRemoteTransactionalValue(data.hostAndPort, key); + if (data.getHostAndPort() != null && !data.getHostAndPort().equals(NetEndpoint.getLocalTcpHostAndPort())) { + return getRemoteTransactionalValue(data.getHostAndPort(), key); } if (tid == transaction.transactionId) { return data; @@ -134,8 +134,10 @@ protected TransactionalValue getValue(K key, TransactionalValue data) { if (v != null) return v; - if (!transaction.transactionEngine.containsTransaction(tid)) // TODO - return null; + // 底层存储写入了未提交事务的脏数据,并且在事务提交前数据库崩溃了 + if (!transaction.transactionEngine.containsTransaction(tid)) { + return data.undo(map, key); + } // get the value before the uncommitted transaction LinkedList d = transaction.transactionEngine.getTransaction(tid).logRecords; @@ -151,7 +153,7 @@ protected TransactionalValue getValue(K key, TransactionalValue data) { "The transaction log might be corrupt for key {0}", key); } } else { - LogRecord r = d.get(data.logId); + LogRecord r = d.get(data.getLogId()); data = r.oldValue; } } @@ -236,7 +238,7 @@ public boolean trySet(K key, V value) { } private boolean trySet(K key, V value, TransactionalValue oldValue) { - TransactionalValue newValue = new TransactionalValue(transaction, value); + TransactionalValue newValue = TransactionalValue.create(transaction, value, oldValue, map.getValueType()); String mapName = getName(); if (oldValue == null) { // a new value @@ -524,7 +526,7 @@ public void remove() { @SuppressWarnings("unchecked") @Override public K append(V value) { // 追加新记录时不会产生事务冲突 - TransactionalValue newValue = new TransactionalValue(transaction, value); + TransactionalValue newValue = TransactionalValue.create(transaction, value, null, null); K key = map.append(newValue); // 记事务log和append新值都是更新内存中的相应数据结构,所以不必把log调用放在append前面 // 放在前面的话调用log方法时就不知道key是什么,当事务要rollback时就不知道如何修改map的内存数据 @@ -551,7 +553,7 @@ public MVCCTransactionMap getInstance(Transaction transaction) { @SuppressWarnings("unchecked") public V putCommitted(K key, V value) { DataUtils.checkArgument(value != null, "The value may not be null"); - TransactionalValue newValue = new TransactionalValue(value); + TransactionalValue newValue = TransactionalValue.createCommitted(value); TransactionalValue oldValue = map.put(key, newValue); return (V) (oldValue == null ? null : oldValue.value); } diff --git a/lealone-mvcc/src/main/java/org/lealone/transaction/mvcc/TransactionalValue.java b/lealone-mvcc/src/main/java/org/lealone/transaction/mvcc/TransactionalValue.java index f900b966a..c9a779377 100644 --- a/lealone-mvcc/src/main/java/org/lealone/transaction/mvcc/TransactionalValue.java +++ b/lealone-mvcc/src/main/java/org/lealone/transaction/mvcc/TransactionalValue.java @@ -17,56 +17,226 @@ */ package org.lealone.transaction.mvcc; +import java.nio.ByteBuffer; + +import org.lealone.common.util.DataUtils; +import org.lealone.db.DataBuffer; +import org.lealone.db.value.ValueString; import org.lealone.net.NetEndpoint; +import org.lealone.storage.StorageMap; +import org.lealone.storage.type.StorageDataType; public class TransactionalValue { - // 每次修改记录的事务名要全局唯一, - // 比如用节点的IP拼接一个本地递增的计数器组成字符串就足够了 - public final String globalReplicationName; - public final long tid; - public final int logId; + public final long tid; // 如果是0代表事务已经提交 public final Object value; - public long version; // 每次更新时自动加1 - public boolean replicated; - public String hostAndPort; + public TransactionalValue(long tid, Object value) { + this.tid = tid; + this.value = value; + } - public TransactionalValue(Object value) { - this(0, 0, value); + public int getLogId() { + return 0; } - public TransactionalValue(long tid, int logId, Object value) { - this.tid = tid; - this.logId = logId; - this.globalReplicationName = null; - this.value = value; + public String getHostAndPort() { + return null; } - public TransactionalValue(MVCCTransaction transaction, Object value) { - this.tid = transaction.transactionId; - this.logId = transaction.logId; - this.globalReplicationName = transaction.globalTransactionName; - this.value = value; - this.hostAndPort = NetEndpoint.getLocalTcpHostAndPort(); + public String getGlobalReplicationName() { + return null; } - public TransactionalValue(long tid, int logId, Object value, long version, String globalTransactionName) { - this.tid = tid; - this.logId = logId; - this.value = value; - this.version = version; - this.globalReplicationName = globalTransactionName; + public boolean isReplicated() { + return false; + } + + public void setReplicated(boolean replicated) { + } + + public void incrementVersion() { + } + + public TransactionalValue undo(StorageMap map, K key) { + return this; } - @Override - public String toString() { - StringBuilder buff = new StringBuilder("TransactionalValue[ "); - buff.append("version = ").append(version); - buff.append(", globalReplicationName = ").append(globalReplicationName); - buff.append(", tid = ").append(tid); - buff.append(", logId = ").append(logId); - buff.append(", value = ").append(value).append(" ]"); - return buff.toString(); + public void write(DataBuffer buff, StorageDataType valueType) { + buff.putVarLong(tid); + if (value == null) { + buff.put((byte) 0); + } else { + buff.put((byte) 1); + valueType.write(buff, value); + } + } + + public static TransactionalValue read(ByteBuffer buff, StorageDataType valueType, StorageDataType oldValueType) { + long tid = DataUtils.readVarLong(buff); + Object value = null; + if (buff.get() == 1) { + value = valueType.read(buff); + } + if (tid == 0) { + return createCommitted(value); + } else { + return NotCommitted.read(tid, value, buff, oldValueType); + } + } + + public static TransactionalValue create(MVCCTransaction transaction, Object value, TransactionalValue oldValue, + StorageDataType oldValueType) { + return new NotCommitted(transaction, value, oldValue, oldValueType); + } + + public static TransactionalValue createCommitted(Object value) { + return new Committed(value); + } + + static class Committed extends TransactionalValue { + Committed(Object value) { + super(0, value); + } + + @Override + public String toString() { + StringBuilder buff = new StringBuilder("Committed[ value = "); + buff.append(value).append(" ]"); + return buff.toString(); + } + } + + static class NotCommitted extends TransactionalValue { + + // 每次修改记录的事务名要全局唯一, + // 比如用节点的IP拼接一个本地递增的计数器组成字符串就足够了 + private final int logId; + private final TransactionalValue oldValue; + private final StorageDataType oldValueType; + private final String hostAndPort; + private final String globalReplicationName; + private long version; // 每次更新时自动加1 + private boolean replicated; + + NotCommitted(MVCCTransaction transaction, Object value, TransactionalValue oldValue, + StorageDataType oldValueType) { + super(transaction.transactionId, value); + this.logId = transaction.logId; + this.oldValue = oldValue; + this.oldValueType = oldValueType; + this.hostAndPort = NetEndpoint.getLocalTcpHostAndPort(); + this.globalReplicationName = transaction.globalTransactionName; + } + + NotCommitted(long tid, Object value, int logId, TransactionalValue oldValue, StorageDataType oldValueType, + String hostAndPort, String globalTransactionName, long version) { + super(tid, value); + this.logId = logId; + this.oldValue = oldValue; + this.oldValueType = oldValueType; + this.hostAndPort = hostAndPort; + this.globalReplicationName = globalTransactionName; + this.version = version; + } + + @Override + public int getLogId() { + return logId; + } + + @Override + public String getHostAndPort() { + return hostAndPort; + } + + @Override + public String getGlobalReplicationName() { + return globalReplicationName; + } + + @Override + public boolean isReplicated() { + return replicated; + } + + @Override + public void setReplicated(boolean replicated) { + this.replicated = replicated; + } + + @Override + public void incrementVersion() { + version++; + } + + @Override + public TransactionalValue undo(StorageMap map, K key) { + if (oldValue == null) { // insert + map.remove(key); + } else { + map.put(key, oldValue); // update或delete + } + if (oldValue != null) { + return oldValue.undo(map, key); + } + return oldValue; + } + + @Override + public void write(DataBuffer buff, StorageDataType valueType) { + super.write(buff, valueType); + buff.putVarInt(logId); + if (oldValue == null) { + buff.put((byte) 0); + } else { + buff.put((byte) 1); + oldValueType.write(buff, oldValue); + } + if (hostAndPort == null) { + buff.put((byte) 0); + } else { + buff.put((byte) 1); + ValueString.type.write(buff, hostAndPort); + } + if (globalReplicationName == null) { + buff.put((byte) 0); + } else { + buff.put((byte) 1); + buff.putVarLong(version); + ValueString.type.write(buff, globalReplicationName); + } + } + + private static NotCommitted read(long tid, Object value, ByteBuffer buff, StorageDataType oldValueType) { + int logId = DataUtils.readVarInt(buff); + TransactionalValue oldValue = null; + if (buff.get() == 1) { + oldValue = (TransactionalValue) oldValueType.read(buff); + } + String hostAndPort = null; + if (buff.get() == 1) { + hostAndPort = ValueString.type.read(buff); + } + String globalReplicationName = null; + long version = 0; + if (buff.get() == 1) { + version = DataUtils.readVarLong(buff); + globalReplicationName = ValueString.type.read(buff); + } + return new NotCommitted(tid, value, logId, oldValue, oldValueType, hostAndPort, globalReplicationName, + version); + } + + @Override + public String toString() { + StringBuilder buff = new StringBuilder("NotCommitted[ "); + buff.append("tid = ").append(tid); + buff.append(", logId = ").append(logId); + buff.append(", version = ").append(version); + buff.append(", globalReplicationName = ").append(globalReplicationName); + buff.append(", value = ").append(value).append(" ]"); + return buff.toString(); + } } } diff --git a/lealone-mvcc/src/main/java/org/lealone/transaction/mvcc/TransactionalValueType.java b/lealone-mvcc/src/main/java/org/lealone/transaction/mvcc/TransactionalValueType.java index 6294f9dd4..d70303d8b 100644 --- a/lealone-mvcc/src/main/java/org/lealone/transaction/mvcc/TransactionalValueType.java +++ b/lealone-mvcc/src/main/java/org/lealone/transaction/mvcc/TransactionalValueType.java @@ -1,23 +1,27 @@ /* - * Copyright 2004-2014 H2 Group. Multiple-Licensed under the MPL 2.0, - * and the EPL 1.0 (http://h2database.com/html/license.html). - * Initial Developer: H2 Group + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.lealone.transaction.mvcc; import java.nio.ByteBuffer; -import org.lealone.common.util.DataUtils; import org.lealone.db.DataBuffer; -import org.lealone.db.value.ValueString; import org.lealone.storage.type.StorageDataType; -/** - * The value type for a transactional value. - * - * @author H2 Group - * @author zhh - */ public class TransactionalValueType implements StorageDataType { public final StorageDataType valueType; @@ -29,7 +33,8 @@ public TransactionalValueType(StorageDataType valueType) { @Override public int getMemory(Object obj) { TransactionalValue v = (TransactionalValue) obj; - return valueType.getMemory(v.value); + // tid最大8字节 + return 8 + valueType.getMemory(v.value); // TODO 由于BufferedMap的合并与复制逻辑的验证是并行的, // 可能导致split时三个复制节点中某些相同的TransactionalValue有些globalReplicationName为null,有些不为null // 这样就会得到不同的内存大小,从而使得splitKey不同 @@ -46,7 +51,7 @@ public int compare(Object aObj, Object bObj) { TransactionalValue b = (TransactionalValue) bObj; long comp = a.tid - b.tid; if (comp == 0) { - comp = a.logId - b.logId; + comp = a.getLogId() - b.getLogId(); if (comp == 0) return valueType.compare(a.value, b.value); } @@ -55,90 +60,26 @@ public int compare(Object aObj, Object bObj) { @Override public void read(ByteBuffer buff, Object[] obj, int len) { - if (buff.get() == 0) { - // fast path (no tid/logId or null entries) - for (int i = 0; i < len; i++) { - obj[i] = new TransactionalValue(valueType.read(buff)); - } - } else { - // slow path (some entries may be null) - for (int i = 0; i < len; i++) { - obj[i] = read(buff); - } + for (int i = 0; i < len; i++) { + obj[i] = read(buff); } } @Override public Object read(ByteBuffer buff) { - long tid = DataUtils.readVarLong(buff); - int logId = DataUtils.readVarInt(buff); - Object value = null; - if (buff.get() == 1) { - value = valueType.read(buff); - } - TransactionalValue transactionalValue = null; - if (buff.get() == 1) { - long version = DataUtils.readVarLong(buff); - String globalTransactionName = ValueString.type.read(buff); - transactionalValue = new TransactionalValue(tid, logId, value, version, globalTransactionName); - } else { - transactionalValue = new TransactionalValue(tid, logId, value); - } - - if (buff.get() == 1) { - transactionalValue.hostAndPort = ValueString.type.read(buff); - } - return transactionalValue; + return TransactionalValue.read(buff, valueType, this); } @Override public void write(DataBuffer buff, Object[] obj, int len) { - boolean fastPath = true; for (int i = 0; i < len; i++) { - TransactionalValue v = (TransactionalValue) obj[i]; - if (v.tid != 0 || v.value == null || v.globalReplicationName != null) { - fastPath = false; - } - } - if (fastPath) { - buff.put((byte) 0); - for (int i = 0; i < len; i++) { - TransactionalValue v = (TransactionalValue) obj[i]; - valueType.write(buff, v.value); - } - } else { - // slow path: - // store tid/logId, and some entries may be null - buff.put((byte) 1); - for (int i = 0; i < len; i++) { - write(buff, obj[i]); - } + write(buff, obj[i]); } } @Override public void write(DataBuffer buff, Object obj) { TransactionalValue v = (TransactionalValue) obj; - buff.putVarLong(v.tid); - buff.putVarInt(v.logId); - if (v.value == null) { - buff.put((byte) 0); - } else { - buff.put((byte) 1); - valueType.write(buff, v.value); - } - if (v.globalReplicationName == null) { - buff.put((byte) 0); - } else { - buff.put((byte) 1); - buff.putVarLong(v.version); - ValueString.type.write(buff, v.globalReplicationName); - } - if (v.hostAndPort == null) { - buff.put((byte) 0); - } else { - buff.put((byte) 1); - ValueString.type.write(buff, v.hostAndPort); - } + v.write(buff, valueType); } } diff --git a/lealone-mvcc/src/main/java/org/lealone/transaction/mvcc/log/LogSyncService.java b/lealone-mvcc/src/main/java/org/lealone/transaction/mvcc/log/LogSyncService.java index c4fc9cfec..d339bde28 100644 --- a/lealone-mvcc/src/main/java/org/lealone/transaction/mvcc/log/LogSyncService.java +++ b/lealone-mvcc/src/main/java/org/lealone/transaction/mvcc/log/LogSyncService.java @@ -186,7 +186,7 @@ public void redo(StorageMap map) { map.remove(key); else { value = vt.read(log); - map.put(key, new TransactionalValue(value)); + map.put(key, TransactionalValue.createCommitted(value)); } } }