diff --git a/lealone-transaction/src/main/java/org/lealone/transaction/MVCCTransaction.java b/lealone-transaction/src/main/java/org/lealone/transaction/MVCCTransaction.java index 661f4208e..8aa6a8b72 100644 --- a/lealone-transaction/src/main/java/org/lealone/transaction/MVCCTransaction.java +++ b/lealone-transaction/src/main/java/org/lealone/transaction/MVCCTransaction.java @@ -22,6 +22,7 @@ import org.lealone.storage.Storage; import org.lealone.storage.StorageMap; import org.lealone.storage.type.DataType; +import org.lealone.storage.type.ObjectDataType; /** * A transaction. @@ -155,11 +156,15 @@ public MVCCTransactionMap openMap(String name, DataType keyType, Da @Override public MVCCTransactionMap openMap(String name, String mapType, DataType keyType, DataType valueType, Storage storage) { + if (keyType == null) + keyType = new ObjectDataType(); + if (valueType == null) + valueType = new ObjectDataType(); + checkNotClosed(); StorageMap map = storage.openMap(name, mapType, keyType, new VersionedValueType(valueType), null); transactionEngine.addMap((StorageMap) map); - transactionEngine.changeDataType(valueType); return new MVCCTransactionMap<>(this, map); } diff --git a/lealone-transaction/src/main/java/org/lealone/transaction/MVCCTransactionEngine.java b/lealone-transaction/src/main/java/org/lealone/transaction/MVCCTransactionEngine.java index e0a9d7b2c..3c18f6a48 100644 --- a/lealone-transaction/src/main/java/org/lealone/transaction/MVCCTransactionEngine.java +++ b/lealone-transaction/src/main/java/org/lealone/transaction/MVCCTransactionEngine.java @@ -6,6 +6,7 @@ package org.lealone.transaction; import java.io.File; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -19,9 +20,13 @@ import org.lealone.storage.StorageMap; import org.lealone.storage.type.DataType; import org.lealone.storage.type.ObjectDataType; +import org.lealone.storage.type.WriteBuffer; +import org.lealone.transaction.log.LogChunkMap; import org.lealone.transaction.log.LogMap; import org.lealone.transaction.log.LogStorage; import org.lealone.transaction.log.LogStorageBuilder; +import org.lealone.transaction.log.RedoLogValue; +import org.lealone.transaction.log.RedoLogValueType; /** * The transaction engine that supports concurrent MVCC read-committed transactions. @@ -60,9 +65,7 @@ public class MVCCTransactionEngine extends TransactionEngineBase { * * Key: opId, value: [ mapId, key, newValue ]. */ - LogMap redoLog; - - private Class dataTypeClass; + LogMap redoLog; public MVCCTransactionEngine() { super(Constants.DEFAULT_TRANSACTION_ENGINE_NAME); @@ -80,19 +83,6 @@ void removeMap(int mapId) { maps.remove(mapId); } - // TODO 只是临时方案 - void changeDataType(DataType dataType) { - if (dataType != null && (dataTypeClass == null || (dataTypeClass != dataType.getClass()))) { - dataTypeClass = dataType.getClass(); - undoLog.close(); - redoLog.close(); - VersionedValueType oldValueType = new VersionedValueType(dataType); - ArrayType undoLogValueType = new ArrayType(new DataType[] { new ObjectDataType(), dataType, oldValueType }); - undoLog = logStorage.openLogMap("undoLog", new ObjectDataType(), undoLogValueType); - redoLog = logStorage.openLogMap("redoLog", new ObjectDataType(), undoLogValueType); - } - } - /** * Initialize the store. This is needed before a transaction can be opened. * If the transaction store is corrupt, this method can throw an exception, @@ -126,7 +116,7 @@ public synchronized void init(Map config) { "Undo map open with a different value type"); } - redoLog = logStorage.openLogMap("redoLog", new ObjectDataType(), undoLogValueType); + redoLog = logStorage.openLogMap("redoLog", new ObjectDataType(), new RedoLogValueType()); initTransactions(); @@ -354,7 +344,30 @@ void commitAfterValidate(int tid) { } private void redoLog(Long operationId, int mapId, Object key, VersionedValue value) { - redoLog.put(operationId, new Object[] { mapId, key, value }); + WriteBuffer writeBuffer = LogChunkMap.getWriteBuffer(); + StorageMap map = maps.get(mapId); + + map.getKeyType().write(writeBuffer, key); + ByteBuffer keyBuffer = writeBuffer.getBuffer(); + keyBuffer.flip(); + keyBuffer = keyBuffer.duplicate(); + + writeBuffer.clear(); + + ByteBuffer valueBuffer; + if (value != null) { + ((VersionedValueType) map.getValueType()).valueType.write(writeBuffer, value.value); + valueBuffer = writeBuffer.getBuffer(); + valueBuffer.flip(); + valueBuffer = valueBuffer.duplicate(); + } else { + valueBuffer = LogChunkMap.EMPTY_BUFFER; + } + + RedoLogValue v = new RedoLogValue(mapId, keyBuffer, valueBuffer); + redoLog.put(operationId, v); + + LogChunkMap.releaseWriteBuffer(writeBuffer); } @SuppressWarnings("unchecked") diff --git a/lealone-transaction/src/main/java/org/lealone/transaction/VersionedValueType.java b/lealone-transaction/src/main/java/org/lealone/transaction/VersionedValueType.java index 0f643b427..ff933fe4f 100644 --- a/lealone-transaction/src/main/java/org/lealone/transaction/VersionedValueType.java +++ b/lealone-transaction/src/main/java/org/lealone/transaction/VersionedValueType.java @@ -16,7 +16,7 @@ */ class VersionedValueType implements DataType { - private final DataType valueType; + final DataType valueType; public VersionedValueType(DataType valueType) { this.valueType = valueType; diff --git a/lealone-transaction/src/main/java/org/lealone/transaction/log/LogChunkMap.java b/lealone-transaction/src/main/java/org/lealone/transaction/log/LogChunkMap.java index 221e60164..57bf0a153 100644 --- a/lealone-transaction/src/main/java/org/lealone/transaction/log/LogChunkMap.java +++ b/lealone-transaction/src/main/java/org/lealone/transaction/log/LogChunkMap.java @@ -37,9 +37,11 @@ * @author zhh */ public class LogChunkMap extends MemoryMap { + public static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0); + private static WriteBuffer writeBuffer; - private static WriteBuffer getWriteBuffer() { + public static WriteBuffer getWriteBuffer() { WriteBuffer buff; if (writeBuffer != null) { buff = writeBuffer; @@ -50,7 +52,7 @@ private static WriteBuffer getWriteBuffer() { return buff; } - private static void releaseWriteBuffer(WriteBuffer buff) { + public static void releaseWriteBuffer(WriteBuffer buff) { if (buff.capacity() <= 4 * 1024 * 1024) { writeBuffer = buff; } diff --git a/lealone-transaction/src/main/java/org/lealone/transaction/log/RedoLogValue.java b/lealone-transaction/src/main/java/org/lealone/transaction/log/RedoLogValue.java new file mode 100644 index 000000000..555d1c1be --- /dev/null +++ b/lealone-transaction/src/main/java/org/lealone/transaction/log/RedoLogValue.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lealone.transaction.log; + +import java.nio.ByteBuffer; + +public class RedoLogValue { + int mapId; + ByteBuffer key; + ByteBuffer value; + + public RedoLogValue(int mapId, ByteBuffer key, ByteBuffer value) { + this.mapId = mapId; + this.key = key; + this.value = value; + } +} diff --git a/lealone-transaction/src/main/java/org/lealone/transaction/log/RedoLogValueType.java b/lealone-transaction/src/main/java/org/lealone/transaction/log/RedoLogValueType.java new file mode 100644 index 000000000..aeb685528 --- /dev/null +++ b/lealone-transaction/src/main/java/org/lealone/transaction/log/RedoLogValueType.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lealone.transaction.log; + +import java.nio.ByteBuffer; + +import org.lealone.common.message.DbException; +import org.lealone.common.util.DataUtils; +import org.lealone.storage.type.DataType; +import org.lealone.storage.type.WriteBuffer; + +public class RedoLogValueType implements DataType { + + @Override + public int compare(Object a, Object b) { + throw DbException.getUnsupportedException("compare"); + } + + @Override + public int getMemory(Object obj) { + throw DbException.getUnsupportedException("getMemory"); + } + + @Override + public void write(WriteBuffer buff, Object obj) { + RedoLogValue v = (RedoLogValue) obj; + buff.putVarInt(v.mapId); + buff.putVarInt(v.key.remaining()); + buff.put(v.key); + buff.putVarInt(v.value.remaining()); + buff.put(v.value); + } + + @Override + public void write(WriteBuffer buff, Object[] obj, int len, boolean key) { + for (int i = 0; i < len; i++) { + write(buff, obj[i]); + } + } + + @Override + public Object read(ByteBuffer buff) { + int mapId = DataUtils.readVarInt(buff); + + byte[] key = new byte[DataUtils.readVarInt(buff)]; + buff.get(key); + ByteBuffer keyBuffer = ByteBuffer.wrap(key); + + byte[] value = new byte[DataUtils.readVarInt(buff)]; + buff.get(value); + ByteBuffer valueBuffer = ByteBuffer.wrap(value); + return new RedoLogValue(mapId, keyBuffer, valueBuffer); + } + + @Override + public void read(ByteBuffer buff, Object[] obj, int len, boolean key) { + for (int i = 0; i < len; i++) { + obj[i] = read(buff); + } + } + +}