Skip to content

Commit

Permalink
事务日志中存放mapName,不再存mapId(要考虑全局唯一性,用mapName更简单灵活)
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Oct 18, 2015
1 parent 05d14cf commit 1ac726c
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 47 deletions.
Expand Up @@ -65,7 +65,7 @@ public StandardPrimaryIndex(ServerSession session, StandardTable table) {
}
ValueDataType keyType = new ValueDataType(null, null, null);
ValueDataType valueType = new ValueDataType(database.getCompareMode(), database, sortTypes);
mapName = StandardTable.getMapNameForTable(getId());
mapName = table.getMapNameForTable(getId());

Storage storage = database.getStorage(table.getStorageEngine());
TransactionEngine transactionEngine = database.getTransactionEngine();
Expand Down
Expand Up @@ -57,7 +57,7 @@ public StandardSecondaryIndex(ServerSession session, StandardTable table, int id
sortTypes[keyColumns - 1] = SortOrder.ASCENDING;
ValueDataType keyType = new ValueDataType(db.getCompareMode(), db, sortTypes);
ValueDataType valueType = new ValueDataType(null, null, null);
mapName = StandardTable.getMapNameForIndex(getId());
mapName = table.getMapNameForIndex(getId());

Storage storage = database.getStorage(table.getStorageEngine());
TransactionEngine transactionEngine = database.getTransactionEngine();
Expand Down
13 changes: 7 additions & 6 deletions lealone-db/src/main/java/org/lealone/db/table/StandardTable.java
Expand Up @@ -402,8 +402,8 @@ public Row getRow(ServerSession session, long key) {
}

@Override
public Index addIndex(ServerSession session, String indexName, int indexId, IndexColumn[] cols, IndexType indexType,
boolean create, String indexComment) {
public Index addIndex(ServerSession session, String indexName, int indexId, IndexColumn[] cols,
IndexType indexType, boolean create, String indexComment) {
if (indexType.isPrimaryKey()) {
for (IndexColumn c : cols) {
Column column = c.column;
Expand Down Expand Up @@ -817,12 +817,13 @@ public boolean containsGlobalUniqueIndex() {
// return rowVersionMap;
// }

public static String getMapNameForTable(int id) {
return getMapName("table", id);
// 只要组合数据库id和表或索引的id就能得到一个全局唯一的map名了
public String getMapNameForTable(int id) {
return getMapName("t", database.getId(), id);
}

public static String getMapNameForIndex(int id) {
return getMapName("index", id);
public String getMapNameForIndex(int id) {
return getMapName("i", database.getId(), id);
}

private static String getMapName(Object... args) {
Expand Down
Expand Up @@ -26,7 +26,6 @@
import org.lealone.test.UnitTestBase;

public class DatabaseEngineTest extends UnitTestBase {

@Test
public void run() {

Expand Down
Expand Up @@ -329,12 +329,12 @@ public long setSavepoint() {
/**
* Add a log entry.
*
* @param mapId the map id
* @param mapName the map name
* @param key the key
* @param oldValue the old value
*/
void log(int mapId, Object key, Object oldValue) {
transactionEngine.log(this, logId, mapId, key, oldValue);
void log(String mapName, Object key, Object oldValue) {
transactionEngine.log(this, logId, mapName, key, oldValue);
// only increment the log id if logging was successful
logId++;
}
Expand Down
Expand Up @@ -19,6 +19,7 @@
import org.lealone.storage.StorageMap;
import org.lealone.storage.type.DataType;
import org.lealone.storage.type.ObjectDataType;
import org.lealone.storage.type.StringDataType;
import org.lealone.storage.type.WriteBuffer;
import org.lealone.transaction.log.LogChunkMap;
import org.lealone.transaction.log.LogMap;
Expand All @@ -32,7 +33,7 @@
*/
public class MVCCTransactionEngine extends TransactionEngineBase {

private final ConcurrentHashMap<Integer, StorageMap<Object, VersionedValue>> maps = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, StorageMap<Object, VersionedValue>> maps = new ConcurrentHashMap<>();
private final AtomicInteger lastTransactionId = new AtomicInteger();
private int maxTransactionId = 0xffff;

Expand All @@ -50,31 +51,31 @@ public class MVCCTransactionEngine extends TransactionEngineBase {
* is not possible). Log entries are written before the data is changed
* (write-ahead).
* <p>
* Key: opId, value: [ mapId, key, oldValue ].
* Key: opId, value: [ mapName, key, oldValue ].
*/
LogMap<Long, Object[]> undoLog;

/**
* The redo log.
*
* Key: opId, value: [ mapId, key, newValue ].
* Key: opId, value: [ mapName, key, newValue ].
*/
LogMap<Long, RedoLogValue> redoLog;

public MVCCTransactionEngine() {
super(Constants.DEFAULT_TRANSACTION_ENGINE_NAME);
}

StorageMap<Object, VersionedValue> getMap(int mapId) {
return maps.get(mapId);
StorageMap<Object, VersionedValue> getMap(String mapName) {
return maps.get(mapName);
}

void addMap(StorageMap<Object, VersionedValue> map) {
maps.put(map.getId(), map);
maps.put(map.getName(), map);
}

void removeMap(int mapId) {
maps.remove(mapId);
void removeMap(String mapName) {
maps.remove(mapName);
}

@Override
Expand All @@ -91,7 +92,7 @@ public synchronized void init(Map<String, String> config) {
// 就算是在同一个事务中也可能涉及同一个数据库中的多个表甚至多个数据库的多个表,
// 所以要序列化数据,只能用ObjectDataType
VersionedValueType oldValueType = new VersionedValueType(new ObjectDataType());
ArrayType undoLogValueType = new ArrayType(new DataType[] { new ObjectDataType(), new ObjectDataType(),
ArrayType undoLogValueType = new ArrayType(new DataType[] { StringDataType.INSTANCE, new ObjectDataType(),
oldValueType });
undoLog = logStorage.openLogMap("undoLog", new ObjectDataType(), undoLogValueType);

Expand Down Expand Up @@ -267,13 +268,13 @@ static long getLogId(long operationId) {
*
* @param t the transaction
* @param logId the log id
* @param mapId the map id
* @param mapName the map name
* @param key the key
* @param oldValue the old value
*/
void log(MVCCTransaction t, long logId, int mapId, Object key, Object oldValue) {
void log(MVCCTransaction t, long logId, String mapName, Object key, Object oldValue) {
Long undoKey = getOperationId(t.transactionId, logId);
Object[] log = new Object[] { mapId, key, oldValue };
Object[] log = new Object[] { mapName, key, oldValue };
synchronized (undoLog) {
if (logId == 0) {
if (undoLog.containsKey(undoKey)) {
Expand Down Expand Up @@ -321,9 +322,9 @@ void commitAfterValidate(int tid) {
removeUndoLog(tid, Long.MAX_VALUE);
}

private void redoLog(Long operationId, int mapId, Object key, VersionedValue value) {
private void redoLog(Long operationId, String mapName, Object key, VersionedValue value) {
WriteBuffer writeBuffer = WriteBufferPool.poll();
StorageMap<?, ?> map = maps.get(mapId);
StorageMap<?, ?> map = maps.get(mapName);

map.getKeyType().write(writeBuffer, key);
ByteBuffer keyBuffer = writeBuffer.getBuffer();
Expand All @@ -342,7 +343,7 @@ private void redoLog(Long operationId, int mapId, Object key, VersionedValue val
valueBuffer = LogChunkMap.EMPTY_BUFFER;
}

RedoLogValue v = new RedoLogValue(mapId, keyBuffer, valueBuffer);
RedoLogValue v = new RedoLogValue(mapName, keyBuffer, valueBuffer);
redoLog.put(operationId, v);

WriteBufferPool.offer(writeBuffer);
Expand All @@ -366,8 +367,8 @@ private void removeUndoLog(int tid, long maxLogId) {
logId = getLogId(undoKey) - 1;
continue;
}
int mapId = (Integer) op[0];
StorageMap<Object, VersionedValue> map = getMap(mapId);
String mapName = (String) op[0];
StorageMap<Object, VersionedValue> map = getMap(mapName);
if (map == null) {
// map was later removed
} else {
Expand All @@ -376,15 +377,15 @@ private void removeUndoLog(int tid, long maxLogId) {
if (value == null) {
// nothing to do
} else if (value.value == null) {
redoLog(undoKey, mapId, key, null);
redoLog(undoKey, mapName, key, null);
// remove the value
// map.remove(key);
logs.add(new Object[] { map, key, undoKey });
lastOperationId = undoKey;
} else {
VersionedValue v2 = new VersionedValue();
v2.value = value.value;
redoLog(undoKey, mapId, key, v2);
redoLog(undoKey, mapName, key, v2);
// map.put(key, v2);
logs.add(new Object[] { map, key, v2, undoKey });
lastOperationId = undoKey;
Expand Down Expand Up @@ -442,8 +443,8 @@ void rollbackTo(MVCCTransaction t, long maxLogId, long toLogId) {
logId = getLogId(undoKey) + 1;
continue;
}
int mapId = ((Integer) op[0]).intValue();
StorageMap<Object, VersionedValue> map = getMap(mapId);
String mapName = (String) op[0];
StorageMap<Object, VersionedValue> map = getMap(mapName);
if (map != null) {
Object key = op[1];
VersionedValue oldValue = (VersionedValue) op[2];
Expand Down Expand Up @@ -515,8 +516,8 @@ private void fetchNext() {
logId = getLogId(undoKey);
continue;
}
int mapId = ((Integer) op[0]).intValue();
StorageMap<Object, VersionedValue> m = getMap(mapId);
String mapName = (String) op[0];
StorageMap<Object, VersionedValue> m = getMap(mapName);
if (m == null) {
// map was removed later on
} else {
Expand Down
Expand Up @@ -220,10 +220,10 @@ public boolean trySet(K key, V value) {
VersionedValue newValue = new VersionedValue();
newValue.operationId = MVCCTransactionEngine.getOperationId(transaction.transactionId, transaction.logId);
newValue.value = value;
int mapId = getId();
String mapName = getName();
if (current == null) {
// a new value
transaction.log(mapId, key, current);
transaction.log(mapName, key, current);
VersionedValue old = map.putIfAbsent(key, newValue);
if (old != null) {
transaction.logUndo();
Expand All @@ -234,7 +234,7 @@ public boolean trySet(K key, V value) {
long id = current.operationId;
if (id == 0) {
// committed
transaction.log(mapId, key, current);
transaction.log(mapName, key, current);
// the transaction is committed:
// overwrite the value
if (!map.replace(key, current, newValue)) {
Expand All @@ -247,7 +247,7 @@ public boolean trySet(K key, V value) {
int tx = MVCCTransactionEngine.getTransactionId(current.operationId);
if (tx == transaction.transactionId) {
// added or updated by this transaction
transaction.log(mapId, key, current);
transaction.log(mapName, key, current);
if (!map.replace(key, current, newValue)) {
// strange, somebody overwrote the value
// even though the change was not committed
Expand Down Expand Up @@ -530,7 +530,7 @@ public void clear() {
@Override
public void remove() {
map.remove();
transaction.transactionEngine.removeMap(getId());
transaction.transactionEngine.removeMap(getName());
}

@Override
Expand Down
Expand Up @@ -21,6 +21,7 @@

import org.lealone.common.util.New;
import org.lealone.storage.type.ObjectDataType;
import org.lealone.storage.type.StringDataType;
import org.lealone.transaction.log.LogMap;
import org.lealone.transaction.log.LogStorage;

Expand Down Expand Up @@ -50,7 +51,7 @@ private static TransactionStatusCache newCache(String hostAndPort) {

static synchronized void init(LogStorage logStorage) {
if (map == null) {
map = logStorage.openLogMap("transactionStatusTable", new ObjectDataType(), new ObjectDataType());
map = logStorage.openLogMap("transactionStatusTable", StringDataType.INSTANCE, new ObjectDataType());
}
}

Expand Down
Expand Up @@ -20,12 +20,12 @@
import java.nio.ByteBuffer;

public class RedoLogValue {
final int mapId;
final String mapName;
final ByteBuffer key;
final ByteBuffer value;

public RedoLogValue(int mapId, ByteBuffer key, ByteBuffer value) {
this.mapId = mapId;
public RedoLogValue(String mapName, ByteBuffer key, ByteBuffer value) {
this.mapName = mapName;
this.key = key;
this.value = value;
}
Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.lealone.common.message.DbException;
import org.lealone.common.util.DataUtils;
import org.lealone.storage.type.DataType;
import org.lealone.storage.type.StringDataType;
import org.lealone.storage.type.WriteBuffer;

public class RedoLogValueType implements DataType {
Expand All @@ -39,7 +40,7 @@ public int getMemory(Object obj) {
@Override
public void write(WriteBuffer buff, Object obj) {
RedoLogValue v = (RedoLogValue) obj;
buff.putVarInt(v.mapId);
StringDataType.INSTANCE.write(buff, v.mapName);
buff.putVarInt(v.key.remaining());
buff.put(v.key);
buff.putVarInt(v.value.remaining());
Expand All @@ -55,7 +56,7 @@ public void write(WriteBuffer buff, Object[] obj, int len, boolean key) {

@Override
public Object read(ByteBuffer buff) {
int mapId = DataUtils.readVarInt(buff);
String mapName = StringDataType.INSTANCE.read(buff);

byte[] key = new byte[DataUtils.readVarInt(buff)];
buff.get(key);
Expand All @@ -64,7 +65,7 @@ public Object read(ByteBuffer buff) {
byte[] value = new byte[DataUtils.readVarInt(buff)];
buff.get(value);
ByteBuffer valueBuffer = ByteBuffer.wrap(value);
return new RedoLogValue(mapId, keyBuffer, valueBuffer);
return new RedoLogValue(mapName, keyBuffer, valueBuffer);
}

@Override
Expand Down

0 comments on commit 1ac726c

Please sign in to comment.