Skip to content

Commit

Permalink
简化RedoLog的实现逻辑
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Dec 2, 2017
1 parent 3ca1658 commit 206a5f1
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 261 deletions.
Expand Up @@ -20,7 +20,6 @@
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
Expand Down Expand Up @@ -93,9 +92,8 @@ public void addMap(StorageMap<Object, TransactionalValue> map) {
void removeMap(String mapName) { void removeMap(String mapName) {
estimatedMemory.remove(mapName); estimatedMemory.remove(mapName);
maps.remove(mapName); maps.remove(mapName);
long tid = nextEvenTransactionId();
RedoLogValue rlv = new RedoLogValue(mapName); RedoLogValue rlv = new RedoLogValue(mapName);
redoLog.put(tid, rlv); redoLog.addRedoLogValue(rlv);
logSyncService.maybeWaitForSync(rlv); logSyncService.maybeWaitForSync(rlv);
} }


Expand Down Expand Up @@ -126,7 +124,6 @@ void close() {


@Override @Override
public void run() { public void run() {
Long checkpoint = null;
while (!isClosed) { while (!isClosed) {
try { try {
semaphore.tryAcquire(sleep, TimeUnit.MILLISECONDS); semaphore.tryAcquire(sleep, TimeUnit.MILLISECONDS);
Expand All @@ -136,10 +133,6 @@ public void run() {
} }


long now = System.currentTimeMillis(); long now = System.currentTimeMillis();

if (redoLog.getLastSyncKey() != null)
checkpoint = redoLog.getLastSyncKey();

boolean writeCheckpoint = false; boolean writeCheckpoint = false;
for (Entry<String, Integer> e : estimatedMemory.entrySet()) { for (Entry<String, Integer> e : estimatedMemory.entrySet()) {
if (isClosed || e.getValue() > mapCacheSize || lastSavedAt + mapSavePeriod > now) { if (isClosed || e.getValue() > mapCacheSize || lastSavedAt + mapSavePeriod > now) {
Expand All @@ -150,10 +143,8 @@ public void run() {
if (lastSavedAt + mapSavePeriod > now) if (lastSavedAt + mapSavePeriod > now)
lastSavedAt = now; lastSavedAt = now;


if (writeCheckpoint && checkpoint != null) { if (writeCheckpoint) {
RedoLogValue rlv = new RedoLogValue(checkpoint); redoLog.writeCheckpoint();
redoLog.put(checkpoint, rlv);
logSyncService.maybeWaitForSync(rlv);
} }
} }
} }
Expand Down Expand Up @@ -185,25 +176,18 @@ public synchronized void init(Map<String, String> config) {
logSyncService = redoLog.getLogSyncService(); logSyncService = redoLog.getLogSyncService();
initPendingRedoLog(); initPendingRedoLog();


Long key = redoLog.lastKey(); // 调用完initPendingRedoLog后再启动logSyncService
if (key != null) logSyncService.start();
lastTransactionId.set(key);

storageMapSaveService = new StorageMapSaveService(sleep); storageMapSaveService = new StorageMapSaveService(sleep);
storageMapSaveService.start(); storageMapSaveService.start();
} }


private void initPendingRedoLog() { private void initPendingRedoLog() {
Long checkpoint = null; long lastTransactionId = 0;
for (Entry<Long, RedoLogValue> e : redoLog.entrySet()) { for (RedoLogValue v : redoLog.getAndResetRedoLogValues()) {
if (e.getValue().checkpoint != null) if (v.transactionId != null && v.transactionId > lastTransactionId) {
checkpoint = e.getValue().checkpoint; lastTransactionId = v.transactionId;
} }

Iterator<Entry<Long, RedoLogValue>> cursor = redoLog.cursor(checkpoint);
while (cursor.hasNext()) {
Entry<Long, RedoLogValue> e = cursor.next();
RedoLogValue v = e.getValue();
if (v.droppedMap != null) { if (v.droppedMap != null) {
ArrayList<ByteBuffer> logs = pendingRedoLog.get(v.droppedMap); ArrayList<ByteBuffer> logs = pendingRedoLog.get(v.droppedMap);
if (logs != null) { if (logs != null) {
Expand All @@ -229,6 +213,7 @@ private void initPendingRedoLog() {
} }
} }
} }
this.lastTransactionId.set(lastTransactionId);
} }


@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Expand Down Expand Up @@ -316,7 +301,7 @@ void prepareCommit(MVCCTransaction t, RedoLogValue v) {
// 事务没有进行任何操作时不用同步日志 // 事务没有进行任何操作时不用同步日志
if (v != null) { if (v != null) {
// 先写redoLog // 先写redoLog
redoLog.put(t.transactionId, v); redoLog.addRedoLogValue(v);
} }
logSyncService.prepareCommit(t); logSyncService.prepareCommit(t);
} }
Expand All @@ -335,7 +320,7 @@ void commit(MVCCTransaction t) {
public void commit(MVCCTransaction t, RedoLogValue v) { public void commit(MVCCTransaction t, RedoLogValue v) {
if (v != null) { // 事务没有进行任何操作时不用同步日志 if (v != null) { // 事务没有进行任何操作时不用同步日志
// 先写redoLog // 先写redoLog
redoLog.put(t.transactionId, v); redoLog.addRedoLogValue(v);
logSyncService.maybeWaitForSync(v); logSyncService.maybeWaitForSync(v);
} }
// 分布式事务推迟提交 // 分布式事务推迟提交
Expand Down Expand Up @@ -408,7 +393,7 @@ public RedoLogValue getRedoLog(MVCCTransaction t) {
ByteBuffer values = ByteBuffer.allocateDirect(buffer.limit()); ByteBuffer values = ByteBuffer.allocateDirect(buffer.limit());
values.put(buffer); values.put(buffer);
values.flip(); values.flip();
return new RedoLogValue(values); return new RedoLogValue(t.transactionId, values);
} }
} }


Expand Down
32 changes: 11 additions & 21 deletions lealone-mvcc/src/main/java/org/lealone/mvcc/log/RedoLog.java
Expand Up @@ -18,10 +18,8 @@
package org.lealone.mvcc.log; package org.lealone.mvcc.log;


import java.io.File; import java.io.File;
import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Queue;
import java.util.Set;


import org.lealone.db.Constants; import org.lealone.db.Constants;
import org.lealone.storage.fs.FilePath; import org.lealone.storage.fs.FilePath;
Expand Down Expand Up @@ -86,21 +84,15 @@ else if (LOG_SYNC_TYPE_NO_SYNC.equalsIgnoreCase(logSyncType))
throw new IllegalArgumentException("Unknow log_sync_type: " + logSyncType); throw new IllegalArgumentException("Unknow log_sync_type: " + logSyncType);


current = new RedoLogChunk(lastId, config); current = new RedoLogChunk(lastId, config);

logSyncService.setRedoLog(this); logSyncService.setRedoLog(this);
logSyncService.start();
}

public void put(long key, RedoLogValue value) {
current.put(key, value);
} }


public Iterator<Entry<Long, RedoLogValue>> cursor(Long from) { public void addRedoLogValue(RedoLogValue value) {
return current.cursor(from); current.addRedoLogValue(value);
} }


public Set<Entry<Long, RedoLogValue>> entrySet() { public Queue<RedoLogValue> getAndResetRedoLogValues() {
return current.entrySet(); return current.getAndResetRedoLogValues();
} }


public void close() { public void close() {
Expand All @@ -122,16 +114,14 @@ public void save() {
} }
} }


public Long lastKey() {
return current.lastKey();
}

public Long getLastSyncKey() {
return current.getLastSyncKey();
}

public LogSyncService getLogSyncService() { public LogSyncService getLogSyncService() {
return logSyncService; return logSyncService;
} }


public void writeCheckpoint() {
RedoLogValue rlv = new RedoLogValue(true);
addRedoLogValue(rlv);
logSyncService.maybeWaitForSync(rlv);
}

} }
85 changes: 21 additions & 64 deletions lealone-mvcc/src/main/java/org/lealone/mvcc/log/RedoLogChunk.java
Expand Up @@ -19,19 +19,14 @@


import java.io.File; import java.io.File;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.concurrent.LinkedTransferQueue;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;


import org.lealone.db.DataBuffer; import org.lealone.db.DataBuffer;
import org.lealone.storage.fs.FileStorage; import org.lealone.storage.fs.FileStorage;
import org.lealone.storage.type.StorageDataType;


/** /**
* A skipList-based redo log chunk * A queue-based redo log chunk
* *
* @author zhh * @author zhh
*/ */
Expand All @@ -44,37 +39,18 @@ private static String getChunkFileName(Map<String, String> config, int id) {
return storageName + File.separator + CHUNK_FILE_NAME_PREFIX + id; return storageName + File.separator + CHUNK_FILE_NAME_PREFIX + id;
} }


