Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(liteFullNodeTool): optimize liteFullNodeTool #4607

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
222 changes: 106 additions & 116 deletions framework/src/main/java/org/tron/tool/litefullnode/LiteFullNodeTool.java
Expand Up @@ -4,6 +4,7 @@
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.internal.Lists;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.google.common.primitives.Bytes;
Expand All @@ -16,9 +17,9 @@
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.LongStream;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.rocksdb.RocksDBException;
Expand Down Expand Up @@ -47,50 +48,23 @@ public class LiteFullNodeTool {
private static final String INFO_FILE_NAME = "info.properties";
private static final String BACKUP_DIR_PREFIX = ".bak_";
private static final String CHECKPOINT_DB = "tmp";
private static final long VM_NEED_RECENT_BLKS = 256;
private static long RECENT_BLKS = 65536;

private static final String BLOCK_DB_NAME = "block";
private static final String BLOCK_INDEX_DB_NAME = "block-index";
private static final String TRANS_CACHE_DB_NAME = "trans-cache";
private static final String TRANS_DB_NAME = "trans";
private static final String COMMON_DB_NAME = "common";
private static final String TRANSACTION_RET_DB_NAME = "transactionRetStore";
private static final String TRANSACTION_HISTORY_DB_NAME = "transactionHistoryStore";

private static final String DIR_FORMAT_STRING = "%s%s%s";

private static List<String> archiveDbs = Arrays.asList(
BLOCK_DB_NAME,
BLOCK_INDEX_DB_NAME,
"trans",
"transactionRetStore",
"transactionHistoryStore");
private static List<String> minimumDbsForLiteNode = Arrays.asList(
"DelegatedResource",
"DelegatedResourceAccountIndex",
"IncrementalMerkleTree",
"account",
"account-index",
"accountTrie",
"accountid-index",
"asset-issue",
"asset-issue-v2",
//"block_KDB",
"code",
//"common",
"contract",
"delegation",
"exchange",
"exchange-v2",
//"nullifier",
"properties",
"proposal",
"recent-block",
//"recent-transaction",
"storage-row",
//TRANS_CACHE_DB_NAME,
//"tree-block-index",
"votes",
"witness",
"witness_schedule"
);
TRANS_DB_NAME,
TRANSACTION_RET_DB_NAME,
TRANSACTION_HISTORY_DB_NAME);

