Skip to content

Commit

Permalink
根据map的Key/ValueType提前序列化redo log的key、value
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Oct 14, 2015
1 parent ae56597 commit 1ebf6a5
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 22 deletions.
Expand Up @@ -22,6 +22,7 @@
import org.lealone.storage.Storage; import org.lealone.storage.Storage;
import org.lealone.storage.StorageMap; import org.lealone.storage.StorageMap;
import org.lealone.storage.type.DataType; import org.lealone.storage.type.DataType;
import org.lealone.storage.type.ObjectDataType;


/** /**
* A transaction. * A transaction.
Expand Down Expand Up @@ -155,11 +156,15 @@ public <K, V> MVCCTransactionMap<K, V> openMap(String name, DataType keyType, Da
@Override @Override
public <K, V> MVCCTransactionMap<K, V> openMap(String name, String mapType, DataType keyType, DataType valueType, public <K, V> MVCCTransactionMap<K, V> openMap(String name, String mapType, DataType keyType, DataType valueType,
Storage storage) { Storage storage) {
if (keyType == null)
keyType = new ObjectDataType();
if (valueType == null)
valueType = new ObjectDataType();

checkNotClosed(); checkNotClosed();
StorageMap<K, VersionedValue> map = storage.openMap(name, mapType, keyType, new VersionedValueType(valueType), StorageMap<K, VersionedValue> map = storage.openMap(name, mapType, keyType, new VersionedValueType(valueType),
null); null);
transactionEngine.addMap((StorageMap<Object, VersionedValue>) map); transactionEngine.addMap((StorageMap<Object, VersionedValue>) map);
transactionEngine.changeDataType(valueType);
return new MVCCTransactionMap<>(this, map); return new MVCCTransactionMap<>(this, map);
} }


Expand Down
Expand Up @@ -6,6 +6,7 @@
package org.lealone.transaction; package org.lealone.transaction;


import java.io.File; import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
Expand All @@ -19,9 +20,13 @@
import org.lealone.storage.StorageMap; import org.lealone.storage.StorageMap;
import org.lealone.storage.type.DataType; import org.lealone.storage.type.DataType;
import org.lealone.storage.type.ObjectDataType; import org.lealone.storage.type.ObjectDataType;
import org.lealone.storage.type.WriteBuffer;
import org.lealone.transaction.log.LogChunkMap;
import org.lealone.transaction.log.LogMap; import org.lealone.transaction.log.LogMap;
import org.lealone.transaction.log.LogStorage; import org.lealone.transaction.log.LogStorage;
import org.lealone.transaction.log.LogStorageBuilder; import org.lealone.transaction.log.LogStorageBuilder;
import org.lealone.transaction.log.RedoLogValue;
import org.lealone.transaction.log.RedoLogValueType;


/** /**
* The transaction engine that supports concurrent MVCC read-committed transactions. * The transaction engine that supports concurrent MVCC read-committed transactions.
Expand Down Expand Up @@ -60,9 +65,7 @@ public class MVCCTransactionEngine extends TransactionEngineBase {
* *
* Key: opId, value: [ mapId, key, newValue ]. * Key: opId, value: [ mapId, key, newValue ].
*/ */
LogMap<Long, Object[]> redoLog; LogMap<Long, RedoLogValue> redoLog;

private Class<?> dataTypeClass;


