diff --git a/tap_core/Cargo.toml b/tap_core/Cargo.toml index 3b9cece..624c5bf 100644 --- a/tap_core/Cargo.toml +++ b/tap_core/Cargo.toml @@ -24,8 +24,6 @@ strum = "0.24.1" strum_macros = "0.24.3" async-trait = "0.1.72" tokio = { version = "1.29.1", features = ["macros", "rt-multi-thread"] } -typetag = "0.2.14" -futures = "0.3.17" [dev-dependencies] criterion = { version = "0.5", features = ["async_std"] } diff --git a/tap_core/src/adapters/escrow_adapter.rs b/tap_core/src/adapters/escrow_adapter.rs index 764bedd..4630c36 100644 --- a/tap_core/src/adapters/escrow_adapter.rs +++ b/tap_core/src/adapters/escrow_adapter.rs @@ -53,4 +53,6 @@ pub trait EscrowAdapter { sender_id: Address, value: u128, ) -> Result<(), Self::AdapterError>; + + async fn verify_signer(&self, signer_address: Address) -> Result; } diff --git a/tap_core/src/adapters/mock/executor_mock.rs b/tap_core/src/adapters/mock/executor_mock.rs index 28ee819..7a807c3 100644 --- a/tap_core/src/adapters/mock/executor_mock.rs +++ b/tap_core/src/adapters/mock/executor_mock.rs @@ -3,9 +3,10 @@ use crate::adapters::escrow_adapter::EscrowAdapter; use crate::adapters::receipt_storage_adapter::{ - safe_truncate_receipts, ReceiptRead, ReceiptStore, StoredReceipt, + safe_truncate_receipts, ReceiptDelete, ReceiptRead, ReceiptStore, StoredReceipt, }; use crate::checks::TimestampCheck; +use crate::eip_712_signed_message::MessageId; use crate::tap_receipt::ReceivedReceipt; use crate::{ adapters::rav_storage_adapter::{RAVRead, RAVStore}, @@ -18,7 +19,7 @@ use std::sync::RwLock; use std::{collections::HashMap, sync::Arc}; pub type EscrowStorage = Arc>>; -pub type QueryAppraisals = Arc>>; +pub type QueryAppraisals = Arc>>; pub type ReceiptStorage = Arc>>; pub type RAVStorage = Arc>>; @@ -36,10 +37,9 @@ pub struct ExecutorMock { rav_storage: RAVStorage, receipt_storage: ReceiptStorage, unique_id: Arc>, - sender_escrow_storage: EscrowStorage, - timestamp_check: Arc, + sender_address: Option
, } impl ExecutorMock { @@ -55,9 +55,15 @@ impl ExecutorMock { unique_id: Arc::new(RwLock::new(0)), sender_escrow_storage, timestamp_check, + sender_address: None, } } + pub fn with_sender_address(mut self, sender_address: Address) -> Self { + self.sender_address = Some(sender_address); + self + } + pub async fn retrieve_receipt_by_id( &self, receipt_id: u64, @@ -139,6 +145,7 @@ impl RAVRead for ExecutorMock { #[async_trait] impl ReceiptStore for ExecutorMock { type AdapterError = AdapterErrorMock; + async fn store_receipt(&self, receipt: ReceivedReceipt) -> Result { let mut id_pointer = self.unique_id.write().unwrap(); let id_previous = *id_pointer; @@ -147,23 +154,12 @@ impl ReceiptStore for ExecutorMock { *id_pointer += 1; Ok(id_previous) } - async fn update_receipt_by_id( - &self, - receipt_id: u64, - receipt: ReceivedReceipt, - ) -> Result<(), Self::AdapterError> { - let mut receipt_storage = self.receipt_storage.write().unwrap(); +} - if !receipt_storage.contains_key(&receipt_id) { - return Err(AdapterErrorMock::AdapterError { - error: "Invalid receipt_id".to_owned(), - }); - }; +#[async_trait] +impl ReceiptDelete for ExecutorMock { + type AdapterError = AdapterErrorMock; - receipt_storage.insert(receipt_id, receipt); - *self.unique_id.write().unwrap() += 1; - Ok(()) - } async fn remove_receipts_in_timestamp_range + std::marker::Send>( &self, timestamp_ns: R, @@ -175,7 +171,6 @@ impl ReceiptStore for ExecutorMock { Ok(()) } } - #[async_trait] impl ReceiptRead for ExecutorMock { type AdapterError = AdapterErrorMock; @@ -251,4 +246,11 @@ impl EscrowAdapter for ExecutorMock { ) -> Result<(), Self::AdapterError> { self.reduce_escrow(sender_id, value) } + + async fn verify_signer(&self, signer_address: Address) -> Result { + Ok(self + .sender_address + .map(|sender| signer_address == sender) + .unwrap_or(false)) + } } diff --git a/tap_core/src/adapters/receipt_storage_adapter.rs b/tap_core/src/adapters/receipt_storage_adapter.rs index b66715a..af04709 100644 --- a/tap_core/src/adapters/receipt_storage_adapter.rs +++ b/tap_core/src/adapters/receipt_storage_adapter.rs @@ -47,18 +47,15 @@ pub trait ReceiptStore { /// It returns a unique receipt_id associated with the stored receipt. Any errors that occur during /// this process should be captured and returned as an `AdapterError`. async fn store_receipt(&self, receipt: ReceivedReceipt) -> Result; +} - /// Updates a specific `ReceivedReceipt` identified by a unique receipt_id. +#[async_trait] +pub trait ReceiptDelete { + /// Defines the user-specified error type. /// - /// This method should be implemented to update a specific `ReceivedReceipt` identified by a unique - /// receipt_id in your storage system. Any errors that occur during this process should be captured - /// and returned as an `AdapterError`. - async fn update_receipt_by_id( - &self, - receipt_id: u64, - receipt: ReceivedReceipt, - ) -> Result<(), Self::AdapterError>; - + /// This error type should implement the `Error` and `Debug` traits from the standard library. + /// Errors of this type are returned to the user when an operation fails. + type AdapterError: std::error::Error + std::fmt::Debug + Send + Sync + 'static; /// Removes all `ReceivedReceipts` within a specific timestamp range from the storage. /// /// This method should be implemented to remove all `ReceivedReceipts` within a specific timestamp diff --git a/tap_core/src/adapters/test/receipt_storage_adapter_test.rs b/tap_core/src/adapters/test/receipt_storage_adapter_test.rs index b6158ed..3507348 100644 --- a/tap_core/src/adapters/test/receipt_storage_adapter_test.rs +++ b/tap_core/src/adapters/test/receipt_storage_adapter_test.rs @@ -5,14 +5,13 @@ mod receipt_storage_adapter_unit_test { use rand::seq::SliceRandom; use rand::thread_rng; - use std::collections::{HashMap, HashSet}; + use std::collections::HashMap; use std::str::FromStr; use std::sync::{Arc, RwLock}; use crate::checks::TimestampCheck; use crate::{ adapters::{executor_mock::ExecutorMock, receipt_storage_adapter::ReceiptStore}, - checks::{mock::get_full_list_of_checks, ReceiptCheck}, eip_712_signed_message::EIP712SignedMessage, tap_eip712_domain, tap_receipt::{Receipt, ReceivedReceipt}, @@ -28,45 +27,24 @@ mod receipt_storage_adapter_unit_test { tap_eip712_domain(1, Address::from([0x11u8; 20])) } - struct ExecutorFixture { - executor: ExecutorMock, - checks: Vec, - } - #[fixture] - fn executor_mock(domain_separator: Eip712Domain) -> ExecutorFixture { + fn executor() -> ExecutorMock { let escrow_storage = Arc::new(RwLock::new(HashMap::new())); let rav_storage = Arc::new(RwLock::new(None)); - let query_appraisals = Arc::new(RwLock::new(HashMap::new())); let receipt_storage = Arc::new(RwLock::new(HashMap::new())); let timestamp_check = Arc::new(TimestampCheck::new(0)); - let executor = ExecutorMock::new( + ExecutorMock::new( rav_storage, receipt_storage.clone(), escrow_storage.clone(), timestamp_check.clone(), - ); - let mut checks = get_full_list_of_checks( - domain_separator, - HashSet::new(), - Arc::new(RwLock::new(HashSet::new())), - receipt_storage, - query_appraisals.clone(), - ); - checks.push(timestamp_check); - - ExecutorFixture { executor, checks } + ) } #[rstest] #[tokio::test] - async fn receipt_adapter_test(domain_separator: Eip712Domain, executor_mock: ExecutorFixture) { - let ExecutorFixture { - mut executor, - checks, - } = executor_mock; - + async fn receipt_adapter_test(domain_separator: Eip712Domain, mut executor: ExecutorMock) { let wallet: LocalWallet = MnemonicBuilder::::default() .phrase("abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about") .build() @@ -76,7 +54,6 @@ mod receipt_storage_adapter_unit_test { Address::from_str("0xabababababababababababababababababababab").unwrap(); // Create receipts - let query_id = 10u64; let value = 100u128; let received_receipt = ReceivedReceipt::new( EIP712SignedMessage::new( @@ -85,8 +62,6 @@ mod receipt_storage_adapter_unit_test { &wallet, ) .unwrap(), - query_id, - &checks, ); let receipt_store_result = executor.store_receipt(received_receipt).await; @@ -114,13 +89,8 @@ mod receipt_storage_adapter_unit_test { #[tokio::test] async fn multi_receipt_adapter_test( domain_separator: Eip712Domain, - executor_mock: ExecutorFixture, + mut executor: ExecutorMock, ) { - let ExecutorFixture { - mut executor, - checks, - } = executor_mock; - let wallet: LocalWallet = MnemonicBuilder::::default() .phrase("abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about") .build() @@ -131,7 +101,7 @@ mod receipt_storage_adapter_unit_test { // Create receipts let mut received_receipts = Vec::new(); - for (query_id, value) in (50..60).enumerate() { + for value in 50..60 { received_receipts.push(ReceivedReceipt::new( EIP712SignedMessage::new( &domain_separator, @@ -139,20 +109,13 @@ mod receipt_storage_adapter_unit_test { &wallet, ) .unwrap(), - query_id as u64, - &checks, )); } let mut receipt_ids = Vec::new(); let mut receipt_timestamps = Vec::new(); for received_receipt in received_receipts { - receipt_ids.push( - executor - .store_receipt(received_receipt.clone()) - .await - .unwrap(), - ); - receipt_timestamps.push(received_receipt.signed_receipt().message.timestamp_ns) + receipt_timestamps.push(received_receipt.signed_receipt().message.timestamp_ns); + receipt_ids.push(executor.store_receipt(received_receipt).await.unwrap()); } // Retreive receipts with timestamp @@ -205,7 +168,6 @@ mod receipt_storage_adapter_unit_test { #[test] fn safe_truncate_receipts_test( domain_separator: Eip712Domain, - executor_mock: ExecutorFixture, #[case] input: Vec, #[case] limit: u64, #[case] expected: Vec, @@ -214,7 +176,6 @@ mod receipt_storage_adapter_unit_test { .phrase("abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about") .build() .unwrap(); - let checks = executor_mock.checks; // Vec of (id, receipt) let mut receipts_orig: Vec<(u64, ReceivedReceipt)> = Vec::new(); @@ -235,13 +196,11 @@ mod receipt_storage_adapter_unit_test { &wallet, ) .unwrap(), - i as u64, // Will use that to check the IDs - &checks, ), )); } - let mut receipts_truncated = receipts_orig.clone(); + let mut receipts_truncated = receipts_orig; // shuffle the input receipts receipts_truncated.shuffle(&mut thread_rng()); @@ -259,9 +218,6 @@ mod receipt_storage_adapter_unit_test { elem_trun.1.signed_receipt().message.timestamp_ns, *expected_timestamp ); - - // Check that the IDs are fine - assert_eq!(elem_trun.0, elem_trun.1.query_id()); } } } diff --git a/tap_core/src/checks/mod.rs b/tap_core/src/checks/mod.rs index 709708e..7a50c96 100644 --- a/tap_core/src/checks/mod.rs +++ b/tap_core/src/checks/mod.rs @@ -1,64 +1,44 @@ // Copyright 2023-, Semiotic AI, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::tap_receipt::{Checking, ReceiptError, ReceiptResult, ReceiptWithState}; -use serde::{Deserialize, Serialize}; -use std::sync::{Arc, RwLock}; +use crate::tap_receipt::{Checking, ReceiptError, ReceiptWithState}; +use std::{ + ops::Deref, + sync::{Arc, RwLock}, +}; -pub type ReceiptCheck = Arc; +pub type ReceiptCheck = Arc; -#[derive(Serialize, Deserialize, Clone, Debug)] -pub enum CheckingChecks { - Pending(ReceiptCheck), - Executed(ReceiptResult<()>), -} +pub type CheckResult = anyhow::Result<()>; -impl CheckingChecks { - pub fn new(check: ReceiptCheck) -> Self { - Self::Pending(check) - } +pub struct Checks(Arc<[ReceiptCheck]>); - pub async fn execute(self, receipt: &ReceiptWithState) -> Self { - match self { - Self::Pending(check) => { - let result = check.check(receipt).await; - Self::Executed(result) - } - Self::Executed(_) => self, - } +impl Checks { + pub fn new(checks: Vec) -> Self { + Self(checks.into()) } +} - pub fn is_failed(&self) -> bool { - matches!(self, Self::Executed(Err(_))) - } +impl Deref for Checks { + type Target = [ReceiptCheck]; - pub fn is_pending(&self) -> bool { - matches!(self, Self::Pending(_)) + fn deref(&self) -> &Self::Target { + self.0.as_ref() } +} - pub fn is_complete(&self) -> bool { - matches!(self, Self::Executed(_)) - } +#[async_trait::async_trait] +pub trait Check { + async fn check(&self, receipt: &ReceiptWithState) -> CheckResult; } #[async_trait::async_trait] -#[typetag::serde(tag = "type")] -pub trait Check: std::fmt::Debug + Send + Sync { - async fn check(&self, receipt: &ReceiptWithState) -> ReceiptResult<()>; - - async fn check_batch(&self, receipts: &[ReceiptWithState]) -> Vec> { - let mut results = Vec::new(); - for receipt in receipts { - let result = self.check(receipt).await; - results.push(result); - } - results - } +pub trait CheckBatch { + async fn check_batch(&self, receipts: &[ReceiptWithState]) -> Vec; } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug)] pub struct TimestampCheck { - #[serde(skip)] min_timestamp_ns: RwLock, } @@ -75,16 +55,16 @@ impl TimestampCheck { } #[async_trait::async_trait] -#[typetag::serde] impl Check for TimestampCheck { - async fn check(&self, receipt: &ReceiptWithState) -> ReceiptResult<()> { + async fn check(&self, receipt: &ReceiptWithState) -> CheckResult { let min_timestamp_ns = *self.min_timestamp_ns.read().unwrap(); let signed_receipt = receipt.signed_receipt(); if signed_receipt.message.timestamp_ns <= min_timestamp_ns { return Err(ReceiptError::InvalidTimestamp { received_timestamp: signed_receipt.message.timestamp_ns, timestamp_min: min_timestamp_ns, - }); + } + .into()); } Ok(()) } @@ -94,24 +74,20 @@ impl Check for TimestampCheck { pub mod mock { use super::*; - use crate::tap_receipt::ReceivedReceipt; + use crate::eip_712_signed_message::MessageId; use alloy_primitives::Address; use alloy_sol_types::Eip712Domain; - use std::{ - collections::{HashMap, HashSet}, - fmt::Debug, - }; + use std::collections::{HashMap, HashSet}; pub fn get_full_list_of_checks( domain_separator: Eip712Domain, valid_signers: HashSet
, allocation_ids: Arc>>, - receipt_storage: Arc>>, - query_appraisals: Arc>>, + _query_appraisals: Arc>>, ) -> Vec { vec![ - Arc::new(UniqueCheck { receipt_storage }), - Arc::new(ValueCheck { query_appraisals }), + // Arc::new(UniqueCheck ), + // Arc::new(ValueCheck { query_appraisals }), Arc::new(AllocationIdCheck { allocation_ids }), Arc::new(SignatureCheck { domain_separator, @@ -120,37 +96,11 @@ pub mod mock { ] } - #[derive(Serialize, Deserialize)] - struct UniqueCheck { - #[serde(skip)] - receipt_storage: Arc>>, - } - impl Debug for UniqueCheck { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "UniqueCheck") - } - } + struct UniqueCheck; #[async_trait::async_trait] - #[typetag::serde] - impl Check for UniqueCheck { - async fn check(&self, receipt: &ReceiptWithState) -> ReceiptResult<()> { - let receipt_storage = self.receipt_storage.read().unwrap(); - // let receipt_id = receipt. - let unique = receipt_storage - .iter() - .all(|(_stored_receipt_id, stored_receipt)| { - stored_receipt.signed_receipt().message != receipt.signed_receipt().message - || stored_receipt.query_id() == receipt.query_id - }); - - unique.then_some(()).ok_or(ReceiptError::NonUniqueReceipt) - } - - async fn check_batch( - &self, - receipts: &[ReceiptWithState], - ) -> Vec> { + impl CheckBatch for UniqueCheck { + async fn check_batch(&self, receipts: &[ReceiptWithState]) -> Vec { let mut signatures: HashSet = HashSet::new(); let mut results = Vec::new(); @@ -159,53 +109,48 @@ pub mod mock { if signatures.insert(signature) { results.push(Ok(())); } else { - results.push(Err(ReceiptError::NonUniqueReceipt)); + results.push(Err(ReceiptError::NonUniqueReceipt.into())); } } results } } - #[derive(Debug, Serialize, Deserialize)] struct ValueCheck { - #[serde(skip)] - query_appraisals: Arc>>, + query_appraisals: Arc>>, } #[async_trait::async_trait] - #[typetag::serde] impl Check for ValueCheck { - async fn check(&self, receipt: &ReceiptWithState) -> ReceiptResult<()> { - let query_id = receipt.query_id; + async fn check(&self, receipt: &ReceiptWithState) -> CheckResult { let value = receipt.signed_receipt().message.value; let query_appraisals = self.query_appraisals.read().unwrap(); + let hash = receipt.signed_receipt().unique_hash(); let appraised_value = query_appraisals - .get(&query_id) - .ok_or(ReceiptError::CheckFailedToComplete { - source_error_message: "Could not find query_appraisals".into(), - })?; + .get(&hash) + .ok_or(ReceiptError::CheckFailedToComplete( + "Could not find query_appraisals".into(), + ))?; if value != *appraised_value { Err(ReceiptError::InvalidValue { received_value: value, - }) + } + .into()) } else { Ok(()) } } } - #[derive(Debug, Serialize, Deserialize)] struct AllocationIdCheck { - #[serde(skip)] allocation_ids: Arc>>, } #[async_trait::async_trait] - #[typetag::serde] impl Check for AllocationIdCheck { - async fn check(&self, receipt: &ReceiptWithState) -> ReceiptResult<()> { + async fn check(&self, receipt: &ReceiptWithState) -> CheckResult { let received_allocation_id = receipt.signed_receipt().message.allocation_id; if self .allocation_ids @@ -217,21 +162,20 @@ pub mod mock { } else { Err(ReceiptError::InvalidAllocationID { received_allocation_id, - }) + } + .into()) } } } - #[derive(Debug, Serialize, Deserialize)] struct SignatureCheck { domain_separator: Eip712Domain, valid_signers: HashSet
, } #[async_trait::async_trait] - #[typetag::serde] impl Check for SignatureCheck { - async fn check(&self, receipt: &ReceiptWithState) -> ReceiptResult<()> { + async fn check(&self, receipt: &ReceiptWithState) -> CheckResult { let recovered_address = receipt .signed_receipt() .recover_signer(&self.domain_separator) @@ -241,7 +185,8 @@ pub mod mock { if !self.valid_signers.contains(&recovered_address) { Err(ReceiptError::InvalidSignature { source_error_message: "Invalid signer".to_string(), - }) + } + .into()) } else { Ok(()) } diff --git a/tap_core/src/eip_712_signed_message.rs b/tap_core/src/eip_712_signed_message.rs index b68c4aa..e6b471c 100644 --- a/tap_core/src/eip_712_signed_message.rs +++ b/tap_core/src/eip_712_signed_message.rs @@ -19,6 +19,9 @@ pub struct EIP712SignedMessage { pub signature: Signature, } +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct MessageId(pub [u8; 32]); + impl EIP712SignedMessage { /// creates signed message with signed EIP712 hash of `message` using `signing_wallet` pub fn new( @@ -56,4 +59,9 @@ impl EIP712SignedMessage { .verify(recovery_message_hash, expected_address)?; Ok(()) } + + /// Use this a simple key for testing + pub fn unique_hash(&self) -> MessageId { + MessageId(self.message.eip712_hash_struct().into()) + } } diff --git a/tap_core/src/error.rs b/tap_core/src/error.rs index 566247b..26caea7 100644 --- a/tap_core/src/error.rs +++ b/tap_core/src/error.rs @@ -4,7 +4,7 @@ //! Module containing Error type and Result typedef //! -use crate::receipt_aggregate_voucher::ReceiptAggregateVoucher; +use crate::{receipt_aggregate_voucher::ReceiptAggregateVoucher, tap_receipt::ReceiptError}; use alloy_primitives::Address; use ethers::signers::WalletError; use ethers_core::types::SignatureError; @@ -55,6 +55,12 @@ pub enum Error { min_timestamp_ns: u64, max_timestamp_ns: u64, }, + + #[error("Receipt error: {0}")] + ReceiptError(#[from] ReceiptError), + + #[error("Failed to check the signer: {0}")] + FailedToVerifySigner(String), } pub type Result = StdResult; diff --git a/tap_core/src/tap_manager/manager.rs b/tap_core/src/tap_manager/manager.rs index 1f959a6..c332e96 100644 --- a/tap_core/src/tap_manager/manager.rs +++ b/tap_core/src/tap_manager/manager.rs @@ -1,18 +1,16 @@ // Copyright 2023-, Semiotic AI, Inc. // SPDX-License-Identifier: Apache-2.0 -use alloy_primitives::Address; use alloy_sol_types::Eip712Domain; -use futures::Future; use super::{RAVRequest, SignedRAV, SignedReceipt}; use crate::{ adapters::{ escrow_adapter::EscrowAdapter, rav_storage_adapter::{RAVRead, RAVStore}, - receipt_storage_adapter::{ReceiptRead, ReceiptStore}, + receipt_storage_adapter::{ReceiptDelete, ReceiptRead, ReceiptStore}, }, - checks::ReceiptCheck, + checks::Checks, receipt_aggregate_voucher::ReceiptAggregateVoucher, tap_receipt::{ CategorizedReceiptsWithState, Failed, ReceiptAuditor, ReceiptWithId, ReceiptWithState, @@ -24,8 +22,10 @@ use crate::{ pub struct Manager { /// Executor that implements adapters executor: E, + /// Checks that must be completed for each receipt before being confirmed or denied for rav request - required_checks: Vec, + checks: Checks, + /// Struct responsible for doing checks for receipt. Ownership stays with manager allowing manager /// to update configuration ( like minimum timestamp ). receipt_auditor: ReceiptAuditor, @@ -39,23 +39,19 @@ where /// will complete all `required_checks` before being accepted or declined from RAV. /// `starting_min_timestamp` will be used as min timestamp until the first RAV request is created. /// - pub fn new( - domain_separator: Eip712Domain, - executor: E, - required_checks: Vec, - ) -> Self { + pub fn new(domain_separator: Eip712Domain, executor: E, checks: impl Into) -> Self { let receipt_auditor = ReceiptAuditor::new(domain_separator, executor.clone()); Self { executor, - required_checks, receipt_auditor, + checks: checks.into(), } } } impl Manager where - E: RAVStore, + E: RAVStore + EscrowAdapter, { /// Verify `signed_rav` matches all values on `expected_rav`, and that `signed_rav` has a valid signer. /// @@ -63,18 +59,13 @@ where /// /// Returns [`Error::AdapterError`] if there are any errors while storing RAV /// - pub async fn verify_and_store_rav( + pub async fn verify_and_store_rav( &self, expected_rav: ReceiptAggregateVoucher, signed_rav: SignedRAV, - verify_signer: F, - ) -> std::result::Result<(), Error> - where - F: FnOnce(Address) -> Fut, - Fut: Future>, - { + ) -> std::result::Result<(), Error> { self.receipt_auditor - .check_rav_signature(&signed_rav, verify_signer) + .check_rav_signature(&signed_rav) .await?; if signed_rav.message != expected_rav { @@ -155,7 +146,7 @@ where receipt, receipt_id: _, } = received_receipt; - let receipt = receipt.finalize_receipt_checks().await; + let receipt = receipt.finalize_receipt_checks(&self.checks).await; match receipt { Ok(checked) => awaiting_reserve_receipts.push(checked), @@ -242,7 +233,7 @@ where impl Manager where - E: ReceiptStore + RAVRead, + E: ReceiptDelete + RAVRead, { /// Removes obsolete receipts from storage. Obsolete receipts are receipts that are older than the last RAV, and /// therefore already aggregated into the RAV. @@ -271,7 +262,7 @@ where impl Manager where - E: ReceiptStore + EscrowAdapter, + E: ReceiptStore, { /// Runs `initial_checks` on `signed_receipt` for initial verification, then stores received receipt. /// The provided `query_id` will be used as a key when chaecking query appraisal. @@ -287,37 +278,17 @@ where pub async fn verify_and_store_receipt( &self, signed_receipt: SignedReceipt, - query_id: u64, - initial_checks: &[ReceiptCheck], ) -> std::result::Result<(), Error> { - let mut received_receipt = - ReceivedReceipt::new(signed_receipt, query_id, &self.required_checks); - // The receipt id is needed before `perform_checks` can be called on received receipt - // since it is needed for uniqueness check. Since the receipt_id is defined when it is stored - // This function first stores it, then checks it, then updates what was stored. - - let receipt_id = self - .executor - .store_receipt(received_receipt.clone()) - .await - .map_err(|err| Error::AdapterError { - source_error: anyhow::Error::new(err), - })?; + let mut received_receipt = ReceivedReceipt::new(signed_receipt); + // perform checks if let ReceivedReceipt::Checking(received_receipt) = &mut received_receipt { - received_receipt - .perform_checks( - initial_checks - .iter() - .map(|check| check.typetag_name()) - .collect::>() - .as_slice(), - ) - .await; + received_receipt.perform_checks(&self.checks).await?; } + // store the receipt self.executor - .update_receipt_by_id(receipt_id, received_receipt) + .store_receipt(received_receipt) .await .map_err(|err| Error::AdapterError { source_error: anyhow::Error::new(err), diff --git a/tap_core/src/tap_manager/rav_request.rs b/tap_core/src/tap_manager/rav_request.rs index 4b59b5f..a279488 100644 --- a/tap_core/src/tap_manager/rav_request.rs +++ b/tap_core/src/tap_manager/rav_request.rs @@ -9,8 +9,7 @@ use crate::{ tap_receipt::{Failed, ReceiptWithState}, }; -#[derive(Debug, Serialize, Deserialize, Clone)] -#[serde(bound(deserialize = "'de: 'static"))] +#[derive(Debug, Serialize, Deserialize)] pub struct RAVRequest { pub valid_receipts: Vec, pub previous_rav: Option, diff --git a/tap_core/src/tap_manager/test/manager_test.rs b/tap_core/src/tap_manager/test/manager_test.rs index c231559..d7b3c87 100644 --- a/tap_core/src/tap_manager/test/manager_test.rs +++ b/tap_core/src/tap_manager/test/manager_test.rs @@ -2,7 +2,6 @@ // SPDX-License-Identifier: Apache-2.0 use std::{ collections::HashMap, - ops::Range, str::FromStr, sync::{Arc, RwLock}, }; @@ -18,14 +17,12 @@ use crate::{ executor_mock::{EscrowStorage, ExecutorMock, QueryAppraisals}, receipt_storage_adapter::ReceiptRead, }, - checks::{mock::get_full_list_of_checks, ReceiptCheck, TimestampCheck}, + checks::{mock::get_full_list_of_checks, Checks, TimestampCheck}, eip_712_signed_message::EIP712SignedMessage, get_current_timestamp_u64_ns, tap_eip712_domain, tap_receipt::Receipt, }; -const LENGTH_OF_CHECKS: usize = 4; - #[fixture] fn keys() -> (LocalWallet, Address) { let wallet: LocalWallet = MnemonicBuilder::::default() @@ -68,7 +65,7 @@ struct ExecutorFixture { executor: ExecutorMock, escrow_storage: EscrowStorage, query_appraisals: QueryAppraisals, - checks: Vec, + checks: Checks, } #[fixture] @@ -76,6 +73,7 @@ fn executor_mock( domain_separator: Eip712Domain, allocation_ids: Vec
, sender_ids: Vec
, + keys: (LocalWallet, Address), ) -> ExecutorFixture { let escrow_storage = Arc::new(RwLock::new(HashMap::new())); let rav_storage = Arc::new(RwLock::new(None)); @@ -87,16 +85,17 @@ fn executor_mock( receipt_storage.clone(), escrow_storage.clone(), timestamp_check.clone(), - ); + ) + .with_sender_address(keys.1); let mut checks = get_full_list_of_checks( domain_separator, sender_ids.iter().cloned().collect(), Arc::new(RwLock::new(allocation_ids.iter().cloned().collect())), - receipt_storage, query_appraisals.clone(), ); checks.push(timestamp_check); + let checks = Checks::new(checks); ExecutorFixture { executor, @@ -107,15 +106,11 @@ fn executor_mock( } #[rstest] -#[case::full_checks(0..LENGTH_OF_CHECKS)] -#[case::partial_checks(0..2)] -#[case::no_checks(0..0)] #[tokio::test] async fn manager_verify_and_store_varying_initial_checks( keys: (LocalWallet, Address), allocation_ids: Vec
, domain_separator: Eip712Domain, - #[case] range: Range, executor_mock: ExecutorFixture, ) { let ExecutorFixture { @@ -125,10 +120,8 @@ async fn manager_verify_and_store_varying_initial_checks( escrow_storage, .. } = executor_mock; - // give receipt 5 second variance for min start time - let manager = Manager::new(domain_separator.clone(), executor, checks.clone()); + let manager = Manager::new(domain_separator.clone(), executor, checks); - let query_id = 1; let value = 20u128; let signed_receipt = EIP712SignedMessage::new( &domain_separator, @@ -136,25 +129,22 @@ async fn manager_verify_and_store_varying_initial_checks( &keys.0, ) .unwrap(); + let query_id = signed_receipt.unique_hash(); query_appraisals.write().unwrap().insert(query_id, value); escrow_storage.write().unwrap().insert(keys.1, 999999); assert!(manager - .verify_and_store_receipt(signed_receipt, query_id, &checks[range]) + .verify_and_store_receipt(signed_receipt) .await .is_ok()); } #[rstest] -#[case::full_checks(0..LENGTH_OF_CHECKS)] -#[case::partial_checks(0..2)] -#[case::no_checks(0..0)] #[tokio::test] async fn manager_create_rav_request_all_valid_receipts( keys: (LocalWallet, Address), allocation_ids: Vec
, domain_separator: Eip712Domain, - #[case] range: Range, executor_mock: ExecutorFixture, ) { let ExecutorFixture { @@ -164,13 +154,11 @@ async fn manager_create_rav_request_all_valid_receipts( escrow_storage, .. } = executor_mock; - let initial_checks = &checks[range]; - - let manager = Manager::new(domain_separator.clone(), executor, checks.clone()); + let manager = Manager::new(domain_separator.clone(), executor, checks); escrow_storage.write().unwrap().insert(keys.1, 999999); let mut stored_signed_receipts = Vec::new(); - for query_id in 0..10 { + for _ in 0..10 { let value = 20u128; let signed_receipt = EIP712SignedMessage::new( &domain_separator, @@ -178,15 +166,15 @@ async fn manager_create_rav_request_all_valid_receipts( &keys.0, ) .unwrap(); + let query_id = signed_receipt.unique_hash(); stored_signed_receipts.push(signed_receipt.clone()); query_appraisals.write().unwrap().insert(query_id, value); assert!(manager - .verify_and_store_receipt(signed_receipt, query_id, initial_checks) + .verify_and_store_receipt(signed_receipt) .await .is_ok()); } let rav_request_result = manager.create_rav_request(0, None).await; - println!("{:?}", rav_request_result); assert!(rav_request_result.is_ok()); let rav_request = rav_request_result.unwrap(); @@ -202,25 +190,17 @@ async fn manager_create_rav_request_all_valid_receipts( EIP712SignedMessage::new(&domain_separator, rav_request.expected_rav.clone(), &keys.0) .unwrap(); assert!(manager - .verify_and_store_rav( - rav_request.expected_rav, - signed_rav, - |address: Address| async move { Ok(keys.1 == address) } - ) + .verify_and_store_rav(rav_request.expected_rav, signed_rav) .await .is_ok()); } #[rstest] -#[case::full_checks(0..LENGTH_OF_CHECKS)] -#[case::partial_checks(0..2)] -#[case::no_checks(0..0)] #[tokio::test] async fn manager_create_multiple_rav_requests_all_valid_receipts( keys: (LocalWallet, Address), allocation_ids: Vec
, domain_separator: Eip712Domain, - #[case] range: Range, executor_mock: ExecutorFixture, ) { let ExecutorFixture { @@ -230,16 +210,14 @@ async fn manager_create_multiple_rav_requests_all_valid_receipts( escrow_storage, .. } = executor_mock; - let initial_checks = &checks[range]; - // give receipt 5 second variance for min start time - let manager = Manager::new(domain_separator.clone(), executor, checks.clone()); + let manager = Manager::new(domain_separator.clone(), executor, checks); escrow_storage.write().unwrap().insert(keys.1, 999999); let mut stored_signed_receipts = Vec::new(); let mut expected_accumulated_value = 0; - for query_id in 0..10 { + for _ in 0..10 { let value = 20u128; let signed_receipt = EIP712SignedMessage::new( &domain_separator, @@ -247,10 +225,11 @@ async fn manager_create_multiple_rav_requests_all_valid_receipts( &keys.0, ) .unwrap(); + let query_id = signed_receipt.unique_hash(); stored_signed_receipts.push(signed_receipt.clone()); query_appraisals.write().unwrap().insert(query_id, value); assert!(manager - .verify_and_store_receipt(signed_receipt, query_id, initial_checks) + .verify_and_store_receipt(signed_receipt) .await .is_ok()); expected_accumulated_value += value; @@ -278,16 +257,12 @@ async fn manager_create_multiple_rav_requests_all_valid_receipts( EIP712SignedMessage::new(&domain_separator, rav_request.expected_rav.clone(), &keys.0) .unwrap(); assert!(manager - .verify_and_store_rav( - rav_request.expected_rav, - signed_rav, - |address: Address| async move { Ok(keys.1 == address) } - ) + .verify_and_store_rav(rav_request.expected_rav, signed_rav) .await .is_ok()); stored_signed_receipts.clear(); - for query_id in 10..20 { + for _ in 10..20 { let value = 20u128; let signed_receipt = EIP712SignedMessage::new( &domain_separator, @@ -295,10 +270,12 @@ async fn manager_create_multiple_rav_requests_all_valid_receipts( &keys.0, ) .unwrap(); + + let query_id = signed_receipt.unique_hash(); stored_signed_receipts.push(signed_receipt.clone()); query_appraisals.write().unwrap().insert(query_id, value); assert!(manager - .verify_and_store_receipt(signed_receipt, query_id, initial_checks) + .verify_and_store_receipt(signed_receipt) .await .is_ok()); expected_accumulated_value += value; @@ -326,11 +303,7 @@ async fn manager_create_multiple_rav_requests_all_valid_receipts( EIP712SignedMessage::new(&domain_separator, rav_request.expected_rav.clone(), &keys.0) .unwrap(); assert!(manager - .verify_and_store_rav( - rav_request.expected_rav, - signed_rav, - |address: Address| async move { Ok(keys.1 == address) } - ) + .verify_and_store_rav(rav_request.expected_rav, signed_rav) .await .is_ok()); } @@ -341,7 +314,6 @@ async fn manager_create_multiple_rav_requests_all_valid_receipts_consecutive_tim keys: (LocalWallet, Address), allocation_ids: Vec
, domain_separator: Eip712Domain, - #[values(0..0, 0..2, 0..LENGTH_OF_CHECKS)] range: Range, #[values(true, false)] remove_old_receipts: bool, executor_mock: ExecutorFixture, ) { @@ -352,11 +324,9 @@ async fn manager_create_multiple_rav_requests_all_valid_receipts_consecutive_tim escrow_storage, .. } = executor_mock; - let initial_checks = &checks[range]; - // give receipt 5 second variance for min start time let starting_min_timestamp = get_current_timestamp_u64_ns().unwrap() - 500000000; - let manager = Manager::new(domain_separator.clone(), executor, checks.clone()); + let manager = Manager::new(domain_separator.clone(), executor, checks); escrow_storage.write().unwrap().insert(keys.1, 999999); @@ -367,10 +337,12 @@ async fn manager_create_multiple_rav_requests_all_valid_receipts_consecutive_tim let mut receipt = Receipt::new(allocation_ids[0], value).unwrap(); receipt.timestamp_ns = starting_min_timestamp + query_id + 1; let signed_receipt = EIP712SignedMessage::new(&domain_separator, receipt, &keys.0).unwrap(); + + let query_id = signed_receipt.unique_hash(); stored_signed_receipts.push(signed_receipt.clone()); query_appraisals.write().unwrap().insert(query_id, value); assert!(manager - .verify_and_store_receipt(signed_receipt, query_id, initial_checks) + .verify_and_store_receipt(signed_receipt) .await .is_ok()); expected_accumulated_value += value; @@ -408,11 +380,7 @@ async fn manager_create_multiple_rav_requests_all_valid_receipts_consecutive_tim ) .unwrap(); assert!(manager - .verify_and_store_rav( - rav_request_1.expected_rav, - signed_rav_1, - |address: Address| async move { Ok(keys.1 == address) } - ) + .verify_and_store_rav(rav_request_1.expected_rav, signed_rav_1) .await .is_ok()); @@ -422,10 +390,11 @@ async fn manager_create_multiple_rav_requests_all_valid_receipts_consecutive_tim let mut receipt = Receipt::new(allocation_ids[0], value).unwrap(); receipt.timestamp_ns = starting_min_timestamp + query_id + 1; let signed_receipt = EIP712SignedMessage::new(&domain_separator, receipt, &keys.0).unwrap(); + let query_id = signed_receipt.unique_hash(); stored_signed_receipts.push(signed_receipt.clone()); query_appraisals.write().unwrap().insert(query_id, value); assert!(manager - .verify_and_store_receipt(signed_receipt, query_id, initial_checks) + .verify_and_store_receipt(signed_receipt) .await .is_ok()); expected_accumulated_value += value; @@ -472,11 +441,7 @@ async fn manager_create_multiple_rav_requests_all_valid_receipts_consecutive_tim ) .unwrap(); assert!(manager - .verify_and_store_rav( - rav_request_2.expected_rav, - signed_rav_2, - |address: Address| async move { Ok(keys.1 == address) } - ) + .verify_and_store_rav(rav_request_2.expected_rav, signed_rav_2) .await .is_ok()); } diff --git a/tap_core/src/tap_receipt/mod.rs b/tap_core/src/tap_receipt/mod.rs index 4261a21..7503ec8 100644 --- a/tap_core/src/tap_receipt/mod.rs +++ b/tap_core/src/tap_receipt/mod.rs @@ -4,7 +4,6 @@ mod receipt; mod receipt_auditor; mod received_receipt; -use std::collections::HashMap; use alloy_primitives::Address; pub use receipt::Receipt; @@ -17,8 +16,6 @@ pub use received_receipt::{ use serde::{Deserialize, Serialize}; use thiserror::Error; -use crate::checks::CheckingChecks; - #[derive(Error, Debug, Clone, Serialize, Deserialize)] pub enum ReceiptError { #[error("invalid allocation ID: {received_allocation_id}")] @@ -36,9 +33,8 @@ pub enum ReceiptError { NonUniqueReceipt, #[error("Attempt to collect escrow failed")] SubtractEscrowFailed, - #[error("Issue encountered while performing check: {source_error_message}")] - CheckFailedToComplete { source_error_message: String }, + #[error("Issue encountered while performing check: {0}")] + CheckFailedToComplete(String), } pub type ReceiptResult = Result; -pub type ReceiptCheckResults = HashMap<&'static str, CheckingChecks>; diff --git a/tap_core/src/tap_receipt/receipt_auditor.rs b/tap_core/src/tap_receipt/receipt_auditor.rs index 34f1057..f3c0683 100644 --- a/tap_core/src/tap_receipt/receipt_auditor.rs +++ b/tap_core/src/tap_receipt/receipt_auditor.rs @@ -1,9 +1,7 @@ // Copyright 2023-, Semiotic AI, Inc. // SPDX-License-Identifier: Apache-2.0 -use alloy_primitives::Address; use alloy_sol_types::Eip712Domain; -use futures::Future; use crate::{ adapters::escrow_adapter::EscrowAdapter, @@ -26,25 +24,6 @@ impl ReceiptAuditor { executor, } } - - pub async fn check_rav_signature( - &self, - signed_rav: &SignedRAV, - verify_signer: F, - ) -> Result<(), Error> - where - F: FnOnce(Address) -> Fut, - Fut: Future>, - { - let recovered_address = signed_rav.recover_signer(&self.domain_separator)?; - if verify_signer(recovered_address).await? { - Ok(()) - } else { - Err(Error::InvalidRecoveredSigner { - address: recovered_address, - }) - } - } } impl ReceiptAuditor @@ -61,6 +40,7 @@ where .map_err(|err| ReceiptError::InvalidSignature { source_error_message: err.to_string(), })?; + if self .executor .subtract_escrow(receipt_signer_address, signed_receipt.message.value) @@ -72,4 +52,20 @@ where Ok(()) } + + pub async fn check_rav_signature(&self, signed_rav: &SignedRAV) -> Result<(), Error> { + let recovered_address = signed_rav.recover_signer(&self.domain_separator)?; + if self + .executor + .verify_signer(recovered_address) + .await + .map_err(|e| Error::FailedToVerifySigner(e.to_string()))? + { + Ok(()) + } else { + Err(Error::InvalidRecoveredSigner { + address: recovered_address, + }) + } + } } diff --git a/tap_core/src/tap_receipt/received_receipt.rs b/tap_core/src/tap_receipt/received_receipt.rs index 8a69d18..6545493 100644 --- a/tap_core/src/tap_receipt/received_receipt.rs +++ b/tap_core/src/tap_receipt/received_receipt.rs @@ -13,51 +13,28 @@ //! This module is useful for managing and tracking the state of received receipts, as well as //! their progress through various checks and stages of inclusion in RAV requests and received RAVs. -use std::collections::HashMap; - use serde::{Deserialize, Serialize}; -use super::{receipt_auditor::ReceiptAuditor, Receipt, ReceiptCheckResults}; +use super::{receipt_auditor::ReceiptAuditor, Receipt, ReceiptError, ReceiptResult}; use crate::{ adapters::{escrow_adapter::EscrowAdapter, receipt_storage_adapter::StoredReceipt}, - checks::{CheckingChecks, ReceiptCheck}, + checks::ReceiptCheck, eip_712_signed_message::EIP712SignedMessage, }; -#[derive(Debug, Serialize, Deserialize, Clone)] -#[serde(bound(deserialize = "'de: 'static"))] -pub struct Checking { - /// A list of checks to be completed for the receipt, along with their current result - pub(crate) checks: ReceiptCheckResults, -} +#[derive(Debug, Clone)] +pub struct Checking; -#[derive(Debug, Serialize, Deserialize, Clone)] -#[serde(bound(deserialize = "'de: 'static"))] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct Failed { /// A list of checks to be completed for the receipt, along with their current result - pub(crate) checks: ReceiptCheckResults, + pub error: ReceiptError, } -impl From for Failed { - fn from(checking: Checking) -> Self { - Self { - checks: checking.checks, - } - } -} - -#[derive(Debug, Serialize, Deserialize, Clone)] +#[derive(Debug, Clone)] pub struct AwaitingReserve; -impl From for Failed { - fn from(_checking: AwaitingReserve) -> Self { - Self { - checks: HashMap::new(), - } - } -} - -#[derive(Debug, Serialize, Deserialize, Clone)] +#[derive(Debug, Clone)] pub struct Reserved; pub trait ReceiptState {} @@ -170,17 +147,10 @@ impl From> for CategorizedReceiptsWithState { impl ReceivedReceipt { /// Initialize a new received receipt with provided signed receipt, query id, and checks - pub fn new( - signed_receipt: EIP712SignedMessage, - query_id: u64, - required_checks: &[ReceiptCheck], - ) -> Self { - let checks = ReceiptWithState::get_empty_required_checks_hashmap(required_checks); - + pub fn new(signed_receipt: EIP712SignedMessage) -> Self { let received_receipt = ReceiptWithState { signed_receipt, - query_id, - state: Checking { checks }, + _state: Checking, }; received_receipt.into() } @@ -192,15 +162,6 @@ impl ReceivedReceipt { | ReceivedReceipt::Reserved(ReceiptWithState { signed_receipt, .. }) => signed_receipt, } } - - pub fn query_id(&self) -> u64 { - match self { - ReceivedReceipt::AwaitingReserve(ReceiptWithState { query_id, .. }) - | ReceivedReceipt::Checking(ReceiptWithState { query_id, .. }) - | ReceivedReceipt::Failed(ReceiptWithState { query_id, .. }) - | ReceivedReceipt::Reserved(ReceiptWithState { query_id, .. }) => *query_id, - } - } } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -211,10 +172,8 @@ where { /// An EIP712 signed receipt message pub(crate) signed_receipt: EIP712SignedMessage, - /// A unique identifier for the query associated with the receipt - pub(crate) query_id: u64, /// The current state of the receipt (e.g., received, checking, failed, accepted, etc.) - pub(crate) state: S, + pub(crate) _state: S, } impl ReceiptWithState { @@ -227,32 +186,12 @@ impl ReceiptWithState { { match auditor.check_and_reserve_escrow(&self).await { Ok(_) => Ok(self.perform_state_changes(Reserved)), - Err(_) => Err(self.perform_state_changes_into()), + Err(e) => Err(self.perform_state_error(e)), } } } impl ReceiptWithState { - /// Completes a single *incomplete* check and stores the result, *if the check already has a result it is skipped.* - /// - /// Returns `Err` only if unable to complete the check, returns `Ok` if the check was completed (*Important:* this is not the result of the check, just the result of _completing_ the check) - /// - /// # Errors - /// - /// Returns [`Error::InvalidStateForRequestedAction`] if the requested check cannot be comleted in the receipts current internal state. All other checks must be complete before `CheckAndReserveEscrow`. - /// - /// Returns [`Error::InvalidCheckError] if requested error in not a required check (list of required checks provided by user on construction) - /// - pub async fn perform_check(&mut self, check_name: &'static str) { - // Only perform check if it is incomplete - // Don't check if already failed - let check = self.state.checks.remove(check_name); - if let Some(check) = check { - let result = check.execute(self).await; - self.state.checks.insert(check_name, result); - } - } - /// Completes a list of *incomplete* check and stores the result, if the check already has a result it is skipped /// /// Returns `Err` only if unable to complete a check, returns `Ok` if the checks were completed (*Important:* this is not the result of the check, just the result of _completing_ the check) @@ -263,74 +202,44 @@ impl ReceiptWithState { /// /// Returns [`Error::InvalidCheckError] if requested error in not a required check (list of required checks provided by user on construction) /// - pub async fn perform_checks(&mut self, checks: &[&'static str]) { + pub async fn perform_checks(&mut self, checks: &[ReceiptCheck]) -> ReceiptResult<()> { for check in checks { - self.perform_check(check).await; + // return early on an error + check + .check(self) + .await + .map_err(|e| ReceiptError::CheckFailedToComplete(e.to_string()))?; } + Ok(()) } /// Completes all remaining checks and stores the results /// /// Returns `Err` only if unable to complete a check, returns `Ok` if no check failed to complete (*Important:* this is not the result of the check, just the result of _completing_ the check) /// - pub async fn finalize_receipt_checks(mut self) -> ResultReceipt { - let incomplete_checks = self.incomplete_checks(); - - self.perform_checks(incomplete_checks.as_slice()).await; - - if self.any_check_resulted_in_error() { - let failed = self.perform_state_changes_into(); - Err(failed) + pub async fn finalize_receipt_checks( + mut self, + checks: &[ReceiptCheck], + ) -> ResultReceipt { + let all_checks_passed = self.perform_checks(checks).await; + + if let Err(e) = all_checks_passed { + Err(self.perform_state_error(e)) } else { let checked = self.perform_state_changes(AwaitingReserve); Ok(checked) } } - - /// Returns all checks that have not been completed - pub(crate) fn incomplete_checks(&self) -> Vec<&'static str> { - let incomplete_checks = self - .state - .checks - .iter() - .filter_map(|(check, result)| { - if result.is_complete() { - return None; - } - Some(*check) - }) - .collect(); - incomplete_checks - } - - fn any_check_resulted_in_error(&self) -> bool { - self.state - .checks - .iter() - .any(|(_, status)| status.is_failed()) - } - - fn get_empty_required_checks_hashmap(required_checks: &[ReceiptCheck]) -> ReceiptCheckResults { - required_checks - .iter() - .map(|check| (check.typetag_name(), CheckingChecks::Pending(check.clone()))) - .collect() - } } impl ReceiptWithState where S: ReceiptState, { - fn perform_state_changes_into(self) -> ReceiptWithState - where - T: ReceiptState, - S: Into, - { + fn perform_state_error(self, error: ReceiptError) -> ReceiptWithState { ReceiptWithState { signed_receipt: self.signed_receipt, - query_id: self.query_id, - state: self.state.into(), + _state: Failed { error }, } } @@ -340,18 +249,13 @@ where { ReceiptWithState { signed_receipt: self.signed_receipt, - query_id: self.query_id, - state: new_state, + _state: new_state, } } pub fn signed_receipt(&self) -> &EIP712SignedMessage { &self.signed_receipt } - - pub fn query_id(&self) -> u64 { - self.query_id - } } #[cfg(test)] pub mod received_receipt_unit_test; diff --git a/tap_core/src/tap_receipt/received_receipt/received_receipt_unit_test.rs b/tap_core/src/tap_receipt/received_receipt/received_receipt_unit_test.rs index 6d1a703..8c74ef6 100644 --- a/tap_core/src/tap_receipt/received_receipt/received_receipt_unit_test.rs +++ b/tap_core/src/tap_receipt/received_receipt/received_receipt_unit_test.rs @@ -17,37 +17,9 @@ use crate::{ checks::{mock::get_full_list_of_checks, ReceiptCheck, TimestampCheck}, eip_712_signed_message::EIP712SignedMessage, tap_eip712_domain, - tap_receipt::{Receipt, ReceiptAuditor, ReceiptCheckResults, ReceivedReceipt}, + tap_receipt::{Receipt, ReceiptAuditor, ReceivedReceipt}, }; -use super::{Checking, ReceiptWithState}; - -impl ReceiptWithState { - fn check_is_complete(&self, check: &str) -> bool { - self.state.checks.get(check).unwrap().is_complete() - } - - fn checking_is_complete(&self) -> bool { - self.state - .checks - .iter() - .all(|(_, status)| status.is_complete()) - } - /// Returns all checks that completed with errors - fn completed_checks_with_errors(&self) -> ReceiptCheckResults { - self.state - .checks - .iter() - .filter_map(|(check, result)| { - if result.is_failed() { - return Some((*check, result.clone())); - } - None - }) - .collect() - } -} - #[fixture] fn keys() -> (LocalWallet, Address) { let wallet: LocalWallet = MnemonicBuilder::::default() @@ -98,6 +70,7 @@ fn executor_mock( domain_separator: Eip712Domain, allocation_ids: Vec
, sender_ids: Vec
, + keys: (LocalWallet, Address), ) -> ExecutorFixture { let escrow_storage = Arc::new(RwLock::new(HashMap::new())); let rav_storage = Arc::new(RwLock::new(None)); @@ -110,12 +83,12 @@ fn executor_mock( receipt_storage.clone(), escrow_storage.clone(), timestamp_check.clone(), - ); + ) + .with_sender_address(keys.1); let mut checks = get_full_list_of_checks( domain_separator, sender_ids.iter().cloned().collect(), Arc::new(RwLock::new(allocation_ids.iter().cloned().collect())), - receipt_storage, query_appraisals.clone(), ); checks.push(timestamp_check); @@ -134,27 +107,20 @@ async fn initialization_valid_receipt( keys: (LocalWallet, Address), allocation_ids: Vec
, domain_separator: Eip712Domain, - executor_mock: ExecutorFixture, ) { - let ExecutorFixture { checks, .. } = executor_mock; - let signed_receipt = EIP712SignedMessage::new( &domain_separator, Receipt::new(allocation_ids[0], 10).unwrap(), &keys.0, ) .unwrap(); - let query_id = 1; - let received_receipt = ReceivedReceipt::new(signed_receipt, query_id, &checks); + let received_receipt = ReceivedReceipt::new(signed_receipt); - let received_receipt = match received_receipt { + match received_receipt { ReceivedReceipt::Checking(checking) => checking, _ => panic!("ReceivedReceipt should be in Checking state"), }; - - assert!(received_receipt.completed_checks_with_errors().is_empty()); - assert!(received_receipt.incomplete_checks().len() == checks.len()); } #[rstest] @@ -180,7 +146,7 @@ async fn partial_then_full_check_valid_receipt( ) .unwrap(); - let query_id = 1; + let query_id = signed_receipt.unique_hash(); // add escrow for sender escrow_storage @@ -193,25 +159,15 @@ async fn partial_then_full_check_valid_receipt( .unwrap() .insert(query_id, query_value); - let received_receipt = ReceivedReceipt::new(signed_receipt, query_id, &checks); + let received_receipt = ReceivedReceipt::new(signed_receipt); let mut received_receipt = match received_receipt { ReceivedReceipt::Checking(checking) => checking, _ => panic!("ReceivedReceipt should be in Checking state"), }; - // perform single arbitrary check - let arbitrary_check_to_perform = checks[0].typetag_name(); - - received_receipt - .perform_check(arbitrary_check_to_perform) - .await; - assert!(received_receipt.check_is_complete(arbitrary_check_to_perform)); - - received_receipt - .perform_checks(&checks.iter().map(|c| c.typetag_name()).collect::>()) - .await; - assert!(received_receipt.checking_is_complete()); + let result = received_receipt.perform_checks(&checks).await; + assert!(result.is_ok()); } #[rstest] @@ -238,8 +194,7 @@ async fn partial_then_finalize_valid_receipt( &keys.0, ) .unwrap(); - - let query_id = 1; + let query_id = signed_receipt.unique_hash(); // add escrow for sender escrow_storage @@ -252,22 +207,14 @@ async fn partial_then_finalize_valid_receipt( .unwrap() .insert(query_id, query_value); - let received_receipt = ReceivedReceipt::new(signed_receipt, query_id, &checks); + let received_receipt = ReceivedReceipt::new(signed_receipt); - let mut received_receipt = match received_receipt { + let received_receipt = match received_receipt { ReceivedReceipt::Checking(checking) => checking, _ => panic!("ReceivedReceipt should be in Checking state"), }; - // perform single arbitrary check - let arbitrary_check_to_perform = checks[0].typetag_name(); - - received_receipt - .perform_check(arbitrary_check_to_perform) - .await; - assert!(received_receipt.check_is_complete(arbitrary_check_to_perform)); - - let awaiting_escrow_receipt = received_receipt.finalize_receipt_checks().await; + let awaiting_escrow_receipt = received_receipt.finalize_receipt_checks(&checks).await; assert!(awaiting_escrow_receipt.is_ok()); let awaiting_escrow_receipt = awaiting_escrow_receipt.unwrap(); @@ -302,7 +249,7 @@ async fn standard_lifetime_valid_receipt( ) .unwrap(); - let query_id = 1; + let query_id = signed_receipt.unique_hash(); // add escrow for sender escrow_storage @@ -315,14 +262,14 @@ async fn standard_lifetime_valid_receipt( .unwrap() .insert(query_id, query_value); - let received_receipt = ReceivedReceipt::new(signed_receipt, query_id, &checks); + let received_receipt = ReceivedReceipt::new(signed_receipt); let received_receipt = match received_receipt { ReceivedReceipt::Checking(checking) => checking, _ => panic!("ReceivedReceipt should be in Checking state"), }; - let awaiting_escrow_receipt = received_receipt.finalize_receipt_checks().await; + let awaiting_escrow_receipt = received_receipt.finalize_receipt_checks(&checks).await; assert!(awaiting_escrow_receipt.is_ok()); let awaiting_escrow_receipt = awaiting_escrow_receipt.unwrap(); diff --git a/tap_integration_tests/tests/indexer_mock/mod.rs b/tap_integration_tests/tests/indexer_mock/mod.rs index cc96559..221c079 100644 --- a/tap_integration_tests/tests/indexer_mock/mod.rs +++ b/tap_integration_tests/tests/indexer_mock/mod.rs @@ -5,7 +5,6 @@ use std::sync::{ Arc, }; -use alloy_primitives::Address; use alloy_sol_types::Eip712Domain; use anyhow::{Error, Result}; use jsonrpsee::{ @@ -23,7 +22,7 @@ use tap_core::{ rav_storage_adapter::{RAVRead, RAVStore}, receipt_storage_adapter::{ReceiptRead, ReceiptStore}, }, - checks::ReceiptCheck, + checks::Checks, tap_manager::{Manager, SignedRAV, SignedReceipt}, }; /// Rpc trait represents a JSON-RPC server that has a single async method `request`. @@ -34,7 +33,6 @@ pub trait Rpc { #[method(name = "request")] async fn request( &self, - request_id: u64, // Unique identifier for the request receipt: SignedReceipt, // Signed receipt associated with the request ) -> Result<(), jsonrpsee::types::ErrorObjectOwned>; // The result of the request, a JSON-RPC error if it fails } @@ -48,11 +46,9 @@ pub trait Rpc { /// aggregator_client is an HTTP client used for making JSON-RPC requests to another server. pub struct RpcManager { manager: Arc>, // Manager object reference counted with an Arc - initial_checks: Vec, // Vector of initial checks to be performed on each request receipt_count: Arc, // Thread-safe atomic counter for receipts threshold: u64, // The count at which a RAV request will be triggered aggregator_client: (HttpClient, String), // HTTP client for sending requests to the aggregator server - sender_id: Address, // The sender address } /// Implementation for `RpcManager`, includes the constructor and the `request` method. @@ -65,10 +61,8 @@ where pub fn new( domain_separator: Eip712Domain, executor: E, - initial_checks: Vec, - required_checks: Vec, + required_checks: Checks, threshold: u64, - sender_id: Address, aggregate_server_address: String, aggregate_server_api_version: String, ) -> Result { @@ -78,10 +72,8 @@ where executor, required_checks, )), - initial_checks, receipt_count: Arc::new(AtomicU64::new(0)), threshold, - sender_id, aggregator_client: ( HttpClientBuilder::default().build(aggregate_server_address)?, aggregate_server_api_version, @@ -97,14 +89,9 @@ where { async fn request( &self, - request_id: u64, receipt: SignedReceipt, ) -> Result<(), jsonrpsee::types::ErrorObjectOwned> { - let verify_result = match self - .manager - .verify_and_store_receipt(receipt, request_id, self.initial_checks.as_slice()) - .await - { + let verify_result = match self.manager.verify_and_store_receipt(receipt).await { Ok(_) => Ok(()), Err(e) => Err(to_rpc_error( Box::new(e), @@ -125,7 +112,6 @@ where time_stamp_buffer, &self.aggregator_client, self.threshold as usize, - self.sender_id, ) .await { @@ -149,12 +135,10 @@ pub async fn run_server( port: u16, // Port on which the server will listen domain_separator: Eip712Domain, // EIP712 domain separator executor: E, // Executor instance - initial_checks: Vec, // Vector of initial checks to be performed on each request - required_checks: Vec, // Vector of required checks to be performed on each request - threshold: u64, // The count at which a RAV request will be triggered - aggregate_server_address: String, // Address of the aggregator server + required_checks: Checks, // Vector of required checks to be performed on each request + threshold: u64, // The count at which a RAV request will be triggered + aggregate_server_address: String, // Address of the aggregator server aggregate_server_api_version: String, // API version of the aggregator server - sender_id: Address, // The sender address ) -> Result<(ServerHandle, std::net::SocketAddr)> where E: ReceiptStore @@ -178,10 +162,8 @@ where let rpc_manager = RpcManager::new( domain_separator, executor, - initial_checks, required_checks, threshold, - sender_id, aggregate_server_address, aggregate_server_api_version, )?; @@ -196,7 +178,6 @@ async fn request_rav( time_stamp_buffer: u64, // Buffer for timestamping, see tap_core for details aggregator_client: &(HttpClient, String), // HttpClient for making requests to the tap_aggregator server threshold: usize, - expected_sender_id: Address, ) -> Result<()> where E: ReceiptRead + RAVRead + RAVStore + EscrowAdapter, @@ -217,11 +198,7 @@ where .request("aggregate_receipts", params) .await?; manager - .verify_and_store_rav( - rav_request.expected_rav, - remote_rav_result.data, - |address| async move { Ok(address == expected_sender_id) }, - ) + .verify_and_store_rav(rav_request.expected_rav, remote_rav_result.data) .await?; // For these tests, we expect every receipt to be valid, i.e. there should be no invalid receipts, nor any missing receipts (less than the expected threshold). diff --git a/tap_integration_tests/tests/showcase.rs b/tap_integration_tests/tests/showcase.rs index 6191166..881d996 100644 --- a/tap_integration_tests/tests/showcase.rs +++ b/tap_integration_tests/tests/showcase.rs @@ -25,8 +25,8 @@ use rstest::*; use tap_aggregator::{jsonrpsee_helpers, server as agg_server}; use tap_core::{ adapters::executor_mock::{ExecutorMock, QueryAppraisals}, - checks::{mock::get_full_list_of_checks, ReceiptCheck, TimestampCheck}, - eip_712_signed_message::EIP712SignedMessage, + checks::{mock::get_full_list_of_checks, Checks, TimestampCheck}, + eip_712_signed_message::{EIP712SignedMessage, MessageId}, tap_eip712_domain, tap_manager::SignedRAV, tap_receipt::Receipt, @@ -159,14 +159,15 @@ fn query_appraisals(query_price: &[u128]) -> QueryAppraisals { query_price .iter() .enumerate() - .map(|(i, p)| (i as u64, *p)) + // TODO update this + .map(|(i, p)| (MessageId([i as u8; 32]), *p)) .collect(), )) } struct ExecutorFixture { executor: ExecutorMock, - checks: Vec, + checks: Checks, } #[fixture] @@ -186,14 +187,14 @@ fn executor( escrow_storage.clone(), timestamp_check.clone(), ); - let mut checks = get_full_list_of_checks( + let checks = get_full_list_of_checks( domain_separator, sender_ids.iter().cloned().collect(), Arc::new(RwLock::new(allocation_ids.iter().cloned().collect())), - receipt_storage, query_appraisals, ); - checks.push(timestamp_check); + + let checks = Checks::new(checks); ExecutorFixture { executor, checks } } @@ -217,17 +218,16 @@ fn requests_1( num_batches: u64, allocation_ids: Vec
, domain_separator: Eip712Domain, -) -> Result, u64)>> { +) -> Vec> { let (sender_key, _) = keys_sender; // Create your Receipt here - let requests = generate_requests( + generate_requests( query_price, num_batches, &sender_key, allocation_ids[0], &domain_separator, - )?; - Ok(requests) + ) } #[fixture] @@ -237,17 +237,16 @@ fn requests_2( num_batches: u64, allocation_ids: Vec
, domain_separator: Eip712Domain, -) -> Result, u64)>> { +) -> Vec> { let (sender_key, _) = keys_sender; // Create your Receipt here - let requests = generate_requests( + generate_requests( query_price, num_batches, &sender_key, allocation_ids[1], &domain_separator, - )?; - Ok(requests) + ) } #[fixture] @@ -258,7 +257,7 @@ fn repeated_timestamp_request( domain_separator: Eip712Domain, num_batches: u64, receipt_threshold_1: u64, -) -> Result, u64)>> { +) -> Vec> { let (sender_key, _) = keys_sender; // Create signed receipts @@ -268,14 +267,13 @@ fn repeated_timestamp_request( &sender_key, allocation_ids[0], &domain_separator, - )?; + ); // Create a new receipt with the timestamp equal to the latest receipt in the first RAV request batch let repeat_timestamp = requests[receipt_threshold_1 as usize - 1] - .0 .message .timestamp_ns; - let target_receipt = &requests[receipt_threshold_1 as usize].0.message; + let target_receipt = &requests[receipt_threshold_1 as usize].message; let repeat_receipt = Receipt { allocation_id: target_receipt.allocation_id, timestamp_ns: repeat_timestamp, @@ -284,9 +282,9 @@ fn repeated_timestamp_request( }; // Sign the new receipt and insert it in the second batch - requests[receipt_threshold_1 as usize].0 = - EIP712SignedMessage::new(&domain_separator, repeat_receipt, &sender_key)?; - Ok(requests) + requests[receipt_threshold_1 as usize] = + EIP712SignedMessage::new(&domain_separator, repeat_receipt, &sender_key).unwrap(); + requests } #[fixture] @@ -297,7 +295,7 @@ fn repeated_timestamp_incremented_by_one_request( domain_separator: Eip712Domain, num_batches: u64, receipt_threshold_1: u64, -) -> Result, u64)>> { +) -> Vec> { let (sender_key, _) = keys_sender; // Create your Receipt here let mut requests = generate_requests( @@ -306,15 +304,14 @@ fn repeated_timestamp_incremented_by_one_request( &sender_key, allocation_ids[0], &domain_separator, - )?; + ); // Create a new receipt with the timestamp equal to the latest receipt timestamp+1 in the first RAV request batch let repeat_timestamp = requests[receipt_threshold_1 as usize - 1] - .0 .message .timestamp_ns + 1; - let target_receipt = &requests[receipt_threshold_1 as usize].0.message; + let target_receipt = &requests[receipt_threshold_1 as usize].message; let repeat_receipt = Receipt { allocation_id: target_receipt.allocation_id, timestamp_ns: repeat_timestamp, @@ -323,9 +320,10 @@ fn repeated_timestamp_incremented_by_one_request( }; // Sign the new receipt and insert it in the second batch - requests[receipt_threshold_1 as usize].0 = - EIP712SignedMessage::new(&domain_separator, repeat_receipt, &sender_key)?; - Ok(requests) + requests[receipt_threshold_1 as usize] = + EIP712SignedMessage::new(&domain_separator, repeat_receipt, &sender_key).unwrap(); + + requests } #[fixture] @@ -335,18 +333,17 @@ fn wrong_requests( num_batches: u64, allocation_ids: Vec
, domain_separator: Eip712Domain, -) -> Result, u64)>> { +) -> Vec> { let (sender_key, _) = wrong_keys_sender; // Create your Receipt here // Create your Receipt here - let requests = generate_requests( + generate_requests( query_price, num_batches, &sender_key, allocation_ids[0], &domain_separator, - )?; - Ok(requests) + ) } // Helper fixtures to start servers for tests @@ -376,7 +373,6 @@ async fn single_indexer_test_server( executor, sender_id, available_escrow, - checks.clone(), checks, receipt_threshold_1, sender_aggregator_addr, @@ -433,7 +429,6 @@ async fn two_indexers_test_servers( executor_1, sender_id, available_escrow, - checks_1.clone(), checks_1, receipt_threshold_1, sender_aggregator_addr, @@ -445,7 +440,6 @@ async fn two_indexers_test_servers( executor_2, sender_id, available_escrow, - checks_2.clone(), checks_2, receipt_threshold_1, sender_aggregator_addr, @@ -491,7 +485,6 @@ async fn single_indexer_wrong_sender_test_server( executor, sender_id, available_escrow, - checks.clone(), checks, receipt_threshold_1, sender_aggregator_addr, @@ -513,16 +506,15 @@ async fn test_manager_one_indexer( (ServerHandle, SocketAddr, ServerHandle, SocketAddr), Error, >, - requests_1: Result, u64)>>, + requests_1: Vec>, ) -> Result<(), Box> { let (_server_handle, socket_addr, _sender_handle, _sender_addr) = single_indexer_test_server.await?; let indexer_1_address = "http://".to_string() + &socket_addr.to_string(); let client_1 = HttpClientBuilder::default().build(indexer_1_address)?; - let requests = requests_1?; - for (receipt_1, id) in requests { - let result = client_1.request("request", (id, receipt_1)).await; + for receipt_1 in requests_1 { + let result = client_1.request("request", (receipt_1,)).await; match result { Ok(()) => {} @@ -547,8 +539,8 @@ async fn test_manager_two_indexers( ), Error, >, - requests_1: Result, u64)>>, - requests_2: Result, u64)>>, + requests_1: Vec>, + requests_2: Vec>, ) -> Result<()> { let ( _server_handle_1, @@ -563,12 +555,10 @@ async fn test_manager_two_indexers( let indexer_2_address = "http://".to_string() + &socket_addr_2.to_string(); let client_1 = HttpClientBuilder::default().build(indexer_1_address)?; let client_2 = HttpClientBuilder::default().build(indexer_2_address)?; - let requests_1 = requests_1?; - let requests_2 = requests_2?; - for ((receipt_1, id_1), (receipt_2, id_2)) in requests_1.iter().zip(requests_2) { - let future_1 = client_1.request("request", (id_1, receipt_1)); - let future_2 = client_2.request("request", (id_2, receipt_2)); + for (receipt_1, receipt_2) in requests_1.iter().zip(requests_2) { + let future_1 = client_1.request("request", (receipt_1,)); + let future_2 = client_2.request("request", (receipt_2,)); match tokio::try_join!(future_1, future_2) { Ok(((), ())) => {} Err(e) => panic!("Error making receipt request: {:?}", e), @@ -584,19 +574,18 @@ async fn test_manager_wrong_aggregator_keys( (ServerHandle, SocketAddr, ServerHandle, SocketAddr), Error, >, - requests_1: Result, u64)>>, + requests_1: Vec>, receipt_threshold_1: u64, ) -> Result<()> { let (_server_handle, socket_addr, _sender_handle, _sender_addr) = single_indexer_wrong_sender_test_server.await?; let indexer_1_address = "http://".to_string() + &socket_addr.to_string(); let client_1 = HttpClientBuilder::default().build(indexer_1_address)?; - let requests = requests_1?; let mut counter = 1; - for (receipt_1, id) in requests { + for receipt_1 in requests_1 { let result: Result<(), jsonrpsee::core::Error> = - client_1.request("request", (id, receipt_1)).await; + client_1.request("request", (receipt_1,)).await; // The rav request is being made with messages that have been signed with a key that differs from the sender aggregator's. // So the Sender Aggregator should send an error to the requesting Indexer. // And so the Indexer should then return an error to the clinet when a rav request is made. @@ -627,33 +616,19 @@ async fn test_manager_wrong_requestor_keys( (ServerHandle, SocketAddr, ServerHandle, SocketAddr), Error, >, - wrong_requests: Result, u64)>>, - receipt_threshold_1: u64, + wrong_requests: Vec>, ) -> Result<()> { let (_server_handle, socket_addr, _sender_handle, _sender_addr) = single_indexer_test_server.await?; let indexer_1_address = "http://".to_string() + &socket_addr.to_string(); let client_1 = HttpClientBuilder::default().build(indexer_1_address)?; - let requests = wrong_requests?; - let mut counter = 1; - for (receipt_1, id) in requests { + for receipt_1 in wrong_requests { let result: Result<(), jsonrpsee::core::Error> = - client_1.request("request", (id, receipt_1)).await; + client_1.request("request", (receipt_1,)).await; // The receipts have been signed with a key that the Indexer is not expecting. - // So the Indexer should return an error when a rav request is made, because they will not have any valid receipts for the request. - // A rav request is made when the number of receipts sent = receipt_threshold_1. - // result should be an error when counter = multiple of receipt_threshold_1 and Ok otherwise. - if (counter % receipt_threshold_1) == 0 { - assert!(result.is_err(), "Should have failed signature verification"); - } else { - assert!( - result.is_ok(), - "Error making receipt request: {:?}", - result.unwrap_err() - ); - } - counter += 1; + // This is one of the initial tests, so it should fail to receive the receipt + assert!(result.is_err(), "Should have failed signature verification"); } Ok(()) @@ -673,8 +648,8 @@ async fn test_tap_manager_rav_timestamp_cuttoff( ), Error, >, - repeated_timestamp_request: Result, u64)>>, - repeated_timestamp_incremented_by_one_request: Result, u64)>>, + repeated_timestamp_request: Vec>, + repeated_timestamp_incremented_by_one_request: Vec>, receipt_threshold_1: u64, ) -> Result<(), Box> { // This test checks that tap_core is correctly filtering receipts by timestamp. @@ -691,12 +666,11 @@ async fn test_tap_manager_rav_timestamp_cuttoff( let indexer_2_address = "http://".to_string() + &socket_addr_2.to_string(); let client_1 = HttpClientBuilder::default().build(indexer_1_address)?; let client_2 = HttpClientBuilder::default().build(indexer_2_address)?; - let requests = repeated_timestamp_request?; let mut counter = 1; - for (receipt_1, id) in requests { + for receipt_1 in repeated_timestamp_request { let result: Result<(), jsonrpsee::core::Error> = - client_1.request("request", (id, receipt_1)).await; + client_1.request("request", (receipt_1,)).await; // The first receipt in the second batch has the same timestamp as the last receipt in the first batch. // TAP manager should ignore this receipt when creating the second RAV request. @@ -718,9 +692,8 @@ async fn test_tap_manager_rav_timestamp_cuttoff( // Here the timestamp first receipt in the second batch is equal to timestamp + 1 of the last receipt in the first batch. // No errors are expected. - let requests = repeated_timestamp_incremented_by_one_request?; - for (receipt_1, id) in requests { - let result = client_2.request("request", (id, receipt_1)).await; + for receipt_1 in repeated_timestamp_incremented_by_one_request { + let result = client_2.request("request", (receipt_1,)).await; match result { Ok(()) => {} Err(e) => panic!("Error making receipt request: {:?}", e), @@ -737,8 +710,8 @@ async fn test_tap_aggregator_rav_timestamp_cuttoff( http_request_size_limit: u32, http_response_size_limit: u32, http_max_concurrent_connections: u32, - repeated_timestamp_request: Result, u64)>>, - repeated_timestamp_incremented_by_one_request: Result, u64)>>, + repeated_timestamp_request: Vec>, + repeated_timestamp_incremented_by_one_request: Vec>, receipt_threshold_1: u64, ) -> Result<(), Box> { // This test checks that tap_aggregator is correctly rejecting receipts with invalid timestamps @@ -756,25 +729,17 @@ async fn test_tap_aggregator_rav_timestamp_cuttoff( // The second batch has one receipt with the same timestamp as the latest receipt in the first batch. // The first RAV will have the same timestamp as one receipt in the second batch. // tap_aggregator should reject the second RAV request due to the repeated timestamp. - let requests = repeated_timestamp_request?; + let requests = repeated_timestamp_request; let first_batch = &requests[0..receipt_threshold_1 as usize]; let second_batch = &requests[receipt_threshold_1 as usize..2 * receipt_threshold_1 as usize]; - let receipts = first_batch - .iter() - .map(|(r, _)| r.clone()) - .collect::>(); - let params = rpc_params!(&aggregate_server_api_version(), &receipts, None::<()>); + let params = rpc_params!(&aggregate_server_api_version(), &first_batch, None::<()>); let first_rav_response: jsonrpsee_helpers::JsonRpcResponse = client.request("aggregate_receipts", params).await?; - let receipts = second_batch - .iter() - .map(|(r, _)| r.clone()) - .collect::>(); let params = rpc_params!( &aggregate_server_api_version(), - &receipts, + &second_batch, first_rav_response.data ); let second_rav_response: Result< @@ -789,25 +754,17 @@ async fn test_tap_aggregator_rav_timestamp_cuttoff( // This is the second part of the test, two batches of receipts are sent to the aggregator. // The second batch has one receipt with the timestamp = timestamp+1 of the latest receipt in the first batch. // tap_aggregator should accept the second RAV request. - let requests = repeated_timestamp_incremented_by_one_request?; + let requests = repeated_timestamp_incremented_by_one_request; let first_batch = &requests[0..receipt_threshold_1 as usize]; let second_batch = &requests[receipt_threshold_1 as usize..2 * receipt_threshold_1 as usize]; - let receipts = first_batch - .iter() - .map(|(r, _)| r.clone()) - .collect::>(); - let params = rpc_params!(&aggregate_server_api_version(), &receipts, None::<()>); + let params = rpc_params!(&aggregate_server_api_version(), &first_batch, None::<()>); let first_rav_response: jsonrpsee_helpers::JsonRpcResponse = client.request("aggregate_receipts", params).await?; - let receipts = second_batch - .iter() - .map(|(r, _)| r.clone()) - .collect::>(); let params = rpc_params!( &aggregate_server_api_version(), - &receipts, + &second_batch, first_rav_response.data ); let second_rav_response: jsonrpsee_helpers::JsonRpcResponse = @@ -815,7 +772,7 @@ async fn test_tap_aggregator_rav_timestamp_cuttoff( // Compute the expected aggregate value and check that it matches the latest RAV. let mut expected_value = 0; - for (receipt, _) in first_batch.iter().chain(second_batch.iter()) { + for receipt in first_batch.iter().chain(second_batch.iter()) { expected_value += receipt.message.value; } assert!(expected_value == second_rav_response.data.message.valueAggregate); @@ -830,26 +787,23 @@ fn generate_requests( sender_key: &LocalWallet, allocation_id: Address, domain_separator: &Eip712Domain, -) -> Result, u64)>> { - let mut requests: Vec<(EIP712SignedMessage, u64)> = Vec::new(); +) -> Vec> { + let mut requests: Vec> = Vec::new(); - let mut counter = 0; for _ in 0..num_batches { for value in query_price { - requests.push(( + requests.push( EIP712SignedMessage::new( domain_separator, - Receipt::new(allocation_id, *value)?, + Receipt::new(allocation_id, *value).unwrap(), sender_key, - )?, - counter, - )); - counter += 1; + ) + .unwrap(), + ); } - counter = 0; } - Ok(requests) + requests } // Start-up a mock Indexer. Requires a Sender Aggregator to be running. @@ -858,8 +812,7 @@ async fn start_indexer_server( mut executor: ExecutorMock, sender_id: Address, available_escrow: u128, - initial_checks: Vec, - required_checks: Vec, + required_checks: Checks, receipt_threshold: u64, agg_server_addr: SocketAddr, ) -> Result<(ServerHandle, SocketAddr)> { @@ -874,13 +827,11 @@ async fn start_indexer_server( let (server_handle, socket_addr) = indexer_mock::run_server( http_port, domain_separator, - executor, - initial_checks, + executor.with_sender_address(sender_id), required_checks, receipt_threshold, aggregate_server_address, aggregate_server_api_version(), - sender_id, ) .await?;