Skip to content

Commit

Permalink
检查点的触发条件之一包括: 已提交数据占用内存总大小超过阈值
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Oct 8, 2018
1 parent 515a1a9 commit 40d0b63
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 32 deletions.
Expand Up @@ -211,12 +211,14 @@ public boolean areValuesEqual(Object a, Object b) {


@Override @Override
public int size() { public int size() {
return buffer.size() + map.size(); merge(); // 如果不先merge,可能两边都存在相同的key,会导致重复计数
return map.size();
} }


@Override @Override
public long sizeAsLong() { public long sizeAsLong() {
return buffer.size() + map.sizeAsLong(); merge();
return map.sizeAsLong();
} }


@Override @Override
Expand Down Expand Up @@ -245,13 +247,13 @@ public StorageMapCursor<K, V> cursor(List<PageKey> pageKeys, K from) {
} }


@Override @Override
public void clear() { public synchronized void clear() {
buffer.clear(); buffer.clear();
map.clear(); map.clear();
} }


@Override @Override
public void remove() { public synchronized void remove() {
buffer.clear(); buffer.clear();
map.remove(); map.remove();
} }
Expand All @@ -262,14 +264,16 @@ public boolean isClosed() {
} }


@Override @Override
public void close() { public synchronized void close() {
merge();
buffer.clear(); buffer.clear();
map.close(); map.close();
AOStorageService.removeBufferedMap(this); AOStorageService.removeBufferedMap(this);
} }


@Override @Override
public void save() { public synchronized void save() {
merge();
map.save(); map.save();
} }


Expand All @@ -279,12 +283,14 @@ public Void call() throws Exception {
return null; return null;
} }


public void merge() { // 不允许多个线程同时调用,因为没有意义,反而会重复merge一样的值
for (Object key : buffer.keySet()) { public synchronized void merge() {
// 不能先remove再put,因为刚刚remove后,要是在put之前有一个读线程进来,那么它就读不到值了 for (Entry<Object, Object> entry : buffer.entrySet()) {
Object value = buffer.get(key); Object key = entry.getKey();
Object value = entry.getValue();
map.put((K) key, (V) value); map.put((K) key, (V) value);
buffer.remove(key); // 执行完put后,可能又有相同key的值来了,此时不能删,只有跟原来相同时才删
buffer.remove(key, value);
} }
} }


Expand Down
Expand Up @@ -19,7 +19,6 @@


