Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

collator protocol changes for elastic scaling (validator side) #3302

Merged
merged 14 commits into from
Mar 15, 2024
Merged
8 changes: 5 additions & 3 deletions polkadot/node/collation-generation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ async fn construct_and_distribute_receipt(
} = collation;

let persisted_validation_data_hash = validation_data.hash();
let parent_head_data = validation_data.parent_head.clone();
let parent_head_data_hash = validation_data.parent_head.hash();

// Apply compression to the block data.
Expand Down Expand Up @@ -551,12 +552,13 @@ async fn construct_and_distribute_receipt(
metrics.on_collation_generated();

sender
.send_message(CollatorProtocolMessage::DistributeCollation(
ccr,
.send_message(CollatorProtocolMessage::DistributeCollation {
candidate_receipt: ccr,
parent_head_data_hash,
pov,
parent_head_data,
result_sender,
))
})
.await;
}

Expand Down
33 changes: 17 additions & 16 deletions polkadot/node/collation-generation/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,11 +390,11 @@ fn sends_distribute_collation_message() {

assert_eq!(to_collator_protocol.len(), 1);
match AllMessages::from(to_collator_protocol.pop().unwrap()) {
AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation(
CandidateReceipt { descriptor, .. },
_pov,
..,
)) => {
AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation {
candidate_receipt,
..
}) => {
let CandidateReceipt { descriptor, .. } = candidate_receipt;
// signature generation is non-deterministic, so we can't just assert that the
// expected descriptor is correct. What we can do is validate that the produced
// descriptor has a valid signature, then just copy in the generated signature
Expand Down Expand Up @@ -529,11 +529,11 @@ fn fallback_when_no_validation_code_hash_api() {

assert_eq!(to_collator_protocol.len(), 1);
match &to_collator_protocol[0] {
AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation(
CandidateReceipt { descriptor, .. },
_pov,
..,
)) => {
AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation {
candidate_receipt,
..
}) => {
let CandidateReceipt { descriptor, .. } = candidate_receipt;
assert_eq!(expect_validation_code_hash, descriptor.validation_code_hash);
},
_ => panic!("received wrong message type"),
Expand Down Expand Up @@ -619,15 +619,16 @@ fn submit_collation_leads_to_distribution() {

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation(
ccr,
AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation {
candidate_receipt,
parent_head_data_hash,
..
)) => {
}) => {
let CandidateReceipt { descriptor, .. } = candidate_receipt;
assert_eq!(parent_head_data_hash, parent_head.hash());
assert_eq!(ccr.descriptor().persisted_validation_data_hash, expected_pvd.hash());
assert_eq!(ccr.descriptor().para_head, dummy_head_data().hash());
assert_eq!(ccr.descriptor().validation_code_hash, validation_code_hash);
assert_eq!(descriptor.persisted_validation_data_hash, expected_pvd.hash());
assert_eq!(descriptor.para_head, dummy_head_data().hash());
assert_eq!(descriptor.validation_code_hash, validation_code_hash);
}
);

Expand Down
17 changes: 12 additions & 5 deletions polkadot/node/core/prospective-parachains/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ use futures::{channel::oneshot, prelude::*};
use polkadot_node_subsystem::{
messages::{
Ancestors, ChainApiMessage, FragmentTreeMembership, HypotheticalCandidate,
HypotheticalFrontierRequest, IntroduceCandidateRequest, ProspectiveParachainsMessage,
ProspectiveValidationDataRequest, RuntimeApiMessage, RuntimeApiRequest,
HypotheticalFrontierRequest, IntroduceCandidateRequest, ParentHeadData,
ProspectiveParachainsMessage, ProspectiveValidationDataRequest, RuntimeApiMessage,
RuntimeApiRequest,
},
overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError,
};
Expand Down Expand Up @@ -764,8 +765,14 @@ fn answer_prospective_validation_data_request(
Some(s) => s,
};

let mut head_data =
storage.head_data_by_hash(&request.parent_head_data_hash).map(|x| x.clone());
let (mut head_data, parent_head_data_hash) = match request.parent_head_data {
ParentHeadData::OnlyHash(parent_head_data_hash) => (
storage.head_data_by_hash(&parent_head_data_hash).map(|x| x.clone()),
parent_head_data_hash,
),
ParentHeadData::WithData { head_data, hash } => (Some(head_data), hash),
};

let mut relay_parent_info = None;
let mut max_pov_size = None;

Expand All @@ -783,7 +790,7 @@ fn answer_prospective_validation_data_request(
}
if head_data.is_none() {
let required_parent = &fragment_tree.scope().base_constraints().required_parent;
if required_parent.hash() == request.parent_head_data_hash {
if required_parent.hash() == parent_head_data_hash {
head_data = Some(required_parent.clone());
}
}
Expand Down
4 changes: 2 additions & 2 deletions polkadot/node/core/prospective-parachains/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use assert_matches::assert_matches;
use polkadot_node_subsystem::{
errors::RuntimeApiError,
messages::{
AllMessages, HypotheticalFrontierRequest, ProspectiveParachainsMessage,
AllMessages, HypotheticalFrontierRequest, ParentHeadData, ProspectiveParachainsMessage,
ProspectiveValidationDataRequest,
},
};
Expand Down Expand Up @@ -468,7 +468,7 @@ async fn get_pvd(
let request = ProspectiveValidationDataRequest {
para_id,
candidate_relay_parent,
parent_head_data_hash: parent_head_data.hash(),
parent_head_data: ParentHeadData::OnlyHash(parent_head_data.hash()),
};
let (tx, rx) = oneshot::channel();
virtual_overseer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use polkadot_node_network_protocol::{
PeerId,
};
use polkadot_node_primitives::PoV;
use polkadot_primitives::{CandidateHash, CandidateReceipt, Hash, Id as ParaId};
use polkadot_primitives::{CandidateHash, CandidateReceipt, Hash, HeadData, Id as ParaId};

/// The status of a collation as seen from the collator.
pub enum CollationStatus {
Expand Down Expand Up @@ -63,6 +63,8 @@ pub struct Collation {
pub parent_head_data_hash: Hash,
/// Proof to verify the state transition of the parachain.
pub pov: PoV,
/// Parent head-data needed for elastic scaling.
pub parent_head_data: HeadData,
/// Collation status.
pub status: CollationStatus,
}
Expand Down
65 changes: 47 additions & 18 deletions polkadot/node/network/collator-protocol/src/collator_side/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use polkadot_node_subsystem_util::{
};
use polkadot_primitives::{
AuthorityDiscoveryId, CandidateHash, CandidateReceipt, CollatorPair, CoreIndex, CoreState,
GroupIndex, Hash, Id as ParaId, SessionIndex,
GroupIndex, Hash, HeadData, Id as ParaId, SessionIndex,
};

use super::LOG_TARGET;
Expand Down Expand Up @@ -347,6 +347,7 @@ async fn distribute_collation<Context>(
receipt: CandidateReceipt,
parent_head_data_hash: Hash,
pov: PoV,
parent_head_data: HeadData,
result_sender: Option<oneshot::Sender<CollationSecondedSignal>>,
) -> Result<()> {
let candidate_relay_parent = receipt.descriptor.relay_parent;
Expand Down Expand Up @@ -465,7 +466,13 @@ async fn distribute_collation<Context>(

per_relay_parent.collations.insert(
candidate_hash,
Collation { receipt, parent_head_data_hash, pov, status: CollationStatus::Created },
Collation {
receipt,
parent_head_data_hash,
pov,
parent_head_data,
status: CollationStatus::Created,
},
);

// If prospective parachains are disabled, a leaf should be known to peer.
Expand Down Expand Up @@ -763,20 +770,26 @@ async fn process_msg<Context>(
CollateOn(id) => {
state.collating_on = Some(id);
},
DistributeCollation(receipt, parent_head_data_hash, pov, result_sender) => {
DistributeCollation {
candidate_receipt,
parent_head_data_hash,
pov,
parent_head_data,
result_sender,
} => {
let _span1 = state
.span_per_relay_parent
.get(&receipt.descriptor.relay_parent)
.get(&candidate_receipt.descriptor.relay_parent)
.map(|s| s.child("distributing-collation"));
let _span2 = jaeger::Span::new(&pov, "distributing-collation");

match state.collating_on {
Some(id) if receipt.descriptor.para_id != id => {
Some(id) if candidate_receipt.descriptor.para_id != id => {
// If the ParaId of a collation requested to be distributed does not match
// the one we expect, we ignore the message.
gum::warn!(
target: LOG_TARGET,
para_id = %receipt.descriptor.para_id,
para_id = %candidate_receipt.descriptor.para_id,
collating_on = %id,
"DistributeCollation for unexpected para_id",
);
Expand All @@ -788,17 +801,18 @@ async fn process_msg<Context>(
runtime,
state,
id,
receipt,
candidate_receipt,
parent_head_data_hash,
pov,
parent_head_data,
result_sender,
)
.await?;
},
None => {
gum::warn!(
target: LOG_TARGET,
para_id = %receipt.descriptor.para_id,
para_id = %candidate_receipt.descriptor.para_id,
"DistributeCollation message while not collating on any",
);
},
Expand Down Expand Up @@ -835,20 +849,30 @@ async fn send_collation(
request: VersionedCollationRequest,
receipt: CandidateReceipt,
pov: PoV,
_parent_head_data: HeadData,
) {
let (tx, rx) = oneshot::channel();

let relay_parent = request.relay_parent();
let peer_id = request.peer_id();
let candidate_hash = receipt.hash();

// The response payload is the same for both versions of protocol
// The response payload is the same for v1 and v2 versions of protocol
// and doesn't have v2 alias for simplicity.
let response = OutgoingResponse {
result: Ok(request_v1::CollationFetchingResponse::Collation(receipt, pov)),
reputation_changes: Vec::new(),
sent_feedback: Some(tx),
};
// For now, we don't send parent head data to the collation requester.
let result =
// if assigned_multiple_cores {
// Ok(request_v1::CollationFetchingResponse::CollationWithParentHeadData {
// receipt,
// pov,
// parent_head_data,
// })
// } else {
Ok(request_v1::CollationFetchingResponse::Collation(receipt, pov))
// }
;
let response =
OutgoingResponse { result, reputation_changes: Vec::new(), sent_feedback: Some(tx) };

if let Err(_) = request.send_outgoing_response(response) {
gum::warn!(target: LOG_TARGET, "Sending collation response failed");
Expand Down Expand Up @@ -1027,9 +1051,13 @@ async fn handle_incoming_request<Context>(
return Ok(())
},
};
let (receipt, pov) = if let Some(collation) = collation {
let (receipt, pov, parent_head_data) = if let Some(collation) = collation {
collation.status.advance_to_requested();
(collation.receipt.clone(), collation.pov.clone())
(
collation.receipt.clone(),
collation.pov.clone(),
collation.parent_head_data.clone(),
)
} else {
gum::warn!(
target: LOG_TARGET,
Expand Down Expand Up @@ -1068,7 +1096,7 @@ async fn handle_incoming_request<Context>(
waiting.collation_fetch_active = true;
// Obtain a timer for sending collation
let _ = state.metrics.time_collation_distribution("send");
send_collation(state, req, receipt, pov).await;
send_collation(state, req, receipt, pov, parent_head_data).await;
}
},
Some(our_para_id) => {
Expand Down Expand Up @@ -1453,8 +1481,9 @@ async fn run_inner<Context>(
if let Some(collation) = next_collation {
let receipt = collation.receipt.clone();
let pov = collation.pov.clone();
let parent_head_data = collation.parent_head_data.clone();

send_collation(&mut state, next, receipt, pov).await;
send_collation(&mut state, next, receipt, pov, parent_head_data).await;
}
},
(candidate_hash, peer_id) = state.advertisement_timeouts.select_next_some() => {
Expand Down
Loading
Loading