Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process SubmitBlock requests in parallel #357

Merged
merged 10 commits into from
Dec 18, 2023
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions kaspad/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm
mining_manager,
flow_context,
index_service.as_ref().map(|x| x.utxoindex().unwrap()),
config,
config.clone(),
core.clone(),
processing_counters,
wrpc_borsh_counters.clone(),
Expand All @@ -451,7 +451,7 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm
grpc_tower_counters.clone(),
));
let grpc_service =
Arc::new(GrpcService::new(grpc_server_addr, rpc_core_service.clone(), args.rpc_max_clients, grpc_tower_counters));
Arc::new(GrpcService::new(grpc_server_addr, config, rpc_core_service.clone(), args.rpc_max_clients, grpc_tower_counters));

// Create an async runtime and register the top-level async services
let async_runtime = Arc::new(AsyncRuntime::new(args.async_threads));
Expand Down
8 changes: 2 additions & 6 deletions protocol/flows/src/flow_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,12 +381,8 @@ impl FlowContext {
// Broadcast as soon as the block has been validated and inserted into the DAG
self.hub.broadcast(make_message!(Payload::InvRelayBlock, InvRelayBlockMessage { hash: Some(hash.into()) })).await;

let ctx = self.clone();
let consensus = consensus.clone();
tokio::spawn(async move {
ctx.on_new_block(&consensus, block, virtual_state_task).await;
ctx.log_block_acceptance(hash, BlockSource::Submit);
});
self.on_new_block(consensus, block, virtual_state_task).await;
self.log_block_acceptance(hash, BlockSource::Submit);

Ok(())
}
Expand Down
5 changes: 4 additions & 1 deletion rpc/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{net::AddrParseError, num::TryFromIntError};
use thiserror::Error;
use workflow_core::channel::ChannelError;

use crate::{api::ctl::RpcState, RpcHash, RpcTransactionId};
use crate::{api::ctl::RpcState, RpcHash, RpcTransactionId, SubmitBlockRejectReason};

#[derive(Clone, Debug, Error)]
pub enum RpcError {
Expand Down Expand Up @@ -77,6 +77,9 @@ pub enum RpcError {
#[error("IP {0} is not registered as banned.")]
IpIsNotBanned(IpAddress),

#[error("Block was not submitted: {0}")]
SubmitBlockError(SubmitBlockRejectReason),

#[error(transparent)]
AddressError(#[from] kaspa_addresses::AddressError),

Expand Down
8 changes: 5 additions & 3 deletions rpc/core/src/model/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,20 @@ impl SubmitBlockRequest {
}
}

#[derive(Eq, PartialEq, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize, BorshSchema)]
#[derive(Clone, Copy, Eq, PartialEq, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize, BorshSchema)]
#[serde(rename_all = "camelCase")]
pub enum SubmitBlockRejectReason {
BlockInvalid = 1,
IsInIBD = 2,
RouteIsFull = 3,
}
impl SubmitBlockRejectReason {
fn as_str(&self) -> &'static str {
// see app\appmessage\rpc_submit_block.go, line 35
match self {
SubmitBlockRejectReason::BlockInvalid => "Block is invalid",
SubmitBlockRejectReason::IsInIBD => "Node is in IBD",
SubmitBlockRejectReason::BlockInvalid => "block is invalid",
SubmitBlockRejectReason::IsInIBD => "node is not synced",
SubmitBlockRejectReason::RouteIsFull => "route is full",
}
}
}
Expand Down
138 changes: 134 additions & 4 deletions rpc/grpc/core/src/convert/message.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,30 @@
//! Conversions of protowire messages from and to rpc core counterparts.
//!
//! Response payloads in protowire do always contain an error field and generally a set of
//! fields providing the requested data.
//!
//! Responses in rpc core are expressed as RpcResult<XxxResponse>, where Xxx is the called
//! RPC method.
//!
//! The general conversion convention from protowire to rpc core is to consider the error
//! field first and, if present, to return a matching Err(RpcError). If absent, try to
//! convert the set of data fields into a matching XxxResponse rpc core response and, on
//! success, return Ok(XxxResponse), otherwise return a conversion error.
//!
//! Conversely, the general conversion convention from rpc core to protowire, depending on
//! a provided RpcResult is to either convert the Ok(XxxResponse) into the matching set
//! of data fields and provide no error or provide no data fields but an error field in case
//! of Err(RpcError).
//!
//! The SubmitBlockResponse is a notable exception to this general rule.

use crate::protowire::{self, submit_block_response_message::RejectReason};
use kaspa_consensus_core::network::NetworkId;
use kaspa_core::debug;
use kaspa_notify::subscription::Command;
use kaspa_rpc_core::{
RpcContextualPeerAddress, RpcError, RpcExtraData, RpcHash, RpcIpAddress, RpcNetworkType, RpcPeerAddress, RpcResult,
SubmitBlockRejectReason, SubmitBlockReport,
};
use std::str::FromStr;

Expand Down Expand Up @@ -109,14 +130,25 @@ from!(item: &kaspa_rpc_core::SubmitBlockReport, RejectReason, {
kaspa_rpc_core::SubmitBlockReport::Success => RejectReason::None,
kaspa_rpc_core::SubmitBlockReport::Reject(kaspa_rpc_core::SubmitBlockRejectReason::BlockInvalid) => RejectReason::BlockInvalid,
kaspa_rpc_core::SubmitBlockReport::Reject(kaspa_rpc_core::SubmitBlockRejectReason::IsInIBD) => RejectReason::IsInIbd,
// The conversion of RouteIsFull falls back to None since there exist no such variant in the original protowire version
// and we do not want to break backwards compatibility
kaspa_rpc_core::SubmitBlockReport::Reject(kaspa_rpc_core::SubmitBlockRejectReason::RouteIsFull) => RejectReason::None,
}
});

