Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rpc transaction routing #1544

Merged
merged 9 commits into from
Oct 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

72 changes: 71 additions & 1 deletion chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use near_primitives::receipt::Receipt;
use near_primitives::sharding::{
ChunkHash, ChunkHashHeight, ReceiptProof, ShardChunk, ShardChunkHeader, ShardProof,
};
use near_primitives::transaction::ExecutionOutcome;
use near_primitives::transaction::{ExecutionOutcome, ExecutionStatus};
use near_primitives::types::{AccountId, Balance, BlockIndex, ChunkExtra, Gas, ShardId};
use near_store::{Store, COL_STATE_HEADERS};

Expand All @@ -27,6 +27,10 @@ use crate::types::{
ReceiptProofResponse, ReceiptResponse, RootProof, RuntimeAdapter, ShardStateSyncResponseHeader,
ShardStateSyncResponsePart, StateHeaderKey, Tip, ValidatorSignatureVerificationResult,
};
use near_primitives::views::{
ExecutionOutcomeView, ExecutionOutcomeWithIdView, ExecutionStatusView,
FinalExecutionOutcomeView, FinalExecutionStatus,
};

/// Maximum number of orphans chain can store.
pub const MAX_ORPHAN_SIZE: usize = 1024;
Expand Down Expand Up @@ -1307,6 +1311,72 @@ impl Chain {

Ok(())
}

pub fn get_transaction_execution_result(
&mut self,
hash: &CryptoHash,
) -> Result<ExecutionOutcomeView, String> {
match self.get_transaction_result(hash) {
Ok(result) => Ok(result.clone().into()),
Err(err) => match err.kind() {
ErrorKind::DBNotFoundErr(_) => {
Ok(ExecutionOutcome { status: ExecutionStatus::Unknown, ..Default::default() }
.into())
}
_ => Err(err.to_string()),
},
}
}

fn get_recursive_transaction_results(
&mut self,
hash: &CryptoHash,
) -> Result<Vec<ExecutionOutcomeWithIdView>, String> {
let outcome = self.get_transaction_execution_result(hash)?;
let receipt_ids = outcome.receipt_ids.clone();
let mut transactions = vec![ExecutionOutcomeWithIdView { id: (*hash).into(), outcome }];
for hash in &receipt_ids {
transactions
.extend(self.get_recursive_transaction_results(&hash.clone().into())?.into_iter());
}
Ok(transactions)
}

pub fn get_final_transaction_result(
&mut self,
hash: &CryptoHash,
) -> Result<FinalExecutionOutcomeView, String> {
let mut outcomes = self.get_recursive_transaction_results(hash)?;
let mut looking_for_id = (*hash).into();
let num_outcomes = outcomes.len();
let status = outcomes
.iter()
.find_map(|outcome_with_id| {
if outcome_with_id.id == looking_for_id {
match &outcome_with_id.outcome.status {
ExecutionStatusView::Unknown if num_outcomes == 1 => {
Some(FinalExecutionStatus::NotStarted)
}
ExecutionStatusView::Unknown => Some(FinalExecutionStatus::Started),
ExecutionStatusView::Failure(e) => {
Some(FinalExecutionStatus::Failure(e.clone()))
}
ExecutionStatusView::SuccessValue(v) => {
Some(FinalExecutionStatus::SuccessValue(v.clone()))
}
ExecutionStatusView::SuccessReceiptId(id) => {
looking_for_id = id.clone();
None
}
}
} else {
None
}
})
.expect("results should resolve to a final outcome");
let receipts = outcomes.split_off(1);
Ok(FinalExecutionOutcomeView { status, transaction: outcomes.pop().unwrap(), receipts })
}
}

/// Various chain getters.
Expand Down
92 changes: 76 additions & 16 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@ use cached::{Cached, SizedCache};
use chrono::Utc;
use log::{debug, error, info, warn};

