Skip to content

Commit

Permalink
secure pools (#4067)
Browse files Browse the repository at this point in the history
* Update testnet24

* Split massa db as worker and exports (#4045)

* 1st step to have massa_db split between exports and worker

* Create MassaDBController trait

* Implement MassaDBController for MassaDB

* Added read function to MassaDBController

* Started cleaning up build errors - not finished

* Implemented more trait methods, cleaned a bit

Still has lifetime issues and some trait methods missing, e.g. for bootstrap

* Fixed build errors

* Fix after rebase on testnet24

* Fix clippy warnings

* Simplified lifetime definitions

* Added doc comments

* Updated doc comments for get_batch and get_versioning_batch

* In pos-exports, put the massa-db-worker as a dev-dependency instead of optional dependancy

* Make pub

* Make RawMassaDB fileds pub

* Clean bootstrap state and versioning cursors (#4053)

* Initial rework of get_batch_to_stream and get_versioning_batch_to_stream

* Updated write_batch_bootstrap_client and server cursors' logic

* secure denunciation pool

* correct errors

* operation pool refactoring

* op pool filtering (no scoring yet)

* updates

* update executed ops

* scoring ops

* full compile

* optimize

* Fix test compilation

* better endo expiry

* update constants

* minor improvements on scoring

* Fix tests of pool.

* Revert "Clean bootstrap state and versioning cursors (#4053)"

This reverts commit 08f7601.

* Revert "Make RawMassaDB fileds pub"

This reverts commit 730c6e2.

* Revert "Make pub"

This reverts commit 9e43bbe.

* Revert "Split massa db as worker and exports (#4045)"

This reverts commit c1379f2.

* Use cargo lock from main

* Fix grpc compilation

* Fix config variable

* Test endorsements in protocol

* Fix tests

* Fix protocol test

---------

Co-authored-by: AurelienFT <aurelien.foucault@epitech.eu>
Co-authored-by: Leo-Besancon <lb@massa.net>
Co-authored-by: Leo-Besancon <leoleo38@live.fr>
  • Loading branch information
4 people committed Jun 15, 2023
1 parent c9030a7 commit dc353ea
Show file tree
Hide file tree
Showing 48 changed files with 1,376 additions and 830 deletions.
5 changes: 4 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 14 additions & 22 deletions massa-api/src/public.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,30 +514,22 @@ impl MassaRpcServer for API<Public> {
// ask pool whether it carries the operations
let in_pool = self.0.pool_command_sender.contains_operations(&ops);

let (speculative_op_exec_statuses, final_op_exec_statuses) =
self.0.execution_controller.get_op_exec_status();
let op_exec_statuses = self.0.execution_controller.get_ops_exec_status(&ops);

// compute operation finality and operation execution status from *_op_exec_statuses
let (is_operation_final, statuses): (Vec<Option<bool>>, Vec<Option<bool>>) = ops
.iter()
.map(|op| {
match (
final_op_exec_statuses.get(op),
speculative_op_exec_statuses.get(op),
) {
// op status found in the final hashmap, so the op is "final"(first value of the tuple: Some(true))
// and we keep its status (copied as the second value of the tuple)
(Some(val), _) => (Some(true), Some(*val)),
// op status NOT found in the final hashmap but in the speculative one, so the op is "not final"(first value of the tuple: Some(false))
// and we keep its status (copied as the second value of the tuple)
(None, Some(val)) => (Some(false), Some(*val)),
// op status not defined in any hashmap, finality and status are unknow hence (None, None)
(None, None) => (None, None),
}
})
.collect::<Vec<(Option<bool>, Option<bool>)>>()
.into_iter()
.unzip();
let (is_operation_final, statuses): (Vec<Option<bool>>, Vec<Option<bool>>) =
op_exec_statuses
.into_iter()
.map(|(spec_exec, final_exec)| match (spec_exec, final_exec) {
(Some(true), Some(true)) => (Some(true), Some(true)),
(Some(false), Some(false)) => (Some(true), Some(false)),
(Some(true), None) => (Some(false), Some(true)),
(Some(false), None) => (Some(false), Some(false)),
_ => (None, None),
})
.collect::<Vec<(Option<bool>, Option<bool>)>>()
.into_iter()
.unzip();

// gather all values into a vector of OperationInfo instances
let mut res: Vec<OperationInfo> = Vec::with_capacity(ops.len());
Expand Down
10 changes: 10 additions & 0 deletions massa-executed-ops/src/executed_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,16 @@ impl ExecutedOps {
}
}

/// Get the execution statuses of a set of operations.
/// Returns a list where each element is None if no execution was found for that op,
/// or a boolean indicating whether the execution was successful (true) or had an error (false).
pub fn get_ops_exec_status(&self, batch: &[OperationId]) -> Vec<Option<bool>> {
batch
.iter()
.map(|op_id| self.op_exec_status.get(op_id).copied())
.collect()
}

/// Recomputes the local caches after bootstrap or loading the state from disk
pub fn recompute_sorted_ops_and_op_exec_status(&mut self) {
self.sorted_ops.clear();
Expand Down
2 changes: 1 addition & 1 deletion massa-executed-ops/src/ops_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use nom::{
};
use std::ops::Bound::{Excluded, Included};

/// Speculative changes for ExecutedOps
/// Changes for ExecutedOps (was_successful, op_expiry_slot)
pub type ExecutedOpsChanges = PreHashMap<OperationId, (bool, Slot)>;

/// `ExecutedOps` Serializer
Expand Down
23 changes: 7 additions & 16 deletions massa-execution-exports/src/controller_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use massa_models::execution::EventFilter;
use massa_models::operation::OperationId;
use massa_models::output_event::SCOutputEvent;
use massa_models::prehash::PreHashMap;
use massa_models::prehash::PreHashSet;
use massa_models::slot::Slot;
use massa_models::stats::ExecutionStats;
use massa_storage::Storage;
Expand Down Expand Up @@ -53,15 +52,14 @@ pub trait ExecutionController: Send + Sync {
addresses: &[Address],
) -> Vec<(Option<Amount>, Option<Amount>)>;

/// Get the execution status of operation that have been executed both speculatively or finaly
/// Get the execution status of a batch of operations.
///
/// Return value
/// `(speculative_statuses, final_statuses)`
/// for each hashmap:
/// key: the operation id
/// value: true: operation executed successfully,
/// false: operation failed
fn get_op_exec_status(&self) -> (HashMap<OperationId, bool>, HashMap<OperationId, bool>);
/// Return value: vector of
/// `(Option<speculative_status>, Option<final_status>)`
/// If an Option is None it means that the op execution was not found.
/// Note that old op executions are forgotten.
/// Otherwise, the status is a boolean indicating whether the execution was successful (true) or if there was an error (false.)
fn get_ops_exec_status(&self, batch: &[OperationId]) -> Vec<(Option<bool>, Option<bool>)>;

/// Get a copy of a single datastore entry with its final and active values
///
Expand Down Expand Up @@ -92,13 +90,6 @@ pub trait ExecutionController: Send + Sync {
req: ReadOnlyExecutionRequest,
) -> Result<ReadOnlyExecutionOutput, ExecutionError>;

/// List which operations inside the provided list were not executed
fn unexecuted_ops_among(
&self,
ops: &PreHashSet<OperationId>,
thread: u8,
) -> PreHashSet<OperationId>;

/// Check if a denunciation has been executed given a `DenunciationIndex`
fn is_denunciation_executed(&self, denunciation_index: &DenunciationIndex) -> bool;

Expand Down
26 changes: 2 additions & 24 deletions massa-execution-exports/src/test_exports/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,28 +205,6 @@ impl ExecutionController for MockExecutionController {
response_rx.recv().unwrap()
}

fn unexecuted_ops_among(
&self,
ops: &PreHashSet<OperationId>,
thread: u8,
) -> PreHashSet<OperationId> {
let (response_tx, response_rx) = mpsc::channel();
if let Err(err) = self
.0
.lock()
.send(MockExecutionControllerMessage::UnexecutedOpsAmong {
ops: ops.clone(),
thread,
response_tx,
})
{
println!("mock error {err}");
}
response_rx
.recv_timeout(Duration::from_millis(100))
.unwrap()
}

fn is_denunciation_executed(&self, denunciation_index: &DenunciationIndex) -> bool {
let (response_tx, response_rx) = mpsc::channel();
if let Err(err) =
Expand All @@ -248,7 +226,7 @@ impl ExecutionController for MockExecutionController {
Box::new(self.clone())
}

fn get_op_exec_status(&self) -> (HashMap<OperationId, bool>, HashMap<OperationId, bool>) {
(HashMap::new(), HashMap::new())
fn get_ops_exec_status(&self, batch: &[OperationId]) -> Vec<(Option<bool>, Option<bool>)> {
vec![(None, None); batch.len()]
}
}
37 changes: 25 additions & 12 deletions massa-execution-worker/src/active_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ use massa_ledger_exports::{
Applicable, LedgerEntry, LedgerEntryUpdate, SetOrDelete, SetOrKeep, SetUpdateOrDelete,
};
use massa_models::denunciation::DenunciationIndex;
use massa_models::prehash::{CapacityAllocator, PreHashMap, PreHashSet};
use massa_models::{
address::Address, amount::Amount, bytecode::Bytecode, operation::OperationId, slot::Slot,
};
use massa_pos_exports::DeferredCredits;
use std::collections::{HashMap, VecDeque};
use std::collections::VecDeque;

#[derive(Default)]
/// History of the outputs of recently executed slots.
Expand Down Expand Up @@ -311,18 +312,30 @@ impl ActiveHistory {
}
}

/// Get the execution statuses of operations
///
/// # Return Value
///
/// A hashmap with
/// * the operation id as the key
/// * and a bool as the value: true: execution succeeded, false: execution failed
pub fn get_op_exec_status(&self) -> HashMap<OperationId, bool> {
self.0
/// Get the execution statuses of a set of operations.
/// Returns a list where each element is None if no execution was found for that op,
/// or a boolean indicating whether the execution was successful (true) or had an error (false).
pub fn get_ops_exec_status(&self, batch: &[OperationId]) -> Vec<Option<bool>> {
let mut to_find: PreHashSet<OperationId> = batch.iter().copied().collect();
let mut found = PreHashMap::with_capacity(to_find.len());
for hist_item in self.0.iter().rev() {
to_find.retain(|op_id| {
if let Some((success, _expiry_slot)) =
hist_item.state_changes.executed_ops_changes.get(op_id)
{
found.insert(*op_id, *success);
false
} else {
true
}
});
if to_find.is_empty() {
break;
}
}
batch
.iter()
.flat_map(|exec_output| exec_output.state_changes.executed_ops_changes.clone())
.map(|(op_id, (status, _))| (op_id, status))
.map(|op_id| found.get(op_id).copied())
.collect()
}
}
17 changes: 3 additions & 14 deletions massa-execution-worker/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use massa_execution_exports::{
use massa_models::denunciation::DenunciationIndex;
use massa_models::execution::EventFilter;
use massa_models::output_event::SCOutputEvent;
use massa_models::prehash::{PreHashMap, PreHashSet};
use massa_models::prehash::PreHashMap;
use massa_models::stats::ExecutionStats;
use massa_models::{address::Address, amount::Amount, operation::OperationId};
use massa_models::{block_id::BlockId, slot::Slot};
Expand Down Expand Up @@ -216,17 +216,6 @@ impl ExecutionController for ExecutionControllerImpl {
}
}

/// List which operations inside the provided list were not executed
fn unexecuted_ops_among(
&self,
ops: &PreHashSet<OperationId>,
thread: u8,
) -> PreHashSet<OperationId> {
self.execution_state
.read()
.unexecuted_ops_among(ops, thread)
}

/// Check if a denunciation has been executed given a `DenunciationIndex`
fn is_denunciation_executed(&self, denunciation_index: &DenunciationIndex) -> bool {
self.execution_state
Expand Down Expand Up @@ -272,8 +261,8 @@ impl ExecutionController for ExecutionControllerImpl {
}

/// See trait definition
fn get_op_exec_status(&self) -> (HashMap<OperationId, bool>, HashMap<OperationId, bool>) {
self.execution_state.read().get_op_exec_status()
fn get_ops_exec_status(&self, batch: &[OperationId]) -> Vec<(Option<bool>, Option<bool>)> {
self.execution_state.read().get_ops_exec_status(batch)
}
}

Expand Down
77 changes: 24 additions & 53 deletions massa-execution-worker/src/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use massa_models::bytecode::Bytecode;
use massa_models::denunciation::{Denunciation, DenunciationIndex};
use massa_models::execution::EventFilter;
use massa_models::output_event::SCOutputEvent;
use massa_models::prehash::PreHashSet;
use massa_models::stats::ExecutionStats;
use massa_models::timeslots::get_block_slot_timestamp;
use massa_models::{
Expand All @@ -44,7 +43,7 @@ use massa_sc_runtime::{Interface, Response, VMError};
use massa_storage::Storage;
use massa_versioning::versioning::MipStore;
use parking_lot::{Mutex, RwLock};
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;
use tracing::{debug, info, trace, warn};

Expand Down Expand Up @@ -1620,46 +1619,6 @@ impl ExecutionState {
}
}

/// List which operations inside the provided list were not executed
pub fn unexecuted_ops_among(
&self,
ops: &PreHashSet<OperationId>,
thread: u8,
) -> PreHashSet<OperationId> {
let mut ops = ops.clone();

if ops.is_empty() {
return ops;
}

{
// check active history
let history = self.active_history.read();
for hist_item in history.0.iter().rev() {
if hist_item.slot.thread != thread {
continue;
}
ops.retain(|op_id| {
!hist_item
.state_changes
.executed_ops_changes
.contains_key(op_id)
});
if ops.is_empty() {
return ops;
}
}
}

{
// check final state
let final_state = self.final_state.read();
ops.retain(|op_id| !final_state.executed_ops.contains(op_id));
}

ops
}

/// Check if a denunciation has been executed given a `DenunciationIndex`
pub fn is_denunciation_executed(&self, denunciation_index: &DenunciationIndex) -> bool {
// check active history
Expand Down Expand Up @@ -1689,18 +1648,30 @@ impl ExecutionState {
context_guard!(self).get_address_future_deferred_credits(address, self.config.thread_count)
}

/// Get the execution statuses of both speculative and final executions
///
/// # Return
/// Get the execution status of a batch of operations.
///
/// * A tuple of hashmaps with:
/// * first the statuses for speculative executions
/// * second the statuses for final executions
pub fn get_op_exec_status(&self) -> (HashMap<OperationId, bool>, HashMap<OperationId, bool>) {
(
self.active_history.read().get_op_exec_status(),
self.final_state.read().executed_ops.op_exec_status.clone(),
)
/// Return value: vector of
/// `(Option<speculative_status>, Option<final_status>)`
/// If an Option is None it means that the op execution was not found.
/// Note that old op executions are forgotten.
/// Otherwise, the status is a boolean indicating whether the execution was successful (true) or if there was an error (false.)
pub fn get_ops_exec_status(&self, batch: &[OperationId]) -> Vec<(Option<bool>, Option<bool>)> {
let speculative_exec = self.active_history.read().get_ops_exec_status(batch);
let final_exec = self
.final_state
.read()
.executed_ops
.get_ops_exec_status(batch);
speculative_exec
.into_iter()
.zip(final_exec.into_iter())
.map(|(speculative_v, final_v)| {
match (speculative_v, final_v) {
(None, Some(f)) => (Some(f), Some(f)), // special case: a final execution should also appear as speculative
(s, f) => (s, f),
}
})
.collect()
}

/// Update MipStore with block header stats
Expand Down
Loading

0 comments on commit dc353ea

Please sign in to comment.