/**
* Create the snapshot dataset.
Expand All @@ -104,13 +78,12 @@ public void generateSnapshot(String sourceDir, String snapshotDir) {
long start = System.currentTimeMillis();
snapshotDir = Paths.get(snapshotDir, SNAPSHOT_DIR_NAME).toString();
try {
hasEnoughBlock(sourceDir);
List<String> snapshotDbs = getSnapshotDbs(sourceDir);
split(sourceDir, snapshotDir, snapshotDbs);
mergeCheckpoint2Snapshot(sourceDir, snapshotDir);
// write genesisBlock and latestBlock
fillSnapshotBlockDb(sourceDir, snapshotDir);
// create tran-cache if not exist, for compatible
checkTranCacheStore(sourceDir, snapshotDir);
// write genesisBlock , latest recent blocks and trans
fillSnapshotBlockAndTransDb(sourceDir, snapshotDir);
generateInfoProperties(Paths.get(snapshotDir, INFO_FILE_NAME).toString(), sourceDir);
} catch (IOException | RocksDBException e) {
logger.error("create snapshot failed, " + e.getMessage());
Expand All @@ -132,6 +105,11 @@ public void generateHistory(String sourceDir, String historyDir) {
long start = System.currentTimeMillis();
historyDir = Paths.get(historyDir, HISTORY_DIR_NAME).toString();
try {
if (isLite(sourceDir)) {
throw new IllegalStateException(
String.format("Unavailable sourceDir: %s is not fullNode data.", sourceDir));
}
hasEnoughBlock(sourceDir);
split(sourceDir, historyDir, archiveDbs);
mergeCheckpoint2History(sourceDir, historyDir);
generateInfoProperties(Paths.get(historyDir, INFO_FILE_NAME).toString(), sourceDir);
Expand All @@ -155,6 +133,12 @@ public void completeHistoryData(String historyDir, String databaseDir) {
long start = System.currentTimeMillis();
BlockNumInfo blockNumInfo = null;
try {
// check historyDir is from lite data
if (isLite(historyDir)) {
throw new IllegalStateException(
String.format("Unavailable history: %s is not generated by fullNode data.",
historyDir));
}
// 1. check block number and genesis block are compatible,
// and return the block numbers of snapshot and history
blockNumInfo = checkAndGetBlockNumInfo(historyDir, databaseDir);
Expand Down Expand Up @@ -183,11 +167,6 @@ private List<String> getSnapshotDbs(String sourceDir) {
.filter(File::isDirectory)
.filter(dir -> !archiveDbs.contains(dir.getName()))
.forEach(dir -> snapshotDbs.add(dir.getName()));
for (String dir : minimumDbsForLiteNode) {
if (!snapshotDbs.contains(dir)) {
throw new RuntimeException("databaseDir does not contain all the necessary databases");
}
}
return snapshotDbs;
}

Expand Down Expand Up @@ -270,88 +249,50 @@ private long getLatestBlockHeaderNum(String databaseDir) throws IOException, Roc
}

/**
* Syncing block from peer that needs latest block and genesis block,
* also VM need recent blocks.
* recent blocks, trans and genesis block.
*/
private void fillSnapshotBlockDb(String sourceDir, String snapshotDir)
private void fillSnapshotBlockAndTransDb(String sourceDir, String snapshotDir)
throws IOException, RocksDBException {
logger.info("-- begin to fill latest block and genesis block to snapshot");
logger.info("-- begin to fill {} block , genesis block and trans to snapshot", RECENT_BLKS);
DBInterface sourceBlockIndexDb = DbTool.getDB(sourceDir, BLOCK_INDEX_DB_NAME);
DBInterface sourceBlockDb = DbTool.getDB(sourceDir, BLOCK_DB_NAME);
DBInterface destBlockDb = DbTool.getDB(snapshotDir, BLOCK_DB_NAME);
DBInterface destBlockIndexDb = DbTool.getDB(snapshotDir, BLOCK_INDEX_DB_NAME);
DBInterface destTransDb = DbTool.getDB(snapshotDir, TRANS_DB_NAME);
// put genesis block and block-index into snapshot
long genesisBlockNum = 0L;
byte[] genesisBlockID = sourceBlockIndexDb.get(ByteArray.fromLong(genesisBlockNum));
destBlockIndexDb.put(ByteArray.fromLong(genesisBlockNum), genesisBlockID);
destBlockDb.put(genesisBlockID, sourceBlockDb.get(genesisBlockID));

long latestBlockNum = getLatestBlockHeaderNum(sourceDir);
long startIndex = latestBlockNum > VM_NEED_RECENT_BLKS
? latestBlockNum - VM_NEED_RECENT_BLKS : 0;
// put the recent blocks in snapshot, VM needs recent 256 blocks.
LongStream.rangeClosed(startIndex, latestBlockNum).forEach(
blockNum -> {
byte[] blockId = null;
byte[] block = null;
try {
blockId = getDataFromSourceDB(sourceDir, BLOCK_INDEX_DB_NAME,
Longs.toByteArray(blockNum));
block = getDataFromSourceDB(sourceDir, BLOCK_DB_NAME, blockId);
} catch (IOException | RocksDBException e) {
throw new RuntimeException(e.getMessage());
}
// put recent blocks index into snapshot
destBlockIndexDb.put(ByteArray.fromLong(blockNum), blockId);
// put latest blocks into snapshot
destBlockDb.put(blockId, block);
});
long startIndex = latestBlockNum - RECENT_BLKS + 1;
// put the recent blocks and trans in snapshot
for (long blockNum = startIndex; blockNum <= latestBlockNum; blockNum++) {
try {
byte[] blockId = getDataFromSourceDB(sourceDir, BLOCK_INDEX_DB_NAME,
Longs.toByteArray(blockNum));
byte[] block = getDataFromSourceDB(sourceDir, BLOCK_DB_NAME, blockId);
// put block
destBlockDb.put(blockId, block);
// put block index
destBlockIndexDb.put(ByteArray.fromLong(blockNum), blockId);
// put trans
long finalBlockNum = blockNum;
new BlockCapsule(block).getTransactions().stream().map(
tc -> tc.getTransactionId().getBytes())
.map(bytes -> Maps.immutableEntry(bytes, Longs.toByteArray(finalBlockNum)))
.forEach(e -> destTransDb.put(e.getKey(), e.getValue()));
} catch (IOException | RocksDBException | BadItemException e) {
throw new RuntimeException(e.getMessage());
}
}

DBInterface destCommonDb = DbTool.getDB(snapshotDir, COMMON_DB_NAME);
destCommonDb.put(DB_KEY_NODE_TYPE, ByteArray.fromInt(Constant.NODE_TYPE_LIGHT_NODE));
destCommonDb.put(DB_KEY_LOWEST_BLOCK_NUM, ByteArray.fromLong(startIndex));
}

