diff --git a/.circleci/config.yml b/.circleci/config.yml index c0ea06694..c6712ad66 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -16,16 +16,21 @@ jobs: - run: name: Install Cargo Extensions command: | + # cargo-audit started requiring libcurl3 + echo "deb http://security.ubuntu.com/ubuntu xenial-security main" | sudo tee -a /etc/apt/sources.list + sudo apt-key adv --recv-keys --keyserver keyserver.ubuntu.com 3B4FE6ACC0B21F32 + sudo apt-get update + sudo apt-get install libcurl3 -y + # get libcurl to a place where it won't get overwritten + sudo cp /usr/lib/x86_64-linux-gnu/libcurl.so.3 /usr/lib + sudo apt-get install curl -y cargo install --quiet cargo-audit || true # cargo-kcov rustup component add rustfmt clippy || true - run: name: Install Redis - # Install from stretch-backports to get the latest version - # The normal repository has version 3.2, which doesn't support redis modules command: | - echo "deb http://deb.debian.org/debian stretch-backports main" | sudo tee -a /etc/apt/sources.list sudo apt-get update - sudo apt-get -t stretch-backports install redis-server + sudo apt-get install redis-server redis-server --version - run: name: Install node and ganache @@ -64,7 +69,7 @@ jobs: cargo clippy --all-targets --all-features -- -D warnings - run: name: Audit Dependencies - command: cargo audit + command: LD_PRELOAD=/usr/lib/libcurl.so.3 cargo audit # - run: # name: Install kcov # command: >- diff --git a/crates/interledger-ccp/Cargo.toml b/crates/interledger-ccp/Cargo.toml index a0efe6f3b..b59052054 100644 --- a/crates/interledger-ccp/Cargo.toml +++ b/crates/interledger-ccp/Cargo.toml @@ -21,3 +21,4 @@ parking_lot = "0.7.1" ring = "0.14.6" tokio-executor = "0.1.7" tokio-timer = "0.2.10" +serde = { version = "1.0.98", features = ["derive"] } diff --git a/crates/interledger-ccp/src/lib.rs b/crates/interledger-ccp/src/lib.rs index 1c34e0513..9afe710cd 100644 --- a/crates/interledger-ccp/src/lib.rs +++ b/crates/interledger-ccp/src/lib.rs @@ -26,8 +26,10 @@ mod test_helpers; pub use server::{CcpRouteManager, CcpRouteManagerBuilder}; +use serde::{Deserialize, Serialize}; + #[repr(u8)] -#[derive(Clone, Copy, Debug, PartialEq, PartialOrd)] +#[derive(Clone, Copy, Debug, PartialEq, PartialOrd, Serialize, Deserialize)] pub enum RoutingRelation { Parent = 1, Peer = 2, diff --git a/crates/interledger-settlement-engines/Cargo.toml b/crates/interledger-settlement-engines/Cargo.toml index 9d4227015..8ba255f16 100644 --- a/crates/interledger-settlement-engines/Cargo.toml +++ b/crates/interledger-settlement-engines/Cargo.toml @@ -16,7 +16,6 @@ futures = "0.1.25" interledger-service = { path = "../interledger-service", version = "0.2.1" } interledger-settlement = { path = "../interledger-settlement", version = "0.1.0" } interledger-service-util = { path = "../interledger-service-util", version = "0.2.1" } -interledger-store-redis = { path = "../interledger-store-redis", version = "0.2.1" } interledger-ildcp = { path = "../interledger-ildcp", version = "0.2.1" } ethabi = "8.0.1" serde = "1.0.91" @@ -48,3 +47,4 @@ os_type = "2.2.0" rand = "0.7.0" interledger = { path = "../interledger", version = "0.4.0" } interledger-packet = { path = "../interledger-packet", version = "0.2.1" } +interledger-store-redis = { path = "../interledger-store-redis", version = "0.2.1" } diff --git a/crates/interledger-settlement-engines/src/api.rs b/crates/interledger-settlement-engines/src/api.rs index 4516ceed4..ee9d3e2d4 100644 --- a/crates/interledger-settlement-engines/src/api.rs +++ b/crates/interledger-settlement-engines/src/api.rs @@ -1,3 +1,4 @@ +use crate::stores::{IdempotentEngineData, IdempotentEngineStore}; use crate::{ApiResponse, CreateAccount, SettlementEngine}; use bytes::Bytes; use futures::{ @@ -6,7 +7,6 @@ use futures::{ }; use hyper::{Response, StatusCode}; use interledger_settlement::Quantity; -use interledger_settlement::{IdempotentData, IdempotentStore}; use log::error; use ring::digest::{digest, SHA256}; use tokio::executor::spawn; @@ -26,7 +26,7 @@ impl_web! { impl SettlementEngineApi where E: SettlementEngine + Clone + Send + Sync + 'static, - S: IdempotentStore + Clone + Send + Sync + 'static, + S: IdempotentEngineStore + Clone + Send + Sync + 'static, { /// Create a new API service by providing it with a field that /// implements the `SettlementEngine` trait @@ -85,7 +85,7 @@ impl_web! { error!("{}", error_msg); error_msg }) - .and_then(move |ret: Option| { + .and_then(move |ret: Option| { if let Some(ret) = ret { if ret.2 == input_hash { Ok(Some((ret.0, ret.1))) @@ -160,7 +160,7 @@ fn get_hash_of(preimage: &[u8]) -> [u8; 32] { impl SettlementEngineApi where E: SettlementEngine + Clone + Send + Sync + 'static, - S: IdempotentStore + Clone + Send + Sync + 'static, + S: IdempotentEngineStore + Clone + Send + Sync + 'static, { /// Serves the API /// Example: diff --git a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs index 2383e8e1f..ed0341676 100644 --- a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs +++ b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs @@ -7,12 +7,10 @@ use std::collections::HashMap; use std::iter::FromIterator; use hyper::StatusCode; -use interledger_store_redis::RedisStoreBuilder; use log::info; use num_bigint::BigUint; use redis::IntoConnectionInfo; use reqwest::r#async::{Client, Response as HttpResponse}; -use ring::{digest, hmac}; use serde::{Deserialize, Serialize}; use serde_json::json; use std::{ @@ -107,7 +105,7 @@ where + Sync + 'static, Si: EthereumLedgerTxSigner + Clone + Send + Sync + 'static, - A: EthereumAccount + Send + Sync + 'static, + A: EthereumAccount + Clone + Send + Sync + 'static, { pub fn new(store: S, signer: Si) -> Self { Self { @@ -234,7 +232,7 @@ where + Sync + 'static, Si: EthereumLedgerTxSigner + Clone + Send + Sync + 'static, - A: EthereumAccount + Send + Sync + 'static, + A: EthereumAccount + Clone + Send + Sync + 'static, { /// Periodically spawns a job every `self.poll_frequency` that notifies the /// Settlement Engine's connectors about transactions which are sent to the @@ -721,24 +719,18 @@ where fn load_account( &self, account_id: String, - ) -> impl Future { + ) -> impl Future { let store = self.store.clone(); let addr = self.address; - result(A::AccountId::from_str(&account_id).map_err(move |_err| { - let error_msg = "Unable to parse account".to_string(); - error!("{}", error_msg); - error_msg - })) - .and_then(move |account_id| { - store - .load_account_addresses(vec![account_id]) - .map_err(move |_err| { - let error_msg = format!("[{:?}] Error getting account: {}", addr, account_id); - error!("{}", error_msg); - error_msg - }) - .and_then(move |addresses| ok((account_id, addresses[0]))) - }) + let account_id_clone = account_id.clone(); + store + .load_account_addresses(vec![account_id.clone()]) + .map_err(move |_err| { + let error_msg = format!("[{:?}] Error getting account: {}", addr, account_id_clone); + error!("{}", error_msg); + error_msg + }) + .and_then(move |addresses| ok((account_id, addresses[0]))) } } @@ -751,7 +743,7 @@ where + Sync + 'static, Si: EthereumLedgerTxSigner + Clone + Send + Sync + 'static, - A: EthereumAccount + Send + Sync + 'static, + A: EthereumAccount + Clone + Send + Sync + 'static, { /// Settlement Engine's function that corresponds to the /// /accounts/:id/ endpoint (POST). It queries the connector's @@ -769,94 +761,83 @@ where let store: S = self.store.clone(); let account_id = account_id.id; - Box::new( - result(A::AccountId::from_str(&account_id).map_err({ - move |_err| { - let error_msg = "Unable to parse account".to_string(); - error!("{}", error_msg); - let status_code = StatusCode::from_u16(400).unwrap(); - (status_code, error_msg) - } - })) - .and_then(move |account_id| { - // We make a POST request to OUR connector's `messages` - // endpoint. This will in turn send an outgoing - // request to its peer connector, which will ask its - // own engine about its settlement information. Then, - // we store that information and use it when - // performing settlements. - let idempotency_uuid = Uuid::new_v4().to_hyphenated().to_string(); - let challenge = Uuid::new_v4().to_hyphenated().to_string(); - let challenge = challenge.into_bytes(); - let challenge_clone = challenge.clone(); - let client = Client::new(); - let mut url = self_clone.connector_url.clone(); - url.path_segments_mut() - .expect("Invalid connector URL") - .push("accounts") - .push(&account_id.to_string()) - .push("messages"); - let action = move || { - client - .post(url.clone()) - .header("Content-Type", "application/octet-stream") - .header("Idempotency-Key", idempotency_uuid.clone()) - .body(challenge.clone()) - .send() - }; + // We make a POST request to OUR connector's `messages` + // endpoint. This will in turn send an outgoing + // request to its peer connector, which will ask its + // own engine about its settlement information. Then, + // we store that information and use it when + // performing settlements. + let idempotency_uuid = Uuid::new_v4().to_hyphenated().to_string(); + let challenge = Uuid::new_v4().to_hyphenated().to_string(); + let challenge = challenge.into_bytes(); + let challenge_clone = challenge.clone(); + let client = Client::new(); + let mut url = self_clone.connector_url.clone(); + url.path_segments_mut() + .expect("Invalid connector URL") + .push("accounts") + .push(&account_id.to_string()) + .push("messages"); + let action = move || { + client + .post(url.clone()) + .header("Content-Type", "application/octet-stream") + .header("Idempotency-Key", idempotency_uuid.clone()) + .body(challenge.clone()) + .send() + }; - Retry::spawn( - ExponentialBackoff::from_millis(10).take(MAX_RETRIES), - action, - ) - .map_err(move |err| { - let err = format!("Couldn't notify connector {:?}", err); - error!("{}", err); - (StatusCode::from_u16(500).unwrap(), err) - }) - .and_then(move |resp| { - parse_body_into_payment_details(resp).and_then(move |payment_details| { - let data = prefixed_mesage(challenge_clone); - let challenge_hash = Sha3::digest(&data); - let recovered_address = payment_details.sig.recover(&challenge_hash); - trace!("Received payment details {:?}", payment_details); - result(recovered_address) - .map_err(move |err| { - let err = format!("Could not recover address {:?}", err); - error!("{}", err); - (StatusCode::from_u16(502).unwrap(), err) - }) - .and_then({ - let payment_details = payment_details.clone(); - move |recovered_address| { - if recovered_address.as_bytes() - == &payment_details.to.own_address.as_bytes()[..] - { - ok(()) - } else { - let error_msg = format!( - "Recovered address did not match: {:?}. Expected {:?}", - recovered_address.to_string(), - payment_details.to - ); - error!("{}", error_msg); - err((StatusCode::from_u16(502).unwrap(), error_msg)) - } + Box::new( + Retry::spawn( + ExponentialBackoff::from_millis(10).take(MAX_RETRIES), + action, + ) + .map_err(move |err| { + let err = format!("Couldn't notify connector {:?}", err); + error!("{}", err); + (StatusCode::from_u16(500).unwrap(), err) + }) + .and_then(move |resp| { + parse_body_into_payment_details(resp).and_then(move |payment_details| { + let data = prefixed_mesage(challenge_clone); + let challenge_hash = Sha3::digest(&data); + let recovered_address = payment_details.sig.recover(&challenge_hash); + trace!("Received payment details {:?}", payment_details); + result(recovered_address) + .map_err(move |err| { + let err = format!("Could not recover address {:?}", err); + error!("{}", err); + (StatusCode::from_u16(502).unwrap(), err) + }) + .and_then({ + let payment_details = payment_details.clone(); + move |recovered_address| { + if recovered_address.as_bytes() + == &payment_details.to.own_address.as_bytes()[..] + { + ok(()) + } else { + let error_msg = format!( + "Recovered address did not match: {:?}. Expected {:?}", + recovered_address.to_string(), + payment_details.to + ); + error!("{}", error_msg); + err((StatusCode::from_u16(502).unwrap(), error_msg)) } + } + }) + .and_then(move |_| { + let data = HashMap::from_iter(vec![(account_id, payment_details.to)]); + store.save_account_addresses(data).map_err(move |err| { + let err = format!("Couldn't connect to store {:?}", err); + error!("{}", err); + (StatusCode::from_u16(500).unwrap(), err) }) - .and_then(move |_| { - let data = - HashMap::from_iter(vec![(account_id, payment_details.to)]); - store.save_account_addresses(data).map_err(move |err| { - let err = format!("Couldn't connect to store {:?}", err); - error!("{}", err); - (StatusCode::from_u16(500).unwrap(), err) - }) - }) - }) + }) }) - .and_then(move |_| Ok((StatusCode::from_u16(201).unwrap(), "CREATED".to_owned()))) - }), + }) + .and_then(move |_| Ok((StatusCode::from_u16(201).unwrap(), "CREATED".to_owned()))), ) } @@ -993,7 +974,6 @@ pub fn run_ethereum_engine( redis_uri: R, ethereum_endpoint: String, settlement_port: u16, - secret_seed: &[u8; 32], private_key: Si, chain_id: u8, confirmations: u8, @@ -1007,49 +987,33 @@ where R: IntoConnectionInfo, Si: EthereumLedgerTxSigner + Clone + Send + Sync + 'static, { - let redis_secret = generate_redis_secret(secret_seed); let redis_uri = redis_uri.into_connection_info().unwrap(); EthereumLedgerRedisStoreBuilder::new(redis_uri.clone()) .connect() .and_then(move |ethereum_store| { - let engine = EthereumLedgerSettlementEngineBuilder::new(ethereum_store, private_key) - .ethereum_endpoint(ðereum_endpoint) - .chain_id(chain_id) - .connector_url(&connector_url) - .confirmations(confirmations) - .asset_scale(asset_scale) - .poll_frequency(poll_frequency) - .watch_incoming(watch_incoming) - .token_address(token_address) - .connect(); - - RedisStoreBuilder::new(redis_uri, redis_secret) - .connect() - .and_then(move |store| { - let addr = SocketAddr::from(([127, 0, 0, 1], settlement_port)); - let listener = TcpListener::bind(&addr) - .expect("Unable to bind to Settlement Engine address"); - info!("Ethereum Settlement Engine listening on: {}", addr); - - let api = SettlementEngineApi::new(engine, store); - tokio::spawn(api.serve(listener.incoming())); - Ok(()) - }) + let engine = + EthereumLedgerSettlementEngineBuilder::new(ethereum_store.clone(), private_key) + .ethereum_endpoint(ðereum_endpoint) + .chain_id(chain_id) + .connector_url(&connector_url) + .confirmations(confirmations) + .asset_scale(asset_scale) + .poll_frequency(poll_frequency) + .watch_incoming(watch_incoming) + .token_address(token_address) + .connect(); + + let addr = SocketAddr::from(([127, 0, 0, 1], settlement_port)); + let listener = + TcpListener::bind(&addr).expect("Unable to bind to Settlement Engine address"); + let api = SettlementEngineApi::new(engine, ethereum_store); + tokio::spawn(api.serve(listener.incoming())); + info!("Ethereum Settlement Engine listening on: {}", addr); + Ok(()) }) } -static REDIS_SECRET_GENERATION_STRING: &str = "ilp_redis_secret"; -fn generate_redis_secret(secret_seed: &[u8; 32]) -> [u8; 32] { - let mut redis_secret: [u8; 32] = [0; 32]; - let sig = hmac::sign( - &hmac::SigningKey::new(&digest::SHA256, secret_seed), - REDIS_SECRET_GENERATION_STRING.as_bytes(), - ); - redis_secret.copy_from_slice(sig.as_ref()); - redis_secret -} - #[cfg(test)] mod tests { use super::super::fixtures::{ALICE, BOB, MESSAGES_API}; @@ -1077,7 +1041,7 @@ mod tests { let alice_store = test_store(ALICE.clone(), false, false, true); alice_store .save_account_addresses(HashMap::from_iter(vec![( - 0, + "0".to_string(), Addresses { own_address: bob.address, token_address: None, @@ -1089,7 +1053,7 @@ mod tests { let bob_store = test_store(bob.clone(), false, false, true); bob_store .save_account_addresses(HashMap::from_iter(vec![( - 42, + "42".to_string(), Addresses { own_address: alice.address, token_address: None, diff --git a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/fixtures.rs b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/fixtures.rs index f918e7f6e..9488ee4b7 100644 --- a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/fixtures.rs +++ b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/fixtures.rs @@ -4,12 +4,12 @@ use mockito::Matcher; lazy_static! { pub static ref ALICE: TestAccount = TestAccount::new( - 1, + "1".to_string(), "3cdb3d9e1b74692bb1e3bb5fc81938151ca64b02", "eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee" ); pub static ref BOB: TestAccount = TestAccount::new( - 0, + "0".to_string(), "9b925641c5ef3fd86f63bff2da55a0deeafd1263", "eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee" ); diff --git a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/test_helpers.rs b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/test_helpers.rs index ceb8ea7d7..ea123a071 100644 --- a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/test_helpers.rs +++ b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/test_helpers.rs @@ -1,5 +1,4 @@ use bytes::Bytes; -use interledger_service::{Account, AccountStore}; use tokio::runtime::Runtime; use parking_lot::RwLock; @@ -20,25 +19,23 @@ use web3::{ use super::eth_engine::{EthereumLedgerSettlementEngine, EthereumLedgerSettlementEngineBuilder}; use super::types::{Addresses, EthereumAccount, EthereumLedgerTxSigner, EthereumStore}; -use interledger_settlement::{IdempotentData, IdempotentStore}; +use crate::stores::{IdempotentEngineData, IdempotentEngineStore}; #[derive(Debug, Clone)] pub struct TestAccount { - pub id: u64, + pub id: String, pub address: Address, pub token_address: Address, pub no_details: bool, } -impl Account for TestAccount { - type AccountId = u64; +impl EthereumAccount for TestAccount { + type AccountId = String; - fn id(&self) -> u64 { - self.id + fn id(&self) -> String { + self.id.clone() } -} -impl EthereumAccount for TestAccount { fn token_address(&self) -> Option
{ if self.no_details { return None; @@ -55,8 +52,8 @@ impl EthereumAccount for TestAccount { pub struct TestStore { pub accounts: Arc>, pub should_fail: bool, - pub addresses: Arc>>, - pub address_to_id: Arc>>, + pub addresses: Arc>>, + pub address_to_id: Arc>>, #[allow(clippy::all)] pub cache: Arc>>, pub last_observed_block: Arc>, @@ -101,12 +98,12 @@ impl EthereumStore for TestStore { fn save_account_addresses( &self, - data: HashMap, + data: HashMap, ) -> Box + Send> { let mut guard = self.addresses.write(); let mut guard2 = self.address_to_id.write(); for (acc, d) in data { - (*guard).insert(acc, d); + (*guard).insert(acc.clone(), d); (*guard2).insert(d, acc); } Box::new(ok(())) @@ -114,12 +111,12 @@ impl EthereumStore for TestStore { fn load_account_addresses( &self, - account_ids: Vec, + account_ids: Vec, ) -> Box, Error = ()> + Send> { let mut v = Vec::with_capacity(account_ids.len()); let addresses = self.addresses.read(); for acc in &account_ids { - if let Some(d) = addresses.get(&acc) { + if let Some(d) = addresses.get(acc) { v.push(Addresses { own_address: d.own_address, token_address: d.token_address, @@ -150,10 +147,10 @@ impl EthereumStore for TestStore { fn load_account_id_from_address( &self, eth_address: Addresses, - ) -> Box + Send> { + ) -> Box + Send> { let addresses = self.address_to_id.read(); let d = if let Some(d) = addresses.get(ð_address) { - *d + d.clone() } else { return Box::new(err(())); }; @@ -181,37 +178,11 @@ impl EthereumStore for TestStore { } } -impl AccountStore for TestStore { - type Account = TestAccount; - - fn get_accounts( - &self, - account_ids: Vec<<::Account as Account>::AccountId>, - ) -> Box, Error = ()> + Send> { - let accounts: Vec = self - .accounts - .iter() - .filter_map(|account| { - if account_ids.contains(&account.id) { - Some(account.clone()) - } else { - None - } - }) - .collect(); - if accounts.len() == account_ids.len() { - Box::new(ok(accounts)) - } else { - Box::new(err(())) - } - } -} - -impl IdempotentStore for TestStore { +impl IdempotentEngineStore for TestStore { fn load_idempotent_data( &self, idempotency_key: String, - ) -> Box, Error = ()> + Send> { + ) -> Box, Error = ()> + Send> { let cache = self.cache.read(); if let Some(data) = cache.get(&idempotency_key) { let mut guard = self.cache_hits.write(); @@ -257,8 +228,8 @@ impl TestStore { own_address: account.address, token_address, }; - addresses.insert(account.id, addrs); - address_to_id.insert(addrs, account.id); + addresses.insert(account.id.clone(), addrs); + address_to_id.insert(addrs, account.id.clone()); } } @@ -279,9 +250,9 @@ impl TestStore { // Test Service impl TestAccount { - pub fn new(id: u64, address: &str, token_address: &str) -> Self { + pub fn new(id: String, address: &str, token_address: &str) -> Self { Self { - id, + id: id.to_string(), address: Address::from_str(address).unwrap(), token_address: Address::from_str(token_address).unwrap(), no_details: false, @@ -301,12 +272,12 @@ where Si: EthereumLedgerTxSigner + Clone + Send + Sync + 'static, S: EthereumStore + LeftoversStore - + IdempotentStore + + IdempotentEngineStore + Clone + Send + Sync + 'static, - A: EthereumAccount + Send + Sync + 'static, + A: EthereumAccount + Clone + Send + Sync + 'static, { EthereumLedgerSettlementEngineBuilder::new(store, key) .connector_url(connector_url) diff --git a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/types.rs b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/types.rs index 1b43ea95b..e06f8aa47 100644 --- a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/types.rs +++ b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/types.rs @@ -1,17 +1,33 @@ use clarity::{PrivateKey, Signature}; use ethereum_tx_sign::RawTransaction; use futures::Future; -use interledger_service::Account; use sha3::{Digest, Keccak256 as Sha3}; use std::collections::HashMap; use std::str::FromStr; use web3::types::{Address, H256, U256}; +use serde::Serialize; +use std::fmt::{Debug, Display}; /// An Ethereum account is associated with an address. We additionally require /// that an optional `token_address` is implemented. If the `token_address` of an /// Ethereum Account is not `None`, than that account is used with the ERC20 token /// associated with that `token_address`. -pub trait EthereumAccount: Account { +/// +use std::hash::Hash; +pub trait EthereumAccount { + type AccountId: Eq + + Hash + + Debug + + Display + + Default + + FromStr + + Send + + Sync + + Clone + + Serialize; + + fn id(&self) -> Self::AccountId; + fn own_address(&self) -> Address; fn token_address(&self) -> Option
{ @@ -35,13 +51,13 @@ pub trait EthereumStore { /// called when creating an account on the API. fn save_account_addresses( &self, - data: HashMap<::AccountId, Addresses>, + data: HashMap<::AccountId, Addresses>, ) -> Box + Send>; /// Loads the Ethereum address associated with this account fn load_account_addresses( &self, - account_ids: Vec<::AccountId>, + account_ids: Vec<::AccountId>, ) -> Box, Error = ()> + Send>; /// Saves the latest block number, up to which all @@ -63,7 +79,7 @@ pub trait EthereumStore { fn load_account_id_from_address( &self, eth_address: Addresses, - ) -> Box::AccountId, Error = ()> + Send>; + ) -> Box::AccountId, Error = ()> + Send>; /// Returns true if the transaction has already been processed and saved in /// the store. diff --git a/crates/interledger-settlement-engines/src/main.rs b/crates/interledger-settlement-engines/src/main.rs index a4f7601e8..061ed8258 100644 --- a/crates/interledger-settlement-engines/src/main.rs +++ b/crates/interledger-settlement-engines/src/main.rs @@ -1,5 +1,4 @@ use clap::{value_t, App, Arg, SubCommand}; -use hex; use std::str::FromStr; use tokio; use url::Url; @@ -41,11 +40,6 @@ pub fn main() { .long("redis_uri") .help("Redis database to add the account to") .default_value("redis://127.0.0.1:6379"), - Arg::with_name("server_secret") - .long("server_secret") - .help("Cryptographic seed used to derive keys") - .takes_value(true) - .required(true), Arg::with_name("chain_id") .long("chain_id") .help("The chain id so that the signer calculates the v value of the sig appropriately") @@ -87,14 +81,6 @@ pub fn main() { let connector_url: String = value_t!(matches, "connector_url", String).unwrap(); let redis_uri = value_t!(matches, "redis_uri", String).expect("redis_uri is required"); let redis_uri = Url::parse(&redis_uri).expect("redis_uri is not a valid URI"); - let server_secret: [u8; 32] = { - let encoded: String = value_t!(matches, "server_secret", String).unwrap(); - let mut server_secret = [0; 32]; - let decoded = hex::decode(encoded).expect("server_secret must be hex-encoded"); - assert_eq!(decoded.len(), 32, "server_secret must be 32 bytes"); - server_secret.clone_from_slice(&decoded); - server_secret - }; let chain_id = value_t!(matches, "chain_id", u8).unwrap(); let confirmations = value_t!(matches, "confirmations", u8).unwrap(); let asset_scale = value_t!(matches, "asset_scale", u8).unwrap(); @@ -105,7 +91,6 @@ pub fn main() { redis_uri, ethereum_endpoint, settlement_port, - &server_secret, private_key, chain_id, confirmations, diff --git a/crates/interledger-settlement-engines/src/stores/mod.rs b/crates/interledger-settlement-engines/src/stores/mod.rs index 9d33c996b..79bd5af36 100644 --- a/crates/interledger-settlement-engines/src/stores/mod.rs +++ b/crates/interledger-settlement-engines/src/stores/mod.rs @@ -1,4 +1,6 @@ +use bytes::Bytes; use futures::future::Future; +use http::StatusCode; pub mod redis_ethereum_ledger; pub mod redis_store_common; @@ -6,6 +8,27 @@ pub mod redis_store_common; #[cfg(test)] pub mod test_helpers; +pub type IdempotentEngineData = (StatusCode, Bytes, [u8; 32]); + +pub trait IdempotentEngineStore { + /// Returns the API response that was saved when the idempotency key was used + /// Also returns a hash of the input data which resulted in the response + fn load_idempotent_data( + &self, + idempotency_key: String, + ) -> Box, Error = ()> + Send>; + + /// Saves the data that was passed along with the api request for later + /// The store MUST also save a hash of the input, so that it errors out on requests + fn save_idempotent_data( + &self, + idempotency_key: String, + input_hash: [u8; 32], + status_code: StatusCode, + data: Bytes, + ) -> Box + Send>; +} + pub trait LeftoversStore { type AssetType; diff --git a/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs b/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs index 7461d3ae2..dba6cf77c 100644 --- a/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs +++ b/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs @@ -3,7 +3,8 @@ use futures::{ Future, }; -use interledger_service::Account as AccountTrait; +use bytes::Bytes; +use http::StatusCode; use std::collections::HashMap; use std::str::FromStr; use web3::types::{Address as EthAddress, H256, U256}; @@ -15,7 +16,7 @@ use redis::{self, cmd, r#async::SharedConnection, ConnectionInfo, PipelineComman use log::{error, trace}; use crate::stores::redis_store_common::{EngineRedisStore, EngineRedisStoreBuilder}; -use crate::stores::LeftoversStore; +use crate::stores::{IdempotentEngineData, IdempotentEngineStore, LeftoversStore}; use num_bigint::BigUint; // Key for the latest observed block and balance. The data is stored in order to @@ -30,19 +31,11 @@ static UNCREDITED_AMOUNT_KEY: &str = "uncredited_settlement_amount"; #[derive(Clone, Debug, Serialize)] pub struct Account { - pub(crate) id: u64, + pub(crate) id: String, pub(crate) own_address: EthAddress, pub(crate) token_address: Option, } -impl AccountTrait for Account { - type AccountId = u64; - - fn id(&self) -> Self::AccountId { - self.id - } -} - fn ethereum_transactions_key(tx_hash: H256) -> String { format!( "{}:{}:{}:{}", @@ -50,7 +43,7 @@ fn ethereum_transactions_key(tx_hash: H256) -> String { ) } -fn ethereum_ledger_key(account_id: u64) -> String { +fn ethereum_ledger_key(account_id: &str) -> String { format!( "{}:{}:{}:{}", ETHEREUM_KEY, LEDGER_KEY, SETTLEMENT_ENGINES_KEY, account_id @@ -65,6 +58,11 @@ fn ethereum_uncredited_amount_key(account_id: String) -> String { } impl EthereumAccount for Account { + type AccountId = String; + + fn id(&self) -> Self::AccountId { + self.id.clone() + } fn token_address(&self) -> Option { self.token_address } @@ -194,16 +192,36 @@ impl LeftoversStore for EthereumLedgerRedisStore { } } +impl IdempotentEngineStore for EthereumLedgerRedisStore { + fn load_idempotent_data( + &self, + idempotency_key: String, + ) -> Box, Error = ()> + Send> { + self.redis_store.load_idempotent_data(idempotency_key) + } + + fn save_idempotent_data( + &self, + idempotency_key: String, + input_hash: [u8; 32], + status_code: StatusCode, + data: Bytes, + ) -> Box + Send> { + self.redis_store + .save_idempotent_data(idempotency_key, input_hash, status_code, data) + } +} + impl EthereumStore for EthereumLedgerRedisStore { type Account = Account; fn load_account_addresses( &self, - account_ids: Vec<::AccountId>, + account_ids: Vec, ) -> Box, Error = ()> + Send> { let mut pipe = redis::pipe(); for account_id in account_ids.iter() { - pipe.hgetall(ethereum_ledger_key(*account_id)); + pipe.hgetall(ethereum_ledger_key(&account_id)); } Box::new( pipe.query_async(self.connection.clone()) @@ -253,7 +271,7 @@ impl EthereumStore for EthereumLedgerRedisStore { fn save_account_addresses( &self, - data: HashMap<::AccountId, EthereumAddresses>, + data: HashMap, ) -> Box + Send> { let mut pipe = redis::pipe(); for (account_id, d) in data { @@ -262,7 +280,7 @@ impl EthereumStore for EthereumLedgerRedisStore { } else { vec![] }; - let acc_id = ethereum_ledger_key(account_id); + let acc_id = ethereum_ledger_key(&account_id); let addrs = &[ ("own_address", d.own_address.as_bytes()), ("token_address", &token_address), @@ -315,13 +333,13 @@ impl EthereumStore for EthereumLedgerRedisStore { fn load_account_id_from_address( &self, eth_address: EthereumAddresses, - ) -> Box::AccountId, Error = ()> + Send> { + ) -> Box + Send> { let mut pipe = redis::pipe(); pipe.get(addrs_to_key(eth_address)); Box::new( pipe.query_async(self.connection.clone()) .map_err(move |err| error!("Error loading account data: {:?}", err)) - .and_then(move |(_conn, account_id): (_, Vec)| ok(account_id[0])), + .and_then(move |(_conn, account_id): (_, Vec)| ok(account_id[0].clone())), ) } @@ -410,7 +428,7 @@ mod tests { #[test] fn saves_and_loads_ethereum_addreses_properly() { block_on(test_store().and_then(|(store, context)| { - let account_ids = vec![30, 42]; + let account_ids = vec!["1".to_string(), "2".to_string()]; let account_addresses = vec![ EthereumAddresses { own_address: EthAddress::from_str("3cdb3d9e1b74692bb1e3bb5fc81938151ca64b02") @@ -426,8 +444,8 @@ mod tests { }, ]; let input = HashMap::from_iter(vec![ - (account_ids[0], account_addresses[0]), - (account_ids[1], account_addresses[1]), + (account_ids[0].clone(), account_addresses[0]), + (account_ids[1].clone(), account_addresses[1]), ]); store .save_account_addresses(input) diff --git a/crates/interledger-settlement-engines/src/stores/redis_store_common.rs b/crates/interledger-settlement-engines/src/stores/redis_store_common.rs index 57b09ed72..c67bc8f3c 100644 --- a/crates/interledger-settlement-engines/src/stores/redis_store_common.rs +++ b/crates/interledger-settlement-engines/src/stores/redis_store_common.rs @@ -1,7 +1,12 @@ +use crate::stores::{IdempotentEngineData, IdempotentEngineStore}; +use bytes::Bytes; use futures::{future::result, Future}; -use redis::{self, r#async::SharedConnection, Client, ConnectionInfo}; +use http::StatusCode; +use redis::{self, cmd, r#async::SharedConnection, Client, ConnectionInfo, PipelineCommands}; +use std::collections::HashMap as SlowHashMap; +use std::str::FromStr; -use log::{debug, error}; +use log::{debug, error, trace}; pub struct EngineRedisStoreBuilder { redis_uri: ConnectionInfo, @@ -33,3 +38,122 @@ impl EngineRedisStoreBuilder { pub struct EngineRedisStore { pub connection: SharedConnection, } + +impl IdempotentEngineStore for EngineRedisStore { + fn load_idempotent_data( + &self, + idempotency_key: String, + ) -> Box, Error = ()> + Send> { + let idempotency_key_clone = idempotency_key.clone(); + Box::new( + cmd("HGETALL") + .arg(idempotency_key.clone()) + .query_async(self.connection.clone()) + .map_err(move |err| { + error!( + "Error loading idempotency key {}: {:?}", + idempotency_key_clone, err + ) + }) + .and_then( + move |(_connection, ret): (_, SlowHashMap)| { + if let (Some(status_code), Some(data), Some(input_hash_slice)) = ( + ret.get("status_code"), + ret.get("data"), + ret.get("input_hash"), + ) { + trace!("Loaded idempotency key {:?} - {:?}", idempotency_key, ret); + let mut input_hash: [u8; 32] = Default::default(); + input_hash.copy_from_slice(input_hash_slice.as_ref()); + Ok(Some(( + StatusCode::from_str(status_code).unwrap(), + Bytes::from(data.clone()), + input_hash, + ))) + } else { + Ok(None) + } + }, + ), + ) + } + + fn save_idempotent_data( + &self, + idempotency_key: String, + input_hash: [u8; 32], + status_code: StatusCode, + data: Bytes, + ) -> Box + Send> { + let mut pipe = redis::pipe(); + pipe.atomic() + .cmd("HMSET") // cannot use hset_multiple since data and status_code have different types + .arg(&idempotency_key) + .arg("status_code") + .arg(status_code.as_u16()) + .arg("data") + .arg(data.as_ref()) + .arg("input_hash") + .arg(&input_hash) + .ignore() + .expire(&idempotency_key, 86400) + .ignore(); + Box::new( + pipe.query_async(self.connection.clone()) + .map_err(|err| error!("Error caching: {:?}", err)) + .and_then(move |(_connection, _): (_, Vec)| { + trace!( + "Cached {:?}: {:?}, {:?}", + idempotency_key, + status_code, + data, + ); + Ok(()) + }), + ) + } +} + +// add tests for idempotency from other store +#[cfg(test)] +mod tests { + use super::super::test_helpers::store_helpers::{block_on, test_store, IDEMPOTENCY_KEY}; + use super::*; + + #[test] + fn saves_and_loads_idempotency_key_data_properly() { + block_on(test_store().and_then(|(store, context)| { + let input_hash: [u8; 32] = Default::default(); + store + .save_idempotent_data( + IDEMPOTENCY_KEY.clone(), + input_hash, + StatusCode::OK, + Bytes::from("TEST"), + ) + .map_err(|err| eprintln!("Redis error: {:?}", err)) + .and_then(move |_| { + store + .load_idempotent_data(IDEMPOTENCY_KEY.clone()) + .map_err(|err| eprintln!("Redis error: {:?}", err)) + .and_then(move |data1| { + assert_eq!( + data1.unwrap(), + (StatusCode::OK, Bytes::from("TEST"), input_hash) + ); + let _ = context; + + store + .load_idempotent_data("asdf".to_string()) + .map_err(|err| eprintln!("Redis error: {:?}", err)) + .and_then(move |data2| { + assert!(data2.is_none()); + let _ = context; + Ok(()) + }) + }) + }) + })) + .unwrap() + } +} diff --git a/crates/interledger-settlement-engines/tests/eth_ledger_settlement.rs b/crates/interledger-settlement-engines/tests/eth_ledger_settlement.rs index 222d33b2c..3e88bf6e6 100644 --- a/crates/interledger-settlement-engines/tests/eth_ledger_settlement.rs +++ b/crates/interledger-settlement-engines/tests/eth_ledger_settlement.rs @@ -16,7 +16,10 @@ mod redis_helpers; use redis_helpers::*; mod test_helpers; -use test_helpers::{create_account, get_balance, send_money, start_eth_engine, start_ganache}; +use test_helpers::{ + accounts_to_ids, create_account_on_engine, get_all_accounts, get_balance, send_money_to_id, + start_eth_engine, start_ganache, +}; #[test] /// In this test we have Alice and Bob who have peered with each other and run @@ -57,7 +60,7 @@ fn eth_ledger_settlement() { let node1_secret = cli::random_secret(); let node1 = InterledgerNode { ilp_address: Address::from_str("example.alice").unwrap(), - default_spsp_account: Some(0), + default_spsp_account: None, admin_auth_token: "hi_alice".to_string(), redis_connection: connection_info1.clone(), btp_address: ([127, 0, 0, 1], get_open_port(None)).into(), @@ -127,7 +130,7 @@ fn eth_ledger_settlement() { let node2_secret = cli::random_secret(); let node2 = InterledgerNode { ilp_address: Address::from_str("example.bob").unwrap(), - default_spsp_account: Some(0), + default_spsp_account: None, admin_auth_token: "admin".to_string(), redis_connection: connection_info2.clone(), btp_address: ([127, 0, 0, 1], get_open_port(None)).into(), @@ -203,67 +206,89 @@ fn eth_ledger_settlement() { // create account endpoint so that they trade addresses. // This would happen automatically if we inserted the // accounts via the Accounts API. - let create1 = create_account(node1_engine, "1"); - let create2 = create_account(node2_engine, "1"); + let bob_addr = Address::from_str("example.bob").unwrap(); + let bob_addr2 = bob_addr.clone(); + let alice_addr = Address::from_str("example.alice").unwrap(); + futures::future::join_all(vec![ + get_all_accounts(node1_http, "hi_alice").map(accounts_to_ids), + get_all_accounts(node2_http, "admin").map(accounts_to_ids), + ]) + .and_then(move |ids| { + let node1_ids = ids[0].clone(); + let node2_ids = ids[1].clone(); + + let bob = node1_ids.get(&bob_addr2).unwrap().to_owned(); + let alice = node2_ids.get(&alice_addr).unwrap().to_owned(); + let bob_at_bob = node2_ids.get(&bob_addr).unwrap().to_owned(); - // Make 4 subsequent payments (we could also do a 71 payment - // directly) - let send1 = send_money(node1_http, node2_http, 10, "in_alice"); - let send2 = send_money(node1_http, node2_http, 20, "in_alice"); - let send3 = send_money(node1_http, node2_http, 39, "in_alice"); - let send4 = send_money(node1_http, node2_http, 1, "in_alice"); + let create1 = create_account_on_engine(node1_engine, bob); + let create2 = create_account_on_engine(node2_engine, alice); - create1 - .and_then(move |_| create2) - .and_then(move |_| send1) - .and_then(move |_| { - get_balance(1, node1_http, "bob").and_then(move |ret| { - assert_eq!(ret, 10); - get_balance(1, node2_http, "alice").and_then(move |ret| { - assert_eq!(ret, -10); - Ok(()) + // Make 4 subsequent payments (we could also do a 71 payment + // directly) + // Alice must know Bob's id in the store... this does not + // make much sense. + let send1 = + send_money_to_id(node1_http, node2_http, 10, bob_at_bob, "in_alice"); + let send2 = + send_money_to_id(node1_http, node2_http, 20, bob_at_bob, "in_alice"); + let send3 = + send_money_to_id(node1_http, node2_http, 39, bob_at_bob, "in_alice"); + let send4 = + send_money_to_id(node1_http, node2_http, 1, bob_at_bob, "in_alice"); + + create1 + .and_then(move |_| create2) + .and_then(move |_| send1) + .and_then(move |_| { + get_balance(bob, node1_http, "bob").and_then(move |ret| { + assert_eq!(ret, 10); + get_balance(alice, node2_http, "alice").and_then(move |ret| { + assert_eq!(ret, -10); + Ok(()) + }) }) }) - }) - .and_then(move |_| send2) - .and_then(move |_| { - get_balance(1, node1_http, "bob").and_then(move |ret| { - assert_eq!(ret, 30); - get_balance(1, node2_http, "alice").and_then(move |ret| { - assert_eq!(ret, -30); - Ok(()) + .and_then(move |_| send2) + .and_then(move |_| { + get_balance(bob, node1_http, "bob").and_then(move |ret| { + assert_eq!(ret, 30); + get_balance(alice, node2_http, "alice").and_then(move |ret| { + assert_eq!(ret, -30); + Ok(()) + }) }) }) - }) - .and_then(move |_| send3) - .and_then(move |_| { - get_balance(1, node1_http, "bob").and_then(move |ret| { - assert_eq!(ret, 69); - get_balance(1, node2_http, "alice").and_then(move |ret| { - assert_eq!(ret, -69); - Ok(()) + .and_then(move |_| send3) + .and_then(move |_| { + get_balance(bob, node1_http, "bob").and_then(move |ret| { + assert_eq!(ret, 69); + get_balance(alice, node2_http, "alice").and_then(move |ret| { + assert_eq!(ret, -69); + Ok(()) + }) }) }) - }) - // Up to here, Alice's balance should be -69 and Bob's - // balance should be 69. Once we make 1 more payment, we - // exceed the settle_threshold and thus a settlement is made - .and_then(move |_| send4) - .and_then(move |_| { - // Wait a few seconds so that the receiver's engine - // gets the data - sleep(Duration::from_secs(5)); - // Since the credit connection reached -70, and the - // settle_to is -10, a 60 Wei transaction is made. - get_balance(1, node1_http, "bob").and_then(move |ret| { - assert_eq!(ret, 10); - get_balance(1, node2_http, "alice").and_then(move |ret| { - assert_eq!(ret, -10); - ganache_pid.kill().unwrap(); - Ok(()) + // Up to here, Alice's balance should be -69 and Bob's + // balance should be 69. Once we make 1 more payment, we + // exceed the settle_threshold and thus a settlement is made + .and_then(move |_| send4) + .and_then(move |_| { + // Wait a few seconds so that the receiver's engine + // gets the data + sleep(Duration::from_secs(5)); + // Since the credit connection reached -70, and the + // settle_to is -10, a 60 Wei transaction is made. + get_balance(bob, node1_http, "bob").and_then(move |ret| { + assert_eq!(ret, 10); + get_balance(alice, node2_http, "alice").and_then(move |ret| { + assert_eq!(ret, -10); + ganache_pid.kill().unwrap(); + Ok(()) + }) }) }) - }) + }) }), ) .unwrap(); diff --git a/crates/interledger-settlement-engines/tests/eth_xrp_interoperable.rs b/crates/interledger-settlement-engines/tests/eth_xrp_interoperable.rs index 98521ed70..a391e9ebc 100644 --- a/crates/interledger-settlement-engines/tests/eth_xrp_interoperable.rs +++ b/crates/interledger-settlement-engines/tests/eth_xrp_interoperable.rs @@ -16,7 +16,8 @@ use redis_helpers::*; mod test_helpers; use test_helpers::{ - create_account, get_balance, send_money, start_eth_engine, start_ganache, start_xrp_engine, + accounts_to_ids, create_account_on_engine, get_all_accounts, get_balance, send_money_to_id, + start_eth_engine, start_ganache, start_xrp_engine, }; #[test] @@ -96,7 +97,7 @@ fn eth_xrp_interoperable() { let node1 = InterledgerNode { ilp_address: Address::from_str("example.alice").unwrap(), - default_spsp_account: Some(0), + default_spsp_account: None, admin_auth_token: "admin".to_string(), redis_connection: connection_info1, btp_address: ([127, 0, 0, 1], get_open_port(None)).into(), @@ -161,7 +162,7 @@ fn eth_xrp_interoperable() { let node2 = InterledgerNode { ilp_address: Address::from_str("example.bob").unwrap(), - default_spsp_account: Some(0), + default_spsp_account: None, admin_auth_token: "admin".to_string(), redis_connection: connection_info2, btp_address: ([127, 0, 0, 1], node2_btp).into(), @@ -243,7 +244,7 @@ fn eth_xrp_interoperable() { let node3 = InterledgerNode { ilp_address: Address::from_str("example.bob.charlie").unwrap(), - default_spsp_account: Some(0), + default_spsp_account: None, admin_auth_token: "admin".to_string(), redis_connection: connection_info3, btp_address: ([127, 0, 0, 1], get_open_port(None)).into(), @@ -316,66 +317,111 @@ fn eth_xrp_interoperable() { delay(500) .map_err(|_| panic!("Something strange happened")) .and_then(move |_| { - // Insert accounts for the 3 nodes (4 total since node2 has - // eth & xrp) - create_account(node1_engine, "1") - .and_then(move |_| create_account(node2_engine, "0")) - .and_then(move |_| create_account(node2_xrp_engine_port, "1")) - .and_then(move |_| create_account(node3_xrp_engine_port, "1")) - // Pay 69k Gwei --> 69 drops - .and_then(move |_| send_money(node1_http, node3_http, 69000, "in_alice")) - // Pay 1k Gwei --> 1 drop - // This will trigger a 60 Gwei settlement from Alice to Bob. - .and_then(move |_| send_money(node1_http, node3_http, 1000, "in_alice")) - .and_then(move |_| { - // wait for the settlements - delay(10000).map_err(|err| panic!(err)).and_then(move |_| { - futures::future::join_all(vec![ - get_balance(0, node1_http, "admin"), - get_balance(1, node1_http, "admin"), - get_balance(0, node2_http, "admin"), - get_balance(1, node2_http, "admin"), - get_balance(0, node3_http, "admin"), - get_balance(1, node3_http, "admin"), - ]) - .and_then(move |balances| { - // Alice has paid Charlie in total 70k Gwei through Bob. - assert_eq!(balances[0], -70000); - // Since Alice has configured Bob's - // `settle_threshold` and `settle_to` to be - // 70k and 10k respectively, once she - // exceeded the 70k threshold, she made a 60k - // Gwei settlement to Bob so that their debt - // settles down to 10k. - // From her perspective, Bob's account has a - // positive 10k balance since she owes him money. - assert_eq!(balances[1], 10000); - // From Bob's perspective, Alice's account - // has a negative sign since he is owed money. - assert_eq!(balances[2], -10000); - // As Bob forwards money to Charlie, he also - // eventually exceeds the `settle_threshold` - // which incidentally is set to 70k. As a - // result, he must make a XRP ledger - // settlement of 65k Drops to get his debt - // back to the `settle_to` value of charlie, - // which is 5k (70k - 5k = 65k). - assert_eq!(balances[3], 5000); - // Charlie's balance indicates that he's - // received 70k drops (the total amount Alice sent him) - assert_eq!(balances[4], 70000); - // And he sees is owed 5k by Bob. - assert_eq!(balances[5], -5000); + let charlie_addr = Address::from_str("example.bob.charlie").unwrap(); + let bob_addr = Address::from_str("example.bob").unwrap(); + let alice_addr = Address::from_str("example.alice").unwrap(); + futures::future::join_all(vec![ + get_all_accounts(node1_http, "admin").map(accounts_to_ids), + get_all_accounts(node2_http, "admin").map(accounts_to_ids), + get_all_accounts(node3_http, "admin").map(accounts_to_ids), + ]) + .and_then(move |ids| { + let node1_ids = ids[0].clone(); + let node2_ids = ids[1].clone(); + let node3_ids = ids[2].clone(); - node2_engine_redis.kill().unwrap(); - node3_engine_redis.kill().unwrap(); - node2_xrp_engine.kill().unwrap(); - node3_xrp_engine.kill().unwrap(); - ganache_pid.kill().unwrap(); - Ok(()) + let alice_on_alice = node1_ids.get(&alice_addr).unwrap().to_owned(); + let bob_on_alice = node1_ids.get(&bob_addr).unwrap().to_owned(); + + let alice_on_bob = node2_ids.get(&alice_addr).unwrap().to_owned(); + let charlie_on_bob = node2_ids.get(&charlie_addr).unwrap().to_owned(); + + let charlie_on_charlie = node3_ids.get(&charlie_addr).unwrap().to_owned(); + let bob_on_charlie = node3_ids.get(&bob_addr).unwrap().to_owned(); + + // Insert accounts for the 3 nodes (4 total since node2 has + // eth & xrp) + create_account_on_engine(node1_engine, bob_on_alice) + .and_then(move |_| create_account_on_engine(node2_engine, alice_on_bob)) + .and_then(move |_| { + create_account_on_engine(node2_xrp_engine_port, charlie_on_bob) + }) + .and_then(move |_| { + create_account_on_engine(node3_xrp_engine_port, bob_on_charlie) + }) + // Pay 69k Gwei --> 69 drops + .and_then(move |_| { + send_money_to_id( + node1_http, + node3_http, + 69000, + charlie_on_charlie, + "in_alice", + ) + }) + // Pay 1k Gwei --> 1 drop + // This will trigger a 60 Gwei settlement from Alice to Bob. + .and_then(move |_| { + send_money_to_id( + node1_http, + node3_http, + 1000, + charlie_on_charlie, + "in_alice", + ) + }) + .and_then(move |_| { + // wait for the settlements + delay(10000).map_err(|err| panic!(err)).and_then(move |_| { + futures::future::join_all(vec![ + get_balance(alice_on_alice, node1_http, "admin"), + get_balance(bob_on_alice, node1_http, "admin"), + get_balance(alice_on_bob, node2_http, "admin"), + get_balance(charlie_on_bob, node2_http, "admin"), + get_balance(charlie_on_charlie, node3_http, "admin"), + get_balance(bob_on_charlie, node3_http, "admin"), + ]) + .and_then( + move |balances| { + // Alice has paid Charlie in total 70k Gwei through Bob. + assert_eq!(balances[0], -70000); + // Since Alice has configured Bob's + // `settle_threshold` and `settle_to` to be + // 70k and 10k respectively, once she + // exceeded the 70k threshold, she made a 60k + // Gwei settlement to Bob so that their debt + // settles down to 10k. + // From her perspective, Bob's account has a + // positive 10k balance since she owes him money. + assert_eq!(balances[1], 10000); + // From Bob's perspective, Alice's account + // has a negative sign since he is owed money. + assert_eq!(balances[2], -10000); + // As Bob forwards money to Charlie, he also + // eventually exceeds the `settle_threshold` + // which incidentally is set to 70k. As a + // result, he must make a XRP ledger + // settlement of 65k Drops to get his debt + // back to the `settle_to` value of charlie, + // which is 5k (70k - 5k = 65k). + assert_eq!(balances[3], 5000); + // Charlie's balance indicates that he's + // received 70k drops (the total amount Alice sent him) + assert_eq!(balances[4], 70000); + // And he sees is owed 5k by Bob. + assert_eq!(balances[5], -5000); + + node2_engine_redis.kill().unwrap(); + node3_engine_redis.kill().unwrap(); + node2_xrp_engine.kill().unwrap(); + node3_xrp_engine.kill().unwrap(); + ganache_pid.kill().unwrap(); + Ok(()) + }, + ) }) }) - }) + }) }), ) .unwrap(); diff --git a/crates/interledger-settlement-engines/tests/test_helpers.rs b/crates/interledger-settlement-engines/tests/test_helpers.rs index fa4c2fddc..9de48c54c 100644 --- a/crates/interledger-settlement-engines/tests/test_helpers.rs +++ b/crates/interledger-settlement-engines/tests/test_helpers.rs @@ -1,8 +1,15 @@ use futures::{stream::Stream, Future}; -use interledger::cli; +use interledger_ildcp::IldcpAccount; +use interledger_packet::Address; +use interledger_service::Account as AccountTrait; use interledger_settlement_engines::engines::ethereum_ledger::run_ethereum_engine; +use interledger_store_redis::Account; +use interledger_store_redis::AccountId; use redis::ConnectionInfo; +use serde::Serialize; use serde_json::json; +use std::collections::HashMap; +use std::fmt::Display; use std::process::Command; use std::str; use std::thread::sleep; @@ -66,7 +73,6 @@ pub fn start_eth_engine( db, "http://localhost:8545".to_string(), engine_port, - &cli::random_secret(), key, 1, 0, @@ -78,9 +84,10 @@ pub fn start_eth_engine( ) } -pub fn create_account( +#[allow(unused)] +pub fn create_account_on_engine( engine_port: u16, - account_id: &'static str, + account_id: T, ) -> impl Future { let client = reqwest::r#async::Client::new(); client @@ -96,10 +103,12 @@ pub fn create_account( .and_then(move |chunk| Ok(str::from_utf8(&chunk).unwrap().to_string())) } -pub fn send_money( +#[allow(unused)] +pub fn send_money_to_id( from: u16, to: u16, amount: u64, + id: T, auth: &str, ) -> impl Future { let client = reqwest::r#async::Client::new(); @@ -107,7 +116,8 @@ pub fn send_money( .post(&format!("http://localhost:{}/pay", from)) .header("Authorization", format!("Bearer {}", auth)) .json(&json!({ - "receiver": format!("http://localhost:{}/.well-known/pay", to), + // TODO: replace with username + "receiver": format!("http://localhost:{}/spsp/{}", to, id), "source_amount": amount, })) .send() @@ -122,8 +132,39 @@ pub fn send_money( }) } -pub fn get_balance( - account_id: u64, +#[allow(unused)] +pub fn get_all_accounts( + node_port: u16, + admin_token: &str, +) -> impl Future, Error = ()> { + let client = reqwest::r#async::Client::new(); + client + .get(&format!("http://localhost:{}/accounts", node_port)) + .header("Authorization", format!("Bearer {}", admin_token)) + .send() + .and_then(|res| res.error_for_status()) + .and_then(|res| res.into_body().concat2()) + .map_err(|err| { + eprintln!("Error getting account data: {:?}", err); + }) + .and_then(move |body| { + let ret: Vec = serde_json::from_slice(&body).unwrap(); + Ok(ret) + }) +} + +#[allow(unused)] +pub fn accounts_to_ids(accounts: Vec) -> HashMap { + let mut map = HashMap::new(); + for a in accounts { + map.insert(a.client_address().clone(), a.id()); + } + map +} + +#[allow(unused)] +pub fn get_balance( + account_id: T, node_port: u16, admin_token: &str, ) -> impl Future { diff --git a/crates/interledger-settlement-engines/tests/xrp_ledger_settlement.rs b/crates/interledger-settlement-engines/tests/xrp_ledger_settlement.rs index e0419bae2..fab38aa3a 100644 --- a/crates/interledger-settlement-engines/tests/xrp_ledger_settlement.rs +++ b/crates/interledger-settlement-engines/tests/xrp_ledger_settlement.rs @@ -14,7 +14,10 @@ mod redis_helpers; use redis_helpers::*; mod test_helpers; -use test_helpers::{create_account, get_balance, send_money, start_xrp_engine}; +use test_helpers::{ + accounts_to_ids, create_account_on_engine, get_all_accounts, get_balance, send_money_to_id, + start_xrp_engine, +}; #[test] /// In this test we have Alice and Bob who have peered with each other and run @@ -72,7 +75,7 @@ fn xrp_ledger_settlement() { let node1_secret = cli::random_secret(); let node1 = InterledgerNode { ilp_address: Address::from_str("example.alice").unwrap(), - default_spsp_account: Some(0), + default_spsp_account: None, admin_auth_token: "hi_alice".to_string(), redis_connection: connection_info1.clone(), btp_address: ([127, 0, 0, 1], get_open_port(None)).into(), @@ -135,7 +138,7 @@ fn xrp_ledger_settlement() { let node2_secret = cli::random_secret(); let node2 = InterledgerNode { ilp_address: Address::from_str("example.bob").unwrap(), - default_spsp_account: Some(0), + default_spsp_account: None, admin_auth_token: "admin".to_string(), redis_connection: connection_info2.clone(), btp_address: ([127, 0, 0, 1], get_open_port(None)).into(), @@ -204,75 +207,97 @@ fn xrp_ledger_settlement() { // create account endpoint so that they trade addresses. // This would happen automatically if we inserted the // accounts via the Accounts API. - let create1 = create_account(node1_engine, "1"); - let create2 = create_account(node2_engine, "1"); + let bob_addr = Address::from_str("example.bob").unwrap(); + let bob_addr2 = bob_addr.clone(); + let alice_addr = Address::from_str("example.alice").unwrap(); + futures::future::join_all(vec![ + get_all_accounts(node1_http, "hi_alice").map(accounts_to_ids), + get_all_accounts(node2_http, "admin").map(accounts_to_ids), + ]) + .and_then(move |ids| { + let node1_ids = ids[0].clone(); + let node2_ids = ids[1].clone(); + + let bob = node1_ids.get(&bob_addr2).unwrap().to_owned(); + let alice = node2_ids.get(&alice_addr).unwrap().to_owned(); + let bob_at_bob = node2_ids.get(&bob_addr).unwrap().to_owned(); + + let create1 = create_account_on_engine(node1_engine, bob); + let create2 = create_account_on_engine(node2_engine, alice); - // Make 4 subsequent payments (we could also do a 71 payment - // directly) - let send1 = send_money(node1_http, node2_http, 10, "in_alice"); - let send2 = send_money(node1_http, node2_http, 20, "in_alice"); - let send3 = send_money(node1_http, node2_http, 39, "in_alice"); - let send4 = send_money(node1_http, node2_http, 1, "in_alice"); + // Make 4 subsequent payments (we could also do a 71 payment + // directly) + let send1 = + send_money_to_id(node1_http, node2_http, 10, bob_at_bob, "in_alice"); + let send2 = + send_money_to_id(node1_http, node2_http, 20, bob_at_bob, "in_alice"); + let send3 = + send_money_to_id(node1_http, node2_http, 39, bob_at_bob, "in_alice"); + let send4 = + send_money_to_id(node1_http, node2_http, 1, bob_at_bob, "in_alice"); - create1 - .and_then(move |_| create2) - .and_then(move |_| send1) - .and_then(move |_| { - get_balance(1, node1_http, "bob").and_then(move |ret| { - assert_eq!(ret, 10); - get_balance(1, node2_http, "alice").and_then(move |ret| { - assert_eq!(ret, -10); - Ok(()) + create1 + .and_then(move |_| create2) + .and_then(move |_| send1) + .and_then(move |_| { + get_balance(bob, node1_http, "bob").and_then(move |ret| { + assert_eq!(ret, 10); + get_balance(alice, node2_http, "alice").and_then(move |ret| { + assert_eq!(ret, -10); + Ok(()) + }) }) }) - }) - .and_then(move |_| send2) - .and_then(move |_| { - get_balance(1, node1_http, "bob").and_then(move |ret| { - assert_eq!(ret, 30); - get_balance(1, node2_http, "alice").and_then(move |ret| { - assert_eq!(ret, -30); - Ok(()) + .and_then(move |_| send2) + .and_then(move |_| { + get_balance(bob, node1_http, "bob").and_then(move |ret| { + assert_eq!(ret, 30); + get_balance(alice, node2_http, "alice").and_then(move |ret| { + assert_eq!(ret, -30); + Ok(()) + }) }) }) - }) - .and_then(move |_| send3) - .and_then(move |_| { - get_balance(1, node1_http, "bob").and_then(move |ret| { - assert_eq!(ret, 69); - get_balance(1, node2_http, "alice").and_then(move |ret| { - assert_eq!(ret, -69); - Ok(()) + .and_then(move |_| send3) + .and_then(move |_| { + get_balance(bob, node1_http, "bob").and_then(move |ret| { + assert_eq!(ret, 69); + get_balance(alice, node2_http, "alice").and_then(move |ret| { + assert_eq!(ret, -69); + Ok(()) + }) }) }) - }) - // Up to here, Alice's balance should be -69 and Bob's - // balance should be 69. Once we make 1 more payment, we - // exceed the settle_threshold and thus a settlement is made - .and_then(move |_| send4) - .and_then(move |_| { - // Since the credit connection reached -70, and the - // settle_to is -10, a 60 drops transaction is made. - get_balance(1, node1_http, "bob").and_then(move |ret| { - assert_eq!(ret, 10); - // Wait a few seconds so that the receiver's engine - // gets the data and applies it (longer than the - // Ethereum engine since we're using a public - // testnet here) - delay(10000) - .map_err(move |_| panic!("Weird error.")) - .and_then(move |_| { - get_balance(1, node2_http, "alice").and_then(move |ret| { - assert_eq!(ret, -10); - alice_engine_redis.kill().unwrap(); - engine_alice.kill().unwrap(); - bob_engine_redis.kill().unwrap(); - engine_bob.kill().unwrap(); - Ok(()) + // Up to here, Alice's balance should be -69 and Bob's + // balance should be 69. Once we make 1 more payment, we + // exceed the settle_threshold and thus a settlement is made + .and_then(move |_| send4) + .and_then(move |_| { + // Since the credit connection reached -70, and the + // settle_to is -10, a 60 drops transaction is made. + get_balance(bob, node1_http, "bob").and_then(move |ret| { + assert_eq!(ret, 10); + // Wait a few seconds so that the receiver's engine + // gets the data and applies it (longer than the + // Ethereum engine since we're using a public + // testnet here) + delay(10000) + .map_err(move |_| panic!("Weird error.")) + .and_then(move |_| { + get_balance(alice, node2_http, "alice").and_then( + move |ret| { + assert_eq!(ret, -10); + alice_engine_redis.kill().unwrap(); + engine_alice.kill().unwrap(); + bob_engine_redis.kill().unwrap(); + engine_bob.kill().unwrap(); + Ok(()) + }, + ) }) - }) + }) }) - }) + }) }), ) .unwrap(); diff --git a/crates/interledger-store-redis/Cargo.toml b/crates/interledger-store-redis/Cargo.toml index 4b4a6fc34..1f7d5064a 100644 --- a/crates/interledger-store-redis/Cargo.toml +++ b/crates/interledger-store-redis/Cargo.toml @@ -33,8 +33,10 @@ serde = { version = "1.0.89", features = ["derive"] } stream-cancel = "0.4.4" tokio-executor = "0.1.6" tokio-timer = "0.2.10" -url = "1.7.2" +url = { version = "1.7.2", features = ["serde"] } http = "0.1.17" +uuid = { version = "0.7.4", features = ["serde"] } +url_serde = "0.2.0" [dev-dependencies] diff --git a/crates/interledger-store-redis/src/account.rs b/crates/interledger-store-redis/src/account.rs index 32e0d9770..cb02d7c87 100644 --- a/crates/interledger-store-redis/src/account.rs +++ b/crates/interledger-store-redis/src/account.rs @@ -15,20 +15,22 @@ use log::error; use redis::{from_redis_value, ErrorKind, FromRedisValue, RedisError, ToRedisArgs, Value}; use ring::aead; -use serde::Serialize; use serde::Serializer; +use serde::{Deserialize, Serialize}; +use std::fmt::Display; use std::{ collections::HashMap, convert::TryFrom, str::{self, FromStr}, }; +use uuid::{parser::ParseError, Uuid}; use url::Url; const ACCOUNT_DETAILS_FIELDS: usize = 21; -#[derive(Clone, Debug, Serialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct Account { - pub(crate) id: u64, + pub(crate) id: AccountId, #[serde(serialize_with = "address_to_string")] pub(crate) ilp_address: Address, // TODO add additional routes @@ -36,24 +38,23 @@ pub struct Account { pub(crate) asset_scale: u8, pub(crate) max_packet_amount: u64, pub(crate) min_balance: Option, - #[serde(serialize_with = "optional_url_to_string")] + #[serde(with = "url_serde")] pub(crate) http_endpoint: Option, #[serde(serialize_with = "optional_bytes_to_utf8")] pub(crate) http_outgoing_token: Option, - #[serde(serialize_with = "optional_url_to_string")] + #[serde(with = "url_serde")] pub(crate) btp_uri: Option, #[serde(serialize_with = "optional_bytes_to_utf8")] pub(crate) btp_outgoing_token: Option, pub(crate) settle_threshold: Option, pub(crate) settle_to: Option, - #[serde(serialize_with = "routing_relation_to_string")] pub(crate) routing_relation: RoutingRelation, pub(crate) send_routes: bool, pub(crate) receive_routes: bool, pub(crate) round_trip_time: u64, pub(crate) packets_per_minute_limit: Option, pub(crate) amount_per_minute_limit: Option, - #[serde(serialize_with = "optional_url_to_string")] + #[serde(with = "url_serde")] pub(crate) settlement_engine_url: Option, } @@ -75,31 +76,8 @@ where } } -fn optional_url_to_string(url: &Option, serializer: S) -> Result -where - S: Serializer, -{ - if let Some(ref url) = url { - serializer.serialize_str(url.as_ref()) - } else { - serializer.serialize_none() - } -} - -// This needs to be pass by ref because serde expects this function to take a ref -#[allow(clippy::trivially_copy_pass_by_ref)] -fn routing_relation_to_string( - relation: &RoutingRelation, - serializer: S, -) -> Result -where - S: Serializer, -{ - serializer.serialize_str(relation.to_string().as_str()) -} - impl Account { - pub fn try_from(id: u64, details: AccountDetails) -> Result { + pub fn try_from(id: AccountId, details: AccountDetails) -> Result { let http_endpoint = if let Some(ref url) = details.http_endpoint { Some(Url::parse(url).map_err(|err| error!("Invalid URL: {:?}", err))?) } else { @@ -180,6 +158,50 @@ impl AccountWithEncryptedTokens { } } +// Uuid does not implement ToRedisArgs and FromRedisValue. +// Rust does not allow implementing foreign traits on foreign data types. +// As a result, we wrap Uuid in a local data type, and implement the necessary +// traits for that. +#[derive(Eq, PartialEq, Hash, Debug, Default, Serialize, Deserialize, Copy, Clone)] +pub struct AccountId(Uuid); + +impl AccountId { + pub fn new() -> Self { + let uid = Uuid::new_v4(); + AccountId(uid) + } +} + +impl FromStr for AccountId { + type Err = ParseError; + + fn from_str(src: &str) -> Result { + let uid = Uuid::from_str(&src)?; + Ok(AccountId(uid)) + } +} + +impl Display for AccountId { + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> Result<(), ::std::fmt::Error> { + f.write_str(&self.0.to_hyphenated().to_string()) + } +} + +impl ToRedisArgs for AccountId { + fn write_redis_args(&self, out: &mut Vec>) { + self.0.to_hyphenated().to_string().write_redis_args(out); + } +} + +impl FromRedisValue for AccountId { + fn from_redis_value(v: &Value) -> Result { + let account_id = String::from_redis_value(v)?; + let uid = Uuid::from_str(&account_id) + .map_err(|_| RedisError::from((ErrorKind::TypeError, "Invalid account id string")))?; + Ok(AccountId(uid)) + } +} + impl ToRedisArgs for AccountWithEncryptedTokens { fn write_redis_args(&self, out: &mut Vec>) { let mut rv = Vec::with_capacity(ACCOUNT_DETAILS_FIELDS * 2); @@ -366,7 +388,7 @@ fn get_bool(key: &str, map: &HashMap) -> bool { } impl AccountTrait for Account { - type AccountId = u64; + type AccountId = AccountId; fn id(&self) -> Self::AccountId { self.id @@ -489,8 +511,9 @@ mod redis_account { #[test] fn from_account_details() { - let account = Account::try_from(10, ACCOUNT_DETAILS.clone()).unwrap(); - assert_eq!(account.id(), 10); + let id = AccountId::new(); + let account = Account::try_from(id, ACCOUNT_DETAILS.clone()).unwrap(); + assert_eq!(account.id(), id); assert_eq!( account.get_http_auth_token().unwrap(), "outgoing_auth_token" diff --git a/crates/interledger-store-redis/src/lib.rs b/crates/interledger-store-redis/src/lib.rs index c1ec38a59..13236448a 100644 --- a/crates/interledger-store-redis/src/lib.rs +++ b/crates/interledger-store-redis/src/lib.rs @@ -6,6 +6,6 @@ mod account; mod crypto; mod store; -pub use account::Account; +pub use account::{Account, AccountId}; pub use redis::{ConnectionInfo, IntoConnectionInfo}; pub use store::{RedisStore, RedisStoreBuilder}; diff --git a/crates/interledger-store-redis/src/store.rs b/crates/interledger-store-redis/src/store.rs index d3afe6aa3..d9a447a10 100644 --- a/crates/interledger-store-redis/src/store.rs +++ b/crates/interledger-store-redis/src/store.rs @@ -27,6 +27,7 @@ use futures::{ use log::{debug, error, trace, warn}; use std::collections::{HashMap, HashSet}; +use super::account::AccountId; use http::StatusCode; use interledger_api::{AccountDetails, NodeStore}; use interledger_btp::BtpStore; @@ -162,13 +163,12 @@ return balance + prepaid_amount"; static ROUTES_KEY: &str = "routes:current"; static RATES_KEY: &str = "rates:current"; static STATIC_ROUTES_KEY: &str = "routes:static"; -static NEXT_ACCOUNT_ID_KEY: &str = "next_account_id"; fn prefixed_idempotency_key(idempotency_key: String) -> String { format!("idempotency-key:{}", idempotency_key) } -fn account_details_key(account_id: u64) -> String { +fn account_details_key(account_id: AccountId) -> String { format!("accounts:{}", account_id) } @@ -271,19 +271,19 @@ impl RedisStoreBuilder { pub struct RedisStore { connection: Arc, exchange_rates: Arc>>, - routes: Arc>>, + routes: Arc>>, hmac_key: Arc, // redisstore stores a key, this must be protected encryption_key: Arc, decryption_key: Arc, } impl RedisStore { - fn get_next_account_id(&self) -> impl Future { - cmd("INCR") - .arg(NEXT_ACCOUNT_ID_KEY) - .query_async(self.connection.as_ref().clone()) - .map_err(|err| error!("Error incrementing account ID: {:?}", err)) - .and_then(|(_conn, next_account_id): (_, u64)| Ok(next_account_id - 1)) + pub fn get_all_accounts_ids(&self) -> impl Future, Error = ()> { + let mut pipe = redis::pipe(); + pipe.smembers("accounts"); + pipe.query_async(self.connection.as_ref().clone()) + .map_err(|err| error!("Error getting account IDs: {:?}", err)) + .and_then(|(_conn, account_ids): (_, Vec>)| Ok(account_ids[0].clone())) } fn create_new_account( @@ -308,12 +308,10 @@ impl RedisStore { .map(|token| hmac::sign(&self.hmac_key, token.as_bytes())); let http_incoming_token_hmac_clone = http_incoming_token_hmac; + let id = AccountId::new(); + debug!("Generated account: {}", id); Box::new( - self.get_next_account_id() - .and_then(|id| { - debug!("Next account id is: {}", id); - Account::try_from(id, account) - }) + result(Account::try_from(id, account)) .and_then(move |account| { // Check that there isn't already an account with values that must be unique let mut keys: Vec = vec!["ID".to_string()]; @@ -351,6 +349,9 @@ impl RedisStore { let mut pipe = redis::pipe(); pipe.atomic(); + // Add the account key to the list of accounts + pipe.sadd("accounts", id).ignore(); + // Set account details pipe.cmd("HMSET").arg(account_details_key(account.id)).arg(account.clone().encrypt_tokens(&encryption_key)) .ignore(); @@ -396,7 +397,7 @@ impl RedisStore { ) } - fn delete_account(&self, id: u64) -> Box + Send> { + fn delete_account(&self, id: AccountId) -> Box + Send> { let connection = self.connection.as_ref().clone(); let routing_table = self.routes.clone(); @@ -408,6 +409,8 @@ impl RedisStore { let mut pipe = redis::pipe(); pipe.atomic(); + pipe.srem("accounts", account.id).ignore(); + pipe.del(account_details_key(account.id)); if account.send_routes { @@ -440,7 +443,7 @@ impl RedisStore { fn retrieve_accounts( &self, - account_ids: Vec, + account_ids: Vec, ) -> Box, Error = ()> + Send> { let decryption_key = self.decryption_key.clone(); let num_accounts = account_ids.len(); @@ -673,37 +676,41 @@ impl BtpStore for RedisStore { .arg("btp_outgoing") .query_async(self.connection.as_ref().clone()) .map_err(|err| error!("Error getting members of set btp_outgoing: {:?}", err)) - .and_then(|(connection, account_ids): (SharedConnection, Vec)| { - if account_ids.is_empty() { - Either::A(ok(Vec::new())) - } else { - let mut pipe = redis::pipe(); - for id in account_ids { - pipe.hgetall(account_details_key(id)); - } - Either::B( - pipe.query_async(connection) - .map_err(|err| { - error!( + .and_then( + |(connection, account_ids): (SharedConnection, Vec)| { + if account_ids.is_empty() { + Either::A(ok(Vec::new())) + } else { + let mut pipe = redis::pipe(); + for id in account_ids { + pipe.hgetall(account_details_key(id)); + } + Either::B( + pipe.query_async(connection) + .map_err(|err| { + error!( "Error getting accounts with outgoing BTP details: {:?}", err ) - }) - .and_then( - move |(_connection, accounts): ( - SharedConnection, - Vec, - )| { - let accounts: Vec = accounts - .into_iter() - .map(|account| account.decrypt_tokens(&decryption_key)) - .collect(); - Ok(accounts) - }, - ), - ) - } - }), + }) + .and_then( + move |(_connection, accounts): ( + SharedConnection, + Vec, + )| { + let accounts: Vec = accounts + .into_iter() + .map(|account| { + account.decrypt_tokens(&decryption_key) + }) + .collect(); + Ok(accounts) + }, + ), + ) + } + }, + ), ) } } @@ -744,7 +751,7 @@ impl HttpStore for RedisStore { } impl RouterStore for RedisStore { - fn routing_table(&self) -> HashMap { + fn routing_table(&self) -> HashMap::AccountId> { self.routes.read().clone() } } @@ -759,46 +766,35 @@ impl NodeStore for RedisStore { self.create_new_account(account) } - fn remove_account(&self, id: u64) -> Box + Send> { + fn remove_account(&self, id: AccountId) -> Box + Send> { self.delete_account(id) } // TODO limit the number of results and page through them fn get_all_accounts(&self) -> Box, Error = ()> + Send> { let decryption_key = self.decryption_key.clone(); - Box::new( - cmd("GET") - .arg(NEXT_ACCOUNT_ID_KEY) - .query_async(self.connection.as_ref().clone()) + let mut pipe = redis::pipe(); + let connection = self.connection.clone(); + pipe.smembers("accounts"); + Box::new(self.get_all_accounts_ids().and_then(move |account_ids| { + let mut pipe = redis::pipe(); + for account_id in account_ids { + pipe.hgetall(account_details_key(account_id)); + } + + pipe.query_async(connection.as_ref().clone()) + .map_err(|err| error!("Error getting account ids: {:?}", err)) .and_then( - move |(connection, next_account_id): (SharedConnection, Option)| { - if let Some(next_account_id) = next_account_id { - if next_account_id > 0 { - trace!("Getting accounts up to id: {}", next_account_id); - let mut pipe = redis::pipe(); - for i in 0..next_account_id { - pipe.hgetall(account_details_key(i)); - } - return Either::A(pipe.query_async(connection).and_then( - move |(_, accounts): ( - _, - Vec>, - )| { - let accounts: Vec = accounts - .into_iter() - .filter_map(|a| a) - .map(|account| account.decrypt_tokens(&decryption_key)) - .collect(); - Ok(accounts) - }, - )); - } - } - Either::B(ok(Vec::new())) + move |(_, accounts): (_, Vec>)| { + let accounts: Vec = accounts + .into_iter() + .filter_map(|a| a) + .map(|account| account.decrypt_tokens(&decryption_key)) + .collect(); + Ok(accounts) }, ) - .map_err(|err| error!("Error getting all accounts: {:?}", err)), - ) + })) } fn set_rates(&self, rates: R) -> Box + Send> @@ -827,14 +823,14 @@ impl NodeStore for RedisStore { // takes the prefixes as Bytes and the account as an Account object fn set_static_routes(&self, routes: R) -> Box + Send> where - R: IntoIterator, + R: IntoIterator, { - let routes: Vec<(String, u64)> = routes.into_iter().collect(); - let accounts: HashSet = - HashSet::from_iter(routes.iter().map(|(_prefix, account_id)| *account_id)); + let routes: Vec<(String, AccountId)> = routes.into_iter().collect(); + let accounts: HashSet<_> = + HashSet::from_iter(routes.iter().map(|(_prefix, account_id)| account_id)); let mut pipe = redis::pipe(); for account_id in accounts { - pipe.exists(account_details_key(account_id)); + pipe.exists(account_details_key(*account_id)); } let routing_table = self.routes.clone(); @@ -866,7 +862,7 @@ impl NodeStore for RedisStore { fn set_static_route( &self, prefix: String, - account_id: u64, + account_id: AccountId, ) -> Box + Send> { let routing_table = self.routes.clone(); let prefix_clone = prefix.clone(); @@ -912,34 +908,41 @@ impl RouteManagerStore for RedisStore { .arg("send_routes_to") .query_async(self.connection.as_ref().clone()) .map_err(|err| error!("Error getting members of set send_routes_to: {:?}", err)) - .and_then(|(connection, account_ids): (SharedConnection, Vec)| { - if account_ids.is_empty() { - Either::A(ok(Vec::new())) - } else { - let mut pipe = redis::pipe(); - for id in account_ids { - pipe.hgetall(account_details_key(id)); + .and_then( + |(connection, account_ids): (SharedConnection, Vec)| { + if account_ids.is_empty() { + Either::A(ok(Vec::new())) + } else { + let mut pipe = redis::pipe(); + for id in account_ids { + pipe.hgetall(account_details_key(id)); + } + Either::B( + pipe.query_async(connection) + .map_err(|err| { + error!( + "Error getting accounts to send routes to: {:?}", + err + ) + }) + .and_then( + move |(_connection, accounts): ( + SharedConnection, + Vec, + )| { + let accounts: Vec = accounts + .into_iter() + .map(|account| { + account.decrypt_tokens(&decryption_key) + }) + .collect(); + Ok(accounts) + }, + ), + ) } - Either::B( - pipe.query_async(connection) - .map_err(|err| { - error!("Error getting accounts to send routes to: {:?}", err) - }) - .and_then( - move |(_connection, accounts): ( - SharedConnection, - Vec, - )| { - let accounts: Vec = accounts - .into_iter() - .map(|account| account.decrypt_tokens(&decryption_key)) - .collect(); - Ok(accounts) - }, - ), - ) - } - }), + }, + ), ) } @@ -957,37 +960,41 @@ impl RouteManagerStore for RedisStore { err ) }) - .and_then(|(connection, account_ids): (SharedConnection, Vec)| { - if account_ids.is_empty() { - Either::A(ok(Vec::new())) - } else { - let mut pipe = redis::pipe(); - for id in account_ids { - pipe.hgetall(account_details_key(id)); + .and_then( + |(connection, account_ids): (SharedConnection, Vec)| { + if account_ids.is_empty() { + Either::A(ok(Vec::new())) + } else { + let mut pipe = redis::pipe(); + for id in account_ids { + pipe.hgetall(account_details_key(id)); + } + Either::B( + pipe.query_async(connection) + .map_err(|err| { + error!( + "Error getting accounts to receive routes from: {:?}", + err + ) + }) + .and_then( + move |(_connection, accounts): ( + SharedConnection, + Vec, + )| { + let accounts: Vec = accounts + .into_iter() + .map(|account| { + account.decrypt_tokens(&decryption_key) + }) + .collect(); + Ok(accounts) + }, + ), + ) } - Either::B( - pipe.query_async(connection) - .map_err(|err| { - error!( - "Error getting accounts to receive routes from: {:?}", - err - ) - }) - .and_then( - move |(_connection, accounts): ( - SharedConnection, - Vec, - )| { - let accounts: Vec = accounts - .into_iter() - .map(|account| account.decrypt_tokens(&decryption_key)) - .collect(); - Ok(accounts) - }, - ), - ) - } - }), + }, + ), ) } @@ -1000,7 +1007,9 @@ impl RouteManagerStore for RedisStore { .query_async(self.connection.as_ref().clone()) .map_err(|err| error!("Error getting static routes: {:?}", err)) .and_then( - |(_, static_routes): (SharedConnection, Vec<(String, u64)>)| Ok(static_routes), + |(_, static_routes): (SharedConnection, Vec<(String, AccountId)>)| { + Ok(static_routes) + }, ); Box::new(self.get_all_accounts().join(get_static_routes).and_then( |(accounts, static_routes)| { @@ -1010,7 +1019,7 @@ impl RouteManagerStore for RedisStore { .map(|account| (account.ilp_address.to_bytes(), account.clone())), ); - let account_map: HashMap = HashMap::from_iter(accounts.iter().map(|account| (account.id, account))); + let account_map: HashMap = HashMap::from_iter(accounts.iter().map(|account| (account.id, account))); let configured_table: HashMap = HashMap::from_iter(static_routes.into_iter() .filter_map(|(prefix, account_id)| { if let Some(account) = account_map.get(&account_id) { @@ -1030,7 +1039,7 @@ impl RouteManagerStore for RedisStore { &mut self, routes: impl IntoIterator, ) -> Box + Send> { - let routes: Vec<(String, u64)> = routes + let routes: Vec<(String, AccountId)> = routes .into_iter() .filter_map(|(prefix, account)| { if let Ok(prefix) = String::from_utf8(prefix.to_vec()) { @@ -1229,7 +1238,7 @@ impl SettlementStore for RedisStore { fn update_balance_for_incoming_settlement( &self, - account_id: u64, + account_id: AccountId, amount: u64, idempotency_key: Option, ) -> Box + Send> { @@ -1250,7 +1259,7 @@ impl SettlementStore for RedisStore { fn refund_settlement( &self, - account_id: u64, + account_id: AccountId, settle_amount: u64, ) -> Box + Send> { trace!( @@ -1303,11 +1312,11 @@ fn update_rates( } // TODO replace this with pubsub when async pubsub is added upstream: https://github.com/mitsuhiko/redis-rs/issues/183 -type RouteVec = Vec<(String, u64)>; +type RouteVec = Vec<(String, AccountId)>; fn update_routes( connection: SharedConnection, - routing_table: Arc>>, + routing_table: Arc>>, ) -> impl Future { let mut pipe = redis::pipe(); pipe.hgetall(ROUTES_KEY).hgetall(STATIC_ROUTES_KEY); diff --git a/crates/interledger-store-redis/tests/accounts_test.rs b/crates/interledger-store-redis/tests/accounts_test.rs index 39fab80b0..210693ffd 100644 --- a/crates/interledger-store-redis/tests/accounts_test.rs +++ b/crates/interledger-store-redis/tests/accounts_test.rs @@ -7,18 +7,22 @@ use interledger_btp::BtpAccount; use interledger_http::HttpAccount; use interledger_ildcp::IldcpAccount; use interledger_packet::Address; -use interledger_service::Account as AccontTrait; +use interledger_service::Account as AccountTrait; use interledger_service::AccountStore; use interledger_service_util::BalanceStore; +use interledger_store_redis::AccountId; use std::str::FromStr; #[test] fn insert_accounts() { - block_on(test_store().and_then(|(store, context)| { + block_on(test_store().and_then(|(store, context, _accs)| { store .insert_account(ACCOUNT_DETAILS_2.clone()) .and_then(move |account| { - assert_eq!(account.id(), 2); + assert_eq!( + *account.client_address(), + Address::from_str("example.charlie").unwrap() + ); let _ = context; Ok(()) }) @@ -28,7 +32,7 @@ fn insert_accounts() { #[test] fn delete_accounts() { - block_on(test_store().and_then(|(store, context)| { + block_on(test_store().and_then(|(store, context, _accs)| { store.get_all_accounts().and_then(move |accounts| { let id = accounts[0].id(); store.remove_account(id).and_then(move |_| { @@ -47,8 +51,8 @@ fn delete_accounts() { #[test] fn starts_with_zero_balance() { - block_on(test_store().and_then(|(store, context)| { - let account0 = Account::try_from(0, ACCOUNT_DETAILS_0.clone()).unwrap(); + block_on(test_store().and_then(|(store, context, accs)| { + let account0 = accs[0].clone(); store.get_balance(account0).and_then(move |balance| { assert_eq!(balance, 0); let _ = context; @@ -62,7 +66,7 @@ fn starts_with_zero_balance() { fn fails_on_duplicate_http_incoming_auth() { let mut account = ACCOUNT_DETAILS_2.clone(); account.http_incoming_token = Some("incoming_auth_token".to_string()); - let result = block_on(test_store().and_then(|(store, context)| { + let result = block_on(test_store().and_then(|(store, context, _accs)| { store.insert_account(account).then(move |result| { let _ = context; result @@ -75,7 +79,7 @@ fn fails_on_duplicate_http_incoming_auth() { fn fails_on_duplicate_btp_incoming_auth() { let mut account = ACCOUNT_DETAILS_2.clone(); account.btp_incoming_token = Some("btp_token".to_string()); - let result = block_on(test_store().and_then(|(store, context)| { + let result = block_on(test_store().and_then(|(store, context, _accs)| { store.insert_account(account).then(move |result| { let _ = context; result @@ -86,7 +90,7 @@ fn fails_on_duplicate_btp_incoming_auth() { #[test] fn get_all_accounts() { - block_on(test_store().and_then(|(store, context)| { + block_on(test_store().and_then(|(store, context, _accs)| { store.get_all_accounts().and_then(move |accounts| { assert_eq!(accounts.len(), 2); let _ = context; @@ -98,63 +102,71 @@ fn get_all_accounts() { #[test] fn gets_single_account() { - block_on(test_store().and_then(|(store, context)| { - store.get_accounts(vec![1]).and_then(move |accounts| { - assert_eq!( - accounts[0].client_address(), - &Address::from_str("example.bob").unwrap() - ); - let _ = context; - Ok(()) - }) + block_on(test_store().and_then(|(store, context, accs)| { + let store_clone = store.clone(); + let acc = accs[0].clone(); + store_clone + .get_accounts(vec![acc.id()]) + .and_then(move |accounts| { + assert_eq!(accounts[0].client_address(), acc.client_address(),); + let _ = context; + Ok(()) + }) })) .unwrap(); } #[test] fn gets_multiple() { - block_on(test_store().and_then(|(store, context)| { - store.get_accounts(vec![1, 0]).and_then(move |accounts| { - // note reverse order is intentional - assert_eq!( - accounts[0].client_address(), - &Address::from_str("example.bob").unwrap() - ); - assert_eq!( - accounts[1].client_address(), - &Address::from_str("example.alice").unwrap() - ); - let _ = context; - Ok(()) - }) + block_on(test_store().and_then(|(store, context, accs)| { + let store_clone = store.clone(); + // set account ids in reverse order + let account_ids: Vec = accs.iter().rev().map(|a| a.id()).collect::<_>(); + store_clone + .get_accounts(account_ids) + .and_then(move |accounts| { + // note reverse order is intentional + assert_eq!(accounts[0].client_address(), accs[1].client_address(),); + assert_eq!(accounts[1].client_address(), accs[0].client_address(),); + let _ = context; + Ok(()) + }) })) .unwrap(); } #[test] -fn decrypts_outgoing_tokens() { - block_on(test_store().and_then(|(store, context)| { - store.get_accounts(vec![0]).and_then(move |accounts| { - let account = &accounts[0]; - assert_eq!( - account.get_http_auth_token().unwrap(), - "outgoing_auth_token" - ); - assert_eq!(account.get_btp_token().unwrap(), b"btp_token"); - let _ = context; - Ok(()) - }) +fn decrypts_outgoing_tokens_acc() { + block_on(test_store().and_then(|(store, context, accs)| { + let acc = accs[0].clone(); + store + .get_accounts(vec![acc.id()]) + .and_then(move |accounts| { + let account = accounts[0].clone(); + assert_eq!( + account.get_http_auth_token().unwrap(), + acc.get_http_auth_token().unwrap(), + ); + assert_eq!( + account.get_btp_token().unwrap(), + acc.get_btp_token().unwrap(), + ); + let _ = context; + Ok(()) + }) })) .unwrap() } #[test] fn errors_for_unknown_accounts() { - let result = block_on(test_store().and_then(|(store, context)| { - store.get_accounts(vec![0, 5]).then(move |result| { - let _ = context; - result - }) + let result = block_on(test_store().and_then(|(store, context, _accs)| { + store + .get_accounts(vec![AccountId::new(), AccountId::new()]) + .then(move |result| { + let _ = context; + result + }) })); assert!(result.is_err()); } diff --git a/crates/interledger-store-redis/tests/balances_test.rs b/crates/interledger-store-redis/tests/balances_test.rs index 40c4f7d2a..92ec94a89 100644 --- a/crates/interledger-store-redis/tests/balances_test.rs +++ b/crates/interledger-store-redis/tests/balances_test.rs @@ -8,15 +8,19 @@ use interledger_service::AccountStore; use interledger_service_util::BalanceStore; use std::str::FromStr; +use interledger_service::Account as AccountTrait; +use interledger_store_redis::{Account, AccountId}; + #[test] fn get_balance() { - block_on(test_store().and_then(|(store, context)| { + block_on(test_store().and_then(|(store, context, _accs)| { + let account_id = AccountId::new(); context .async_connection() - .map_err(|err| panic!(err)) - .and_then(|connection| { + .map_err(move |err| panic!(err)) + .and_then(move |connection| { redis::cmd("HMSET") - .arg("accounts:0") + .arg(format!("accounts:{}", account_id)) .arg("balance") .arg(600) .arg("prepaid_amount") @@ -24,7 +28,8 @@ fn get_balance() { .query_async(connection) .map_err(|err| panic!(err)) .and_then(move |(_, _): (_, redis::Value)| { - let account = Account::try_from(0, ACCOUNT_DETAILS_0.clone()).unwrap(); + let account = + Account::try_from(account_id, ACCOUNT_DETAILS_0.clone()).unwrap(); store.get_balance(account).and_then(move |balance| { assert_eq!(balance, 1000); let _ = context; @@ -38,12 +43,12 @@ fn get_balance() { #[test] fn prepare_then_fulfill_with_settlement() { - block_on(test_store().and_then(|(store, context)| { + block_on(test_store().and_then(|(store, context, accs)| { let store_clone_1 = store.clone(); let store_clone_2 = store.clone(); store .clone() - .get_accounts(vec![0, 1]) + .get_accounts(vec![accs[0].id(), accs[1].id()]) .map_err(|_err| panic!("Unable to get accounts")) .and_then(move |accounts| { let account0 = accounts[0].clone(); @@ -96,21 +101,24 @@ fn process_fulfill_no_settle_to() { acc.settle_to = None; acc }; - block_on(test_store().and_then(|(store, context)| { + block_on(test_store().and_then(|(store, context, _accs)| { let store_clone = store.clone(); - store.clone().insert_account(acc).and_then(move |_| { - store_clone.get_accounts(vec![2]).and_then(move |accounts| { - let acc = accounts[0].clone(); - store_clone - .clone() - .update_balances_for_fulfill(acc.clone(), 100) - .and_then(move |(balance, amount_to_settle)| { - assert_eq!(balance, 100); - assert_eq!(amount_to_settle, 0); - let _ = context; - Ok(()) - }) - }) + store.clone().insert_account(acc).and_then(move |account| { + let id = account.id(); + store_clone + .get_accounts(vec![id]) + .and_then(move |accounts| { + let acc = accounts[0].clone(); + store_clone + .clone() + .update_balances_for_fulfill(acc.clone(), 100) + .and_then(move |(balance, amount_to_settle)| { + assert_eq!(balance, 100); + assert_eq!(amount_to_settle, 0); + let _ = context; + Ok(()) + }) + }) }) })) .unwrap(); @@ -129,21 +137,24 @@ fn process_fulfill_settle_to_over_threshold() { acc.btp_incoming_token = None; acc }; - block_on(test_store().and_then(|(store, context)| { + block_on(test_store().and_then(|(store, context, _accs)| { let store_clone = store.clone(); - store.clone().insert_account(acc).and_then(move |_| { - store_clone.get_accounts(vec![2]).and_then(move |accounts| { - let acc = accounts[0].clone(); - store_clone - .clone() - .update_balances_for_fulfill(acc.clone(), 1000) - .and_then(move |(balance, amount_to_settle)| { - assert_eq!(balance, 1000); - assert_eq!(amount_to_settle, 0); - let _ = context; - Ok(()) - }) - }) + store.clone().insert_account(acc).and_then(move |acc| { + let id = acc.id(); + store_clone + .get_accounts(vec![id]) + .and_then(move |accounts| { + let acc = accounts[0].clone(); + store_clone + .clone() + .update_balances_for_fulfill(acc.clone(), 1000) + .and_then(move |(balance, amount_to_settle)| { + assert_eq!(balance, 1000); + assert_eq!(amount_to_settle, 0); + let _ = context; + Ok(()) + }) + }) }) })) .unwrap(); @@ -162,21 +173,24 @@ fn process_fulfill_ok() { acc.btp_incoming_token = None; acc }; - block_on(test_store().and_then(|(store, context)| { + block_on(test_store().and_then(|(store, context, _accs)| { let store_clone = store.clone(); - store.clone().insert_account(acc).and_then(move |_| { - store_clone.get_accounts(vec![2]).and_then(move |accounts| { - let acc = accounts[0].clone(); - store_clone - .clone() - .update_balances_for_fulfill(acc.clone(), 101) - .and_then(move |(balance, amount_to_settle)| { - assert_eq!(balance, 0); - assert_eq!(amount_to_settle, 101); - let _ = context; - Ok(()) - }) - }) + store.clone().insert_account(acc).and_then(move |account| { + let id = account.id(); + store_clone + .get_accounts(vec![id]) + .and_then(move |accounts| { + let acc = accounts[0].clone(); + store_clone + .clone() + .update_balances_for_fulfill(acc.clone(), 101) + .and_then(move |(balance, amount_to_settle)| { + assert_eq!(balance, 0); + assert_eq!(amount_to_settle, 101); + let _ = context; + Ok(()) + }) + }) }) })) .unwrap(); @@ -184,12 +198,12 @@ fn process_fulfill_ok() { #[test] fn prepare_then_reject() { - block_on(test_store().and_then(|(store, context)| { + block_on(test_store().and_then(|(store, context, accs)| { let store_clone_1 = store.clone(); let store_clone_2 = store.clone(); store .clone() - .get_accounts(vec![0, 1]) + .get_accounts(vec![accs[0].id(), accs[1].id()]) .map_err(|_err| panic!("Unable to get accounts")) .and_then(move |accounts| { let account0 = accounts[0].clone(); @@ -231,10 +245,10 @@ fn prepare_then_reject() { #[test] fn enforces_minimum_balance() { - block_on(test_store().and_then(|(store, context)| { + block_on(test_store().and_then(|(store, context, accs)| { store .clone() - .get_accounts(vec![0, 1]) + .get_accounts(vec![accs[0].id(), accs[1].id()]) .map_err(|_err| panic!("Unable to get accounts")) .and_then(move |accounts| { store @@ -253,16 +267,16 @@ fn enforces_minimum_balance() { // Prepare and Fulfill a packet for 100 units from Account 0 to Account 1 // Then, Prepare and Fulfill a packet for 80 units from Account 1 to Account 0 fn netting_fulfilled_balances() { - block_on(test_store().and_then(|(store, context)| { + block_on(test_store().and_then(|(store, context, accs)| { let store_clone1 = store.clone(); let store_clone2 = store.clone(); store .clone() .insert_account(ACCOUNT_DETAILS_2.clone()) - .and_then(move |_| { + .and_then(move |acc| { store .clone() - .get_accounts(vec![0, 2]) + .get_accounts(vec![accs[0].id(), acc.id()]) .map_err(|_err| panic!("Unable to get accounts")) .and_then(move |accounts| { let account0 = accounts[0].clone(); diff --git a/crates/interledger-store-redis/tests/btp_test.rs b/crates/interledger-store-redis/tests/btp_test.rs index e9898d6a9..64763ef7c 100644 --- a/crates/interledger-store-redis/tests/btp_test.rs +++ b/crates/interledger-store-redis/tests/btp_test.rs @@ -3,15 +3,20 @@ mod common; use common::*; use interledger_btp::{BtpAccount, BtpStore}; use interledger_http::HttpAccount; -use interledger_service::Account as AccountTrait; +use interledger_ildcp::IldcpAccount; +use interledger_packet::Address; +use std::str::FromStr; #[test] fn gets_account_from_btp_token() { - block_on(test_store().and_then(|(store, context)| { + block_on(test_store().and_then(|(store, context, _accs)| { store .get_account_from_btp_token("other_btp_token") .and_then(move |account| { - assert_eq!(account.id(), 1); + assert_eq!( + *account.client_address(), + Address::from_str("example.bob").unwrap() + ); let _ = context; Ok(()) }) @@ -20,8 +25,8 @@ fn gets_account_from_btp_token() { } #[test] -fn decrypts_outgoing_tokens() { - block_on(test_store().and_then(|(store, context)| { +fn decrypts_outgoing_tokens_btp() { + block_on(test_store().and_then(|(store, context, _accs)| { store .get_account_from_btp_token("other_btp_token") .and_then(move |account| { @@ -42,7 +47,7 @@ fn decrypts_outgoing_tokens() { #[test] fn errors_on_unknown_btp_token() { - let result = block_on(test_store().and_then(|(store, context)| { + let result = block_on(test_store().and_then(|(store, context, _accs)| { store .get_account_from_btp_token("unknown_btp_token") .then(move |result| { diff --git a/crates/interledger-store-redis/tests/common/store_helpers.rs b/crates/interledger-store-redis/tests/common/store_helpers.rs index a436bb0c1..ae4ac7a71 100644 --- a/crates/interledger-store-redis/tests/common/store_helpers.rs +++ b/crates/interledger-store-redis/tests/common/store_helpers.rs @@ -3,7 +3,7 @@ use super::redis_helpers::*; use env_logger; use futures::Future; use interledger_api::NodeStore; -use interledger_store_redis::{RedisStore, RedisStoreBuilder}; +use interledger_store_redis::{Account, RedisStore, RedisStoreBuilder}; use lazy_static::lazy_static; use parking_lot::Mutex; use tokio::runtime::Runtime; @@ -12,17 +12,25 @@ lazy_static! { static ref TEST_MUTEX: Mutex<()> = Mutex::new(()); } -pub fn test_store() -> impl Future { +pub fn test_store() -> impl Future), Error = ()> { let context = TestContext::new(); RedisStoreBuilder::new(context.get_client_connection_info(), [0; 32]) .connect() .and_then(|store| { let store_clone = store.clone(); + let mut accs = Vec::new(); store .clone() .insert_account(ACCOUNT_DETAILS_0.clone()) - .and_then(move |_| store_clone.insert_account(ACCOUNT_DETAILS_1.clone())) - .and_then(|_| Ok((store, context))) + .and_then(move |acc| { + accs.push(acc.clone()); + store_clone + .insert_account(ACCOUNT_DETAILS_1.clone()) + .and_then(move |acc| { + accs.push(acc.clone()); + Ok((store, context, accs)) + }) + }) }) } diff --git a/crates/interledger-store-redis/tests/http_test.rs b/crates/interledger-store-redis/tests/http_test.rs index 46157fafc..b4153a3a9 100644 --- a/crates/interledger-store-redis/tests/http_test.rs +++ b/crates/interledger-store-redis/tests/http_test.rs @@ -3,15 +3,20 @@ mod common; use common::*; use interledger_btp::BtpAccount; use interledger_http::{HttpAccount, HttpStore}; -use interledger_service::Account as AccountTrait; +use interledger_ildcp::IldcpAccount; +use interledger_packet::Address; +use std::str::FromStr; #[test] fn gets_account_from_http_bearer_token() { - block_on(test_store().and_then(|(store, context)| { + block_on(test_store().and_then(|(store, context, _accs)| { store .get_account_from_http_token("incoming_auth_token") .and_then(move |account| { - assert_eq!(account.id(), 0); + assert_eq!( + *account.client_address(), + Address::from_str("example.alice").unwrap() + ); assert_eq!( account.get_http_auth_token().unwrap(), "outgoing_auth_token" @@ -25,8 +30,8 @@ fn gets_account_from_http_bearer_token() { } #[test] -fn decrypts_outgoing_tokens() { - block_on(test_store().and_then(|(store, context)| { +fn decrypts_outgoing_tokens_http() { + block_on(test_store().and_then(|(store, context, _accs)| { store .get_account_from_http_token("incoming_auth_token") .and_then(move |account| { @@ -44,7 +49,7 @@ fn decrypts_outgoing_tokens() { #[test] fn errors_on_unknown_http_auth() { - let result = block_on(test_store().and_then(|(store, context)| { + let result = block_on(test_store().and_then(|(store, context, _accs)| { store .get_account_from_http_token("unknown_token") .then(move |result| { diff --git a/crates/interledger-store-redis/tests/rate_limiting_test.rs b/crates/interledger-store-redis/tests/rate_limiting_test.rs index ee04cc39a..cbca4f6cb 100644 --- a/crates/interledger-store-redis/tests/rate_limiting_test.rs +++ b/crates/interledger-store-redis/tests/rate_limiting_test.rs @@ -2,11 +2,12 @@ mod common; use common::*; use futures::future::join_all; use interledger_service_util::{RateLimitError, RateLimitStore}; +use interledger_store_redis::AccountId; #[test] fn rate_limits_number_of_packets() { - block_on(test_store().and_then(|(store, context)| { - let account = Account::try_from(0, ACCOUNT_DETAILS_0.clone()).unwrap(); + block_on(test_store().and_then(|(store, context, _accs)| { + let account = Account::try_from(AccountId::new(), ACCOUNT_DETAILS_0.clone()).unwrap(); join_all(vec![ store.clone().apply_rate_limits(account.clone(), 10), store.clone().apply_rate_limits(account.clone(), 10), @@ -24,8 +25,8 @@ fn rate_limits_number_of_packets() { #[test] fn limits_amount_throughput() { - block_on(test_store().and_then(|(store, context)| { - let account = Account::try_from(1, ACCOUNT_DETAILS_1.clone()).unwrap(); + block_on(test_store().and_then(|(store, context, _accs)| { + let account = Account::try_from(AccountId::new(), ACCOUNT_DETAILS_1.clone()).unwrap(); join_all(vec![ store.clone().apply_rate_limits(account.clone(), 500), store.clone().apply_rate_limits(account.clone(), 500), @@ -43,8 +44,8 @@ fn limits_amount_throughput() { #[test] fn refunds_throughput_limit_for_rejected_packets() { - block_on(test_store().and_then(|(store, context)| { - let account = Account::try_from(1, ACCOUNT_DETAILS_1.clone()).unwrap(); + block_on(test_store().and_then(|(store, context, _accs)| { + let account = Account::try_from(AccountId::new(), ACCOUNT_DETAILS_1.clone()).unwrap(); join_all(vec![ store.clone().apply_rate_limits(account.clone(), 500), store.clone().apply_rate_limits(account.clone(), 500), diff --git a/crates/interledger-store-redis/tests/rates_test.rs b/crates/interledger-store-redis/tests/rates_test.rs index ed8772b0f..f215ba429 100644 --- a/crates/interledger-store-redis/tests/rates_test.rs +++ b/crates/interledger-store-redis/tests/rates_test.rs @@ -8,7 +8,7 @@ use tokio_timer::sleep; #[test] fn set_rates() { - block_on(test_store().and_then(|(store, context)| { + block_on(test_store().and_then(|(store, context, _accs)| { let store_clone = store.clone(); let rates = store.get_exchange_rates(&["ABC", "XYZ"]); assert!(rates.is_err()); diff --git a/crates/interledger-store-redis/tests/routing_test.rs b/crates/interledger-store-redis/tests/routing_test.rs index ed08b99d2..bf3f85c4f 100644 --- a/crates/interledger-store-redis/tests/routing_test.rs +++ b/crates/interledger-store-redis/tests/routing_test.rs @@ -4,9 +4,11 @@ use bytes::Bytes; use common::*; use interledger_api::{AccountDetails, NodeStore}; use interledger_ccp::RouteManagerStore; +use interledger_ildcp::IldcpAccount; use interledger_packet::Address; use interledger_router::RouterStore; use interledger_service::Account as AccountTrait; +use interledger_store_redis::AccountId; use std::str::FromStr; use std::{collections::HashMap, time::Duration}; use tokio_timer::sleep; @@ -26,71 +28,89 @@ fn polls_for_route_updates() { store .clone() .insert_account(ACCOUNT_DETAILS_0.clone()) - .and_then(move |_| { + .and_then(move |alice| { let routing_table = store_clone_1.routing_table(); assert_eq!(routing_table.len(), 1); assert_eq!( *routing_table.get(&Bytes::from("example.alice")).unwrap(), - 0 + alice.id() ); - store_clone_1.insert_account(AccountDetails { - ilp_address: Address::from_str("example.bob").unwrap(), - asset_scale: 6, - asset_code: "XYZ".to_string(), - max_packet_amount: 1000, - min_balance: Some(-1000), - http_endpoint: None, - http_incoming_token: None, - http_outgoing_token: None, - btp_uri: None, - btp_incoming_token: None, - settle_threshold: None, - settle_to: None, - send_routes: false, - receive_routes: false, - routing_relation: None, - round_trip_time: None, - amount_per_minute_limit: None, - packets_per_minute_limit: None, - settlement_engine_url: None, - }) - }) - .and_then(move |_| { - let routing_table = store_clone_2.routing_table(); - assert_eq!(routing_table.len(), 2); - assert_eq!(*routing_table.get(&Bytes::from("example.bob")).unwrap(), 1); - connection - .map_err(|err| panic!(err)) - .and_then(|connection| { - redis::cmd("HMSET") - .arg("routes:current") - .arg("example.alice") - .arg(1) - .arg("example.charlie") - .arg(0) - .query_async(connection) - .and_then(|(_connection, _result): (_, redis::Value)| Ok(())) - .map_err(|err| panic!(err)) - .and_then(|_| sleep(Duration::from_millis(10)).then(|_| Ok(()))) + store_clone_1 + .insert_account(AccountDetails { + ilp_address: Address::from_str("example.bob").unwrap(), + asset_scale: 6, + asset_code: "XYZ".to_string(), + max_packet_amount: 1000, + min_balance: Some(-1000), + http_endpoint: None, + http_incoming_token: None, + http_outgoing_token: None, + btp_uri: None, + btp_incoming_token: None, + settle_threshold: None, + settle_to: None, + send_routes: false, + receive_routes: false, + routing_relation: None, + round_trip_time: None, + amount_per_minute_limit: None, + packets_per_minute_limit: None, + settlement_engine_url: None, }) - .and_then(move |_| { + .and_then(move |bob| { let routing_table = store_clone_2.routing_table(); - assert_eq!(routing_table.len(), 3); - assert_eq!( - *routing_table.get(&Bytes::from("example.alice")).unwrap(), - 1 - ); + assert_eq!(routing_table.len(), 2); assert_eq!( *routing_table.get(&Bytes::from("example.bob")).unwrap(), - 1 + bob.id(), ); - assert_eq!( - *routing_table.get(&Bytes::from("example.charlie")).unwrap(), - 0 - ); - assert!(routing_table.get(&Bytes::from("example.other")).is_none()); - let _ = context; - Ok(()) + let alice_id = alice.id(); + let bob_id = bob.id(); + connection + .map_err(|err| panic!(err)) + .and_then(move |connection| { + redis::cmd("HMSET") + .arg("routes:current") + .arg("example.alice") + .arg(bob_id) + .arg("example.charlie") + .arg(alice_id) + .query_async(connection) + .and_then( + |(_connection, _result): (_, redis::Value)| Ok(()), + ) + .map_err(|err| panic!(err)) + .and_then(|_| { + sleep(Duration::from_millis(10)).then(|_| Ok(())) + }) + }) + .and_then(move |_| { + let routing_table = store_clone_2.routing_table(); + assert_eq!(routing_table.len(), 3); + assert_eq!( + *routing_table + .get(&Bytes::from("example.alice")) + .unwrap(), + bob_id + ); + assert_eq!( + *routing_table + .get(&Bytes::from("example.bob")) + .unwrap(), + bob.id(), + ); + assert_eq!( + *routing_table + .get(&Bytes::from("example.charlie")) + .unwrap(), + alice_id, + ); + assert!(routing_table + .get(&Bytes::from("example.other")) + .is_none()); + let _ = context; + Ok(()) + }) }) }) }), @@ -100,11 +120,14 @@ fn polls_for_route_updates() { #[test] fn gets_accounts_to_send_routes_to() { - block_on(test_store().and_then(|(store, context)| { + block_on(test_store().and_then(|(store, context, _accs)| { store .get_accounts_to_send_routes_to() .and_then(move |accounts| { - assert_eq!(accounts[0].id(), 1); + assert_eq!( + *accounts[0].client_address(), + Address::from_str("example.bob").unwrap() + ); assert_eq!(accounts.len(), 1); let _ = context; Ok(()) @@ -115,11 +138,14 @@ fn gets_accounts_to_send_routes_to() { #[test] fn gets_accounts_to_receive_routes_from() { - block_on(test_store().and_then(|(store, context)| { + block_on(test_store().and_then(|(store, context, _accs)| { store .get_accounts_to_receive_routes_from() .and_then(move |accounts| { - assert_eq!(accounts[0].id(), 0); + assert_eq!( + *accounts[0].client_address(), + Address::from_str("example.alice").unwrap() + ); assert_eq!(accounts.len(), 1); let _ = context; Ok(()) @@ -130,7 +156,7 @@ fn gets_accounts_to_receive_routes_from() { #[test] fn gets_local_and_configured_routes() { - block_on(test_store().and_then(|(store, context)| { + block_on(test_store().and_then(|(store, context, _accs)| { store .get_local_and_configured_routes() .and_then(move |(local, configured)| { @@ -145,10 +171,12 @@ fn gets_local_and_configured_routes() { #[test] fn saves_routes_to_db() { - block_on(test_store().and_then(|(mut store, context)| { + block_on(test_store().and_then(|(mut store, context, _accs)| { let get_connection = context.async_connection(); - let account0 = Account::try_from(0, ACCOUNT_DETAILS_0.clone()).unwrap(); - let account1 = Account::try_from(1, ACCOUNT_DETAILS_1.clone()).unwrap(); + let account0_id = AccountId::new(); + let account1_id = AccountId::new(); + let account0 = Account::try_from(account0_id, ACCOUNT_DETAILS_0.clone()).unwrap(); + let account1 = Account::try_from(account1_id, ACCOUNT_DETAILS_1.clone()).unwrap(); store .set_routes(vec![ (Bytes::from("example.a"), account0.clone()), @@ -156,15 +184,15 @@ fn saves_routes_to_db() { (Bytes::from("example.c"), account1.clone()), ]) .and_then(move |_| { - get_connection.and_then(|connection| { + get_connection.and_then(move |connection| { redis::cmd("HGETALL") .arg("routes:current") .query_async(connection) .map_err(|err| panic!(err)) - .and_then(|(_conn, routes): (_, HashMap)| { - assert_eq!(routes["example.a"], 0); - assert_eq!(routes["example.b"], 0); - assert_eq!(routes["example.c"], 1); + .and_then(move |(_conn, routes): (_, HashMap)| { + assert_eq!(routes["example.a"], account0_id); + assert_eq!(routes["example.b"], account0_id); + assert_eq!(routes["example.c"], account1_id); assert_eq!(routes.len(), 3); Ok(()) }) @@ -180,9 +208,11 @@ fn saves_routes_to_db() { #[test] fn updates_local_routes() { - block_on(test_store().and_then(|(store, context)| { - let account0 = Account::try_from(0, ACCOUNT_DETAILS_0.clone()).unwrap(); - let account1 = Account::try_from(1, ACCOUNT_DETAILS_1.clone()).unwrap(); + block_on(test_store().and_then(|(store, context, _accs)| { + let account0_id = AccountId::new(); + let account1_id = AccountId::new(); + let account0 = Account::try_from(account0_id, ACCOUNT_DETAILS_0.clone()).unwrap(); + let account1 = Account::try_from(account1_id, ACCOUNT_DETAILS_1.clone()).unwrap(); store .clone() .set_routes(vec![ @@ -192,9 +222,9 @@ fn updates_local_routes() { ]) .and_then(move |_| { let routes = store.routing_table(); - assert_eq!(routes[&b"example.a"[..]], 0); - assert_eq!(routes[&b"example.b"[..]], 0); - assert_eq!(routes[&b"example.c"[..]], 1); + assert_eq!(routes[&b"example.a"[..]], account0_id); + assert_eq!(routes[&b"example.b"[..]], account0_id); + assert_eq!(routes[&b"example.c"[..]], account1_id); assert_eq!(routes.len(), 3); Ok(()) }) @@ -208,14 +238,14 @@ fn updates_local_routes() { #[test] fn adds_static_routes_to_redis() { - block_on(test_store().and_then(|(store, context)| { + block_on(test_store().and_then(|(store, context, accs)| { let get_connection = context.async_connection(); store .clone() .set_static_routes(vec![ - ("example.a".to_string(), 0), - ("example.b".to_string(), 0), - ("example.c".to_string(), 1), + ("example.a".to_string(), accs[0].id()), + ("example.b".to_string(), accs[0].id()), + ("example.c".to_string(), accs[1].id()), ]) .and_then(move |_| { get_connection.and_then(|connection| { @@ -223,10 +253,10 @@ fn adds_static_routes_to_redis() { .arg("routes:static") .query_async(connection) .map_err(|err| panic!(err)) - .and_then(move |(_, routes): (_, HashMap)| { - assert_eq!(routes["example.a"], 0); - assert_eq!(routes["example.b"], 0); - assert_eq!(routes["example.c"], 1); + .and_then(move |(_, routes): (_, HashMap)| { + assert_eq!(routes["example.a"], accs[0].id()); + assert_eq!(routes["example.b"], accs[0].id()); + assert_eq!(routes["example.c"], accs[1].id()); assert_eq!(routes.len(), 3); let _ = context; Ok(()) @@ -239,30 +269,32 @@ fn adds_static_routes_to_redis() { #[test] fn static_routes_override_others() { - block_on(test_store().and_then(|(store, context)| { + block_on(test_store().and_then(|(store, context, accs)| { let mut store_clone = store.clone(); store .clone() .set_static_routes(vec![ - ("example.a".to_string(), 0), - ("example.b".to_string(), 0), + ("example.a".to_string(), accs[0].id()), + ("example.b".to_string(), accs[0].id()), ]) .and_then(move |_| { - let account1 = Account::try_from(1, ACCOUNT_DETAILS_1.clone()).unwrap(); - store_clone.set_routes(vec![ - (Bytes::from("example.a"), account1.clone()), - (Bytes::from("example.b"), account1.clone()), - (Bytes::from("example.c"), account1), - ]) - }) - .and_then(move |_| { - let routes = store.routing_table(); - assert_eq!(routes[&b"example.a"[..]], 0); - assert_eq!(routes[&b"example.b"[..]], 0); - assert_eq!(routes[&b"example.c"[..]], 1); - assert_eq!(routes.len(), 3); - let _ = context; - Ok(()) + let account1_id = AccountId::new(); + let account1 = Account::try_from(account1_id, ACCOUNT_DETAILS_1.clone()).unwrap(); + store_clone + .set_routes(vec![ + (Bytes::from("example.a"), account1.clone()), + (Bytes::from("example.b"), account1.clone()), + (Bytes::from("example.c"), account1), + ]) + .and_then(move |_| { + let routes = store.routing_table(); + assert_eq!(routes[&b"example.a"[..]], accs[0].id()); + assert_eq!(routes[&b"example.b"[..]], accs[0].id()); + assert_eq!(routes[&b"example.c"[..]], account1_id); + assert_eq!(routes.len(), 3); + let _ = context; + Ok(()) + }) }) })) .unwrap() @@ -270,18 +302,18 @@ fn static_routes_override_others() { #[test] fn returns_configured_routes_for_route_manager() { - block_on(test_store().and_then(|(store, context)| { + block_on(test_store().and_then(|(store, context, accs)| { store .clone() .set_static_routes(vec![ - ("example.a".to_string(), 0), - ("example.b".to_string(), 1), + ("example.a".to_string(), accs[0].id()), + ("example.b".to_string(), accs[1].id()), ]) .and_then(move |_| store.get_local_and_configured_routes()) .and_then(move |(_local, configured)| { assert_eq!(configured.len(), 2); - assert_eq!(configured[&b"example.a"[..]].id(), 0); - assert_eq!(configured[&b"example.b"[..]].id(), 1); + assert_eq!(configured[&b"example.a"[..]].id(), accs[0].id()); + assert_eq!(configured[&b"example.b"[..]].id(), accs[1].id()); let _ = context; Ok(()) }) diff --git a/crates/interledger-store-redis/tests/settlement_test.rs b/crates/interledger-store-redis/tests/settlement_test.rs index b7fbc0f24..d03fde1b4 100644 --- a/crates/interledger-store-redis/tests/settlement_test.rs +++ b/crates/interledger-store-redis/tests/settlement_test.rs @@ -3,6 +3,7 @@ mod common; use bytes::Bytes; use common::*; use http::StatusCode; +use interledger_service::Account; use interledger_settlement::{IdempotentStore, SettlementStore}; use lazy_static::lazy_static; use redis::{cmd, r#async::SharedConnection}; @@ -13,13 +14,14 @@ lazy_static! { #[test] fn credits_prepaid_amount() { - block_on(test_store().and_then(|(store, context)| { + block_on(test_store().and_then(|(store, context, accs)| { + let id = accs[0].id(); context.async_connection().and_then(move |conn| { store - .update_balance_for_incoming_settlement(0, 100, Some(IDEMPOTENCY_KEY.clone())) + .update_balance_for_incoming_settlement(id, 100, Some(IDEMPOTENCY_KEY.clone())) .and_then(move |_| { cmd("HMGET") - .arg("accounts:0") + .arg(format!("accounts:{}", id)) .arg("balance") .arg("prepaid_amount") .query_async(conn) @@ -38,7 +40,7 @@ fn credits_prepaid_amount() { #[test] fn saves_and_loads_idempotency_key_data_properly() { - block_on(test_store().and_then(|(store, context)| { + block_on(test_store().and_then(|(store, context, _accs)| { let input_hash: [u8; 32] = Default::default(); store .save_idempotent_data( @@ -75,13 +77,14 @@ fn saves_and_loads_idempotency_key_data_properly() { #[test] fn idempotent_settlement_calls() { - block_on(test_store().and_then(|(store, context)| { + block_on(test_store().and_then(|(store, context, accs)| { + let id = accs[0].id(); context.async_connection().and_then(move |conn| { store - .update_balance_for_incoming_settlement(0, 100, Some(IDEMPOTENCY_KEY.clone())) + .update_balance_for_incoming_settlement(id, 100, Some(IDEMPOTENCY_KEY.clone())) .and_then(move |_| { cmd("HMGET") - .arg("accounts:0") + .arg(format!("accounts:{}", id)) .arg("balance") .arg("prepaid_amount") .query_async(conn) @@ -92,13 +95,13 @@ fn idempotent_settlement_calls() { store .update_balance_for_incoming_settlement( - 0, + id, 100, Some(IDEMPOTENCY_KEY.clone()), // Reuse key to make idempotent request. ) .and_then(move |_| { cmd("HMGET") - .arg("accounts:0") + .arg(format!("accounts:{}", id)) .arg("balance") .arg("prepaid_amount") .query_async(conn) @@ -127,13 +130,14 @@ fn idempotent_settlement_calls() { #[test] fn credits_balance_owed() { - block_on(test_store().and_then(|(store, context)| { + block_on(test_store().and_then(|(store, context, accs)| { + let id = accs[0].id(); context .shared_async_connection() .map_err(|err| panic!(err)) .and_then(move |conn| { cmd("HSET") - .arg("accounts:0") + .arg(format!("accounts:{}", id)) .arg("balance") .arg(-200) .query_async(conn) @@ -141,13 +145,13 @@ fn credits_balance_owed() { .and_then(move |(conn, _balance): (SharedConnection, i64)| { store .update_balance_for_incoming_settlement( - 0, + id, 100, Some(IDEMPOTENCY_KEY.clone()), ) .and_then(move |_| { cmd("HMGET") - .arg("accounts:0") + .arg(format!("accounts:{}", id)) .arg("balance") .arg("prepaid_amount") .query_async(conn) @@ -172,13 +176,14 @@ fn credits_balance_owed() { #[test] fn clears_balance_owed() { - block_on(test_store().and_then(|(store, context)| { + block_on(test_store().and_then(|(store, context, accs)| { + let id = accs[0].id(); context .shared_async_connection() .map_err(|err| panic!(err)) .and_then(move |conn| { cmd("HSET") - .arg("accounts:0") + .arg(format!("accounts:{}", id)) .arg("balance") .arg(-100) .query_async(conn) @@ -186,13 +191,13 @@ fn clears_balance_owed() { .and_then(move |(conn, _balance): (SharedConnection, i64)| { store .update_balance_for_incoming_settlement( - 0, + id, 100, Some(IDEMPOTENCY_KEY.clone()), ) .and_then(move |_| { cmd("HMGET") - .arg("accounts:0") + .arg(format!("accounts:{}", id)) .arg("balance") .arg("prepaid_amount") .query_async(conn) @@ -217,13 +222,14 @@ fn clears_balance_owed() { #[test] fn clears_balance_owed_and_puts_remainder_as_prepaid() { - block_on(test_store().and_then(|(store, context)| { + block_on(test_store().and_then(|(store, context, accs)| { + let id = accs[0].id(); context .shared_async_connection() .map_err(|err| panic!(err)) .and_then(move |conn| { cmd("HSET") - .arg("accounts:0") + .arg(format!("accounts:{}", id)) .arg("balance") .arg(-40) .query_async(conn) @@ -231,13 +237,13 @@ fn clears_balance_owed_and_puts_remainder_as_prepaid() { .and_then(move |(conn, _balance): (SharedConnection, i64)| { store .update_balance_for_incoming_settlement( - 0, + id, 100, Some(IDEMPOTENCY_KEY.clone()), ) .and_then(move |_| { cmd("HMGET") - .arg("accounts:0") + .arg(format!("accounts:{}", id)) .arg("balance") .arg("prepaid_amount") .query_async(conn) diff --git a/crates/interledger/src/main.rs b/crates/interledger/src/main.rs index 1604dcb4f..de5b325ed 100644 --- a/crates/interledger/src/main.rs +++ b/crates/interledger/src/main.rs @@ -2,6 +2,7 @@ use base64; use clap::value_t; use clap::{App, Arg, ArgGroup, SubCommand}; use config; +use futures::future::Future; use hex; use interledger::{cli::*, node::*}; use interledger_ildcp::IldcpResponseBuilder; @@ -337,7 +338,10 @@ pub fn main() { .ok(), settlement_engine_url: None, }; - tokio::run(insert_account_redis(redis_uri, &server_secret, account)); + tokio::run( + insert_account_redis(redis_uri, &server_secret, account) + .and_then(move |_| Ok(())), + ); } _ => app.print_help().unwrap(), }, diff --git a/crates/interledger/src/node.rs b/crates/interledger/src/node.rs index dec9b9d8e..0236c9f28 100644 --- a/crates/interledger/src/node.rs +++ b/crates/interledger/src/node.rs @@ -15,7 +15,9 @@ use interledger_service_util::{ MaxPacketAmountService, RateLimitService, ValidatorService, }; use interledger_settlement::{SettlementApi, SettlementMessageService}; -use interledger_store_redis::{Account, ConnectionInfo, IntoConnectionInfo, RedisStoreBuilder}; +use interledger_store_redis::{ + Account, AccountId, ConnectionInfo, IntoConnectionInfo, RedisStoreBuilder, +}; use interledger_stream::StreamReceiverService; use log::{debug, error, info, trace}; use ring::{digest, hmac}; @@ -264,7 +266,10 @@ impl InterledgerNode { tokio::run(self.serve()); } - pub fn insert_account(&self, account: AccountDetails) -> impl Future { + pub fn insert_account( + &self, + account: AccountDetails, + ) -> impl Future { insert_account_redis(self.redis_connection.clone(), &self.secret_seed, account) } } @@ -276,7 +281,7 @@ pub fn insert_account_redis( redis_uri: R, secret_seed: &[u8; 32], account: AccountDetails, -) -> impl Future +) -> impl Future where R: IntoConnectionInfo, { @@ -291,7 +296,7 @@ where .map_err(|_| error!("Unable to create account")) .and_then(|account| { debug!("Created account: {}", account.id()); - Ok(()) + Ok(account.id()) }) }) } diff --git a/crates/interledger/tests/test_helpers.rs b/crates/interledger/tests/test_helpers.rs new file mode 100644 index 000000000..059d1226f --- /dev/null +++ b/crates/interledger/tests/test_helpers.rs @@ -0,0 +1,124 @@ +use futures::{stream::Stream, Future}; +use interledger_ildcp::IldcpAccount; +use interledger_packet::Address; +use interledger_service::Account as AccountTrait; +use interledger_store_redis::Account; +use interledger_store_redis::AccountId; +use serde::Serialize; +use serde_json::json; +use std::collections::HashMap; +use std::fmt::Display; +use std::str; + +#[derive(serde::Deserialize)] +pub struct DeliveryData { + pub delivered_amount: u64, +} + +#[derive(serde::Deserialize)] +pub struct BalanceData { + pub balance: String, +} + +#[allow(unused)] +pub fn create_account_on_engine( + engine_port: u16, + account_id: T, +) -> impl Future { + let client = reqwest::r#async::Client::new(); + client + .post(&format!("http://localhost:{}/accounts", engine_port)) + .header("Content-Type", "application/json") + .json(&json!({ "id": account_id })) + .send() + .and_then(move |res| res.error_for_status()) + .and_then(move |res| res.into_body().concat2()) + .map_err(|err| { + eprintln!("Error creating account: {:?}", err); + }) + .and_then(move |chunk| Ok(str::from_utf8(&chunk).unwrap().to_string())) +} + +#[allow(unused)] +pub fn send_money_to_id( + from: u16, + to: u16, + amount: u64, + id: T, + auth: &str, +) -> impl Future { + let client = reqwest::r#async::Client::new(); + client + .post(&format!("http://localhost:{}/pay", from)) + .header("Authorization", format!("Bearer {}", auth)) + .json(&json!({ + // TODO: replace with username + "receiver": format!("http://localhost:{}/spsp/{}", to, id), + "source_amount": amount, + })) + .send() + .and_then(|res| res.error_for_status()) + .and_then(|res| res.into_body().concat2()) + .map_err(|err| { + eprintln!("Error sending SPSP payment: {:?}", err); + }) + .and_then(move |body| { + let ret: DeliveryData = serde_json::from_slice(&body).unwrap(); + Ok(ret.delivered_amount) + }) +} + +#[allow(unused)] +pub fn get_all_accounts( + node_port: u16, + admin_token: &str, +) -> impl Future, Error = ()> { + let client = reqwest::r#async::Client::new(); + client + .get(&format!("http://localhost:{}/accounts", node_port)) + .header("Authorization", format!("Bearer {}", admin_token)) + .send() + .and_then(|res| res.error_for_status()) + .and_then(|res| res.into_body().concat2()) + .map_err(|err| { + eprintln!("Error getting account data: {:?}", err); + }) + .and_then(move |body| { + let ret: Vec = serde_json::from_slice(&body).unwrap(); + Ok(ret) + }) +} + +#[allow(unused)] +pub fn accounts_to_ids(accounts: Vec) -> HashMap { + let mut map = HashMap::new(); + for a in accounts { + map.insert(a.client_address().clone(), a.id()); + } + map +} + +#[allow(unused)] +pub fn get_balance( + account_id: T, + node_port: u16, + admin_token: &str, +) -> impl Future { + let client = reqwest::r#async::Client::new(); + client + .get(&format!( + "http://localhost:{}/accounts/{}/balance", + node_port, account_id + )) + .header("Authorization", format!("Bearer {}", admin_token)) + .send() + .and_then(|res| res.error_for_status()) + .and_then(|res| res.into_body().concat2()) + .map_err(|err| { + eprintln!("Error getting account data: {:?}", err); + }) + .and_then(|body| { + let ret: BalanceData = serde_json::from_slice(&body).unwrap(); + Ok(ret.balance.parse().unwrap()) + }) +} diff --git a/crates/interledger/tests/three_nodes.rs b/crates/interledger/tests/three_nodes.rs index dab35e20f..f54ba13b0 100644 --- a/crates/interledger/tests/three_nodes.rs +++ b/crates/interledger/tests/three_nodes.rs @@ -1,20 +1,22 @@ #![recursion_limit = "128"] use env_logger; -use futures::{Future, Stream}; +use futures::Future; use interledger::{ cli, node::{AccountDetails, InterledgerNode}, }; use interledger_packet::Address; use serde_json::json; -use std::str; use std::str::FromStr; use tokio::runtime::Builder as RuntimeBuilder; mod redis_helpers; use redis_helpers::*; +mod test_helpers; +use test_helpers::*; + #[test] fn three_nodes() { // Nodes 1 and 2 are peers, Node 2 is the parent of Node 3 @@ -250,113 +252,115 @@ fn three_nodes() { delay(500) .map_err(|_| panic!("Something strange happened")) .and_then(move |_| { - let client = reqwest::r#async::Client::new(); - let send_1_to_3 = client - .post(&format!("http://localhost:{}/pay", node1_http)) - .header("Authorization", "Bearer default account holder") - .json(&json!({ - "receiver": format!("http://localhost:{}/.well-known/pay", node3_http), - "source_amount": 1000, - })) - .send() - .and_then(|res| res.error_for_status()) - .and_then(|res| res.into_body().concat2()) - .and_then(|body| { - assert_eq!(str::from_utf8(body.as_ref()).unwrap(), "{\"delivered_amount\":2}"); - Ok(()) - }); + let three_addr = Address::from_str("example.two.three").unwrap(); + let one_addr = Address::from_str("example.one").unwrap(); + futures::future::join_all(vec![ + get_all_accounts(node1_http, "admin").map(accounts_to_ids), + get_all_accounts(node2_http, "admin").map(accounts_to_ids), + get_all_accounts(node3_http, "admin").map(accounts_to_ids), + ]) + .and_then(move |ids| { + let node1_ids = ids[0].clone(); + let node2_ids = ids[1].clone(); + let node3_ids = ids[2].clone(); - let send_3_to_1 = client - .post(&format!("http://localhost:{}/pay", node3_http)) - .header("Authorization", "Bearer default account holder") - .json(&json!({ - "receiver": format!("http://localhost:{}/.well-known/pay", node1_http).as_str(), - "source_amount": 1000, - })) - .send() - .and_then(|res| res.error_for_status()) - .and_then(|res| res.into_body().concat2()) - .and_then(|body| { - assert_eq!(str::from_utf8(body.as_ref()).unwrap(), "{\"delivered_amount\":500000}"); - Ok(()) - }); + let one_on_one = node1_ids.get(&one_addr).unwrap().to_owned(); + let three_on_two = node2_ids.get(&three_addr).unwrap().to_owned(); + let three_on_three = node3_ids.get(&three_addr).unwrap().to_owned(); - let get_balance = |account_id, node_port, admin_token| { - let client = reqwest::r#async::Client::new(); - client - .get(&format!( - "http://localhost:{}/accounts/{}/balance", - node_port, account_id - )) - .header("Authorization", format!("Bearer {}", admin_token)) - .send() + let send_1_to_3 = send_money_to_id( + node1_http, + node3_http, + 1000, + three_on_three, + "default account holder", + ); + let send_3_to_1 = send_money_to_id( + node3_http, + node1_http, + 1000, + one_on_one, + "default account holder", + ); + + // Node 1 sends 1000 to Node 3. However, Node1's scale is 9, + // while Node 3's scale is 6. This means that Node 3 will + // see 1000x less. In addition, the conversion rate is 2:1 + // for 3's asset, so he will receive 2 total. + send_1_to_3 .map_err(|err| { - eprintln!("Error getting account data: {:?}", err); + eprintln!("Error sending from node 1 to node 3: {:?}", err); err }) - .and_then(|res| res.error_for_status()) - .and_then(|res| res.into_body().concat2()) - }; - - // Node 1 sends 1000 to Node 3. However, Node1's scale is 9, - // while Node 3's scale is 6. This means that Node 3 will - // see 1000x less. In addition, the conversion rate is 2:1 - // for 3's asset, so he will receive 2 total. - send_1_to_3 - .map_err(|err| { - eprintln!("Error sending from node 1 to node 3: {:?}", err); - err - }) - .and_then(move |_| { - get_balance(0, node1_http, "default account holder") - .and_then(move |ret| { - let ret = str::from_utf8(&ret).unwrap(); - assert_eq!(ret, "{\"balance\":\"-1000\"}"); - Ok(()) - }).and_then(move |_| { - // Node 2 updates Node 3's balance properly. - get_balance(1, node2_http, "three").and_then(move |ret| { - let ret = str::from_utf8(&ret).unwrap(); - assert_eq!(ret, "{\"balance\":\"2\"}"); - Ok(()) - }) - }).and_then(move |_| { - // Node 3's balance is properly adjusted after - // it's received the message from node 2 - get_balance(0, node3_http, "default account holder").and_then(move |ret| { - let ret = str::from_utf8(&ret).unwrap(); - assert_eq!(ret, "{\"balance\":\"2\"}"); - Ok(()) - }) + .and_then(move |_| { + get_balance(one_on_one, node1_http, "default account holder") + .and_then(move |ret| { + assert_eq!(ret, -1000); + Ok(()) + }) + .and_then(move |_| { + // Node 2 updates Node 3's balance properly. + get_balance(three_on_two, node2_http, "three").and_then( + move |ret| { + assert_eq!(ret, 2); + Ok(()) + }, + ) + }) + .and_then(move |_| { + // Node 3's balance is properly adjusted after + // it's received the message from node 2 + get_balance( + three_on_three, + node3_http, + "default account holder", + ) + .and_then( + move |ret| { + assert_eq!(ret, 2); + Ok(()) + }, + ) + }) }) - }) - .and_then(move |_| send_3_to_1 - .map_err(|err| { - eprintln!("Error sending from node 3 to node 1: {:?}", err); - err - })) - .and_then(move |_| { - get_balance(0, node1_http, "default account holder").and_then(move |ret| { - let ret = str::from_utf8(&ret).unwrap(); - assert_eq!(ret, "{\"balance\":\"499000\"}"); - Ok(()) - }).and_then(move |_| { - // Node 2 updates Node 3's balance properly. - get_balance(1, node2_http, "three").and_then(move |ret| { - let ret = str::from_utf8(&ret).unwrap(); - assert_eq!(ret, "{\"balance\":\"-998\"}"); - Ok(()) - }) - }).and_then(move |_| { - // Node 3's balance is properly adjusted after - // it's received the message from node 2 - get_balance(0, node3_http, "default account holder").and_then(move |ret| { - let ret = str::from_utf8(&ret).unwrap(); - assert_eq!(ret, "{\"balance\":\"-998\"}"); - Ok(()) + .and_then(move |_| { + send_3_to_1.map_err(|err| { + eprintln!("Error sending from node 3 to node 1: {:?}", err); + err }) }) - }) + .and_then(move |_| { + get_balance(one_on_one, node1_http, "default account holder") + .and_then(move |ret| { + assert_eq!(ret, 499_000); + Ok(()) + }) + .and_then(move |_| { + // Node 2 updates Node 3's balance properly. + get_balance(three_on_two, node2_http, "three").and_then( + move |ret| { + assert_eq!(ret, -998); + Ok(()) + }, + ) + }) + .and_then(move |_| { + // Node 3's balance is properly adjusted after + // it's received the message from node 2 + get_balance( + three_on_three, + node3_http, + "default account holder", + ) + .and_then( + move |ret| { + assert_eq!(ret, -998); + Ok(()) + }, + ) + }) + }) + }) }), ) .unwrap();