From b16f23d5481de65658b08544c083e2849821370e Mon Sep 17 00:00:00 2001 From: Alexis Asseman Date: Wed, 7 Feb 2024 17:34:35 -0800 Subject: [PATCH] feat(aggregator): Add support for multiple signers in input (#211) Fixes #205 Signed-off-by: Alexis Asseman --- tap_aggregator/Cargo.toml | 1 + tap_aggregator/src/aggregator.rs | 48 +++++++---- tap_aggregator/src/main.rs | 19 ++++- tap_aggregator/src/server.rs | 109 ++++++++++++++++++------ tap_core/src/error.rs | 2 +- tap_integration_tests/tests/showcase.rs | 3 + 6 files changed, 141 insertions(+), 41 deletions(-) diff --git a/tap_aggregator/Cargo.toml b/tap_aggregator/Cargo.toml index 79b010ca..0459ac40 100644 --- a/tap_aggregator/Cargo.toml +++ b/tap_aggregator/Cargo.toml @@ -34,4 +34,5 @@ ruint = "1.10.1" [dev-dependencies] jsonrpsee = { version = "0.18.0", features = ["http-client", "jsonrpsee-core"] } +rand = "0.8.5" rstest = "0.17.0" diff --git a/tap_aggregator/src/aggregator.rs b/tap_aggregator/src/aggregator.rs index efd07684..7d83e95b 100644 --- a/tap_aggregator/src/aggregator.rs +++ b/tap_aggregator/src/aggregator.rs @@ -1,13 +1,13 @@ // Copyright 2023-, Semiotic AI, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::collections::hash_set; +use std::collections::{hash_set, HashSet}; use alloy_primitives::Address; -use alloy_sol_types::Eip712Domain; -use anyhow::{Ok, Result}; +use alloy_sol_types::{Eip712Domain, SolStruct}; +use anyhow::{bail, Ok, Result}; use ethers_core::types::Signature; -use ethers_signers::{LocalWallet, Signer}; +use ethers_signers::LocalWallet; use tap_core::{ eip_712_signed_message::EIP712SignedMessage, @@ -19,22 +19,26 @@ pub async fn check_and_aggregate_receipts( receipts: &[EIP712SignedMessage], previous_rav: Option>, wallet: &LocalWallet, + accepted_addresses: &HashSet
, ) -> Result> { - // Get the address of the wallet - let address: [u8; 20] = wallet.address().into(); - let address: Address = address.into(); - - // Check that the receipts are unique check_signatures_unique(receipts)?; - // Check that the receipts are signed by ourselves - receipts - .iter() - .try_for_each(|receipt| receipt.verify(domain_separator, address))?; + // Check that the receipts are signed by an accepted signer address + receipts.iter().try_for_each(|receipt| { + check_signature_is_from_one_of_addresses( + receipt.clone(), + domain_separator, + accepted_addresses, + ) + })?; - // Check that the previous rav is signed by ourselves + // Check that the previous rav is signed by an accepted signer address if let Some(previous_rav) = &previous_rav { - previous_rav.verify(domain_separator, address)?; + check_signature_is_from_one_of_addresses( + previous_rav.clone(), + domain_separator, + accepted_addresses, + )?; } // Check that the receipts timestamp is greater than the previous rav @@ -68,6 +72,20 @@ pub async fn check_and_aggregate_receipts( Ok(EIP712SignedMessage::new(domain_separator, rav, wallet).await?) } +fn check_signature_is_from_one_of_addresses( + message: EIP712SignedMessage, + domain_separator: &Eip712Domain, + accepted_addresses: &HashSet
, +) -> Result<()> { + let recovered_address = message.recover_signer(domain_separator)?; + if !accepted_addresses.contains(&recovered_address) { + bail!(tap_core::Error::InvalidRecoveredSigner { + address: recovered_address, + }); + } + Ok(()) +} + fn check_allocation_id( receipts: &[EIP712SignedMessage], allocation_id: Address, diff --git a/tap_aggregator/src/main.rs b/tap_aggregator/src/main.rs index 086c7d93..4ecf87ff 100644 --- a/tap_aggregator/src/main.rs +++ b/tap_aggregator/src/main.rs @@ -4,6 +4,7 @@ #![doc = include_str!("../README.md")] use std::borrow::Cow; +use std::collections::HashSet; use std::str::FromStr; use alloy_primitives::{Address, FixedBytes, U256}; @@ -25,10 +26,18 @@ struct Args { #[arg(long, default_value_t = 8080, env = "TAP_PORT")] port: u16, - /// Sender private key for signing Receipt Aggregate Vouchers, as a hex string. + /// Signer private key for signing Receipt Aggregate Vouchers, as a hex string. #[arg(long, env = "TAP_PRIVATE_KEY")] private_key: String, + /// Signer public keys. Not the counterpart of the signer private key. Signers that are allowed + /// for the incoming receipts / RAV to aggregate. Useful when needing to accept receipts that + /// were signed with a different key (e.g. a recent key rotation, or receipts coming from a + /// different gateway / aggregator that use a different signing key). + /// Expects a comma-separated list of Ethereum addresses. + #[arg(long, env = "TAP_PUBLIC_KEYS")] + public_keys: Option>, + /// Maximum request body size in bytes. /// Defaults to 10MB. #[arg(long, default_value_t = 10 * 1024 * 1024, env = "TAP_MAX_REQUEST_BODY_SIZE")] @@ -94,11 +103,19 @@ async fn main() -> Result<()> { // Create the EIP-712 domain separator. let domain_separator = create_eip712_domain(&args)?; + // Create HashSet of *all* allowed signers + let mut accepted_addresses: HashSet
= std::collections::HashSet::new(); + accepted_addresses.insert(wallet.address().0.into()); + if let Some(public_keys) = &args.public_keys { + accepted_addresses.extend(public_keys.iter().cloned()); + } + // Start the JSON-RPC server. // This await is non-blocking let (handle, _) = server::run_server( args.port, wallet, + accepted_addresses, domain_separator, args.max_request_body_size, args.max_response_body_size, diff --git a/tap_aggregator/src/server.rs b/tap_aggregator/src/server.rs index 762aef8e..d4b89047 100644 --- a/tap_aggregator/src/server.rs +++ b/tap_aggregator/src/server.rs @@ -1,8 +1,9 @@ // Copyright 2023-, Semiotic AI, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::str::FromStr; +use std::{collections::HashSet, str::FromStr}; +use alloy_primitives::Address; use alloy_sol_types::Eip712Domain; use anyhow::Result; use ethers_signers::LocalWallet; @@ -96,6 +97,7 @@ pub trait Rpc { struct RpcImpl { wallet: LocalWallet, + accepted_addresses: HashSet
, domain_separator: Eip712Domain, } @@ -132,6 +134,7 @@ fn check_api_version_deprecation(api_version: &TapRpcApiVersion) -> Option, domain_separator: &Eip712Domain, receipts: Vec>, previous_rav: Option>, @@ -154,7 +157,14 @@ async fn aggregate_receipts_( let res = match api_version { TapRpcApiVersion::V0_0 => { - check_and_aggregate_receipts(domain_separator, &receipts, previous_rav, wallet).await + check_and_aggregate_receipts( + domain_separator, + &receipts, + previous_rav, + wallet, + accepted_addresses, + ) + .await } }; @@ -188,6 +198,7 @@ impl RpcServer for RpcImpl { match aggregate_receipts_( api_version, &self.wallet, + &self.accepted_addresses, &self.domain_separator, receipts, previous_rav, @@ -211,6 +222,7 @@ impl RpcServer for RpcImpl { pub async fn run_server( port: u16, wallet: LocalWallet, + accepted_addresses: HashSet
, domain_separator: Eip712Domain, max_request_body_size: u32, max_response_body_size: u32, @@ -229,6 +241,7 @@ pub async fn run_server( println!("Listening on: {}", addr); let rpc_impl = RpcImpl { wallet, + accepted_addresses, domain_separator, }; let handle = server.start(rpc_impl.into_rpc())?; @@ -238,12 +251,15 @@ pub async fn run_server( #[cfg(test)] #[allow(clippy::too_many_arguments)] mod tests { + use std::collections::HashSet; use std::str::FromStr; use alloy_primitives::Address; use alloy_sol_types::Eip712Domain; use ethers_signers::{coins_bip39::English, LocalWallet, MnemonicBuilder, Signer}; use jsonrpsee::{core::client::ClientT, http_client::HttpClientBuilder, rpc_params}; + use rand::prelude::*; + use rand::seq::SliceRandom; use rstest::*; use crate::server; @@ -253,17 +269,27 @@ mod tests { tap_receipt::Receipt, }; - #[fixture] - fn keys() -> (LocalWallet, Address) { + #[derive(Clone)] + struct Keys { + wallet: LocalWallet, + address: Address, + } + + fn keys(index: u32) -> Keys { let wallet: LocalWallet = MnemonicBuilder::::default() .phrase("abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about") + .index(index) + .unwrap() .build() .unwrap(); // Alloy library does not have feature parity with ethers library (yet) This workaround is needed to get the address // to convert to an alloy Address. This will not be needed when the alloy library has wallet support. let address: [u8; 20] = wallet.address().into(); - (wallet, address.into()) + Keys { + wallet, + address: address.into(), + } } #[fixture] @@ -299,16 +325,19 @@ mod tests { #[rstest] #[tokio::test] async fn protocol_version( - keys: (LocalWallet, Address), domain_separator: Eip712Domain, http_request_size_limit: u32, http_response_size_limit: u32, http_max_concurrent_connections: u32, ) { + // The keys that will be used to sign the new RAVs + let keys_main = keys(0); + // Start the JSON-RPC server. let (handle, local_addr) = server::run_server( 0, - keys.0, + keys_main.wallet, + HashSet::from([keys_main.address]), domain_separator, http_request_size_limit, http_response_size_limit, @@ -335,7 +364,6 @@ mod tests { #[case::rav_from_zero_valued_receipts (vec![0,0,0,0])] #[tokio::test] async fn signed_rav_is_valid_with_no_previous_rav( - keys: (LocalWallet, Address), domain_separator: Eip712Domain, http_request_size_limit: u32, http_response_size_limit: u32, @@ -343,11 +371,23 @@ mod tests { allocation_ids: Vec
, #[case] values: Vec, #[values("0.0")] api_version: &str, + #[values(0, 1, 2)] random_seed: u64, ) { + // The keys that will be used to sign the new RAVs + let keys_main = keys(0); + // Extra keys to test the server's ability to accept multiple signers as input + let keys_0 = keys(1); + let keys_1 = keys(2); + // Vector of all wallets to make it easier to select one randomly + let all_wallets = vec![keys_main.clone(), keys_0.clone(), keys_1.clone()]; + // PRNG for selecting a random wallet + let mut rng = StdRng::seed_from_u64(random_seed); + // Start the JSON-RPC server. let (handle, local_addr) = server::run_server( 0, - keys.0.clone(), + keys_main.wallet.clone(), + HashSet::from([keys_main.address, keys_0.address, keys_1.address]), domain_separator.clone(), http_request_size_limit, http_response_size_limit, @@ -368,7 +408,7 @@ mod tests { EIP712SignedMessage::new( &domain_separator, Receipt::new(allocation_ids[0], value).unwrap(), - &keys.0, + &all_wallets.choose(&mut rng).unwrap().wallet, ) .await .unwrap(), @@ -395,7 +435,7 @@ mod tests { assert!(remote_rav.message.timestamp_ns == local_rav.timestamp_ns); assert!(remote_rav.message.value_aggregate == local_rav.value_aggregate); - assert!(remote_rav.recover_signer(&domain_separator).unwrap() == keys.1); + assert!(remote_rav.recover_signer(&domain_separator).unwrap() == keys_main.address); handle.stop().unwrap(); handle.stopped().await; @@ -406,7 +446,6 @@ mod tests { #[case::rav_from_zero_valued_receipts (vec![0,0,0,0])] #[tokio::test] async fn signed_rav_is_valid_with_previous_rav( - keys: (LocalWallet, Address), domain_separator: Eip712Domain, http_request_size_limit: u32, http_response_size_limit: u32, @@ -414,11 +453,23 @@ mod tests { allocation_ids: Vec
, #[case] values: Vec, #[values("0.0")] api_version: &str, + #[values(0, 1, 2, 3, 4)] random_seed: u64, ) { + // The keys that will be used to sign the new RAVs + let keys_main = keys(0); + // Extra keys to test the server's ability to accept multiple signers as input + let keys_0 = keys(1); + let keys_1 = keys(2); + // Vector of all wallets to make it easier to select one randomly + let all_wallets = vec![keys_main.clone(), keys_0.clone(), keys_1.clone()]; + // PRNG for selecting a random wallet + let mut rng = StdRng::seed_from_u64(random_seed); + // Start the JSON-RPC server. let (handle, local_addr) = server::run_server( 0, - keys.0.clone(), + keys_main.wallet.clone(), + HashSet::from([keys_main.address, keys_0.address, keys_1.address]), domain_separator.clone(), http_request_size_limit, http_response_size_limit, @@ -439,7 +490,7 @@ mod tests { EIP712SignedMessage::new( &domain_separator, Receipt::new(allocation_ids[0], value).unwrap(), - &keys.0, + &all_wallets.choose(&mut rng).unwrap().wallet, ) .await .unwrap(), @@ -453,9 +504,13 @@ mod tests { None, ) .unwrap(); - let signed_prev_rav = EIP712SignedMessage::new(&domain_separator, prev_rav, &keys.0) - .await - .unwrap(); + let signed_prev_rav = EIP712SignedMessage::new( + &domain_separator, + prev_rav, + &all_wallets.choose(&mut rng).unwrap().wallet, + ) + .await + .unwrap(); // Create new RAV from last half of receipts and prev_rav through the JSON-RPC server let res: server::JsonRpcResponse> = client @@ -472,7 +527,7 @@ mod tests { let rav = res.data; - assert!(rav.recover_signer(&domain_separator).unwrap() == keys.1); + assert!(rav.recover_signer(&domain_separator).unwrap() == keys_main.address); handle.stop().unwrap(); handle.stopped().await; @@ -481,17 +536,20 @@ mod tests { #[rstest] #[tokio::test] async fn invalid_api_version( - keys: (LocalWallet, Address), domain_separator: Eip712Domain, http_request_size_limit: u32, http_response_size_limit: u32, http_max_concurrent_connections: u32, allocation_ids: Vec
, ) { + // The keys that will be used to sign the new RAVs + let keys_main = keys(0); + // Start the JSON-RPC server. let (handle, local_addr) = server::run_server( 0, - keys.0.clone(), + keys_main.wallet.clone(), + HashSet::from([keys_main.address]), domain_separator.clone(), http_request_size_limit, http_response_size_limit, @@ -509,7 +567,7 @@ mod tests { let receipts = vec![EIP712SignedMessage::new( &domain_separator, Receipt::new(allocation_ids[0], 42).unwrap(), - &keys.0, + &keys_main.wallet, ) .await .unwrap()]; @@ -562,13 +620,15 @@ mod tests { #[rstest] #[tokio::test] async fn request_size_limit( - keys: (LocalWallet, Address), domain_separator: Eip712Domain, http_response_size_limit: u32, http_max_concurrent_connections: u32, allocation_ids: Vec
, #[values("0.0")] api_version: &str, ) { + // The keys that will be used to sign the new RAVs + let keys_main = keys(0); + // Set the request byte size limit to a value that easily triggers the HTTP 413 // error. let http_request_size_limit = 100 * 1024; @@ -581,7 +641,8 @@ mod tests { // Start the JSON-RPC server. let (handle, local_addr) = server::run_server( 0, - keys.0.clone(), + keys_main.wallet.clone(), + HashSet::from([keys_main.address]), domain_separator.clone(), http_request_size_limit, http_response_size_limit, @@ -602,7 +663,7 @@ mod tests { EIP712SignedMessage::new( &domain_separator, Receipt::new(allocation_ids[0], u128::MAX / 1000).unwrap(), - &keys.0, + &keys_main.wallet, ) .await .unwrap(), diff --git a/tap_core/src/error.rs b/tap_core/src/error.rs index 5637dfef..566247bf 100644 --- a/tap_core/src/error.rs +++ b/tap_core/src/error.rs @@ -29,7 +29,7 @@ pub enum Error { WalletError(#[from] WalletError), #[error(transparent)] SignatureError(#[from] SignatureError), - #[error("Recovered sender address invalid{address}")] + #[error("Recovered sender address invalid {address}")] InvalidRecoveredSigner { address: Address }, #[error("Received RAV does not match expexted RAV")] InvalidReceivedRAV { diff --git a/tap_integration_tests/tests/showcase.rs b/tap_integration_tests/tests/showcase.rs index 0735e3ff..66a5342a 100644 --- a/tap_integration_tests/tests/showcase.rs +++ b/tap_integration_tests/tests/showcase.rs @@ -958,9 +958,12 @@ async fn start_sender_aggregator( listener.local_addr()?.port() }; + let accepted_addresses = HashSet::from([keys.1]); + let (server_handle, socket_addr) = agg_server::run_server( http_port, keys.0, + accepted_addresses, domain_separator, http_request_size_limit, http_response_size_limit,