Skip to content

Commit

Permalink
feat(db): add checkpoint v2 in case db inconsistent when ungraceful s…
Browse files Browse the repository at this point in the history
…hutdown
  • Loading branch information
tomatoishealthy committed Sep 1, 2022
1 parent 626e31e commit 2f8f8d0
Show file tree
Hide file tree
Showing 17 changed files with 615 additions and 143 deletions.
Expand Up @@ -133,9 +133,11 @@ private void openDatabase(Options dbOptions) throws IOException {
}
try {
database = factory.open(dbPath.toFile(), dbOptions);
logger.info("DB {} open success with : writeBufferSize {}M,cacheSize {}M,maxOpenFiles {}.",
this.getDBName(), dbOptions.writeBufferSize() / 1024 / 1024,
dbOptions.cacheSize() / 1024 / 1024, dbOptions.maxOpenFiles());
if (!this.getDBName().startsWith("checkpoint")) {
logger.info("DB {} open success with : writeBufferSize {}M,cacheSize {}M,maxOpenFiles {}.",
this.getDBName(), dbOptions.writeBufferSize() / 1024 / 1024,
dbOptions.cacheSize() / 1024 / 1024, dbOptions.maxOpenFiles());
}
} catch (IOException e) {
if (e.getMessage().contains("Corruption:")) {
factory.repair(dbPath.toFile(), dbOptions);
Expand Down
17 changes: 11 additions & 6 deletions chainbase/src/main/java/org/tron/core/db/TronDatabase.java
Expand Up @@ -2,14 +2,14 @@

import com.google.protobuf.InvalidProtocolBufferException;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import javax.annotation.PostConstruct;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.iq80.leveldb.WriteOptions;
import org.rocksdb.DirectComparator;
import org.springframework.beans.factory.annotation.Autowired;
import org.tron.common.parameter.CommonParameter;
import org.tron.common.storage.WriteOptionsWrapper;
Expand All @@ -18,11 +18,8 @@
import org.tron.common.storage.rocksdb.RocksDbDataSourceImpl;
import org.tron.common.utils.StorageUtils;
import org.tron.core.db.common.DbSourceInter;
import org.tron.core.db2.common.LevelDB;
import org.tron.core.db2.common.RocksDB;
import org.tron.core.db2.common.WrappedByteArray;
import org.tron.core.db2.core.ITronChainBase;
import org.tron.core.db2.core.SnapshotRoot;
import org.tron.core.exception.BadItemException;
import org.tron.core.exception.ItemNotFoundException;

Expand All @@ -46,7 +43,7 @@ protected TronDatabase(String dbName) {
dbSource =
new LevelDbDataSourceImpl(StorageUtils.getOutputDirectoryByDbName(dbName),
dbName,
StorageUtils.getOptionsByDbName(dbName),
getOptionsByDbNameForLevelDB(dbName),
new WriteOptions().sync(CommonParameter.getInstance()
.getStorage().isDbSync()));
} else if ("ROCKSDB".equals(CommonParameter.getInstance()
Expand All @@ -55,7 +52,7 @@ protected TronDatabase(String dbName) {
CommonParameter.getInstance().getStorage().getDbDirectory()).toString();
dbSource =
new RocksDbDataSourceImpl(parentName, dbName, CommonParameter.getInstance()
.getRocksDBCustomSettings());
.getRocksDBCustomSettings(), getDirectComparator());
}

dbSource.initDB();
Expand All @@ -69,6 +66,14 @@ private void init() {
protected TronDatabase() {
}

protected org.iq80.leveldb.Options getOptionsByDbNameForLevelDB(String dbName) {
return StorageUtils.getOptionsByDbName(dbName);
}

protected DirectComparator getDirectComparator() {
return null;
}

public DbSourceInter<byte[]> getDbSource() {
return dbSource;
}
Expand Down
218 changes: 206 additions & 12 deletions chainbase/src/main/java/org/tron/core/db2/core/SnapshotManager.java
Expand Up @@ -7,6 +7,9 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

import java.io.File;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -15,17 +18,26 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;

import io.prometheus.client.Histogram;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.tron.common.error.TronDBException;
import org.tron.common.parameter.CommonParameter;
import org.tron.common.prometheus.MetricKeys;
import org.tron.common.prometheus.Metrics;
import org.tron.common.storage.WriteOptionsWrapper;
import org.tron.common.utils.FileUtil;
import org.tron.common.utils.StorageUtils;
import org.tron.core.capsule.BlockCapsule;
import org.tron.core.db.RevokingDatabase;
import org.tron.core.db2.ISession;
import org.tron.core.db2.common.DB;
Expand All @@ -34,14 +46,17 @@
import org.tron.core.db2.common.Value;
import org.tron.core.db2.common.WrappedByteArray;
import org.tron.core.exception.RevokingStoreIllegalStateException;
import org.tron.core.store.CheckPointV2Store;
import org.tron.core.store.CheckTmpStore;

@Slf4j(topic = "DB")
public class SnapshotManager implements RevokingDatabase {

public static final int DEFAULT_MAX_FLUSH_COUNT = 500;
public static final int DEFAULT_MAX_FLUSH_COUNT = 200;
public static final int DEFAULT_MIN_FLUSH_COUNT = 1;
private static final int DEFAULT_STACK_MAX_SIZE = 256;
private static final long ONE_MINUTE_MILLS = 60*1000L;
private static final String CHECKPOINT_V2_DIR = "checkpoint";
@Getter
private List<Chainbase> dbs = new ArrayList<>();
@Getter
Expand All @@ -63,6 +78,8 @@ public class SnapshotManager implements RevokingDatabase {

private Map<String, ListeningExecutorService> flushServices = new HashMap<>();

private ScheduledExecutorService pruneCheckpointThread = Executors.newSingleThreadScheduledExecutor();

@Autowired
@Setter
@Getter
Expand All @@ -71,11 +88,28 @@ public class SnapshotManager implements RevokingDatabase {
@Setter
private volatile int maxFlushCount = DEFAULT_MIN_FLUSH_COUNT;

private long currentBlockNum = -1;

private int checkpointVersion = 1; // default v1

public SnapshotManager(String checkpointPath) {
}

@PostConstruct
public void init() {
checkpointVersion = CommonParameter.getInstance().getStorage().getCheckpointVersion();
// prune checkpoint
pruneCheckpointThread.scheduleWithFixedDelay(() -> {
try {
if (isV2Open() && !unChecked) {
pruneCheckpoint();
}
} catch (Throwable t) {
logger.error("Exception in prune checkpoint", t);
}
}, 10000, 3600, TimeUnit.MILLISECONDS);


exitThread = new Thread(() -> {
LockSupport.park();
// to Guarantee Some other thread invokes unpark with the current thread as the target
Expand Down Expand Up @@ -244,6 +278,7 @@ public void shutdown() {
System.err.println("******** before revokingDb size:" + size);
checkTmpStore.close();
System.err.println("******** end to pop revokingDb ********");
pruneCheckpointThread.shutdown();
}

public void updateSolidity(int hops) {
Expand Down Expand Up @@ -307,8 +342,22 @@ public void flush() {
if (shouldBeRefreshed()) {
try {
long start = System.currentTimeMillis();
deleteCheckpoint();
createCheckpoint();
if (!isV2Open()) {
Histogram.Timer requestTimer = Metrics.histogramStartTimer(
MetricKeys.Histogram.DB_FLUSH, "delete");
deleteCheckpoint();
Metrics.histogramObserve(requestTimer);

Histogram.Timer createTimer = Metrics.histogramStartTimer(
MetricKeys.Histogram.DB_FLUSH, "create");
createCheckpoint();
Metrics.histogramObserve(createTimer);
} else {
Histogram.Timer createV2 = Metrics.histogramStartTimer(
MetricKeys.Histogram.DB_FLUSH, "create2");
createCheckpointV2();
Metrics.histogramObserve(createV2);
}
long checkPointEnd = System.currentTimeMillis();
refresh();
flushCount = 0;
Expand Down Expand Up @@ -360,25 +409,120 @@ private void createCheckpoint() {
}
}

private void deleteCheckpoint() {
private void createCheckpointV2() {
CheckPointV2Store checkPointV2Store = null;
try {
Map<byte[], byte[]> hmap = new HashMap<>();
if (!checkTmpStore.getDbSource().allKeys().isEmpty()) {
for (Map.Entry<byte[], byte[]> e : checkTmpStore.getDbSource()) {
hmap.put(e.getKey(), null);
Map<WrappedByteArray, WrappedByteArray> batch = new HashMap<>();
for (Chainbase db : dbs) {
Snapshot head = db.getHead();
if (Snapshot.isRoot(head)) {
return;
}

String dbName = db.getDbName();
Snapshot next = head.getRoot();
for (int i = 0; i < flushCount; ++i) {
next = next.getNext();
SnapshotImpl snapshot = (SnapshotImpl) next;
DB<Key, Value> keyValueDB = snapshot.getDb();
for (Map.Entry<Key, Value> e : keyValueDB) {
Key k = e.getKey();
Value v = e.getValue();
batch.put(WrappedByteArray.of(Bytes.concat(simpleEncode(dbName), k.getBytes())),
WrappedByteArray.of(v.encode()));
if ("block".equals(db.getDbName())) {
currentBlockNum = new BlockCapsule(v.getBytes()).getNum();
}
}
}
}

checkTmpStore.getDbSource().updateByBatch(hmap);
} catch (Exception e) {
if (currentBlockNum == -1) {
logger.error("create checkpoint failed, currentBlockNum: {}", currentBlockNum);
System.exit(-1);
}
String dbName = System.currentTimeMillis() + "_" + currentBlockNum;
checkPointV2Store = getCheckpointDB(dbName);
checkPointV2Store.getDbSource().updateByBatch(batch.entrySet().stream()
.map(e -> Maps.immutableEntry(e.getKey().getBytes(), e.getValue().getBytes()))
.collect(HashMap::new, (m, k) -> m.put(k.getKey(), k.getValue()), HashMap::putAll),
WriteOptionsWrapper.getInstance().sync(CommonParameter
.getInstance().getStorage().isCheckpointSync()));

} catch ( Exception e) {
throw new TronDBException(e);
} finally {
if (checkPointV2Store != null) {
checkPointV2Store.close();
}
}
}

private CheckPointV2Store getCheckpointDB(String dbName) {
return new CheckPointV2Store(CHECKPOINT_V2_DIR+"/"+dbName);
}

private List<String> getCheckpointList() {
String dbPath = Paths.get(StorageUtils.getOutputDirectoryByDbName(CHECKPOINT_V2_DIR),
CommonParameter.getInstance().getStorage().getDbDirectory()).toString();
File file = new File(Paths.get(dbPath, CHECKPOINT_V2_DIR).toString());
if (file.exists() && file.isDirectory()) {
String[] subDirs = file.list();
if (subDirs != null) {
return Arrays.stream(subDirs).sorted().collect(Collectors.toList());
}
}
return null;
}

private void deleteCheckpoint() {
checkTmpStore.reset();
}

private void pruneCheckpoint() {
if (unChecked) {
return;
}
List<String> cpList = getCheckpointList();
if (cpList == null) {
return;
}
if (cpList.size() < 3) {
return;
}
for (String cp: cpList.subList(0,3)) {
long timestamp = Long.parseLong(cp.split("_")[0]);
long blockNumber = Long.parseLong(cp.split("_")[1]);
if (System.currentTimeMillis() - timestamp < ONE_MINUTE_MILLS*2) {
break;
}
String checkpointPath = Paths.get(StorageUtils.getOutputDirectoryByDbName(CHECKPOINT_V2_DIR),
CommonParameter.getInstance().getStorage().getDbDirectory(), CHECKPOINT_V2_DIR).toString();
if (!FileUtil.recursiveDelete(Paths.get(checkpointPath, cp).toString())) {
logger.error("checkpoint prune failed, number: {}", blockNumber);
return;
}
logger.debug("checkpoint prune success, number: {}", blockNumber);
}
}

// ensure run this method first after process start.
@Override
public void check() {
for (Chainbase db : dbs) {
if (!isV2Open()) {
List<String> cpList = getCheckpointList();
if (cpList != null && cpList.size() != 0) {
logger.error("checkpoint check failed, can't convert checkpoint from v2 to v1");
System.exit(-1);
}
checkV1();
} else {
checkV2();
}
}

private void checkV1() {
for (Chainbase db: dbs) {
if (!Snapshot.isRoot(db.getHead())) {
throw new IllegalStateException("first check.");
}
Expand All @@ -389,7 +533,7 @@ public void check() {
.map(db -> Maps.immutableEntry(db.getDbName(), db))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
advance();
for (Map.Entry<byte[], byte[]> e : checkTmpStore.getDbSource()) {
for (Map.Entry<byte[], byte[]> e: checkTmpStore.getDbSource()) {
byte[] key = e.getKey();
byte[] value = e.getValue();
String db = simpleDecode(key);
Expand All @@ -414,6 +558,56 @@ public void check() {
unChecked = false;
}

private void checkV2() {
logger.info("checkpoint version: {}", CommonParameter.getInstance().getStorage().getCheckpointVersion());
logger.info("checkpoint sync: {}", CommonParameter.getInstance().getStorage().isCheckpointSync());
List<String> cpList = getCheckpointList();
if (cpList == null || cpList.size() == 0) {
logger.info("checkpoint size is 0, using v1 recover");
checkV1();
deleteCheckpoint();
return;
}

Map<String, Chainbase> dbMap = dbs.stream()
.map(db -> Maps.immutableEntry(db.getDbName(), db))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
for (String cp: cpList) {
long blockNumber = Long.parseLong(cp.split("_")[1]);
if (blockNumber < 0) {
logger.error("checkpoint illegal, block number: {}", blockNumber);
System.exit(-1);
}
CheckPointV2Store checkPointV2Store = getCheckpointDB(cp);
advance();
for (Map.Entry<byte[], byte[]> e: checkPointV2Store.getDbSource()) {
byte[] key = e.getKey();
byte[] value = e.getValue();
String db = simpleDecode(key);
if (dbMap.get(db) == null) {
continue;
}
byte[] realKey = Arrays.copyOfRange(key, db.getBytes().length + 4, key.length);
byte[] realValue = value.length == 1 ? null : Arrays.copyOfRange(value, 1, value.length);
if (realValue != null) {
dbMap.get(db).getHead().put(realKey, realValue);
} else {
dbMap.get(db).getHead().remove(realKey);
}
}

dbs.forEach(db -> db.getHead().getRoot().merge(db.getHead()));
retreat();
checkPointV2Store.close();
logger.info("checkpoint recover success, block number{}", blockNumber);
}
unChecked = false;
}

private boolean isV2Open() {
return checkpointVersion == 2;
}

private byte[] simpleEncode(String s) {
byte[] bytes = s.getBytes();
byte[] length = Ints.toByteArray(bytes.length);
Expand Down

0 comments on commit 2f8f8d0

Please sign in to comment.