private void checkTranCacheStore(String sourceDir, String snapshotDir)
throws IOException, RocksDBException {
logger.info("-- create trans-cache db if not exists.");
if (FileUtil.isExists(String.format(DIR_FORMAT_STRING, snapshotDir,
File.separator, TRANS_CACHE_DB_NAME))) {
return;
}
// fullnode is old version, create trans-cache database
DBInterface recentBlockDb = DbTool.getDB(snapshotDir, "recent-block");
DBInterface transCacheDb = DbTool.getDB(snapshotDir, TRANS_CACHE_DB_NAME);
long headNum = getLatestBlockHeaderNum(sourceDir);
long recentBlockCount = recentBlockDb.size();

LongStream.rangeClosed(headNum - recentBlockCount + 1, headNum).forEach(
blockNum -> {
byte[] blockId = null;
byte[] block = null;
try {
blockId = getDataFromSourceDB(sourceDir, BLOCK_INDEX_DB_NAME,
Longs.toByteArray(blockNum));
block = getDataFromSourceDB(sourceDir, BLOCK_DB_NAME, blockId);
} catch (IOException | RocksDBException e) {
throw new RuntimeException(e.getMessage());
}
BlockCapsule blockCapsule = null;
try {
blockCapsule = new BlockCapsule(block);
} catch (BadItemException e) {
throw new RuntimeException("construct block failed, num: " + blockNum);
}
if (blockCapsule.getTransactions().isEmpty()) {
return;
}
blockCapsule.getTransactions().stream()
.map(tc -> tc.getTransactionId().getBytes())
.map(bytes -> Maps.immutableEntry(bytes, Longs.toByteArray(blockNum)))
.forEach(e -> transCacheDb.put(e.getKey(), e.getValue()));
});
}

