Skip to content

Commit

Permalink
执行checkpoint命令时强制保存所有的StorageMap
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Oct 18, 2019
1 parent 08dc16b commit a6d81c3
Showing 1 changed file with 2 additions and 7 deletions.
Expand Up @@ -266,7 +266,6 @@ private class CheckpointService extends Thread {

private static final int DEFAULT_COMMITTED_DATA_CACHE_SIZE = 32 * 1024 * 1024; // 32M
private static final int DEFAULT_CHECKPOINT_PERIOD = 1 * 60 * 60 * 1000; // 1小时
private final AtomicBoolean checking = new AtomicBoolean(false);
private final Semaphore semaphore = new Semaphore(1);
private final int committedDataCacheSize;
private final long checkpointPeriod;
Expand Down Expand Up @@ -316,9 +315,7 @@ void checkpoint() {
}

// 按周期自动触发
private void checkpoint(boolean force) {
if (!checking.compareAndSet(false, true))
return;
private synchronized void checkpoint(boolean force) {
long now = System.currentTimeMillis();
boolean executeCheckpoint = force || isClosed || (lastSavedAt + checkpointPeriod < now);

Expand All @@ -330,7 +327,6 @@ private void checkpoint(boolean force) {
}
executeCheckpoint = totalEstimatedMemory > committedDataCacheSize;
}

if (executeCheckpoint) {
for (StorageMap<Object, TransactionalValue> map : maps.values()) {
if (map.isClosed())
Expand All @@ -341,14 +337,13 @@ private void checkpoint(boolean force) {
// 因为在保存未提交数据时,也同时保存了原来的数据,如果在读到未提交数据时发现了异常,就会进行撤销,
// 读时撤销机制在TransactionalValue类中实现。
AtomicInteger counter = estimatedMemory.get(map.getName());
if (counter != null && counter.getAndSet(0) > 0) {
if (force || counter != null && counter.getAndSet(0) > 0) {
map.save();
}
}
lastSavedAt = now;
logSyncService.checkpoint(nextEvenTransactionId());
}
checking.set(false);
}

@Override
Expand Down

0 comments on commit a6d81c3

Please sign in to comment.