use crate::metrics;
use near_chain::types::{
AcceptedBlock, LatestKnown, ReceiptResponse, ValidatorSignatureVerificationResult,
};
use near_chain::{
BlockApproval, BlockStatus, Chain, ChainGenesis, ChainStoreAccess, Provenance, RuntimeAdapter,
Tip,
BlockApproval, BlockStatus, Chain, ChainGenesis, ChainStoreAccess, ErrorKind, Provenance,
RuntimeAdapter, Tip,
};
use near_chunks::{NetworkAdapter, ProcessChunkOnePartResult, ShardsManager};
use near_crypto::Signature;
Expand All @@ -32,8 +31,10 @@ use near_primitives::transaction::SignedTransaction;
use near_primitives::types::{AccountId, BlockIndex, EpochId, ShardId};
use near_primitives::unwrap_or_return;
use near_primitives::utils::to_timestamp;
use near_primitives::views::FinalExecutionOutcomeView;
use near_store::Store;

use crate::metrics;
use crate::sync::{BlockSync, HeaderSync, StateSync, StateSyncResult};
use crate::types::{Error, ShardSyncDownload};
use crate::{BlockProducer, ClientConfig, SyncStatus};
Expand All @@ -44,6 +45,9 @@ const NUM_BLOCKS_FOR_APPROVAL: usize = 20;
/// Over this number of blocks in advance if we are not chunk producer - route tx to upcoming validators.
const TX_ROUTING_HEIGHT_HORIZON: BlockIndex = 4;

/// Max number of transaction status query that we keep.
const TX_STATUS_REQUEST_LIMIT: usize = 500;

/// Block economics config taken from genesis config
struct BlockEconomicsConfig {
gas_price_adjustment_rate: u8,
Expand Down Expand Up @@ -74,6 +78,10 @@ pub struct Client {
pub state_sync: StateSync,
/// Block economics, relevant to changes when new block must be produced.
block_economics_config: BlockEconomicsConfig,
/// Transaction query that needs to be forwarded to other shards
pub tx_status_requests: SizedCache<CryptoHash, ()>,
/// Transaction status response
pub tx_status_response: SizedCache<CryptoHash, FinalExecutionOutcomeView>,
}

impl Client {
Expand Down Expand Up @@ -113,6 +121,8 @@ impl Client {
block_economics_config: BlockEconomicsConfig {
gas_price_adjustment_rate: chain_genesis.gas_price_adjustment_rate,
},
tx_status_requests: SizedCache::with_size(TX_STATUS_REQUEST_LIMIT),
tx_status_response: SizedCache::with_size(TX_STATUS_REQUEST_LIMIT),
})
}

Expand Down Expand Up @@ -761,22 +771,27 @@ impl Client {
true
}

