Skip to content

Commit

Permalink
Allow querying all keys with a prefix through storage queries (#576)
Browse files Browse the repository at this point in the history
* Add trie_node_info function

* Allow passing nibbles for the node info

* Add a `prefix_proof` module

* Comment and fmt

* Finish changes

* Build fix

* Fix doc

* Verbose panic message

* More docfix

* Add comment

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
tomaka and mergify[bot] committed Mar 1, 2021
1 parent a91bca4 commit 02ac82d
Show file tree
Hide file tree
Showing 7 changed files with 425 additions and 28 deletions.
56 changes: 50 additions & 6 deletions bin/wasm-node/rust/src/json_rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,49 @@ impl JsonRpcService {
.to_json_response(request_id),
);
}
methods::MethodCall::state_getKeysPaged {
prefix,
count,
start_key,
hash,
} => {
assert!(hash.is_none()); // TODO: not implemented

let mut lock = self.blocks.lock().await;

let block_hash = lock.best_block;
let state_root = lock.known_blocks.get(&block_hash).unwrap().state_root;
drop(lock);

let outcome = self
.network_service
.clone()
.storage_prefix_keys_query(
self.network_chain_index,
&block_hash,
&prefix.unwrap().0, // TODO: don't unwrap! what is this Option?
&state_root,
)
.await;

self.send_back(&match outcome {
Ok(keys) => {
// TODO: instead of requesting all keys with that prefix from the network, pass `start_key` to the network service
let out = keys
.into_iter()
.filter(|k| start_key.as_ref().map_or(true, |start| k >= &start.0)) // TODO: not sure if start should be in the set or not?
.map(methods::HexString)
.take(usize::try_from(count).unwrap_or(usize::max_value()))
.collect::<Vec<_>>();
methods::Response::state_getKeysPaged(out).to_json_response(request_id)
}
Err(error) => json_rpc::parse::build_error_response(
request_id,
json_rpc::parse::ErrorResponse::ServerError(-32000, &error.to_string()),
None,
),
});
}
methods::MethodCall::state_queryStorageAt { keys, at } => {
let blocks = self.blocks.lock().await;
let at = at.as_ref().map(|h| h.0).unwrap_or(blocks.best_block);
Expand Down Expand Up @@ -1621,12 +1664,13 @@ impl JsonRpcService {
}
executor::read_only_runtime_host::RuntimeHostVm::StorageGet(get) => {
let requested_key = get.key_as_vec(); // TODO: optimization: don't use as_vec
let storage_value = proof_verify::verify_proof(proof_verify::Config {
requested_key: &requested_key,
trie_root_hash: &runtime_block_state_root,
proof: call_proof.iter().map(|v| &v[..]),
})
.unwrap(); // TODO: shouldn't unwrap but do storage_proof instead
let storage_value =
proof_verify::verify_proof(proof_verify::VerifyProofConfig {
requested_key: &requested_key,
trie_root_hash: &runtime_block_state_root,
proof: call_proof.iter().map(|v| &v[..]),
})
.unwrap(); // TODO: shouldn't unwrap but do storage_proof instead
runtime_call = get.inject_value(storage_value.as_ref().map(iter::once));
}
executor::read_only_runtime_host::RuntimeHostVm::NextKey(_) => {
Expand Down
66 changes: 64 additions & 2 deletions bin/wasm-node/rust/src/network_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use smoldot::{
peer_id::PeerId,
},
network::{protocol, service},
trie::proof_verify,
trie::{self, prefix_proof, proof_verify},
};
use std::{collections::HashSet, sync::Arc};

Expand Down Expand Up @@ -538,7 +538,7 @@ impl NetworkService {
let mut result = Vec::with_capacity(outcome.len());
for key in requested_keys.clone() {
result.push(
proof_verify::verify_proof(proof_verify::Config {
proof_verify::verify_proof(proof_verify::VerifyProofConfig {
proof: outcome.iter().map(|nv| &nv[..]),
requested_key: key.as_ref(),
trie_root_hash: &storage_trie_root,
Expand All @@ -565,6 +565,68 @@ impl NetworkService {
})
}

pub async fn storage_prefix_keys_query(
self: Arc<Self>,
chain_index: usize,
block_hash: &[u8; 32],
prefix: &[u8],
storage_trie_root: &[u8; 32],
) -> Result<Vec<Vec<u8>>, StorageQueryError> {
let mut prefix_scan = prefix_proof::prefix_scan(prefix_proof::Config {
prefix,
trie_root_hash: *storage_trie_root,
});

'main_scan: loop {
const NUM_ATTEMPTS: usize = 3;

let mut outcome_errors = Vec::with_capacity(NUM_ATTEMPTS);

// TODO: better peers selection ; don't just take the first 3
// TODO: must only ask the peers that know about this block
for target in self.peers_list().await.take(NUM_ATTEMPTS) {
let result = self
.clone()
.storage_proof_request(
chain_index,
target,
protocol::StorageProofRequestConfig {
block_hash: *block_hash,
keys: prefix_scan.requested_keys().map(|nibbles| {
trie::nibbles_to_bytes_extend(nibbles).collect::<Vec<_>>()
}),
},
)
.await
.map_err(StorageQueryErrorDetail::Network);

match result {
Ok(proof) => {
match prefix_scan.resume(proof.iter().map(|v| &v[..])) {
Ok(prefix_proof::ResumeOutcome::InProgress(scan)) => {
// Continue next step of the proof.
prefix_scan = scan;
continue 'main_scan;
}
Ok(prefix_proof::ResumeOutcome::Success { keys }) => {
return Ok(keys);
}
Err(err) => todo!("{:?}", err),
}
}
Err(err) => {
outcome_errors.push(err);
}
}
}

debug_assert_eq!(outcome_errors.len(), outcome_errors.capacity());
return Err(StorageQueryError {
errors: outcome_errors,
});
}
}

/// Sends a storage proof request to the given peer.
///
/// See also [`NetworkService::storage_query`].
Expand Down
12 changes: 7 additions & 5 deletions bin/wasm-node/rust/src/sync_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,11 +333,13 @@ async fn start_sync(
// TODO: log what happens
keys.into_iter()
.map(|key| {
proof_verify::verify_proof(proof_verify::Config {
proof: outcome.iter().map(|nv| &nv[..]),
requested_key: key.as_ref(),
trie_root_hash: &state_trie_root,
})
proof_verify::verify_proof(
proof_verify::VerifyProofConfig {
proof: outcome.iter().map(|nv| &nv[..]),
requested_key: key.as_ref(),
trie_root_hash: &state_trie_root,
},
)
.map_err(|_err| {
panic!("{:?}", _err); // TODO: remove panic, it's just for debugging
()
Expand Down
6 changes: 5 additions & 1 deletion src/trie.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,14 @@ mod nibble;

pub mod calculate_root;
pub mod node_value;
pub mod prefix_proof;
pub mod proof_verify;
pub mod trie_structure;

pub use nibble::{bytes_to_nibbles, BytesToNibbles, Nibble, NibbleFromU8Error};
pub use nibble::{
all_nibbles, bytes_to_nibbles, nibbles_to_bytes_extend, BytesToNibbles, Nibble,
NibbleFromU8Error,
};

/// Radix-16 Merkle-Patricia trie.
// TODO: probably useless, remove
Expand Down
22 changes: 21 additions & 1 deletion src/trie/nibble.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use core::{convert::TryFrom, fmt};
use core::{convert::TryFrom, fmt, iter};

/// A single nibble with four bits.
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
Expand Down Expand Up @@ -53,6 +53,26 @@ pub enum NibbleFromU8Error {
TooLarge,
}

/// Returns an iterator of all possible nibble values.
pub fn all_nibbles() -> impl ExactSizeIterator<Item = Nibble> {
(0..16).map(Nibble)
}

/// Turns an iterator of nibbles into an iterator of bytes.
///
/// If the number of nibbles is uneven, adds a `0` nibble at the end.
// TODO: ExactSizeIterator
pub fn nibbles_to_bytes_extend<I: Iterator<Item = Nibble>>(
mut nibbles: I,
) -> impl Iterator<Item = u8> {
iter::from_fn(move || {
let n1 = nibbles.next()?;
let n2 = nibbles.next().unwrap_or(Nibble(0));
let byte = (n1.0 << 4) | n2.0;
Some(byte)
})
}

/// Turns an iterator of bytes into an iterator of nibbles corresponding to these bytes.
///
/// For each byte, the iterator yields a nibble containing the 4 most significant bits then a
Expand Down
162 changes: 162 additions & 0 deletions src/trie/prefix_proof.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Smoldot
// Copyright (C) 2019-2021 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

//! Scanning, through trie proofs, the list of all keys that share a certain prefix.
//!
//! This module is a helper whose objective is to find out the list of all keys that start with
//! a certain prefix by performing storage proofs.
//!
//! The total number of storage proofs required is equal to the maximum depth of the tree below
//! the requested prefix, plus one. For example, if a tree has the nodes `[1, 5]`, `[1, 5, 8, 9]`,
//! and `[1, 5, 8, 9, 2]`, then four queries are necessary to find all the keys whose prefix
//! is `[1]`.

// TODO: usage example

// TODO: this code is entirely untested; no idea if it works

use super::{nibble, proof_verify};

use alloc::{vec, vec::Vec};
use core::{fmt, iter};

/// Configuration to pass to [`prefix_scan`].
pub struct Config<'a> {
/// Prefix that all the keys must share.
pub prefix: &'a [u8],

/// Merkle value (or node value) of the root node of the trie.
///
/// > **Note**: The Merkle value and node value are always the same for the root node.
pub trie_root_hash: [u8; 32],
}

/// Start a new scanning process.
pub fn prefix_scan(config: Config<'_>) -> PrefixScan {
PrefixScan {
trie_root_hash: config.trie_root_hash,
next_queries: vec![nibble::bytes_to_nibbles(config.prefix.iter().copied()).collect()],
final_result: Vec::with_capacity(32),
}
}

/// Scan of a prefix in progress.
pub struct PrefixScan {
trie_root_hash: [u8; 32],
// TODO: we have lots of Vecs here; maybe find a way to optimize
next_queries: Vec<Vec<nibble::Nibble>>,
// TODO: we have lots of Vecs here; maybe find a way to optimize
final_result: Vec<Vec<u8>>,
}

impl PrefixScan {
/// Returns the list of keys whose storage proof must be queried.
pub fn requested_keys(
&'_ self,
) -> impl Iterator<Item = impl Iterator<Item = nibble::Nibble> + '_> + '_ {
self.next_queries.iter().map(|l| l.iter().copied())
}

/// Injects the proof presumably containing the keys returned by [`PrefixScan::requested_keys`].
///
/// Returns an error if the proof is invalid. In that case, `self` isn't modified.
pub fn resume<'a>(
mut self,
proof: impl Iterator<Item = &'a [u8]> + Clone + 'a,
) -> Result<ResumeOutcome, proof_verify::Error> {
// The entire body is executed as long as verifying at least one proof succeeds.
for is_first_iteration in iter::once(true).chain(iter::repeat(false)) {
// Filled with the queries to perform at the next iteration.
// Capacity assumes a maximum of 2 children per node on average. This value was chosen
// completely arbitrarily.
let mut next = Vec::with_capacity(self.next_queries.len() * 2);

// True if any proof verification has succeeded during this iteration.
// Controls whether we continue iterating.
let mut any_successful_proof = false;

for query in &self.next_queries {
let info = match proof_verify::trie_node_info(proof_verify::TrieNodeInfoConfig {
requested_key: query.iter().cloned(),
trie_root_hash: &self.trie_root_hash,
proof: proof.clone(),
}) {
Ok(info) => info,
Err(err) if is_first_iteration => return Err(err),
Err(_) => continue,
};

any_successful_proof = true;

if info.node_value.is_some() {
// Trie nodes with a value are always aligned to "bytes-keys". In other words, the
// number of nibbles is always even.
debug_assert_eq!(query.len() % 2, 0);
let key = query
.chunks(2)
.map(|n| (u8::from(n[0]) << 4) | u8::from(n[1]))
.collect::<Vec<_>>();

// Insert in final results, making sure we check for duplicates.
debug_assert!(!self.final_result.iter().any(|n| *n == key));
self.final_result.push(key);
}

for child_nibble in info.children.next_nibbles() {
let mut next_query = Vec::with_capacity(query.len() + 1);
next_query.extend_from_slice(&query);
next_query.push(child_nibble);
next.push(next_query);
}
}

// Finished when nothing more to request.
if next.is_empty() {
return Ok(ResumeOutcome::Success {
keys: self.final_result,
});
}

// Update `next_queries` for the next iteration.
self.next_queries = next;

if !any_successful_proof {
break;
}
}

Ok(ResumeOutcome::InProgress(self))
}
}

impl fmt::Debug for PrefixScan {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("PrefixScan").finish()
}
}

/// Outcome of calling [`PrefixScan::resume`].
#[derive(Debug)]
pub enum ResumeOutcome {
/// Scan must continue with the next storage proof query.
InProgress(PrefixScan),
/// Scan has succeeded.
Success {
/// List of keys with the requested prefix.
keys: Vec<Vec<u8>>,
},
}
Loading

0 comments on commit 02ac82d

Please sign in to comment.