Skip to content

Commit

Permalink
WIP: Add TxIndex to Responder
Browse files Browse the repository at this point in the history
Still need to do a full pass through the reorg logic to make sure things are still consistent.
Some tests using `BitcoindMock::in_mempool` will also be nice to have.
  • Loading branch information
sr-gi committed Oct 7, 2022
1 parent 4826b5e commit fc04b8e
Show file tree
Hide file tree
Showing 5 changed files with 285 additions and 404 deletions.
175 changes: 70 additions & 105 deletions teos/src/carrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::{Arc, Condvar, Mutex};
use crate::responder::ConfirmationStatus;
use crate::{errors, rpc_errors};

use bitcoin::{BlockHash, Transaction, Txid};
use bitcoin::{Transaction, Txid};
use bitcoincore_rpc::{
jsonrpc::error::Error::Rpc as RpcError, jsonrpc::error::Error::Transport as TransportError,
Client as BitcoindClient, Error::JsonRpc as JsonRpcError, RpcApi,
Expand All @@ -22,7 +22,7 @@ pub struct Carrier {
/// A map of receipts already issued by the [Carrier].
/// Used to prevent potentially re-sending the same transaction over and over.
issued_receipts: HashMap<Txid, ConfirmationStatus>,
/// The last known block header.
/// The last known block height.
block_height: u32,
}

Expand All @@ -41,6 +41,11 @@ impl Carrier {
}
}

/// The last known block height.
pub(crate) fn block_height(&self) -> u32 {
self.block_height
}