import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
Expand Down Expand Up @@ -233,12 +232,11 @@ public void checkpoint() {


private class CheckpointService extends Thread { private class CheckpointService extends Thread {


private static final int DEFAULT_STORAGE_MAP_COMMITTED_DATA_CACHE_SIZE = 32 * 1024 * 1024; // 32M private static final int DEFAULT_COMMITTED_DATA_CACHE_SIZE = 32 * 1024 * 1024; // 32M
private static final int DEFAULT_CHECKPOINT_PERIOD = 1 * 60 * 60 * 1000; // 1小时 private static final int DEFAULT_CHECKPOINT_PERIOD = 1 * 60 * 60 * 1000; // 1小时

private final AtomicBoolean checking = new AtomicBoolean(false); private final AtomicBoolean checking = new AtomicBoolean(false);
private final Semaphore semaphore = new Semaphore(1); private final Semaphore semaphore = new Semaphore(1);
private final int mapCacheSize; private final int committedDataCacheSize;
private final int checkpointPeriod; private final int checkpointPeriod;
private final int sleep; private final int sleep;


Expand All @@ -249,11 +247,11 @@ private class CheckpointService extends Thread {
setName(getClass().getSimpleName()); setName(getClass().getSimpleName());
setDaemon(true); setDaemon(true);


String v = config.get("storage_map_committed_data_cache_size_in_mb"); String v = config.get("committed_data_cache_size_in_mb");
if (v != null) if (v != null)
mapCacheSize = Integer.parseInt(v) * 1024 * 1024; committedDataCacheSize = Integer.parseInt(v) * 1024 * 1024;
else else
mapCacheSize = DEFAULT_STORAGE_MAP_COMMITTED_DATA_CACHE_SIZE; committedDataCacheSize = DEFAULT_COMMITTED_DATA_CACHE_SIZE;


v = config.get("checkpoint_period"); v = config.get("checkpoint_period");
if (v != null) if (v != null)
Expand Down Expand Up @@ -291,16 +289,28 @@ private void checkpoint(boolean force) {
if (!checking.compareAndSet(false, true)) if (!checking.compareAndSet(false, true))
return; return;
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
boolean executeCheckpoint = false; boolean executeCheckpoint = force || isClosed || (lastSavedAt + checkpointPeriod < now);
boolean forceSave = force || isClosed || (lastSavedAt + checkpointPeriod < now);
for (Entry<String, AtomicInteger> e : estimatedMemory.entrySet()) { // 如果上面的条件都不满足,那么再看看已经提交的数据占用的预估总内存大小是否大于阈值
if (forceSave || executeCheckpoint || e.getValue().get() > mapCacheSize) { if (!executeCheckpoint) {
maps.get(e.getKey()).save(); long totalEstimatedMemory = 0;
e.getValue().set(0); for (AtomicInteger counter : estimatedMemory.values()) {
executeCheckpoint = true; totalEstimatedMemory += counter.get();
} }
executeCheckpoint = totalEstimatedMemory > committedDataCacheSize;
} }

if (executeCheckpoint) { if (executeCheckpoint) {
for (StorageMap<Object, TransactionalValue> map : maps.values()) {
// 在这里有可能把已提交和未提交事务的数据都保存了,
// 不过不要紧,如果在生成检查点之后系统崩溃了导致未提交事务不能正常完成,还有读时撤销机制保证数据完整性,
// 因为在保存未提交数据时,也同时保存了原来的数据,如果在读到未提交数据时发现了异常,就会进行撤销,
// 读时撤销机制在TransactionalValue类中实现。
AtomicInteger counter = estimatedMemory.get(map.getName());
if (counter != null && counter.getAndSet(0) > 0) {
map.save();
}
}
lastSavedAt = now; lastSavedAt = now;
logSyncService.checkpoint(nextEvenTransactionId()); logSyncService.checkpoint(nextEvenTransactionId());
} }
Expand Down
Expand Up @@ -122,6 +122,10 @@ static class NotCommitted extends TransactionalValue {
NotCommitted(MVCCTransaction transaction, Object value, TransactionalValue oldValue, NotCommitted(MVCCTransaction transaction, Object value, TransactionalValue oldValue,
StorageDataType oldValueType) { StorageDataType oldValueType) {
super(transaction.transactionId, value); super(transaction.transactionId, value);
// 避免同一个事务对同一行不断更新导致过长的oldValue链,只取最早的oldValue即可
if (oldValue != null && oldValue.tid == transaction.transactionId && (oldValue instanceof NotCommitted)) {
oldValue = ((NotCommitted) oldValue).oldValue;
}
this.logId = transaction.logId; this.logId = transaction.logId;
this.oldValue = oldValue; this.oldValue = oldValue;
this.oldValueType = oldValueType; this.oldValueType = oldValueType;
Expand Down
Expand Up @@ -45,25 +45,78 @@ public static Storage getStorage() {
return storage; return storage;
} }


public static TransactionEngine getTransactionEngine() {
return getTransactionEngine(null, false);
}

public static TransactionEngine getTransactionEngine(boolean isDistributed) { public static TransactionEngine getTransactionEngine(boolean isDistributed) {
return getTransactionEngine(null, isDistributed);
}

public static TransactionEngine getTransactionEngine(Map<String, String> config) {
return getTransactionEngine(config, false);
}

public static TransactionEngine getTransactionEngine(Map<String, String> config, boolean isDistributed) {
TransactionEngine te = TransactionEngineManager.getInstance() TransactionEngine te = TransactionEngineManager.getInstance()
.getEngine(Constants.DEFAULT_TRANSACTION_ENGINE_NAME); .getEngine(Constants.DEFAULT_TRANSACTION_ENGINE_NAME);
assertEquals(Constants.DEFAULT_TRANSACTION_ENGINE_NAME, te.getName()); assertEquals(Constants.DEFAULT_TRANSACTION_ENGINE_NAME, te.getName());
if (config == null)
config = getDefaultConfig();
if (isDistributed) {
config.put("host_and_port", Constants.DEFAULT_HOST + ":" + Constants.DEFAULT_TCP_PORT);
}
te.init(config);
return te;
}


public static Map<String, String> getDefaultConfig() {
Map<String, String> config = new HashMap<>(); Map<String, String> config = new HashMap<>();
config.put("base_dir", joinDirs("mvcc")); config.put("base_dir", joinDirs("mvcc"));
config.put("redo_log_dir", "redo_log"); config.put("redo_log_dir", "redo_log");
config.put("log_sync_type", LogSyncService.LOG_SYNC_TYPE_BATCH); config.put("log_sync_type", LogSyncService.LOG_SYNC_TYPE_BATCH);
config.put("log_sync_batch_window", "10"); // 10ms config.put("log_sync_batch_window", "10"); // 10ms

// config.put("log_sync_type", LogSyncService.LOG_SYNC_TYPE_PERIODIC); // config.put("log_sync_type", LogSyncService.LOG_SYNC_TYPE_PERIODIC);
// config.put("log_sync_period", "500"); // 500ms // config.put("log_sync_period", "500"); // 500ms
return config;
}


if (isDistributed) { @Test
config.put("host_and_port", Constants.DEFAULT_HOST + ":" + Constants.DEFAULT_TCP_PORT); public void testCheckpoint() {
Map<String, String> config = getDefaultConfig();
config.put("committed_data_cache_size_in_mb", "1");
config.put("checkpoint_service_sleep_interval", "100"); // 100ms
config.put("log_sync_type", LogSyncService.LOG_SYNC_TYPE_PERIODIC);
TransactionEngine te = getTransactionEngine(config);
Storage storage = getStorage();

Transaction t1 = te.beginTransaction(false, false);
TransactionMap<String, String> map = t1.openMap("testCheckpoint", storage);
map.remove();
map = t1.openMap("testCheckpoint", storage);
assertEquals(0, map.getDiskSpaceUsed());
assertEquals(0, map.size());

for (int i = 1; i <= 50000; i++) {
map.put("key" + i, "value" + i);
} }
te.init(config); t1.commit();
return te; assertEquals(50000, map.size());
try {
Thread.sleep(2000); // 等待后端检查点线程完成数据保存
} catch (InterruptedException e) {
}
assertTrue(map.getDiskSpaceUsed() > 0);

map.remove();
Transaction t2 = te.beginTransaction(false, false);
map = t2.openMap("testCheckpoint", storage);
assertEquals(0, map.getDiskSpaceUsed());
map.put("abc", "value123");
t2.commit();

te.checkpoint();
assertTrue(map.getDiskSpaceUsed() > 0);
} }


@Test @Test
Expand Down Expand Up @@ -125,8 +178,6 @@ public void run() {
assertEquals(1, map.size()); assertEquals(1, map.size());
t4.commit(); t4.commit();


te.checkpoint();

Transaction t5 = te.beginTransaction(false, false); Transaction t5 = te.beginTransaction(false, false);
map = map.getInstance(t5); map = map.getInstance(t5);
map.put("6", "g"); map.put("6", "g");
Expand Down

0 comments on commit 40d0b63

Please sign in to comment.