from!(item: &kaspa_rpc_core::SubmitBlockRequest, protowire::SubmitBlockRequestMessage, {
Self { block: Some((&item.block).into()), allow_non_daa_blocks: item.allow_non_daa_blocks }
});
// This conversion breaks the general conversion convention (see file header) since the message may
// contain both a non default reject_reason and a matching error message. In the RouteIsFull case
// reject_reason is None (because this reason has no variant in protowire) but a specific error
// message is provided.
from!(item: RpcResult<&kaspa_rpc_core::SubmitBlockResponse>, protowire::SubmitBlockResponseMessage, {
Self { reject_reason: RejectReason::from(&item.report) as i32, error: None }
let error: Option<protowire::RpcError> = match item.report {
kaspa_rpc_core::SubmitBlockReport::Success => None,
kaspa_rpc_core::SubmitBlockReport::Reject(reason) => Some(RpcError::SubmitBlockError(reason).into())
};
Self { reject_reason: RejectReason::from(&item.report) as i32, error }
});

from!(item: &kaspa_rpc_core::GetBlockTemplateRequest, protowire::GetBlockTemplateRequestMessage, {
Expand Down Expand Up @@ -464,9 +496,31 @@ try_from!(item: &protowire::SubmitBlockRequestMessage, kaspa_rpc_core::SubmitBlo
allow_non_daa_blocks: item.allow_non_daa_blocks,
}
});
try_from!(item: &protowire::SubmitBlockResponseMessage, RpcResult<kaspa_rpc_core::SubmitBlockResponse>, {
Self { report: RejectReason::try_from(item.reject_reason).map_err(|_| RpcError::PrimitiveToEnumConversionError)?.into() }
});
impl TryFrom<&protowire::SubmitBlockResponseMessage> for kaspa_rpc_core::SubmitBlockResponse {
type Error = RpcError;
// This conversion breaks the general conversion convention (see file header) since the message may
// contain both a non-None reject_reason and a matching error message. Things get even challenging
// in the RouteIsFull case where reject_reason is None (because this reason has no variant in protowire)
// but a specific error message is provided.
fn try_from(item: &protowire::SubmitBlockResponseMessage) -> RpcResult<Self> {
let report: SubmitBlockReport =
RejectReason::try_from(item.reject_reason).map_err(|_| RpcError::PrimitiveToEnumConversionError)?.into();
if let Some(ref err) = item.error {
match report {
SubmitBlockReport::Success => {
if err.message == RpcError::SubmitBlockError(SubmitBlockRejectReason::RouteIsFull).to_string() {
Ok(Self { report: SubmitBlockReport::Reject(SubmitBlockRejectReason::RouteIsFull) })
} else {
Err(err.into())
}
}
SubmitBlockReport::Reject(_) => Ok(Self { report }),
}
} else {
Ok(Self { report })
}
}
}

