From 1a141ff8e247efd943249c76739c1a6757aada3b Mon Sep 17 00:00:00 2001 From: Moncef AOUDIA Date: Tue, 10 Oct 2023 19:03:29 +0200 Subject: [PATCH] feat: add AsyncPoolChanges filters --- Cargo.lock | 1 + massa-grpc/Cargo.toml | 62 +++++----- .../src/stream/new_slot_execution_outputs.rs | 117 +++++++++++++++++- massa-grpc/src/tests/stream.rs | 4 +- 4 files changed, 153 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 293dfaba4a1..030cfaed326 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2978,6 +2978,7 @@ dependencies = [ "massa_execution_exports", "massa_final_state", "massa_hash", + "massa_ledger_exports", "massa_models", "massa_pool_exports", "massa_pos_exports", diff --git a/massa-grpc/Cargo.toml b/massa-grpc/Cargo.toml index 9b80cd1a9c0..9ce7cf04b1b 100644 --- a/massa-grpc/Cargo.toml +++ b/massa-grpc/Cargo.toml @@ -11,44 +11,50 @@ documentation = "https://docs.massa.net/" testing = [] [dependencies] -massa-proto-rs = { workspace = true, "features" = ["tonic"] } -displaydoc = { workspace = true } -thiserror = { workspace = true } -tonic = { workspace = true, "features" = ["gzip", "tls"] } -tonic-web = { workspace = true } -tonic-reflection = { workspace = true } -tonic-health = { workspace = true } -tower-http = { workspace = true, "features" = ["cors"] } -hyper = { workspace = true } -futures-util = { workspace = true } -serde = { workspace = true, "features" = ["derive"] } -tokio = { workspace = true, "features" = ["rt-multi-thread", "macros"] } -tokio-stream = { workspace = true } # BOM UPGRADE Revert to "0.1.12" if problem -tracing = { workspace = true } -parking_lot = { workspace = true, "features" = ["deadlock_detection"] } -h2 = { workspace = true } -itertools = { workspace = true } - +# Internal packages +massa_bootstrap = { workspace = true } massa_consensus_exports = { workspace = true } +massa_execution_exports = { workspace = true } massa_hash = { workspace = true } +massa_ledger_exports = { workspace = true } massa_models = { workspace = true } -massa_pos_exports = { workspace = true } massa_pool_exports = { workspace = true } +massa_pos_exports = { workspace = true } massa_protocol_exports = { workspace = true } -massa_execution_exports = { workspace = true } +massa_sdk = { workspace = true } +massa_serialization = { workspace = true } +massa_signature = { workspace = true } massa_storage = { workspace = true } massa_time = { workspace = true } -massa_wallet = { workspace = true } -massa_serialization = { workspace = true } massa_versioning = { workspace = true } -massa_signature = { workspace = true } -massa_bootstrap = { workspace = true } -massa_sdk = { workspace = true } +massa_wallet = { workspace = true } + +# Massa projects dependencies +massa-proto-rs = { workspace = true, "features" = ["tonic"] } + +# Common dependencies +displaydoc = { workspace = true } +futures-util = { workspace = true } +h2 = { workspace = true } +hyper = { workspace = true } +itertools = { workspace = true } + +parking_lot = { workspace = true, "features" = ["deadlock_detection"] } +serde = { workspace = true, "features" = ["derive"] } +thiserror = { workspace = true } +tokio = { workspace = true, "features" = ["rt-multi-thread", "macros"] } +tokio-stream = { workspace = true } # BOM UPGRADE Revert to "0.1.12" if problem +tonic = { workspace = true, "features" = ["gzip", "tls"] } +tonic-health = { workspace = true } +tonic-reflection = { workspace = true } +tonic-web = { workspace = true } +tower-http = { workspace = true, "features" = ["cors"] } +tracing = { workspace = true } [dev-dependencies] -num = { workspace = true } -massa_consensus_exports = { workspace = true, "features" = ["testing"] } massa_channel = { workspace = true } -mockall = { workspace = true } +massa_consensus_exports = { workspace = true, "features" = ["testing"] } massa_final_state = { workspace = true } +mockall = { workspace = true } +num = { workspace = true } tokio = { workspace = true, "features" = ["test-util", "time"] } diff --git a/massa-grpc/src/stream/new_slot_execution_outputs.rs b/massa-grpc/src/stream/new_slot_execution_outputs.rs index 7c49f6b2f46..afebf3ba5bb 100644 --- a/massa-grpc/src/stream/new_slot_execution_outputs.rs +++ b/massa-grpc/src/stream/new_slot_execution_outputs.rs @@ -6,6 +6,7 @@ use crate::server::MassaPublicGrpc; use crate::SlotRange; use futures_util::StreamExt; use massa_execution_exports::{ExecutionOutput, SlotExecutionOutput}; +use massa_ledger_exports::SetUpdateOrDelete; use massa_models::address::Address; use massa_models::operation::OperationId; use massa_models::slot::Slot; @@ -481,10 +482,124 @@ fn filter_map_exec_output( } } } - //TODO to be implemented + if let Some(async_pool_changes_filter) = &filters.async_pool_changes_filter { if async_pool_changes_filter.none.is_some() { exec_output.state_changes.async_pool_changes.0.clear(); + } else { + exec_output + .state_changes + .async_pool_changes + .0 + .retain(|_, change| { + match change { + SetUpdateOrDelete::Set(value) => { + if let Some(change_types) = &async_pool_changes_filter.change_types { + if !change_types + .contains(&(grpc_model::AsyncPoolChangeType::Set as i32)) + { + return false; + } + } + //TODO to be confirmed + if let Some(handlers) = &async_pool_changes_filter.handlers { + if !handlers.contains(&value.function) { + return false; + } + } + if let Some(destination_addresses) = + &async_pool_changes_filter.destination_addresses + { + if !destination_addresses.contains(&value.destination) { + return false; + } + } + + if let Some(emitter_addresses) = + &async_pool_changes_filter.emitter_addresses + { + if !emitter_addresses.contains(&value.sender) { + return false; + } + } + + if let Some(can_be_executed) = async_pool_changes_filter.can_be_executed + { + if value.can_be_executed != can_be_executed { + return false; + } + } + } + SetUpdateOrDelete::Update(value) => { + if let Some(change_types) = &async_pool_changes_filter.change_types { + if !change_types + .contains(&(grpc_model::AsyncPoolChangeType::Set as i32)) + { + return false; + } + } + //TODO to be confirmed + if let Some(handlers) = &async_pool_changes_filter.handlers { + match value.function.clone() { + massa_ledger_exports::SetOrKeep::Set(function_name) => { + if !handlers.contains(&function_name) { + return false; + } + } + //TODO to be confirmed + massa_ledger_exports::SetOrKeep::Keep => { + return false; + } + } + } + + if let Some(destination_addresses) = + &async_pool_changes_filter.destination_addresses + { + match value.destination { + massa_ledger_exports::SetOrKeep::Set(addr) => { + if !destination_addresses.contains(&addr) { + return false; + } + } + massa_ledger_exports::SetOrKeep::Keep => {} + } + } + + if let Some(emitter_addresses) = + &async_pool_changes_filter.emitter_addresses + { + match value.sender { + massa_ledger_exports::SetOrKeep::Set(addr) => { + if !emitter_addresses.contains(&addr) { + return false; + } + } + massa_ledger_exports::SetOrKeep::Keep => {} + } + } + + if let Some(can_be_executed) = async_pool_changes_filter.can_be_executed + { + match value.can_be_executed { + massa_ledger_exports::SetOrKeep::Set(can_be_ex) => { + if can_be_executed != can_be_ex { + return false; + } + } + massa_ledger_exports::SetOrKeep::Keep => {} + } + } + } + SetUpdateOrDelete::Delete => {} + } + + true + }); + + if exec_output.state_changes.async_pool_changes.0.is_empty() { + return None; + } } } diff --git a/massa-grpc/src/tests/stream.rs b/massa-grpc/src/tests/stream.rs index b59d6c6de70..06731ead1cd 100644 --- a/massa-grpc/src/tests/stream.rs +++ b/massa-grpc/src/tests/stream.rs @@ -1189,7 +1189,7 @@ async fn new_slot_execution_outputs() { // TODO add test when filter is updated - /* filter = massa_proto_rs::massa::api::v1::NewSlotExecutionOutputsFilter { + filter = massa_proto_rs::massa::api::v1::NewSlotExecutionOutputsFilter { filter: Some( massa_proto_rs::massa::api::v1::new_slot_execution_outputs_filter::Filter::EventFilter( massa_proto_rs::massa::api::v1::ExecutionEventFilter { @@ -1217,7 +1217,7 @@ async fn new_slot_execution_outputs() { let result = tokio::time::timeout(Duration::from_secs(2), resp_stream.next()).await; dbg!(&result); - assert!(result.is_err()); */ + assert!(result.is_err()); stop_handle.stop(); }