Skip to content

Commit

Permalink
add grpc call for search for commitment
Browse files Browse the repository at this point in the history
changed to bytes
refactor variable name
fix UI feedback
  • Loading branch information
SWvheerden committed Dec 17, 2021
1 parent 80854b2 commit 0ef2cd3
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 11 deletions.
7 changes: 7 additions & 0 deletions applications/tari_app_grpc/proto/base_node.proto
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ service BaseNode {
rpc GetTipInfo(Empty) returns (TipInfoResponse);
// Search for blocks containing the specified kernels
rpc SearchKernels(SearchKernelsRequest) returns (stream HistoricalBlock);
// Search for blocks containing the specified commitments
rpc SearchUtxos(SearchUtxosRequest) returns (stream HistoricalBlock);
// Fetch any utxos that exist in the main chain
rpc FetchMatchingUtxos(FetchMatchingUtxosRequest) returns (stream FetchMatchingUtxosResponse);
// get all peers from the base node
Expand Down Expand Up @@ -289,6 +291,11 @@ message SearchKernelsRequest{
repeated Signature signatures = 1;
}

// This is the request type for the Search Utxo rpc
message SearchUtxosRequest{
repeated bytes commitments = 1;
}

message FetchMatchingUtxosRequest {
repeated bytes hashes = 1;
}
Expand Down
2 changes: 1 addition & 1 deletion applications/tari_base_node/src/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ impl CommandHandler {
Ok(mut data) => match data.pop() {
Some(v) => println!("{}", v.block()),
_ => println!(
"Pruned node: utxo found, but block not found for utxo commitment {}",
"Block not found for utxo commitment {}",
commitment.to_hex()
),
},
Expand Down
59 changes: 58 additions & 1 deletion applications/tari_base_node/src/grpc/base_node_grpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use tari_app_grpc::{
tari_rpc::{CalcType, Sorting},
};
use tari_app_utilities::consts;
use tari_common_types::types::Signature;
use tari_common_types::types::{Commitment, Signature};
use tari_comms::{Bytes, CommsNode};
use tari_core::{
base_node::{
Expand Down Expand Up @@ -122,6 +122,7 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
type GetTokensInCirculationStream = mpsc::Receiver<Result<tari_rpc::ValueAtHeightResponse, Status>>;
type ListHeadersStream = mpsc::Receiver<Result<tari_rpc::BlockHeader, Status>>;
type SearchKernelsStream = mpsc::Receiver<Result<tari_rpc::HistoricalBlock, Status>>;
type SearchUtxosStream = mpsc::Receiver<Result<tari_rpc::HistoricalBlock, Status>>;

async fn get_network_difficulty(
&self,
Expand Down Expand Up @@ -794,6 +795,62 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
Ok(Response::new(rx))
}

async fn search_utxos(
&self,
request: Request<tari_rpc::SearchUtxosRequest>,
) -> Result<Response<Self::SearchUtxosStream>, Status> {
debug!(target: LOG_TARGET, "Incoming GRPC request for SearchUtxos");
let request = request.into_inner();

let converted: Result<Vec<_>, _> = request
.commitments
.into_iter()
.map(|s| Commitment::from_bytes(&s))
.collect();
let outputs = converted.map_err(|_| Status::internal("Failed to convert one or more arguments."))?;

let mut handler = self.node_service.clone();

let (mut tx, rx) = mpsc::channel(GET_BLOCKS_PAGE_SIZE);
task::spawn(async move {
let blocks = match handler.fetch_blocks_with_utxos(outputs).await {
Err(err) => {
warn!(
target: LOG_TARGET,
"Error communicating with local base node: {:?}", err,
);
return;
},
Ok(data) => data,
};
for block in blocks {
match tx
.send(
block
.try_into()
.map_err(|err| Status::internal(format!("Could not provide block:{}", err))),
)
.await
{
Ok(_) => (),
Err(err) => {
warn!(target: LOG_TARGET, "Error sending header via GRPC: {}", err);
match tx.send(Err(Status::unknown("Error sending data"))).await {
Ok(_) => (),
Err(send_err) => {
warn!(target: LOG_TARGET, "Error sending error to GRPC client: {}", send_err)
},
}
return;
},
}
}
});

debug!(target: LOG_TARGET, "Sending SearchUtxos response stream to client");
Ok(Response::new(rx))
}

#[allow(clippy::useless_conversion)]
async fn fetch_matching_utxos(
&self,
Expand Down
19 changes: 10 additions & 9 deletions base_layer/core/src/base_node/comms_interface/inbound_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,24 +292,25 @@ where T: BlockchainBackend + 'static
}
Ok(NodeCommsResponse::HistoricalBlocks(blocks))
},
NodeCommsRequest::FetchBlocksWithUtxos(hashes) => {
let mut blocks = Vec::with_capacity(hashes.len());
for hash in hashes {
let hash_hex = hash.to_hex();
NodeCommsRequest::FetchBlocksWithUtxos(commitments) => {
let mut blocks = Vec::with_capacity(commitments.len());
for commitment in commitments {
let commitment_hex = commitment.to_hex();
debug!(
target: LOG_TARGET,
"A peer has requested a block with hash {}", hash_hex,
"A peer has requested a block with commitment {}", commitment_hex,
);
match self.blockchain_db.fetch_block_with_utxo(hash).await {
match self.blockchain_db.fetch_block_with_utxo(commitment).await {
Ok(Some(block)) => blocks.push(block),
Ok(None) => warn!(
target: LOG_TARGET,
"Could not provide requested block {} to peer because not stored", hash_hex,
"Could not provide requested block with commitment {} to peer because not stored",
commitment_hex,
),
Err(e) => warn!(
target: LOG_TARGET,
"Could not provide requested block {} to peer because: {}",
hash_hex,
"Could not provide requested block with commitment {} to peer because: {}",
commitment_hex,
e.to_string()
),
}
Expand Down

0 comments on commit 0ef2cd3

Please sign in to comment.