try_from!(item: &protowire::GetBlockTemplateRequestMessage, kaspa_rpc_core::GetBlockTemplateRequest, {
Self { pay_address: item.pay_address.clone().try_into()?, extra_data: RpcExtraData::from_iter(item.extra_data.bytes()) }
Expand Down Expand Up @@ -825,3 +879,79 @@ try_from!(&protowire::NotifySinkBlueScoreChangedResponseMessage, RpcResult<kaspa
// ----------------------------------------------------------------------------

// TODO: tests

#[cfg(test)]
mod tests {
use kaspa_rpc_core::{RpcError, RpcResult, SubmitBlockRejectReason, SubmitBlockReport, SubmitBlockResponse};

use crate::protowire::{self, submit_block_response_message::RejectReason, SubmitBlockResponseMessage};

#[test]
fn test_submit_block_response() {
struct Test {
rpc_core: RpcResult<kaspa_rpc_core::SubmitBlockResponse>,
protowire: protowire::SubmitBlockResponseMessage,
}
impl Test {
fn new(
rpc_core: RpcResult<kaspa_rpc_core::SubmitBlockResponse>,
protowire: protowire::SubmitBlockResponseMessage,
) -> Self {
Self { rpc_core, protowire }
}
}
let tests = vec![
Test::new(
Ok(SubmitBlockResponse { report: SubmitBlockReport::Success }),
SubmitBlockResponseMessage { reject_reason: RejectReason::None as i32, error: None },
),
Test::new(
Ok(SubmitBlockResponse { report: SubmitBlockReport::Reject(SubmitBlockRejectReason::BlockInvalid) }),
SubmitBlockResponseMessage {
reject_reason: RejectReason::BlockInvalid as i32,
error: Some(protowire::RpcError {
message: RpcError::SubmitBlockError(SubmitBlockRejectReason::BlockInvalid).to_string(),
}),
},
),
Test::new(
Ok(SubmitBlockResponse { report: SubmitBlockReport::Reject(SubmitBlockRejectReason::IsInIBD) }),
SubmitBlockResponseMessage {
reject_reason: RejectReason::IsInIbd as i32,
error: Some(protowire::RpcError {
message: RpcError::SubmitBlockError(SubmitBlockRejectReason::IsInIBD).to_string(),
}),
},
),
Test::new(
Ok(SubmitBlockResponse { report: SubmitBlockReport::Reject(SubmitBlockRejectReason::RouteIsFull) }),
SubmitBlockResponseMessage {
reject_reason: RejectReason::None as i32, // This rpc core reject reason has no matching protowire variant
error: Some(protowire::RpcError {
message: RpcError::SubmitBlockError(SubmitBlockRejectReason::RouteIsFull).to_string(),
}),
},
),
];

for test in tests {
let cnv_protowire: SubmitBlockResponseMessage = test.rpc_core.as_ref().map_err(|x| x.clone()).into();
assert_eq!(cnv_protowire.reject_reason, test.protowire.reject_reason);
assert_eq!(cnv_protowire.error.is_some(), test.protowire.error.is_some());
assert_eq!(cnv_protowire.error, test.protowire.error);

let cnv_rpc_core: RpcResult<SubmitBlockResponse> = (&test.protowire).try_into();
assert_eq!(cnv_rpc_core.is_ok(), test.rpc_core.is_ok());
match cnv_rpc_core {
Ok(ref cnv_response) => {
let Ok(ref response) = test.rpc_core else { panic!() };
assert_eq!(cnv_response.report, response.report);
}
Err(ref cnv_err) => {
let Err(ref err) = test.rpc_core else { panic!() };
assert_eq!(cnv_err.to_string(), err.to_string());
}
}
}
}
}
3 changes: 2 additions & 1 deletion rpc/grpc/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ include.workspace = true
license.workspace = true

[dependencies]
kaspa-consensus-core.workspace = true
kaspa-core.workspace = true
kaspa-grpc-core.workspace = true
kaspa-notify.workspace = true
kaspa-rpc-core.workspace = true
kaspa-rpc-macros.workspace = true
kaspa-rpc-service.workspace = true
kaspa-utils.workspace = true
kaspa-utils-tower.workspace = true
kaspa-utils.workspace = true

async-channel.workspace = true
async-stream.workspace = true
Expand Down
3 changes: 2 additions & 1 deletion rpc/grpc/server/src/adaptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ impl Adaptor {

pub fn server(
serve_address: NetAddress,
network_bps: u64,
manager: Manager,
core_service: DynRpcService,
core_notifier: Arc<Notifier<Notification, ChannelConnection>>,
counters: Arc<TowerConnectionCounters>,
) -> Arc<Self> {
let (manager_sender, manager_receiver) = mpsc_channel(Self::manager_channel_size());
let connection_handler = ConnectionHandler::new(manager_sender, core_service.clone(), core_notifier, counters);
let connection_handler = ConnectionHandler::new(network_bps, manager_sender, core_service.clone(), core_notifier, counters);
let server_termination = connection_handler.serve(serve_address);
let adaptor = Arc::new(Adaptor::new(Some(server_termination), connection_handler, manager, serve_address));
adaptor.manager.clone().start_event_loop(manager_receiver);
Expand Down
Loading