Skip to content

Commit

Permalink
fix: nested filters definitions
Browse files Browse the repository at this point in the history
  • Loading branch information
aoudiamoncef committed Oct 10, 2023
1 parent 8f79432 commit 1e86981
Showing 1 changed file with 38 additions and 42 deletions.
80 changes: 38 additions & 42 deletions massa-grpc/src/stream/new_slot_execution_outputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ struct Filter {
struct AsyncPoolChangesFilter {
// Do not return any message
none: Option<()>,
// The type of the change
change_type: Option<i32>,
// The handler function name within the destination address bytecode
handler: Option<String>,
// The address towards which the message is being sent
destination_address: Option<Address>,
// The address that sent the message
emitter_address: Option<Address>,
// The types of the change
change_types: Option<HashSet<i32>>,
// The handlers functions names within the destination address bytecode
handlers: Option<HashSet<String>>,
// The addresses towards which the message is being sent
destination_addresses: Option<HashSet<Address>>,
// The addresses that sent the message
emitter_addresses: Option<HashSet<Address>>,
// Boolean that determine if the message can be executed. For messages without filter this boolean is always true.
// For messages with filter, this boolean is true if the filter has been matched between `validity_start` and current slot.
can_be_executed: Option<bool>,
Expand All @@ -75,12 +75,12 @@ struct ExecutedDenounciationFilter {
struct ExecutionEventFilter {
// Do not return any message
none: Option<()>,
// Caller address
caller_address: Option<Address>,
// Emitter address
emitter_address: Option<Address>,
// Original operation id
original_operation_id: Option<OperationId>,
// Caller addresses
caller_addresses: Option<HashSet<Address>>,
// Emitter addresses
emitter_addresses: Option<HashSet<Address>>,
// Original operation ids
original_operation_ids: Option<HashSet<OperationId>>,
// Whether the event is a failure
is_failure: Option<bool>,
}
Expand All @@ -89,16 +89,16 @@ struct ExecutionEventFilter {
struct ExecutedOpsChangesFilter {
// Do not return any message
none: Option<()>,
// Operation id
operation_id: Option<OperationId>,
// Operation ids
operation_ids: Option<HashSet<OperationId>>,
}

#[derive(Clone, Debug, Default)]
struct LedgerChangesFilter {
// Do not return any message
none: Option<()>,
// Address for which we have ledger changes
address: Option<Address>,
// Addresses for which we have ledger changes
addresses: Option<HashSet<Address>>,
}

/// Creates a new stream of new produced and received slot execution outputs
Expand Down Expand Up @@ -265,28 +265,27 @@ fn get_filter(
},
grpc_api::new_slot_execution_outputs_filter::Filter::AsyncPoolChangesFilter(filter) => {
if let Some(filter) = filter.filter {
let mut nested_filter = AsyncPoolChangesFilter::default();
let nested_filter = async_pool_changes_filter.get_or_insert(AsyncPoolChangesFilter::default());
match filter {
grpc_api::async_pool_changes_filter::Filter::None(_) => {
nested_filter.none = Some(());
},
grpc_api::async_pool_changes_filter::Filter::Type(change_type) => {
nested_filter.change_type = Some(change_type);
nested_filter.change_types.get_or_insert_with(HashSet::new).insert(change_type);
},
grpc_api::async_pool_changes_filter::Filter::Handler(function) => {
nested_filter.handler = Some(function);
nested_filter.handlers.get_or_insert_with(HashSet::new).insert(function);
},
grpc_api::async_pool_changes_filter::Filter::DestinationAddress(addr) => {
nested_filter.destination_address = Some(Address::from_str(&addr)?);
nested_filter.destination_addresses.get_or_insert_with(HashSet::new).insert(Address::from_str(&addr)?);
},
grpc_api::async_pool_changes_filter::Filter::EmitterAddress(addr) => {
nested_filter.emitter_address = Some(Address::from_str(&addr)?);
nested_filter.emitter_addresses.get_or_insert_with(HashSet::new).insert(Address::from_str(&addr)?);
},
grpc_api::async_pool_changes_filter::Filter::CanBeExecuted(can_be_executed) => {
nested_filter.can_be_executed = Some(can_be_executed);
},
}
async_pool_changes_filter = Some(nested_filter);
};
}
},
grpc_api::new_slot_execution_outputs_filter::Filter::ExecutedDenounciationFilter(filter) => {
Expand All @@ -302,53 +301,50 @@ fn get_filter(
},
grpc_api::new_slot_execution_outputs_filter::Filter::EventFilter(filter) => {
if let Some(filter) = filter.filter {
let mut nested_filter = ExecutionEventFilter::default();
let nested_filter = execution_event_filter.get_or_insert(ExecutionEventFilter::default());
match filter {
grpc_api::execution_event_filter::Filter::None(_) => {
nested_filter.none = Some(());
},
grpc_api::execution_event_filter::Filter::CallerAddress(addr) => {
nested_filter.caller_address = Some(Address::from_str(&addr)?);
nested_filter.caller_addresses.get_or_insert_with(HashSet::new).insert(Address::from_str(&addr)?);
},
grpc_api::execution_event_filter::Filter::EmitterAddress(addr) => {
nested_filter.emitter_address = Some(Address::from_str(&addr)?);
nested_filter.emitter_addresses.get_or_insert_with(HashSet::new).insert(Address::from_str(&addr)?);
},
grpc_api::execution_event_filter::Filter::OriginalOperationId(op) => {
nested_filter.original_operation_id = Some(OperationId::from_str(&op)?);
grpc_api::execution_event_filter::Filter::OriginalOperationId(op_id) => {
nested_filter.original_operation_ids.get_or_insert_with(HashSet::new).insert(OperationId::from_str(&op_id)?);
},
grpc_api::execution_event_filter::Filter::IsFailure(is_failure) => {
nested_filter.is_failure = Some(is_failure);
},
}
execution_event_filter = Some(nested_filter);
};
}
},
grpc_api::new_slot_execution_outputs_filter::Filter::ExecutedOpsChangesFilter(filter) => {
if let Some(filter) = filter.filter {
let mut nested_filter = ExecutedOpsChangesFilter::default();
let nested_filter = executed_ops_changes_filter.get_or_insert(ExecutedOpsChangesFilter::default());
match filter {
grpc_api::executed_ops_changes_filter::Filter::None(_) => {
nested_filter.none = Some(());
},
grpc_api::executed_ops_changes_filter::Filter::OperationId(op_id) => {
nested_filter.operation_id = Some(OperationId::from_str(&op_id)?);
nested_filter.operation_ids.get_or_insert_with(HashSet::new).insert(OperationId::from_str(&op_id)?);
},
}
executed_ops_changes_filter = Some(nested_filter);
}
},
grpc_api::new_slot_execution_outputs_filter::Filter::LedgerChangesFilter(filter) => {
if let Some(filter) = filter.filter {
let mut nested_filter = LedgerChangesFilter::default();
let nested_filter = ledger_changes_filter.get_or_insert(LedgerChangesFilter::default());
match filter {
grpc_api::ledger_changes_filter::Filter::None(_) => {
nested_filter.none = Some(());
},
grpc_api::ledger_changes_filter::Filter::Address(addr) => {
nested_filter.address = Some(Address::from_str(&addr)?);
nested_filter.addresses.get_or_insert_with(HashSet::new).insert(Address::from_str(&addr)?);
},
}
ledger_changes_filter = Some(nested_filter);
}
},
}
Expand Down Expand Up @@ -461,22 +457,22 @@ fn filter_map_exec_output(
if let Some(executed_ops_changes_filter) = &filters.executed_ops_changes_filter {
if executed_ops_changes_filter.none.is_some() {
exec_output.state_changes.executed_ops_changes.clear();
} else if let Some(op_id) = executed_ops_changes_filter.operation_id {
} else if let Some(operation_ids) = executed_ops_changes_filter.operation_ids.clone() {
exec_output
.state_changes
.executed_ops_changes
.retain(|operation_id, _| operation_id == &op_id);
.retain(|operation_id, _| operation_ids.contains(operation_id));
}
}
if let Some(ledger_changes_filter) = &filters.ledger_changes_filter {
if ledger_changes_filter.none.is_some() {
exec_output.state_changes.ledger_changes.0.clear();
} else if let Some(addr) = ledger_changes_filter.address {
} else if let Some(addresses) = ledger_changes_filter.addresses.clone() {
exec_output
.state_changes
.ledger_changes
.0
.retain(|address, _| address == &addr);
.retain(|address, _| addresses.contains(address));
}
}

Expand Down

0 comments on commit 1e86981

Please sign in to comment.