private byte[] getGenesisBlockHash(String parentDir) throws IOException, RocksDBException {
long genesisBlockNum = 0L;
DBInterface blockIndexDb = DbTool.getDB(parentDir, BLOCK_INDEX_DB_NAME);
Expand Down Expand Up @@ -424,8 +365,8 @@ private void trimHistory(String databaseDir, BlockNumInfo blockNumInfo)
logger.info("-- begin to trim the history data.");
DBInterface blockIndexDb = DbTool.getDB(databaseDir, BLOCK_INDEX_DB_NAME);
DBInterface blockDb = DbTool.getDB(databaseDir, BLOCK_DB_NAME);
DBInterface transDb = DbTool.getDB(databaseDir, "trans");
DBInterface tranRetDb = DbTool.getDB(databaseDir, "transactionRetStore");
DBInterface transDb = DbTool.getDB(databaseDir, TRANS_DB_NAME);
DBInterface tranRetDb = DbTool.getDB(databaseDir, TRANSACTION_RET_DB_NAME);
for (long n = blockNumInfo.getHistoryBlkNum(); n > blockNumInfo.getSnapshotBlkNum(); n--) {
byte[] blockIdHash = blockIndexDb.get(ByteArray.fromLong(n));
BlockCapsule block = new BlockCapsule(blockDb.get(blockIdHash));
Expand Down Expand Up @@ -460,12 +401,15 @@ private void mergeBak2Database(String databaseDir) throws IOException, RocksDBEx
private byte[] getDataFromSourceDB(String sourceDir, String dbName, byte[] key)
throws IOException, RocksDBException {
DBInterface sourceDb = DbTool.getDB(sourceDir, dbName);
DBInterface checkpointDb = DbTool.getDB(sourceDir, "tmp");
byte[] value = sourceDb.get(key);
if (isEmptyBytes(value)) {
byte[] valueFromTmp = checkpointDb.get(Bytes.concat(simpleEncode(dbName), key));
DBInterface checkpointDb = DbTool.getDB(sourceDir, CHECKPOINT_DB);
// get data from tmp first.
byte[] valueFromTmp = checkpointDb.get(Bytes.concat(simpleEncode(dbName), key));
byte[] value;
if (isEmptyBytes(valueFromTmp)) {
value = sourceDb.get(key);
} else {
value = valueFromTmp.length == 1
? null : Arrays.copyOfRange(valueFromTmp, 1, valueFromTmp.length);
? null : Arrays.copyOfRange(valueFromTmp, 1, valueFromTmp.length);
}
if (isEmptyBytes(value)) {
throw new RuntimeException(String.format("data not found in store, dbName: %s, key: %s",
Expand All @@ -489,8 +433,7 @@ private static boolean isEmptyBytes(byte[] b) {
private void deleteSnapshotFlag(String databaseDir) throws IOException, RocksDBException {
logger.info("-- delete the info file.");
Files.delete(Paths.get(databaseDir, INFO_FILE_NAME));
DBInterface destBlockIndexDb = DbTool.getDB(databaseDir, BLOCK_INDEX_DB_NAME);
if (destBlockIndexDb.get(ByteArray.fromLong(1)) != null) {
if (!isLite(databaseDir)) {
DBInterface destCommonDb = DbTool.getDB(databaseDir, COMMON_DB_NAME);
destCommonDb.delete(DB_KEY_NODE_TYPE);
destCommonDb.delete(DB_KEY_LOWEST_BLOCK_NUM);
Expand All @@ -500,6 +443,53 @@ private void deleteSnapshotFlag(String databaseDir) throws IOException, RocksDBE

}

private void hasEnoughBlock(String sourceDir) throws RocksDBException, IOException {
// check latest
long latest = getLatestBlockHeaderNum(sourceDir);
// check first, not 0;
long first = 0;
DBInterface sourceBlockIndexDb = DbTool.getDB(sourceDir, BLOCK_INDEX_DB_NAME);
DBIterator iterator = sourceBlockIndexDb.iterator();
iterator.seekToFirst();
if (iterator.hasNext()) {
iterator.next();
if (iterator.hasNext()) {
first = Longs.fromByteArray(iterator.getKey());
}
}

if (latest - first + 1 < RECENT_BLKS) {
throw new NoSuchElementException(
String.format("At least %d blocks in block store, actual latestBlock:%d, firstBlock:%d.",
RECENT_BLKS, latest, first));
}
}

private boolean isLite(String databaseDir) throws RocksDBException, IOException {
DBInterface sourceDb = DbTool.getDB(databaseDir, BLOCK_INDEX_DB_NAME);
DBInterface checkpointDb = DbTool.getDB(databaseDir, CHECKPOINT_DB);
byte[] key = ByteArray.fromLong(1);
byte[] valueFromTmp = checkpointDb.get(Bytes.concat(simpleEncode(BLOCK_INDEX_DB_NAME), key));
byte[] value;
if (isEmptyBytes(valueFromTmp)) {
value = sourceDb.get(key);
} else {
value = valueFromTmp.length == 1
? null : Arrays.copyOfRange(valueFromTmp, 1, valueFromTmp.length);
}
return isEmptyBytes(value);
}

@VisibleForTesting
public static void setRecentBlks(long recentBlks) {
RECENT_BLKS = recentBlks;
}

@VisibleForTesting
public static void reSetRecentBlks() {
RECENT_BLKS = 65536;
}

private void run(Args argv) {
if (StringUtils.isBlank(argv.fnDataPath) || StringUtils.isBlank(argv.datasetPath)) {
throw new ParameterException("fnDataPath or datasetPath can't be null");
Expand Down