public MVCCTransactionEngine() { public MVCCTransactionEngine() {
super(Constants.DEFAULT_TRANSACTION_ENGINE_NAME); super(Constants.DEFAULT_TRANSACTION_ENGINE_NAME);
Expand All @@ -80,19 +83,6 @@ void removeMap(int mapId) {
maps.remove(mapId); maps.remove(mapId);
} }


// TODO 只是临时方案
void changeDataType(DataType dataType) {
if (dataType != null && (dataTypeClass == null || (dataTypeClass != dataType.getClass()))) {
dataTypeClass = dataType.getClass();
undoLog.close();
redoLog.close();
VersionedValueType oldValueType = new VersionedValueType(dataType);
ArrayType undoLogValueType = new ArrayType(new DataType[] { new ObjectDataType(), dataType, oldValueType });
undoLog = logStorage.openLogMap("undoLog", new ObjectDataType(), undoLogValueType);
redoLog = logStorage.openLogMap("redoLog", new ObjectDataType(), undoLogValueType);
}
}

/** /**
* Initialize the store. This is needed before a transaction can be opened. * Initialize the store. This is needed before a transaction can be opened.
* If the transaction store is corrupt, this method can throw an exception, * If the transaction store is corrupt, this method can throw an exception,
Expand Down Expand Up @@ -126,7 +116,7 @@ public synchronized void init(Map<String, String> config) {
"Undo map open with a different value type"); "Undo map open with a different value type");
} }


redoLog = logStorage.openLogMap("redoLog", new ObjectDataType(), undoLogValueType); redoLog = logStorage.openLogMap("redoLog", new ObjectDataType(), new RedoLogValueType());


initTransactions(); initTransactions();


Expand Down Expand Up @@ -354,7 +344,30 @@ void commitAfterValidate(int tid) {
} }


private void redoLog(Long operationId, int mapId, Object key, VersionedValue value) { private void redoLog(Long operationId, int mapId, Object key, VersionedValue value) {
redoLog.put(operationId, new Object[] { mapId, key, value }); WriteBuffer writeBuffer = LogChunkMap.getWriteBuffer();
StorageMap<?, ?> map = maps.get(mapId);

map.getKeyType().write(writeBuffer, key);
ByteBuffer keyBuffer = writeBuffer.getBuffer();
keyBuffer.flip();
keyBuffer = keyBuffer.duplicate();

writeBuffer.clear();

ByteBuffer valueBuffer;
if (value != null) {
((VersionedValueType) map.getValueType()).valueType.write(writeBuffer, value.value);
valueBuffer = writeBuffer.getBuffer();
valueBuffer.flip();
valueBuffer = valueBuffer.duplicate();
} else {
valueBuffer = LogChunkMap.EMPTY_BUFFER;
}

RedoLogValue v = new RedoLogValue(mapId, keyBuffer, valueBuffer);
redoLog.put(operationId, v);

LogChunkMap.releaseWriteBuffer(writeBuffer);
} }


@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Expand Down
Expand Up @@ -16,7 +16,7 @@
*/ */
class VersionedValueType implements DataType { class VersionedValueType implements DataType {


private final DataType valueType; final DataType valueType;


public VersionedValueType(DataType valueType) { public VersionedValueType(DataType valueType) {
this.valueType = valueType; this.valueType = valueType;
Expand Down
Expand Up @@ -37,9 +37,11 @@
* @author zhh * @author zhh
*/ */
public class LogChunkMap<K, V> extends MemoryMap<K, V> { public class LogChunkMap<K, V> extends MemoryMap<K, V> {
public static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);

private static WriteBuffer writeBuffer; private static WriteBuffer writeBuffer;


private static WriteBuffer getWriteBuffer() { public static WriteBuffer getWriteBuffer() {
WriteBuffer buff; WriteBuffer buff;
if (writeBuffer != null) { if (writeBuffer != null) {
buff = writeBuffer; buff = writeBuffer;
Expand All @@ -50,7 +52,7 @@ private static WriteBuffer getWriteBuffer() {
return buff; return buff;
} }


private static void releaseWriteBuffer(WriteBuffer buff) { public static void releaseWriteBuffer(WriteBuffer buff) {
if (buff.capacity() <= 4 * 1024 * 1024) { if (buff.capacity() <= 4 * 1024 * 1024) {
writeBuffer = buff; writeBuffer = buff;
} }
Expand Down
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.lealone.transaction.log;

import java.nio.ByteBuffer;

public class RedoLogValue {
int mapId;
ByteBuffer key;
ByteBuffer value;

public RedoLogValue(int mapId, ByteBuffer key, ByteBuffer value) {
this.mapId = mapId;
this.key = key;
this.value = value;
}
}
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.lealone.transaction.log;

import java.nio.ByteBuffer;

import org.lealone.common.message.DbException;
import org.lealone.common.util.DataUtils;
import org.lealone.storage.type.DataType;
import org.lealone.storage.type.WriteBuffer;

public class RedoLogValueType implements DataType {

@Override
public int compare(Object a, Object b) {
throw DbException.getUnsupportedException("compare");
}

@Override
public int getMemory(Object obj) {
throw DbException.getUnsupportedException("getMemory");
}

@Override
public void write(WriteBuffer buff, Object obj) {
RedoLogValue v = (RedoLogValue) obj;
buff.putVarInt(v.mapId);
buff.putVarInt(v.key.remaining());
buff.put(v.key);
buff.putVarInt(v.value.remaining());
buff.put(v.value);
}

@Override
public void write(WriteBuffer buff, Object[] obj, int len, boolean key) {
for (int i = 0; i < len; i++) {
write(buff, obj[i]);
}
}

@Override
public Object read(ByteBuffer buff) {
int mapId = DataUtils.readVarInt(buff);

byte[] key = new byte[DataUtils.readVarInt(buff)];
buff.get(key);
ByteBuffer keyBuffer = ByteBuffer.wrap(key);

byte[] value = new byte[DataUtils.readVarInt(buff)];
buff.get(value);
ByteBuffer valueBuffer = ByteBuffer.wrap(value);
return new RedoLogValue(mapId, keyBuffer, valueBuffer);
}

@Override
public void read(ByteBuffer buff, Object[] obj, int len, boolean key) {
for (int i = 0; i < len; i++) {
obj[i] = read(buff);
}
}

}

0 comments on commit 1ebf6a5

Please sign in to comment.