Skip to content

Commit

Permalink
feat: add AsyncPoolChanges filters
Browse files Browse the repository at this point in the history
  • Loading branch information
aoudiamoncef committed Oct 11, 2023
1 parent 9ffe095 commit 85a8a1a
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 31 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

62 changes: 34 additions & 28 deletions massa-grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,44 +11,50 @@ documentation = "https://docs.massa.net/"
testing = []

[dependencies]
massa-proto-rs = { workspace = true, "features" = ["tonic"] }
displaydoc = { workspace = true }
thiserror = { workspace = true }
tonic = { workspace = true, "features" = ["gzip", "tls"] }
tonic-web = { workspace = true }
tonic-reflection = { workspace = true }
tonic-health = { workspace = true }
tower-http = { workspace = true, "features" = ["cors"] }
hyper = { workspace = true }
futures-util = { workspace = true }
serde = { workspace = true, "features" = ["derive"] }
tokio = { workspace = true, "features" = ["rt-multi-thread", "macros"] }
tokio-stream = { workspace = true } # BOM UPGRADE Revert to "0.1.12" if problem
tracing = { workspace = true }
parking_lot = { workspace = true, "features" = ["deadlock_detection"] }
h2 = { workspace = true }
itertools = { workspace = true }

# Internal packages
massa_bootstrap = { workspace = true }
massa_consensus_exports = { workspace = true }
massa_execution_exports = { workspace = true }
massa_hash = { workspace = true }
massa_ledger_exports = { workspace = true }
massa_models = { workspace = true }
massa_pos_exports = { workspace = true }
massa_pool_exports = { workspace = true }
massa_pos_exports = { workspace = true }
massa_protocol_exports = { workspace = true }
massa_execution_exports = { workspace = true }
massa_sdk = { workspace = true }
massa_serialization = { workspace = true }
massa_signature = { workspace = true }
massa_storage = { workspace = true }
massa_time = { workspace = true }
massa_wallet = { workspace = true }
massa_serialization = { workspace = true }
massa_versioning = { workspace = true }
massa_signature = { workspace = true }
massa_bootstrap = { workspace = true }
massa_sdk = { workspace = true }
massa_wallet = { workspace = true }

# Massa projects dependencies
massa-proto-rs = { workspace = true, "features" = ["tonic"] }

# Common dependencies
displaydoc = { workspace = true }
futures-util = { workspace = true }
h2 = { workspace = true }
hyper = { workspace = true }
itertools = { workspace = true }

parking_lot = { workspace = true, "features" = ["deadlock_detection"] }
serde = { workspace = true, "features" = ["derive"] }
thiserror = { workspace = true }
tokio = { workspace = true, "features" = ["rt-multi-thread", "macros"] }
tokio-stream = { workspace = true } # BOM UPGRADE Revert to "0.1.12" if problem
tonic = { workspace = true, "features" = ["gzip", "tls"] }
tonic-health = { workspace = true }
tonic-reflection = { workspace = true }
tonic-web = { workspace = true }
tower-http = { workspace = true, "features" = ["cors"] }
tracing = { workspace = true }

[dev-dependencies]
num = { workspace = true }
massa_consensus_exports = { workspace = true, "features" = ["testing"] }
massa_channel = { workspace = true }
mockall = { workspace = true }
massa_consensus_exports = { workspace = true, "features" = ["testing"] }
massa_final_state = { workspace = true }
mockall = { workspace = true }
num = { workspace = true }
tokio = { workspace = true, "features" = ["test-util", "time"] }
117 changes: 116 additions & 1 deletion massa-grpc/src/stream/new_slot_execution_outputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::server::MassaPublicGrpc;
use crate::SlotRange;
use futures_util::StreamExt;
use massa_execution_exports::{ExecutionOutput, SlotExecutionOutput};
use massa_ledger_exports::SetUpdateOrDelete;
use massa_models::address::Address;
use massa_models::operation::OperationId;
use massa_models::slot::Slot;
Expand Down Expand Up @@ -481,10 +482,124 @@ fn filter_map_exec_output(
}
}
}
//TODO to be implemented

