Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Trie Node Retrieve and Save #1569

Merged
merged 8 commits into from
Sep 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions rskj-core/src/main/java/co/rsk/cli/tools/ExecuteBlocks.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@

import co.rsk.RskContext;
import co.rsk.core.bc.BlockExecutor;
import co.rsk.core.bc.BlockResult;
import co.rsk.crypto.Keccak256;
import co.rsk.db.StateRootHandler;
import co.rsk.trie.TrieStore;
import org.ethereum.core.Block;
import org.ethereum.db.BlockStore;

import java.util.Arrays;

/**
* The entry point for execute blocks CLI tool
* This is an experimental/unsupported tool
Expand All @@ -34,19 +39,27 @@ public static void main(String[] args) {
BlockExecutor blockExecutor = ctx.getBlockExecutor();
BlockStore blockStore = ctx.getBlockStore();
TrieStore trieStore = ctx.getTrieStore();

execute(args, blockExecutor, blockStore, trieStore);
StateRootHandler stateRootHandler = ctx.getStateRootHandler();

execute(args, blockExecutor, blockStore, trieStore, stateRootHandler);
}

public static void execute(String[] args, BlockExecutor blockExecutor, BlockStore blockStore, TrieStore trieStore) {
public static void execute(String[] args, BlockExecutor blockExecutor, BlockStore blockStore, TrieStore trieStore,
StateRootHandler stateRootHandler) {
long fromBlockNumber = Long.parseLong(args[0]);
long toBlockNumber = Long.parseLong(args[1]);

for (long n = fromBlockNumber; n <= toBlockNumber; n++) {
Block block = blockStore.getChainBlockByNumber(n);
Block parent = blockStore.getBlockByHash(block.getParentHash().getBytes());

blockExecutor.execute(block, parent.getHeader(), false, false);
BlockResult blockResult = blockExecutor.execute(block, parent.getHeader(), false, false);

Keccak256 stateRootHash = stateRootHandler.translate(block.getHeader());
if (!Arrays.equals(blockResult.getFinalState().getHash().getBytes(), stateRootHash.getBytes())) {
System.err.println("Invalid state root block number " + n);
break;
}
}

trieStore.flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

public class BootstrapImporter {

Expand Down Expand Up @@ -76,7 +77,7 @@ private static void insertState(TrieStore destinationTrieStore, RLPElement rlpEl

for (int k = 0; k < nodesData.size(); k++) {
RLPElement element = nodesData.get(k);
byte[] rlpData = element.getRLPData();
byte[] rlpData = Objects.requireNonNull(element.getRLPData());
Trie trie = Trie.fromMessage(rlpData, fakeStore);
hashMapDB.put(trie.getHash().getBytes(), rlpData);
nodes.add(trie);
Expand Down
2 changes: 1 addition & 1 deletion rskj-core/src/main/java/co/rsk/trie/MultiTrieStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public Optional<Trie> retrieve(byte[] rootHash) {
if (message == null) {
continue;
}
return Optional.of(Trie.fromMessage(message, this));
return Optional.of(Trie.fromMessage(message, this).markAsSaved());
}

return Optional.empty();
Expand Down
8 changes: 7 additions & 1 deletion rskj-core/src/main/java/co/rsk/trie/NodeReference.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

public class NodeReference {

private static final NodeReference EMPTY = new NodeReference(null, null, null);

private final TrieStore store;

Expand Down Expand Up @@ -106,6 +107,11 @@ public boolean isEmbeddable() {

}

// the referenced node was loaded
public boolean wasLoaded() {
return lazyNode != null;
}

// This method should only be called from save()
public int serializedLength() {
if (!isEmpty()) {
Expand Down Expand Up @@ -149,6 +155,6 @@ private long nodeSize(Trie trie) {
}

public static NodeReference empty() {
return new NodeReference(null, null, null);
return EMPTY;
}
}
34 changes: 26 additions & 8 deletions rskj-core/src/main/java/co/rsk/trie/Trie.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,18 @@
* An empty node has no subnodes and a null value
*/
public class Trie {
private static final int ARITY = 2;
private static final int MAX_EMBEDDED_NODE_SIZE_IN_BYTES = 44;

private static final Profiler profiler = ProfilerFactory.getInstance();

private static final int ARITY = 2;
private static final int MAX_EMBEDDED_NODE_SIZE_IN_BYTES = 44;
private static final String INVALID_ARITY = "Invalid arity";

private static final int MESSAGE_HEADER_LENGTH = 2 + Short.BYTES * 2;
private static final String INVALID_VALUE_LENGTH = "Invalid value length";

// all zeroed, default hash for empty nodes
private static Keccak256 emptyHash = makeEmptyHash();
private static final Keccak256 EMPTY_HASH = makeEmptyHash();

// this node associated value, if any
private byte[] value;
Expand Down Expand Up @@ -101,7 +102,10 @@ public class Trie {
private VarInt childrenSize;

// associated store, to store or retrieve nodes in the trie
private TrieStore store;
private final TrieStore store;

// already saved in store flag
private volatile boolean saved;

// shared Path
private final TrieKeySlice sharedPath;
Expand Down Expand Up @@ -152,6 +156,7 @@ public static Trie fromMessage(byte[] message, TrieStore store) {
}

profiler.stop(metric);

return trie;
}

Expand Down Expand Up @@ -232,7 +237,9 @@ private static Trie fromMessageOrchid(byte[] message, TrieStore store) {
}

// it doesn't need to clone value since it's retrieved from store or created from message
return new Trie(store, sharedPath, value, left, right, lvalue, valueHash);
Trie trie = new Trie(store, sharedPath, value, left, right, lvalue, valueHash);

return trie;
}

