Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement downloading the chain storage #2876

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/full-node/src/run/consensus_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ impl SyncBackground {
.push(request.map(move |r| (request_id, r)).boxed());
}
all::DesiredRequest::GrandpaWarpSync { .. }
| all::DesiredRequest::StorageGet { .. }
| all::DesiredRequest::StorageGetKeys { .. }
| all::DesiredRequest::RuntimeCallMerkleProof { .. } => {
// Not used in "full" mode.
unreachable!()
Expand Down
118 changes: 114 additions & 4 deletions bin/light-base/src/network_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@
use crate::platform::Platform;

use alloc::{
borrow::ToOwned as _,
boxed::Box,
string::{String, ToString as _},
sync::Arc,
vec::Vec,
};
use core::{cmp, num::NonZeroUsize, pin::Pin, task::Poll, time::Duration};
use core::{cmp, num::NonZeroUsize, ops, pin::Pin, task::Poll, time::Duration};
use futures::{
channel::{mpsc, oneshot},
lock::Mutex,
Expand All @@ -59,7 +60,7 @@ use smoldot::{
network::{protocol, service},
};

pub use service::EncodedMerkleProof;
pub use service::{EncodedMerkleProof, EncodedStateResponse};

mod tasks;

Expand Down Expand Up @@ -188,6 +189,12 @@ struct SharedGuarded<TPlat: Platform> {
fnv::FnvBuildHasher,
>,

storage_requests: HashMap<
service::OutRequestId,
oneshot::Sender<Result<EncodedStateResponse, service::StateRequestError>>,
fnv::FnvBuildHasher,
>,

call_proof_requests: HashMap<
service::OutRequestId,
oneshot::Sender<Result<service::EncodedMerkleProof, service::CallProofRequestError>>,
Expand Down Expand Up @@ -268,6 +275,7 @@ impl<TPlat: Platform> NetworkService<TPlat> {
Default::default(),
),
storage_proof_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
storage_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
call_proof_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
kademlia_discovery_operations: HashMap::with_capacity_and_hasher(
2,
Expand Down Expand Up @@ -648,6 +656,89 @@ impl<TPlat: Platform> NetworkService<TPlat> {
result.map_err(StorageProofRequestError::Request)
}

/// Sends a storage request for a range of keys to the given peer.
// TODO: more docs
pub async fn state_request(
self: Arc<Self>,
chain_index: usize,
target: PeerId, // TODO: takes by value because of futures longevity issue
block_hash: [u8; 32],
keys: ops::RangeFrom<Vec<u8>>,
timeout: Duration,
) -> Result<(Vec<(Vec<u8>, Vec<u8>)>, bool), StorageRequestError> {
let rx = {
let mut guarded = self.shared.guarded.lock().await;

// The call to `start_state_request_unchecked` below panics if we have no active
// connection.
if !guarded.network.can_start_requests(&target) {
return Err(StorageRequestError::NoConnection);
}

log::debug!(
target: "network",
"Connection({}) <= StorageRequest(chain={}, block={}, start_key={})",
target,
self.shared.log_chain_names[chain_index],
HashDisplay(&block_hash),
HashDisplay(&keys.start)
);

let request_id = guarded.network.start_state_request_unchecked(
TPlat::now(),
&target,
chain_index,
&block_hash,
&keys.start,
timeout,
);

self.shared.wake_up_main_background_task.notify(1);

let (tx, rx) = oneshot::channel();
guarded.storage_requests.insert(request_id, tx);
rx
};

let result = rx.await.unwrap();

match result {
Ok(response) => {
let decoded = response.decode();
log::debug!(
target: "network",
"Connection({}) => StorageRequest(chain={}, num_elems={}, total_size={}, complete={:?})",
target,
self.shared.log_chain_names[chain_index],
decoded.entries.len(),
BytesDisplay(decoded.entries.iter().fold(0, |a, b| a + u64::try_from(b.key.len() + b.value.len()).unwrap())),
decoded.complete,
);

// TODO: the `to_owned()` here are kind of expensive, figure if not possible to avoid them
Ok((
decoded
.entries
.into_iter()
.map(|entry| (entry.key.to_owned(), entry.value.to_owned()))
.collect::<Vec<_>>(),
decoded.complete,
))
}
Err(err) => {
log::debug!(
target: "network",
"Connection({}) => StorageRequest(chain={}, error={:?})",
target,
self.shared.log_chain_names[chain_index],
err
);

Err(StorageRequestError::Request(err))
}
}
}

/// Sends a call proof request to the given peer.
///
/// See also [`NetworkService::call_proof_request`].
Expand Down Expand Up @@ -916,6 +1007,16 @@ pub enum StorageProofRequestError {
Request(service::StorageProofRequestError),
}

/// Error returned by [`NetworkService::storage_request`].
#[derive(Debug, derive_more::Display, Clone)]
pub enum StorageRequestError {
/// No established connection with the target.
NoConnection,
/// Error during the request.
#[display(fmt = "{}", _0)]
Request(service::StateRequestError),
}

/// Error returned by [`NetworkService::call_proof_request`].
#[derive(Debug, derive_more::Display, Clone)]
pub enum CallProofRequestError {
Expand Down Expand Up @@ -1143,6 +1244,16 @@ async fn update_round<TPlat: Platform>(
.unwrap()
.send(response);
}
service::Event::StateRequestResult {
request_id,
response,
} => {
let _ = guarded
.storage_requests
.remove(&request_id)
.unwrap()
.send(response);
}
service::Event::CallProofRequestResult {
request_id,
response,
Expand All @@ -1153,8 +1264,7 @@ async fn update_round<TPlat: Platform>(
.unwrap()
.send(response);
}
service::Event::StateRequestResult { .. }
| service::Event::KademliaFindNodeRequestResult { .. } => {
service::Event::KademliaFindNodeRequestResult { .. } => {
// We never start this kind of requests.
unreachable!()
}
Expand Down
68 changes: 62 additions & 6 deletions bin/light-base/src/sync_service/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ pub(super) async fn start_standalone_chain<TPlat: Platform>(
known_finalized_runtime: None,
pending_block_requests: stream::FuturesUnordered::new(),
pending_grandpa_requests: stream::FuturesUnordered::new(),
pending_storage_requests: stream::FuturesUnordered::new(),
pending_storage_keys_requests: stream::FuturesUnordered::new(),
pending_storage_range_requests: stream::FuturesUnordered::new(),
pending_call_proof_requests: stream::FuturesUnordered::new(),
warp_sync_taking_long_time_warning: future::Either::Left(TPlat::sleep(
Duration::from_secs(15),
Expand Down Expand Up @@ -277,12 +278,12 @@ pub(super) async fn start_standalone_chain<TPlat: Platform>(
}
},

(request_id, result) = task.pending_storage_requests.select_next_some() => {
(request_id, result) = task.pending_storage_keys_requests.select_next_some() => {
// A storage request has been finished.
// `result` is an error if the request got cancelled by the sync state machine.
if let Ok(result) = result {
// Inject the result of the request into the sync state machine.
task.sync.storage_get_response(
task.sync.storage_get_keys_response(
request_id,
result.map(|list| list.into_iter()),
).1
Expand All @@ -294,6 +295,23 @@ pub(super) async fn start_standalone_chain<TPlat: Platform>(
}
},

(request_id, result) = task.pending_storage_range_requests.select_next_some() => {
// A storage request has been finished.
// `result` is an error if the request got cancelled by the sync state machine.
if let Ok(result) = result {
// Inject the result of the request into the sync state machine.
task.sync.storage_get_keys_range_response(
request_id,
result.map(|(list, complete)| (list.into_iter(), complete)).map_err(|_| ()),
).1

} else {
// The sync state machine has emitted a `Action::Cancel` earlier, and is
// thus no longer interested in the response.
continue;
}
},

(request_id, result) = task.pending_call_proof_requests.select_next_some() => {
// A call proof request has been finished.
// `result` is an error if the request got cancelled by the sync state machine.
Expand Down Expand Up @@ -422,7 +440,7 @@ struct Task<TPlat: Platform> {
>,

/// List of storage requests currently in progress.
pending_storage_requests: stream::FuturesUnordered<
pending_storage_keys_requests: stream::FuturesUnordered<
future::BoxFuture<
'static,
(
Expand All @@ -432,6 +450,20 @@ struct Task<TPlat: Platform> {
>,
>,

/// List of storage requests currently in progress.
pending_storage_range_requests: stream::FuturesUnordered<
future::BoxFuture<
'static,
(
all::RequestId,
Result<
Result<(Vec<(Vec<u8>, Vec<u8>)>, bool), network_service::StorageRequestError>,
future::Aborted,
>,
),
>,
>,

/// List of call proof requests currently in progress.
pending_call_proof_requests: stream::FuturesUnordered<
future::BoxFuture<
Expand Down Expand Up @@ -544,7 +576,7 @@ impl<TPlat: Platform> Task<TPlat> {
.push(async move { (request_id, grandpa_request.await) }.boxed());
}

all::DesiredRequest::StorageGet {
all::DesiredRequest::StorageGetKeys {
block_hash,
state_trie_root,
ref keys,
Expand Down Expand Up @@ -588,7 +620,31 @@ impl<TPlat: Platform> Task<TPlat> {
.sync
.add_request(source_id, request_detail.into(), abort);

self.pending_storage_requests
self.pending_storage_keys_requests
.push(async move { (request_id, storage_request.await) }.boxed());
}

all::DesiredRequest::StorageGetRange {
block_hash,
ref keys,
..
} => {
let peer_id = self.sync[source_id].0.clone(); // TODO: why does this require cloning? weird borrow chk issue

let storage_request = self.network_service.clone().state_request(
self.network_chain_index,
peer_id,
block_hash,
keys.clone(),
Duration::from_secs(16),
);

let (storage_request, abort) = future::abortable(storage_request);
let request_id = self
.sync
.add_request(source_id, request_detail.into(), abort);

self.pending_storage_range_requests
.push(async move { (request_id, storage_request.await) }.boxed());
}

Expand Down
3 changes: 1 addition & 2 deletions src/chain_spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,7 @@ impl ChainSpec {
trie::calculate_root::RootMerkleValueCalculation::StorageValue(
val,
) => {
let key: alloc::vec::Vec<u8> = val.key().collect();
let value = genesis_storage.value(&key[..]);
let value = genesis_storage.value(&val.key_as_vec()[..]);
calculation = val.inject(state_version, value);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/executor/runtime_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ impl Inner {
// TODO: allocating a Vec, meh
if let Some(overlay) = self
.top_trie_changes
.diff_get(&value_request.key().collect::<Vec<_>>())
.diff_get(&value_request.key_as_vec())
{
// TODO: we only support V0 for now, see https://github.com/paritytech/smoldot/issues/1967
self.root_calculation =
Expand Down
Loading