Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add GRPC call to search for utxo via commitment hex #3666

Merged
merged 3 commits into from
Dec 17, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions applications/tari_app_grpc/proto/base_node.proto
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ service BaseNode {
rpc GetTipInfo(Empty) returns (TipInfoResponse);
// Search for blocks containing the specified kernels
rpc SearchKernels(SearchKernelsRequest) returns (stream HistoricalBlock);
// Search for blocks containing the specified commitments
rpc SearchUtxos(SearchUtxosRequest) returns (stream HistoricalBlock);
// Fetch any utxos that exist in the main chain
rpc FetchMatchingUtxos(FetchMatchingUtxosRequest) returns (stream FetchMatchingUtxosResponse);
// get all peers from the base node
Expand Down Expand Up @@ -289,6 +291,11 @@ message SearchKernelsRequest{
repeated Signature signatures = 1;
}

// This is the request type for the Search Utxo rpc
message SearchUtxosRequest{
repeated bytes commitments = 1;
}

message FetchMatchingUtxosRequest {
repeated bytes hashes = 1;
}
Expand Down
2 changes: 1 addition & 1 deletion applications/tari_base_node/src/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ impl CommandHandler {
Ok(mut data) => match data.pop() {
Some(v) => println!("{}", v.block()),
_ => println!(
"Pruned node: utxo found, but block not found for utxo commitment {}",
"Block not found for utxo commitment {}",
commitment.to_hex()
),
},
Expand Down
59 changes: 58 additions & 1 deletion applications/tari_base_node/src/grpc/base_node_grpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use tari_app_grpc::{
tari_rpc::{CalcType, Sorting},
};
use tari_app_utilities::consts;
use tari_common_types::types::Signature;
use tari_common_types::types::{Commitment, Signature};
use tari_comms::{Bytes, CommsNode};
use tari_core::{
base_node::{
Expand Down Expand Up @@ -122,6 +122,7 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
type GetTokensInCirculationStream = mpsc::Receiver<Result<tari_rpc::ValueAtHeightResponse, Status>>;
type ListHeadersStream = mpsc::Receiver<Result<tari_rpc::BlockHeader, Status>>;
type SearchKernelsStream = mpsc::Receiver<Result<tari_rpc::HistoricalBlock, Status>>;
type SearchUtxosStream = mpsc::Receiver<Result<tari_rpc::HistoricalBlock, Status>>;

async fn get_network_difficulty(
&self,
Expand Down Expand Up @@ -794,6 +795,62 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
Ok(Response::new(rx))
}

async fn search_utxos(
&self,
request: Request<tari_rpc::SearchUtxosRequest>,
) -> Result<Response<Self::SearchUtxosStream>, Status> {
debug!(target: LOG_TARGET, "Incoming GRPC request for SearchUtxos");
let request = request.into_inner();

let converted: Result<Vec<_>, _> = request
.commitments
.into_iter()
.map(|s| Commitment::from_bytes(&s))
.collect();
let outputs = converted.map_err(|_| Status::internal("Failed to convert one or more arguments."))?;

let mut handler = self.node_service.clone();

let (mut tx, rx) = mpsc::channel(GET_BLOCKS_PAGE_SIZE);
task::spawn(async move {
let blocks = match handler.fetch_blocks_with_utxos(outputs).await {
Err(err) => {
warn!(
target: LOG_TARGET,
"Error communicating with local base node: {:?}", err,
);
return;
},
Ok(data) => data,
};
for block in blocks {
match tx
.send(
block
.try_into()
.map_err(|err| Status::internal(format!("Could not provide block:{}", err))),
)
.await
{
Ok(_) => (),
Err(err) => {
warn!(target: LOG_TARGET, "Error sending header via GRPC: {}", err);
match tx.send(Err(Status::unknown("Error sending data"))).await {
Ok(_) => (),
Err(send_err) => {
warn!(target: LOG_TARGET, "Error sending error to GRPC client: {}", send_err)
},
}
return;
},
}
}
});

debug!(target: LOG_TARGET, "Sending SearchUtxos response stream to client");
Ok(Response::new(rx))
}

#[allow(clippy::useless_conversion)]
async fn fetch_matching_utxos(
&self,
Expand Down
19 changes: 10 additions & 9 deletions base_layer/core/src/base_node/comms_interface/inbound_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,24 +292,25 @@ where T: BlockchainBackend + 'static
}
Ok(NodeCommsResponse::HistoricalBlocks(blocks))
},
NodeCommsRequest::FetchBlocksWithUtxos(hashes) => {
let mut blocks = Vec::with_capacity(hashes.len());
for hash in hashes {
let hash_hex = hash.to_hex();
NodeCommsRequest::FetchBlocksWithUtxos(commitments) => {
let mut blocks = Vec::with_capacity(commitments.len());
for commitment in commitments {
let commitment_hex = commitment.to_hex();
debug!(
target: LOG_TARGET,
"A peer has requested a block with hash {}", hash_hex,
"A peer has requested a block with commitment {}", commitment_hex,
);
match self.blockchain_db.fetch_block_with_utxo(hash).await {
match self.blockchain_db.fetch_block_with_utxo(commitment).await {
Ok(Some(block)) => blocks.push(block),
Ok(None) => warn!(
target: LOG_TARGET,
"Could not provide requested block {} to peer because not stored", hash_hex,
"Could not provide requested block with commitment {} to peer because not stored",
commitment_hex,
),
Err(e) => warn!(
target: LOG_TARGET,
"Could not provide requested block {} to peer because: {}",
hash_hex,
"Could not provide requested block with commitment {} to peer because: {}",
commitment_hex,
e.to_string()
),
}
Expand Down