/// Clears the receipts cached by the [Carrier]. Should be called periodically to prevent it from
/// growing unbounded.
pub(crate) fn clear_receipts(&mut self) {
Expand Down Expand Up @@ -100,11 +105,14 @@ impl Carrier {
}
rpc_errors::RPC_VERIFY_ALREADY_IN_CHAIN => {
log::info!(
"Transaction is already in the blockchain: {}. Getting confirmation count",
"Transaction was confirmed long ago, not keeping track of it: {}",
tx.txid()
);

ConfirmationStatus::ConfirmedIn(self.get_tx_height(&tx.txid()).unwrap())
// Given we are not using txindex, if a transaction bounces we cannot get its confirmation count. However, [send_transaction] is guarded by
// checking whether the transaction id can be found in the [Responder]'s [TxIndex], meaning that if the transaction bounces it was confirmed long
// ago (> IRREVOCABLY_RESOLVED), so we don't need to worry about it.
ConfirmationStatus::IrrevocablyResolved
}
rpc_errors::RPC_DESERIALIZATION_ERROR => {
// Adding this here just for completeness. We should never end up here. The Carrier only sends txs handed by the Responder,
Expand Down Expand Up @@ -139,77 +147,40 @@ impl Carrier {
receipt
}

/// Gets the block height at where a given [Transaction] was confirmed at (if any).
fn get_tx_height(&self, txid: &Txid) -> Option<u32> {
if let Some(block_hash) = self.get_block_hash_for_tx(txid) {
self.get_block_height(&block_hash)
} else {
None
}
}

/// Queries the height of a given [Block](bitcoin::Block). Returns it if the block can be found. Returns [None] otherwise.
fn get_block_height(&self, block_hash: &BlockHash) -> Option<u32> {
self.hang_until_bitcoind_reachable();

match self.bitcoin_cli.get_block_header_info(block_hash) {
Ok(header_data) => Some(header_data.height as u32),
Err(JsonRpcError(RpcError(rpcerr))) => match rpcerr.code {
rpc_errors::RPC_INVALID_ADDRESS_OR_KEY => {
log::info!("Block not found: {}", block_hash);
None
}
e => {
log::error!("Unexpected error code when calling getblockheader: {}", e);
None
}
},
Err(JsonRpcError(TransportError(_))) => {
// Connection refused, bitcoind is down.
log::error!("Connection lost with bitcoind, retrying request when possible");
self.flag_bitcoind_unreachable();
self.get_block_height(block_hash)
}
// TODO: This may need finer catching.
Err(e) => {
log::error!("Unexpected JSONRPCError when calling getblockheader: {}", e);
None
}
}
}

/// Gets the block hash where a given [Transaction] was confirmed at (if any).
pub(crate) fn get_block_hash_for_tx(&self, txid: &Txid) -> Option<BlockHash> {
/// Checks whether a given transaction can be found in the mempool.
pub(crate) fn in_mempool(&self, txid: &Txid) -> bool {
self.hang_until_bitcoind_reachable();

match self.bitcoin_cli.get_raw_transaction_info(txid, None) {
Ok(tx_data) => tx_data.blockhash,
Ok(_) => true,
Err(JsonRpcError(RpcError(rpcerr))) => match rpcerr.code {
rpc_errors::RPC_INVALID_ADDRESS_OR_KEY => {
log::info!("Transaction not found in mempool nor blockchain: {}", txid);
None
log::info!("Transaction not found in mempool: {}", txid);
false
}
e => {
// DISCUSS: This could result in a silent error with unknown consequences
log::error!(
"Unexpected error code when calling getrawtransaction: {}",
e
);
None
false
}
},
Err(JsonRpcError(TransportError(_))) => {
// Connection refused, bitcoind is down.
log::error!("Connection lost with bitcoind, retrying request when possible");
self.flag_bitcoind_unreachable();
self.get_block_hash_for_tx(txid)
self.in_mempool(txid)
}
// TODO: This may need finer catching.
Err(e) => {
// DISCUSS: This could result in a silent error with unknown consequences
log::error!(
"Unexpected JSONRPCError when calling getrawtransaction: {}",
e
);
None
false
}
}
}
Expand All @@ -221,7 +192,7 @@ mod tests {
use std::thread;

use crate::test_utils::{get_random_tx, start_server, BitcoindMock, MockOptions, START_HEIGHT};
use teos_common::test_utils::TX_HEX;
use teos_common::test_utils::{TXID_HEX, TX_HEX};

use bitcoin::consensus;
use bitcoin::hashes::hex::FromHex;
Expand All @@ -241,11 +212,10 @@ mod tests {

#[test]
fn test_clear_receipts() {
let bitcoind_mock = BitcoindMock::new(MockOptions::empty());
let bitcoind_mock = BitcoindMock::new(MockOptions::default());
let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new()));
let bitcoin_cli = Arc::new(BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap());
let start_height = START_HEIGHT as u32;
start_server(bitcoind_mock.server);

let mut carrier = Carrier::new(bitcoin_cli, bitcoind_reachable, start_height);

Expand All @@ -265,7 +235,7 @@ mod tests {

#[test]
fn test_send_transaction_ok() {
let bitcoind_mock = BitcoindMock::new(MockOptions::empty());
let bitcoind_mock = BitcoindMock::new(MockOptions::default());
let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new()));
let bitcoin_cli = Arc::new(BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap());
let start_height = START_HEIGHT as u32;
Expand All @@ -275,6 +245,7 @@ mod tests {
let tx = consensus::deserialize(&Vec::from_hex(TX_HEX).unwrap()).unwrap();
let r = carrier.send_transaction(&tx);

// Notice this covers both the case where the tx was already in our mempool or when we just put it there
assert_eq!(r, ConfirmationStatus::InMempoolSince(start_height));

// Check the receipt is on the cache
Expand Down Expand Up @@ -328,20 +299,19 @@ mod tests {

#[test]
fn test_send_transaction_verify_already_in_chain() {
let bitcoind_mock = BitcoindMock::new(MockOptions::new(
let bitcoind_mock = BitcoindMock::new(MockOptions::with_error(
rpc_errors::RPC_VERIFY_ALREADY_IN_CHAIN as i64,
BlockHash::default(),
START_HEIGHT,
));
let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new()));
let bitcoin_cli = Arc::new(BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap());
let start_height = START_HEIGHT as u32;
start_server(bitcoind_mock.server);

let mut carrier = Carrier::new(bitcoin_cli, bitcoind_reachable, start_height);
let tx = consensus::deserialize(&Vec::from_hex(TX_HEX).unwrap()).unwrap();
let r = carrier.send_transaction(&tx);

assert_eq!(r, ConfirmationStatus::ConfirmedIn(start_height));
assert_eq!(r, ConfirmationStatus::IrrevocablyResolved);

// Check the receipt is on the cache
assert_eq!(carrier.issued_receipts.get(&tx.txid()).unwrap(), &r);
Expand Down Expand Up @@ -372,7 +342,7 @@ mod tests {
#[test]
fn test_send_transaction_connection_error() {
// Try to connect to an offline bitcoind.
let bitcoind_mock = BitcoindMock::new(MockOptions::empty());
let bitcoind_mock = BitcoindMock::new(MockOptions::default());
let bitcoind_reachable = Arc::new((Mutex::new(false), Condvar::new()));
let bitcoin_cli = Arc::new(BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap());
let start_height = START_HEIGHT as u32;
Expand All @@ -399,91 +369,86 @@ mod tests {
}

#[test]
fn test_get_tx_height_ok() {
let target_height = 21;
let bitcoind_mock =
BitcoindMock::new(MockOptions::with_block(BlockHash::default(), target_height));
fn test_in_mempool() {
let bitcoind_mock = BitcoindMock::new(MockOptions::default().in_mempool());
let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new()));
let bitcoin_cli = Arc::new(BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap());
let start_height = START_HEIGHT as u32;
start_server(bitcoind_mock.server);

let carrier = Carrier::new(bitcoin_cli, bitcoind_reachable, start_height);
let tx = consensus::deserialize::<Transaction>(&Vec::from_hex(TX_HEX).unwrap()).unwrap();
assert_eq!(
carrier.get_tx_height(&tx.txid()),
Some(target_height as u32)
);
let txid = Txid::from_hex(TXID_HEX).unwrap();
assert!(carrier.in_mempool(&txid));
}

#[test]
fn test_get_tx_height_not_found() {
// Hee we are not testing the case where the block hash is unknown (which will also return None). This is because we only
// learn block hashes from bitcoind, and once a block is known, it cannot disappear (ir can be disconnected, but not banish).
let bitcoind_mock = BitcoindMock::new(MockOptions::empty());
fn test_not_in_mempool() {
let bitcoind_mock = BitcoindMock::new(MockOptions::default());
let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new()));
let bitcoin_cli = Arc::new(BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap());
let start_height = START_HEIGHT as u32;
start_server(bitcoind_mock.server);

let carrier = Carrier::new(bitcoin_cli, bitcoind_reachable, start_height);
let tx = consensus::deserialize::<Transaction>(&Vec::from_hex(TX_HEX).unwrap()).unwrap();
assert_eq!(carrier.get_tx_height(&tx.txid()), None);
let txid = Txid::from_hex(TXID_HEX).unwrap();
assert!(!carrier.in_mempool(&txid));
}

#[test]
fn test_get_block_height_ok() {
let target_height = 21;
let block_hash = BlockHash::default();
let bitcoind_mock = BitcoindMock::new(MockOptions::with_block(block_hash, target_height));
fn test_not_in_mempool_via_error() {
let bitcoind_mock = BitcoindMock::new(MockOptions::with_error(
rpc_errors::RPC_INVALID_ADDRESS_OR_KEY as i64,
));
let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new()));
let bitcoin_cli = Arc::new(BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap());
let start_height = START_HEIGHT as u32;
start_server(bitcoind_mock.server);

let carrier = Carrier::new(bitcoin_cli, bitcoind_reachable, start_height);
assert_eq!(
carrier.get_block_height(&block_hash),
Some(target_height as u32)
);
let txid = Txid::from_hex(TXID_HEX).unwrap();
assert!(!carrier.in_mempool(&txid));
}

#[test]
fn test_get_block_height_not_found() {
let bitcoind_mock = BitcoindMock::new(MockOptions::empty());
fn test_in_mempool_unexpected_error() {
let bitcoind_mock =
BitcoindMock::new(MockOptions::with_error(rpc_errors::RPC_MISC_ERROR as i64));
let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new()));
let bitcoin_cli = Arc::new(BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap());
let start_height = START_HEIGHT as u32;
start_server(bitcoind_mock.server);

let carrier = Carrier::new(bitcoin_cli, bitcoind_reachable, start_height);
assert_eq!(carrier.get_block_height(&BlockHash::default()), None);
let txid = Txid::from_hex(TXID_HEX).unwrap();
assert!(!carrier.in_mempool(&txid));
}

#[test]
fn test_get_block_hash_for_tx_ok() {
let block_hash = BlockHash::default();
let bitcoind_mock = BitcoindMock::new(MockOptions::with_block(block_hash, 21));
let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new()));
fn test_in_mempool_connection_error() {
// Try to connect to an offline bitcoind.
let bitcoind_mock = BitcoindMock::new(MockOptions::default());
let bitcoind_reachable = Arc::new((Mutex::new(false), Condvar::new()));
let bitcoin_cli = Arc::new(BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap());
let start_height = START_HEIGHT as u32;
start_server(bitcoind_mock.server);
let carrier = Carrier::new(bitcoin_cli, bitcoind_reachable.clone(), start_height);

let tx = consensus::deserialize::<Transaction>(&Vec::from_hex(TX_HEX).unwrap()).unwrap();
let carrier = Carrier::new(bitcoin_cli, bitcoind_reachable, start_height);
assert_eq!(carrier.get_block_hash_for_tx(&tx.txid()), Some(block_hash));
}
let txid = Txid::from_hex(TXID_HEX).unwrap();
let delay = std::time::Duration::new(3, 0);

#[test]
fn test_get_block_hash_for_tx_not_found() {
let bitcoind_mock = BitcoindMock::new(MockOptions::empty());
let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new()));
let bitcoin_cli = Arc::new(BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap());
let start_height = START_HEIGHT as u32;
start_server(bitcoind_mock.server);
thread::spawn(move || {
thread::sleep(delay);
let (reachable, notifier) = &*bitcoind_reachable;
*reachable.lock().unwrap() = true;
notifier.notify_all();
});

let tx = consensus::deserialize::<Transaction>(&Vec::from_hex(TX_HEX).unwrap()).unwrap();
let carrier = Carrier::new(bitcoin_cli, bitcoind_reachable, start_height);
assert_eq!(carrier.get_block_hash_for_tx(&tx.txid()), None);
let before = std::time::Instant::now();
carrier.in_mempool(&txid);

// Check the request has hanged for ~delay
assert_eq!(
(std::time::Instant::now() - before).as_secs(),
delay.as_secs()
);
}
}
Loading

0 comments on commit fc04b8e

Please sign in to comment.