Skip to content

Commit

Permalink
优化和简化RedoLogValue的sync实现逻辑
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Dec 1, 2017
1 parent dab7c3a commit 3ca1658
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 32 deletions.
Expand Up @@ -94,8 +94,9 @@ void removeMap(String mapName) {
estimatedMemory.remove(mapName);
maps.remove(mapName);
long tid = nextEvenTransactionId();
redoLog.put(tid, new RedoLogValue(mapName));
logSyncService.maybeWaitForSync(redoLog, tid);
RedoLogValue rlv = new RedoLogValue(mapName);
redoLog.put(tid, rlv);
logSyncService.maybeWaitForSync(rlv);
}

private class StorageMapSaveService extends Thread {
Expand Down Expand Up @@ -150,8 +151,9 @@ public void run() {
lastSavedAt = now;

if (writeCheckpoint && checkpoint != null) {
redoLog.put(checkpoint, new RedoLogValue(checkpoint));
logSyncService.maybeWaitForSync(redoLog, redoLog.getLastSyncKey());
RedoLogValue rlv = new RedoLogValue(checkpoint);
redoLog.put(checkpoint, rlv);
logSyncService.maybeWaitForSync(rlv);
}
}
}
Expand Down Expand Up @@ -334,7 +336,7 @@ public void commit(MVCCTransaction t, RedoLogValue v) {
if (v != null) { // 事务没有进行任何操作时不用同步日志
// 先写redoLog
redoLog.put(t.transactionId, v);
logSyncService.maybeWaitForSync(redoLog, t.transactionId);
logSyncService.maybeWaitForSync(v);
}
// 分布式事务推迟提交
if (t.isLocal()) {
Expand Down
Expand Up @@ -34,15 +34,12 @@ public BatchLogSyncService(Map<String, String> config) {
}

@Override
public void maybeWaitForSync(RedoLog redoLog, Long lastOperationId) {
public void maybeWaitForSync(RedoLogValue redoLogValue) {
haveWork.release();
Long lastSyncKey = redoLog.getLastSyncKey();

if (lastSyncKey == null || lastSyncKey < lastOperationId) {
if (!redoLogValue.synced) {
while (true) {
WaitQueue.Signal signal = syncComplete.register();
lastSyncKey = redoLog.getLastSyncKey();
if (lastSyncKey != null && lastSyncKey >= lastOperationId) {
if (redoLogValue.synced) {
signal.cancel();
return;
} else
Expand Down
Expand Up @@ -42,7 +42,7 @@ public LogSyncService(String name) {
setDaemon(true);
}

public abstract void maybeWaitForSync(RedoLog redoLog, Long lastOperationId);
public abstract void maybeWaitForSync(RedoLogValue redoLogValue);

public void setRedoLog(RedoLog redoLog) {
this.redoLog = redoLog;
Expand Down
Expand Up @@ -34,7 +34,7 @@ public void run() {
}

@Override
public void maybeWaitForSync(RedoLog redoLog, Long lastOperationId) {
public void maybeWaitForSync(RedoLogValue redoLogValue) {
}

@Override
Expand Down
Expand Up @@ -40,11 +40,9 @@ public PeriodicLogSyncService(Map<String, String> config) {
}

@Override
public void maybeWaitForSync(RedoLog redoLog, Long lastOperationId) {
public void maybeWaitForSync(RedoLogValue redoLogValue) {
haveWork.release();
Long lastSyncKey = redoLog.getLastSyncKey();

if (lastSyncKey == null || lastSyncKey < lastOperationId) {
if (!redoLogValue.synced) {
// 因为Long.MAX_VALUE > Long.MAX_VALUE + 1
// lastSyncedAt是long类型,当lastSyncedAt为Long.MAX_VALUE时,
// 再加一个int类型的blockWhenSyncLagsMillis时还是小于Long.MAX_VALUE;
Expand All @@ -55,8 +53,7 @@ public void maybeWaitForSync(RedoLog redoLog, Long lastOperationId) {
long started = System.currentTimeMillis();
while (waitForSyncToCatchUp(started)) {
WaitQueue.Signal signal = syncComplete.register();
lastSyncKey = redoLog.getLastSyncKey();
if (lastSyncKey != null && lastSyncKey >= lastOperationId) {
if (redoLogValue.synced) {
signal.cancel();
return;
} else if (waitForSyncToCatchUp(started))
Expand Down
4 changes: 2 additions & 2 deletions lealone-mvcc/src/main/java/org/lealone/mvcc/log/RedoLog.java
Expand Up @@ -91,8 +91,8 @@ else if (LOG_SYNC_TYPE_NO_SYNC.equalsIgnoreCase(logSyncType))
logSyncService.start();
}

public RedoLogValue put(Long key, RedoLogValue value) {
return current.put(key, value);
public void put(long key, RedoLogValue value) {
current.put(key, value);
}

public Iterator<Entry<Long, RedoLogValue>> cursor(Long from) {
Expand Down
19 changes: 8 additions & 11 deletions lealone-mvcc/src/main/java/org/lealone/mvcc/log/RedoLogChunk.java
Expand Up @@ -60,8 +60,7 @@ public int compare(K k1, K k2) {
private final int id;
private final StorageDataType keyType;
private final StorageDataType valueType;
private final ConcurrentSkipListMap<Long, RedoLogValue> skipListMap;
private ConcurrentSkipListMap<Long, RedoLogValue> newSkipListMap;
private ConcurrentSkipListMap<Long, RedoLogValue> skipListMap;
private final FileStorage fileStorage;

private long pos;
Expand All @@ -73,7 +72,6 @@ public int compare(K k1, K k2) {
this.keyType = new RedoLogKeyType();
this.valueType = new RedoLogValueType();
skipListMap = new ConcurrentSkipListMap<>(new KeyComparator<Long>(keyType));
newSkipListMap = new ConcurrentSkipListMap<>(new KeyComparator<Long>(keyType));
String chunkFileName = getChunkFileName(config, id);
fileStorage = new FileStorage();
fileStorage.open(chunkFileName, config);
Expand All @@ -96,10 +94,8 @@ int getId() {
return id;
}

RedoLogValue put(Long key, RedoLogValue value) {
// TODO 如果先开始的事务后提交了,不用新的SkipListMap要如何优化?
newSkipListMap.put(key, value);
return skipListMap.put(key, value);
void put(Long key, RedoLogValue value) {
skipListMap.put(key, value);
}

Iterator<Entry<Long, RedoLogValue>> cursor(Long from) {
Expand All @@ -116,11 +112,9 @@ void close() {
}

synchronized void save() {
ConcurrentSkipListMap<Long, RedoLogValue> newSkipListMap = this.newSkipListMap;
this.newSkipListMap = new ConcurrentSkipListMap<>(new KeyComparator<Long>(keyType));
ConcurrentSkipListMap<Long, RedoLogValue> newSkipListMap = this.skipListMap;
this.skipListMap = new ConcurrentSkipListMap<>(new KeyComparator<Long>(keyType));
Long lastKey = this.lastSyncKey;
// Set<Entry<Long, RedoLogValue>> entrySet = lastKey == null ? skipListMap.entrySet()
// : skipListMap.tailMap(lastKey, false).entrySet();
Set<Entry<Long, RedoLogValue>> entrySet = newSkipListMap.entrySet();
if (!entrySet.isEmpty()) {
try (DataBuffer buff = DataBuffer.create()) {
Expand All @@ -138,6 +132,9 @@ synchronized void save() {
fileStorage.sync();
}
this.lastSyncKey = lastKey;
for (Entry<Long, RedoLogValue> e : entrySet) {
e.getValue().synced = true;
}
}
}
}
Expand Down
Expand Up @@ -35,6 +35,8 @@ public class RedoLogValue {
// 4. 已经被删除的map
public String droppedMap;

volatile boolean synced;

public RedoLogValue() {
}

Expand Down

0 comments on commit 3ca1658

Please sign in to comment.