private static Trie fromMessageRskip107(ByteBuffer message, TrieStore store) {
Expand Down Expand Up @@ -320,7 +327,9 @@ private static Trie fromMessageRskip107(ByteBuffer message, TrieStore store) {
throw new IllegalArgumentException("The message had more data than expected");
}

return new Trie(store, sharedPath, value, left, right, lvalue, valueHash, childrenSize);
Trie trie = new Trie(store, sharedPath, value, left, right, lvalue, valueHash, childrenSize);

return trie;
}

/**
Expand All @@ -342,7 +351,7 @@ public Keccak256 getHash() {
}

if (isEmptyTrie()) {
return emptyHash.copy();
return EMPTY_HASH.copy();
}

byte[] message = this.toMessage();
Expand All @@ -361,7 +370,7 @@ public Keccak256 getHashOrchid(boolean isSecure) {
}

if (isEmptyTrie()) {
return emptyHash.copy();
return EMPTY_HASH.copy();
}

byte[] message = this.toMessageOrchid(isSecure);
Expand Down Expand Up @@ -1435,4 +1444,13 @@ private List<Trie> findNodes(TrieKeySlice key) {

return subnodes;
}

public boolean wasSaved() {
return this.saved;
}

public Trie markAsSaved() {
this.saved = true;
return this;
}
}
117 changes: 93 additions & 24 deletions rskj-core/src/main/java/co/rsk/trie/TrieStoreImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@

package co.rsk.trie;

import org.ethereum.datasource.DataSourceWithCache;
import org.ethereum.datasource.KeyValueDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import javax.annotation.Nullable;
import java.util.Optional;
import java.util.Set;
import java.util.WeakHashMap;

/**
* TrieStoreImpl store and retrieve Trie node by hash
Expand All @@ -40,11 +39,9 @@ public class TrieStoreImpl implements TrieStore {

private static final Logger logger = LoggerFactory.getLogger("triestore");

private KeyValueDataSource store;
private static final ThreadLocal<TraceInfo> traceInfoLocal = ThreadLocal.withInitial(TraceInfo::new);

/** Weak references are removed once the tries are garbage collected */
private Set<Trie> savedTries = Collections
.newSetFromMap(Collections.synchronizedMap(new WeakHashMap<>()));
private final KeyValueDataSource store;