if let Some(async_pool_changes_filter) = &filters.async_pool_changes_filter {
if async_pool_changes_filter.none.is_some() {
exec_output.state_changes.async_pool_changes.0.clear();
} else {
exec_output
.state_changes
.async_pool_changes
.0
.retain(|_, change| {
match change {
SetUpdateOrDelete::Set(value) => {
if let Some(change_types) = &async_pool_changes_filter.change_types {
if !change_types
.contains(&(grpc_model::AsyncPoolChangeType::Set as i32))
{
return false;
}
}
//TODO to be confirmed
if let Some(handlers) = &async_pool_changes_filter.handlers {
if !handlers.contains(&value.function) {
return false;
}
}
if let Some(destination_addresses) =
&async_pool_changes_filter.destination_addresses
{
if !destination_addresses.contains(&value.destination) {
return false;
}
}

if let Some(emitter_addresses) =
&async_pool_changes_filter.emitter_addresses
{
if !emitter_addresses.contains(&value.sender) {
return false;
}
}

if let Some(can_be_executed) = async_pool_changes_filter.can_be_executed
{
if value.can_be_executed != can_be_executed {
return false;
}
}
}
SetUpdateOrDelete::Update(value) => {
if let Some(change_types) = &async_pool_changes_filter.change_types {
if !change_types
.contains(&(grpc_model::AsyncPoolChangeType::Set as i32))
{
return false;
}
}
//TODO to be confirmed
if let Some(handlers) = &async_pool_changes_filter.handlers {
match value.function.clone() {
massa_ledger_exports::SetOrKeep::Set(function_name) => {
if !handlers.contains(&function_name) {
return false;
}
}
//TODO to be confirmed
massa_ledger_exports::SetOrKeep::Keep => {
return false;
}
}
}

if let Some(destination_addresses) =
&async_pool_changes_filter.destination_addresses
{
match value.destination {
massa_ledger_exports::SetOrKeep::Set(addr) => {
if !destination_addresses.contains(&addr) {
return false;
}
}
massa_ledger_exports::SetOrKeep::Keep => {}
}
}

if let Some(emitter_addresses) =
&async_pool_changes_filter.emitter_addresses
{
match value.sender {
massa_ledger_exports::SetOrKeep::Set(addr) => {
if !emitter_addresses.contains(&addr) {
return false;
}
}
massa_ledger_exports::SetOrKeep::Keep => {}
}
}

if let Some(can_be_executed) = async_pool_changes_filter.can_be_executed
{
match value.can_be_executed {
massa_ledger_exports::SetOrKeep::Set(can_be_ex) => {
if can_be_executed != can_be_ex {
return false;
}
}
massa_ledger_exports::SetOrKeep::Keep => {}
}
}
}
SetUpdateOrDelete::Delete => {}
}

true
});

if exec_output.state_changes.async_pool_changes.0.is_empty() {
return None;
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions massa-grpc/src/tests/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1189,7 +1189,7 @@ async fn new_slot_execution_outputs() {

// TODO add test when filter is updated

/* filter = massa_proto_rs::massa::api::v1::NewSlotExecutionOutputsFilter {
filter = massa_proto_rs::massa::api::v1::NewSlotExecutionOutputsFilter {
filter: Some(
massa_proto_rs::massa::api::v1::new_slot_execution_outputs_filter::Filter::EventFilter(
massa_proto_rs::massa::api::v1::ExecutionEventFilter {
Expand Down Expand Up @@ -1217,7 +1217,7 @@ async fn new_slot_execution_outputs() {

let result = tokio::time::timeout(Duration::from_secs(2), resp_stream.next()).await;
dbg!(&result);
assert!(result.is_err()); */
assert!(result.is_err());

stop_handle.stop();
}
Expand Down

0 comments on commit 85a8a1a

Please sign in to comment.