/// Forwards given transaction to upcoming validators.
fn forward_tx(&mut self, tx: SignedTransaction) -> NetworkClientResponses {
let head = unwrap_or_return!(self.chain.head(), NetworkClientResponses::NoResponse);
let me = self.block_producer.as_ref().map(|bp| &bp.account_id);
let shard_id = self.runtime_adapter.account_id_to_shard_id(&tx.transaction.signer_id);
/// Find a validator that is responsible for a given shard to forward requests to
fn find_validator_for_forwarding(
&self,
shard_id: ShardId,
) -> Result<AccountId, near_chain::Error> {
let head = self.chain.head()?;
// TODO(MarX, #1366): Forward tx even if I am a validator.
// How many validators ahead of current time should we forward tx?
let target_height = head.height + TX_ROUTING_HEIGHT_HORIZON - 1;

let validator = unwrap_or_return!(
self.runtime_adapter.get_chunk_producer(&head.epoch_id, target_height, shard_id),
{
warn!(target: "client", "Me: {:?} Dropping tx: {:?}", me, tx);
NetworkClientResponses::NoResponse
}
);
self.runtime_adapter.get_chunk_producer(&head.epoch_id, target_height, shard_id)
}

/// Forwards given transaction to upcoming validators.
fn forward_tx(&self, tx: SignedTransaction) -> NetworkClientResponses {
let shard_id = self.runtime_adapter.account_id_to_shard_id(&tx.transaction.signer_id);
let me = self.block_producer.as_ref().map(|bp| &bp.account_id);
let validator = unwrap_or_return!(self.find_validator_for_forwarding(shard_id), {
warn!(target: "client", "Me: {:?} Dropping tx: {:?}", me, tx);
NetworkClientResponses::NoResponse
});

debug!(target: "client",
"I'm {:?}, routing a transaction to {}, shard_id = {}",
Expand All @@ -788,7 +803,52 @@ impl Client {
// Send message to network to actually forward transaction.
self.network_adapter.send(NetworkRequests::ForwardTx(validator, tx));

NetworkClientResponses::NoResponse
NetworkClientResponses::RequestRouted
}

pub fn get_tx_status(
&mut self,
tx_hash: CryptoHash,
signer_account_id: AccountId,
) -> NetworkClientResponses {
if let Some(res) = self.tx_status_response.cache_remove(&tx_hash) {
self.tx_status_requests.cache_remove(&tx_hash);
return NetworkClientResponses::TxStatus(res);
}
let me = self.block_producer.as_ref().map(|bp| &bp.account_id);
let has_tx_result = match self.chain.get_transaction_result(&tx_hash) {
Ok(_) => true,
Err(e) => match e.kind() {
ErrorKind::DBNotFoundErr(_) => false,
_ => {
warn!(target: "client", "Error trying to get transaction result: {}", e.to_string());
return NetworkClientResponses::NoResponse;
}
},
};
if has_tx_result {
let tx_result = unwrap_or_return!(
self.chain.get_final_transaction_result(&tx_hash),
NetworkClientResponses::NoResponse
);
return NetworkClientResponses::TxStatus(tx_result);
}
let target_shard_id = self.runtime_adapter.account_id_to_shard_id(&signer_account_id);
let validator = unwrap_or_return!(self.find_validator_for_forwarding(target_shard_id), {
warn!(target: "client", "Me: {:?} Dropping tx: {:?}", me, tx_hash);
NetworkClientResponses::NoResponse
});

if let Some(account_id) = me {
if account_id == &validator {
// this probably means that we are crossing epoch boundary and the current node
// does not have state for the next epoch. TODO: figure out what to do in this case
return NetworkClientResponses::NoResponse;
}
}
self.tx_status_requests.cache_set(tx_hash, ());
self.network_adapter.send(NetworkRequests::TxStatus(validator, signer_account_id, tx_hash));
NetworkClientResponses::RequestRouted
}

/// Process transaction and either add it to the mempool or return to redirect to another validator.
Expand Down
11 changes: 11 additions & 0 deletions chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use crate::types::{
Status, StatusSyncInfo, SyncStatus,
};
use crate::{sync, StatusResponse};
use cached::Cached;

enum AccountAnnounceVerificationResult {
Valid,
Expand Down Expand Up @@ -176,6 +177,16 @@ impl Handler<NetworkClientMessages> for ClientActor {
fn handle(&mut self, msg: NetworkClientMessages, _ctx: &mut Context<Self>) -> Self::Result {
match msg {
NetworkClientMessages::Transaction(tx) => self.client.process_tx(tx),
NetworkClientMessages::TxStatus { tx_hash, signer_account_id } => {
self.client.get_tx_status(tx_hash, signer_account_id)
}
NetworkClientMessages::TxStatusResponse(tx_result) => {
let tx_hash = tx_result.transaction.id;
if self.client.tx_status_requests.cache_remove(&tx_hash).is_some() {
self.client.tx_status_response.cache_set(tx_hash, tx_result);
}
NetworkClientResponses::NoResponse
}
NetworkClientMessages::BlockHeader(header, peer_id) => {
self.receive_header(header, peer_id)
}
Expand Down
4 changes: 2 additions & 2 deletions chain/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ extern crate lazy_static;
pub use crate::client::Client;
pub use crate::client_actor::ClientActor;
pub use crate::types::{
BlockProducer, ClientConfig, Error, GetBlock, GetChunk, Query, Status, StatusResponse,
SyncStatus, TxDetails, TxStatus, GetNetworkInfo,
BlockProducer, ClientConfig, Error, GetBlock, GetChunk, GetNetworkInfo, Query, Status,
StatusResponse, SyncStatus, TxDetails, TxStatus,
};
pub use crate::view_client::ViewClientActor;

Expand Down
3 changes: 2 additions & 1 deletion chain/client/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,8 @@ pub fn setup_mock_all_validators(
| NetworkRequests::PingTo(_, _)
| NetworkRequests::FetchPingPongInfo
| NetworkRequests::BanPeer { .. }
| NetworkRequests::BlockHeaderAnnounce { .. } => {}
| NetworkRequests::BlockHeaderAnnounce { .. }
| NetworkRequests::TxStatus(_, _, _) => {}
};
}
Box::new(Some(resp))
Expand Down
Loading