public TrieStoreImpl(KeyValueDataSource store) {
this.store = store;
Expand All @@ -55,33 +52,79 @@ public TrieStoreImpl(KeyValueDataSource store) {
*/
@Override
public void save(Trie trie) {
logger.trace("Start saving trie root.");
save(trie, true, 0);
logger.trace("End saving trie root.");
TraceInfo traceInfo = null;
if (logger.isTraceEnabled()) {
traceInfo = traceInfoLocal.get();
traceInfo.numOfRetrievesInSaveTrie = 0;
traceInfo.numOfSavesInSaveTrie = 0;
traceInfo.numOfNoSavesInSaveTrie = 0;

logger.trace("Start saving trie root.");
}

// save a trie recursively
save(trie, true, 0, traceInfo);

if (traceInfo != null) {
logger.trace("End saving trie root. No. Retrieves: {}. No. Saves: {}. No. No Saves: {}",
traceInfo.numOfRetrievesInSaveTrie, traceInfo.numOfSavesInSaveTrie, traceInfo.numOfNoSavesInSaveTrie);
logger.trace("End process block. No. Retrieves: {}. No. Saves: {}. No. No Saves: {}",
traceInfo.numOfRetrievesInBlockProcess, traceInfo.numOfSavesInBlockProcess, traceInfo.numOfNoSavesInBlockProcess);

traceInfo.numOfRetrievesInBlockProcess = 0;
traceInfo.numOfSavesInBlockProcess = 0;
traceInfo.numOfNoSavesInBlockProcess = 0;

if (store instanceof DataSourceWithCache) {
((DataSourceWithCache) store).emitLogs();
}

traceInfoLocal.remove();
}
}

/**
* @param forceSaveRoot allows saving the root node even if it's embeddable
* @param isRootNode it is the root node of the trie
*/
private void save(Trie trie, boolean forceSaveRoot, int level) {
logger.trace("Start saving trie, level : {}", level);
if (savedTries.contains(trie)) {
// it is guaranteed that the children of a saved node are also saved
private void save(Trie trie, boolean isRootNode, int level, @Nullable TraceInfo traceInfo) {
if (trie.wasSaved()) {
return;
}

logger.trace("Start saving trie, level : {}", level);

byte[] trieKeyBytes = trie.getHash().getBytes();

if (forceSaveRoot && this.store.get(trieKeyBytes) != null) {
if (isRootNode && this.store.get(trieKeyBytes) != null) {
// the full trie is already saved
logger.trace("End saving trie, level : {}, already saved.", level);

if (traceInfo != null) {
traceInfo.numOfNoSavesInSaveTrie++;
traceInfo.numOfNoSavesInBlockProcess++;
}

return;
}

logger.trace("Start left trie. Level: {}", level);
trie.getLeft().getNode().ifPresent(t -> save(t, false, level + 1));
logger.trace("Start right trie. Level: {}", level);
trie.getRight().getNode().ifPresent(t -> save(t, false, level + 1));
if (traceInfo != null) {
traceInfo.numOfSavesInSaveTrie++;
traceInfo.numOfSavesInBlockProcess++;
}

NodeReference leftNodeReference = trie.getLeft();

if (leftNodeReference.wasLoaded()) {
logger.trace("Start left trie. Level: {}", level);
leftNodeReference.getNode().ifPresent(t -> save(t, false, level + 1, traceInfo));
}

NodeReference rightNodeReference = trie.getRight();

if (rightNodeReference.wasLoaded()) {
logger.trace("Start right trie. Level: {}", level);
rightNodeReference.getNode().ifPresent(t -> save(t, false, level + 1, traceInfo));
}

if (trie.hasLongValue()) {
// Note that there is no distinction in keys between node data and value data. This could bring problems in
Expand All @@ -98,15 +141,15 @@ private void save(Trie trie, boolean forceSaveRoot, int level) {
logger.trace("End Putting in store, hasLongValue. Level: {}", level);
}

if (trie.isEmbeddable() && !forceSaveRoot) {
if (trie.isEmbeddable() && !isRootNode) {
logger.trace("End Saving. Level: {}", level);
return;
}

logger.trace("Putting in store trie root.");
this.store.put(trieKeyBytes, trie.toMessage());
trie.markAsSaved();
logger.trace("End putting in store trie root.");
savedTries.add(trie);
logger.trace("End Saving trie, level: {}.", level);
}

Expand All @@ -118,22 +161,48 @@ public void flush(){
@Override
public Optional<Trie> retrieve(byte[] hash) {
byte[] message = this.store.get(hash);

if (message == null) {
return Optional.empty();
}

Trie trie = Trie.fromMessage(message, this);
savedTries.add(trie);
if (logger.isTraceEnabled()) {
TraceInfo traceInfo = traceInfoLocal.get();
traceInfo.numOfRetrievesInSaveTrie++;
traceInfo.numOfRetrievesInBlockProcess++;
}

Trie trie = Trie.fromMessage(message, this).markAsSaved();
return Optional.of(trie);
}

@Override
public byte[] retrieveValue(byte[] hash) {
if (logger.isTraceEnabled()) {
TraceInfo traceInfo = traceInfoLocal.get();
traceInfo.numOfRetrievesInSaveTrie++;
traceInfo.numOfRetrievesInBlockProcess++;
}

return this.store.get(hash);
}

@Override
public void dispose() {
store.close();
}

/**
* This holds tracing information during execution of the {@link #save(Trie)} method.
* Should not be used when logger tracing is disabled ({@link Logger#isTraceEnabled()} is {@code false}).
*/
private static final class TraceInfo {
private int numOfRetrievesInBlockProcess;
private int numOfSavesInBlockProcess;
private int numOfNoSavesInBlockProcess;

private int numOfRetrievesInSaveTrie;
private int numOfSavesInSaveTrie;
private int numOfNoSavesInSaveTrie;
}
}
Loading