private static class KeyComparator<K> implements java.util.Comparator<K> {
StorageDataType keyType;

public KeyComparator(StorageDataType keyType) {
this.keyType = keyType;
}

@Override
public int compare(K k1, K k2) {
return keyType.compare(k1, k2);
}
}

private final int id; private final int id;
private final StorageDataType keyType;
private final StorageDataType valueType;
private ConcurrentSkipListMap<Long, RedoLogValue> skipListMap;
private final FileStorage fileStorage; private final FileStorage fileStorage;


private LinkedTransferQueue<RedoLogValue> queue;
private long pos; private long pos;
private volatile Long lastSyncKey;


RedoLogChunk(int id, Map<String, String> config) { RedoLogChunk(int id, Map<String, String> config) {
this.id = id; this.id = id;
// 不使用ObjectDataType,因为ObjectDataType需要自动侦测,会有一些开销
this.keyType = new RedoLogKeyType();
this.valueType = new RedoLogValueType();
skipListMap = new ConcurrentSkipListMap<>(new KeyComparator<Long>(keyType));
String chunkFileName = getChunkFileName(config, id); String chunkFileName = getChunkFileName(config, id);
fileStorage = new FileStorage(); fileStorage = new FileStorage();
fileStorage.open(chunkFileName, config); fileStorage.open(chunkFileName, config);
queue = new LinkedTransferQueue<>();
pos = fileStorage.size(); pos = fileStorage.size();
if (pos > 0) if (pos > 0)
read(); read();
Expand All @@ -83,27 +59,26 @@ public int compare(K k1, K k2) {
private void read() { private void read() {
ByteBuffer buffer = fileStorage.readFully(0, (int) pos); ByteBuffer buffer = fileStorage.readFully(0, (int) pos);
while (buffer.remaining() > 0) { while (buffer.remaining() > 0) {
Long k = (Long) keyType.read(buffer); RedoLogValue v = RedoLogValue.read(buffer);
RedoLogValue v = (RedoLogValue) valueType.read(buffer); if (v.checkpoint)
skipListMap.put(k, v); queue = new LinkedTransferQueue<>();
lastSyncKey = k; else
queue.add(v);
} }
} }


int getId() { int getId() {
return id; return id;
} }


void put(Long key, RedoLogValue value) { void addRedoLogValue(RedoLogValue value) {
skipListMap.put(key, value); queue.add(value);
} }


Iterator<Entry<Long, RedoLogValue>> cursor(Long from) { LinkedTransferQueue<RedoLogValue> getAndResetRedoLogValues() {
return from == null ? skipListMap.entrySet().iterator() : skipListMap.tailMap(from).entrySet().iterator(); LinkedTransferQueue<RedoLogValue> oldQueue = this.queue;
} this.queue = new LinkedTransferQueue<>();

return oldQueue;
Set<Entry<Long, RedoLogValue>> entrySet() {
return skipListMap.entrySet();
} }


void close() { void close() {
Expand All @@ -112,16 +87,11 @@ void close() {
} }


synchronized void save() { synchronized void save() {
ConcurrentSkipListMap<Long, RedoLogValue> newSkipListMap = this.skipListMap; LinkedTransferQueue<RedoLogValue> oldQueue = getAndResetRedoLogValues();
this.skipListMap = new ConcurrentSkipListMap<>(new KeyComparator<Long>(keyType)); if (!oldQueue.isEmpty()) {
Long lastKey = this.lastSyncKey;
Set<Entry<Long, RedoLogValue>> entrySet = newSkipListMap.entrySet();
if (!entrySet.isEmpty()) {
try (DataBuffer buff = DataBuffer.create()) { try (DataBuffer buff = DataBuffer.create()) {
for (Entry<Long, RedoLogValue> e : entrySet) { for (RedoLogValue v : oldQueue) {
lastKey = e.getKey(); v.write(buff);
keyType.write(buff, lastKey);
valueType.write(buff, e.getValue());
} }
int chunkLength = buff.position(); int chunkLength = buff.position();
if (chunkLength > 0) { if (chunkLength > 0) {
Expand All @@ -131,9 +101,8 @@ synchronized void save() {
pos += chunkLength; pos += chunkLength;
fileStorage.sync(); fileStorage.sync();
} }
this.lastSyncKey = lastKey; for (RedoLogValue v : oldQueue) {
for (Entry<Long, RedoLogValue> e : entrySet) { v.synced = true;
e.getValue().synced = true;
} }
} }
} }
Expand All @@ -143,18 +112,6 @@ long logChunkSize() {
return pos; return pos;
} }


Long lastKey() {
try {
return skipListMap.lastKey();
} catch (NoSuchElementException e) {
return null;
}
}

Long getLastSyncKey() {
return lastSyncKey;
}

@Override @Override
public int compareTo(RedoLogChunk o) { public int compareTo(RedoLogChunk o) {
return Integer.signum(this.id - o.id); return Integer.signum(this.id - o.id);
Expand Down

This file was deleted.

0 comments on commit 206a5f1

Please sign in to comment.