Skip to content

Commit

Permalink
feat: Update hermes event queue error handling (#219)
Browse files Browse the repository at this point in the history
* update error handling for event queue

* wip

---------

Co-authored-by: Oleksandr Prokhorenko <djminikin@gmail.com>
  • Loading branch information
Mr-Leshiy and minikin committed May 9, 2024
1 parent 145bfee commit 0f96431
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 110 deletions.
5 changes: 3 additions & 2 deletions hermes/bin/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ pub(crate) trait HermesEventPayload: Send + Sync + 'static {
/// Target Hermes app to execute the event
pub(crate) enum TargetApp {
/// Execute for all available apps
_All,
#[allow(dead_code)]
All,
/// Execute for a specific list of apps
List(Vec<HermesAppName>),
}
Expand All @@ -37,7 +38,7 @@ pub(crate) enum TargetModule {
/// Execute for all available modules
All,
/// Execute for a specific list of modules
_List(Vec<ModuleId>),
List(Vec<ModuleId>),
}

/// Hermes event
Expand Down
197 changes: 96 additions & 101 deletions hermes/bin/src/event/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,33 +21,28 @@ use crate::{
/// Singleton instance of the Hermes event queue.
static EVENT_QUEUE_INSTANCE: OnceCell<HermesEventQueue> = OnceCell::new();

/// Hermes event queue error
/// Failed to add event into the event queue. Event queue is closed.
#[derive(thiserror::Error, Debug, Clone)]
pub(crate) enum Error {
/// Target app not found.
#[error("Target app not found, app name: {0:?}.")]
AppNotFound(HermesAppName),

/// Target module not found.
#[error("Target module not found, module id: {0:?}.")]
ModuleNotFound(ModuleId),

/// Failed to add event into the event queue. Event queue is closed.
#[error("Failed to add event into the event queue. Event queue is closed.")]
CannotAddEvent,

/// Failed when event queue already been initialized.
#[error("Event queue already been initialized.")]
AlreadyInitialized,

/// Failed when event queue not been initialized.
#[error("Event queue not been initialized. Call `HermesEventQueue::init` first.")]
NotInitialized,

/// Event loop has crashed unexpectedly.
#[error("Event loop has crashed unexpectedly.")]
EventLoopPanics,
}
#[error("Failed to add event into the event queue. Event queue is closed.")]
pub(crate) struct CannotAddEventError;

/// Failed when event queue already been initialized.
#[derive(thiserror::Error, Debug, Clone)]
#[error("Event queue already been initialized.")]
pub(crate) struct AlreadyInitializedError;

/// Failed when event queue not been initialized.
#[derive(thiserror::Error, Debug, Clone)]
#[error("Event queue not been initialized. Call `HermesEventQueue::init` first.")]
pub(crate) struct NotInitializedError;

/// Event loop has crashed unexpectedly.
#[derive(thiserror::Error, Debug, Clone)]
#[error("Event loop has crashed unexpectedly.")]
pub(crate) struct EventLoopPanicsError;

/// Hermes event execution context
type ExecutionContext<'a> = (&'a HermesAppName, &'a ModuleId, &'a Module);

/// Hermes event queue.
/// It is a singleton struct.
Expand All @@ -59,97 +54,63 @@ struct HermesEventQueue {
/// Hermes event queue execution loop thread handler
pub(crate) struct HermesEventLoopHandler {
/// Hermes event queue execution loop thread handler
handle: Option<JoinHandle<anyhow::Result<()>>>,
handle: Option<JoinHandle<()>>,
}

impl HermesEventLoopHandler {
/// Join the event loop thread
///
/// # Errors:
/// - `EventLoopPanicsError`
pub(crate) fn join(&mut self) -> anyhow::Result<()> {
match self.handle.take() {
Some(handle) => handle.join().map_err(|_| Error::EventLoopPanics)?,
None => Ok(()),
if let Some(handle) = self.handle.take() {
handle.join().map_err(|_| EventLoopPanicsError)?;
}
Ok(())
}
}

/// Creates a new instance of the `HermesEventQueue`.
/// Runs an event loop thread.
///
/// # Errors:
/// - `Error::AlreadyInitialized`
/// - `AlreadyInitializedError`
pub(crate) fn init(indexed_apps: Arc<IndexedApps>) -> anyhow::Result<HermesEventLoopHandler> {
let (sender, receiver) = std::sync::mpsc::channel();

EVENT_QUEUE_INSTANCE
.set(HermesEventQueue { sender })
.map_err(|_| Error::AlreadyInitialized)?;
.map_err(|_| AlreadyInitializedError)?;

Ok(HermesEventLoopHandler {
handle: Some(thread::spawn(move || {
event_execution_loop(&indexed_apps, receiver)
event_execution_loop(&indexed_apps, receiver);
})),
})
}

/// Add event into the event queue
///
/// # Errors:
/// - `Error::CannotAddEvent`
/// - `Error::NotInitialized`
pub(crate) fn send(event: HermesEvent) -> anyhow::Result<()> {
let queue = EVENT_QUEUE_INSTANCE.get().ok_or(Error::NotInitialized)?;

queue
.sender
.send(event)
.map_err(|_| Error::CannotAddEvent)?;
Ok(())
}

/// Execute a hermes event on the provided module and all necessary info.
///
/// # Errors:
/// - `wasm::module::BadWASMModuleError`
pub(crate) fn event_dispatch(
app_name: HermesAppName, module_id: ModuleId, event: &dyn HermesEventPayload, module: &Module,
) -> anyhow::Result<()> {
let runtime_context = HermesRuntimeContext::new(
app_name,
module_id,
event.event_name().to_string(),
module.exec_counter(),
);

// Advise Runtime Extensions of a new context
new_context(&runtime_context);

module.execute_event(event, runtime_context)?;
Ok(())
}

/// Executes provided Hermes event filtering by target app and target module.
///
/// # Errors:
/// - `Error::ModuleNotFound`
/// - `Error::AppNotFound`
/// - `wasm::module::BadWASMModuleError`
fn targeted_event_execution(indexed_apps: &IndexedApps, event: &HermesEvent) -> anyhow::Result<()> {
/// Get execution context
fn get_execution_context<'a>(
target_app: &'a TargetApp, target_module: &'a TargetModule, indexed_apps: &'a IndexedApps,
) -> Vec<ExecutionContext<'a>> {
// Gather target apps
let target_apps = match event.target_app() {
TargetApp::_All => indexed_apps.iter().collect(),
let target_apps = match target_app {
TargetApp::All => indexed_apps.iter().collect(),
TargetApp::List(target_apps) => {
let mut res = Vec::new();
for app_name in target_apps {
let app = indexed_apps
.get(app_name)
.ok_or(Error::AppNotFound(app_name.to_owned()))?;
let Some(app) = indexed_apps.get(app_name) else {
tracing::error!("Target app not found, app name: {:?}", app_name);
continue;
};

res.push((app_name, app));
}
res
},
};
// Gather target modules
let target_modules = match event.target_module() {
match target_module {
TargetModule::All => {
let mut res = Vec::new();
for (app_name, app) in target_apps {
Expand All @@ -159,39 +120,73 @@ fn targeted_event_execution(indexed_apps: &IndexedApps, event: &HermesEvent) ->
}
res
},
TargetModule::_List(target_modules) => {
TargetModule::List(target_modules) => {
let mut res = Vec::new();
for (app_name, app) in target_apps {
for module_id in target_modules {
let module = app
.indexed_modules()
.get(module_id)
.ok_or(Error::ModuleNotFound(module_id.to_owned()))?;
let Some(module) = app.indexed_modules().get(module_id) else {
tracing::error!(
"Target module not found, app name: {:?}, module id: {:?}",
app_name,
module_id
);
continue;
};

res.push((app_name, module_id, module));
}
}
res
},
};
}
}

/// Add event into the event queue
///
/// # Errors:
/// - `CannotAddEventError`
/// - `NotInitializedError`
pub(crate) fn send(event: HermesEvent) -> anyhow::Result<()> {
let queue = EVENT_QUEUE_INSTANCE.get().ok_or(NotInitializedError)?;

queue.sender.send(event).map_err(|_| CannotAddEventError)?;

Ok(())
}

/// Execute a hermes event on the provided module and all necessary info.
pub(crate) fn event_dispatch(
app_name: HermesAppName, module_id: ModuleId, module: &Module, event: &dyn HermesEventPayload,
) {
let runtime_context = HermesRuntimeContext::new(
app_name,
module_id,
event.event_name().to_string(),
module.exec_counter(),
);

// Advise Runtime Extensions of a new context
new_context(&runtime_context);

if let Err(err) = module.execute_event(event, runtime_context) {
tracing::error!("Error executing event, err: {err}");
}
}

/// Executes provided Hermes event filtering by target app and target module.
fn targeted_event_execution(indexed_apps: &IndexedApps, event: &HermesEvent) {
let execution_contexts =
get_execution_context(event.target_app(), event.target_module(), indexed_apps);

// Event dispatch
for (app_name, module_id, module) in target_modules {
event_dispatch(app_name.clone(), module_id.clone(), event.payload(), module)?;
for (app_name, module_id, module) in execution_contexts {
event_dispatch(app_name.clone(), module_id.clone(), module, event.payload());
}
Ok(())
}

/// Executes Hermes events from the provided receiver .
///
/// # Errors:
/// - `Error::ModuleNotFound`
/// - `Error::AppNotFound`
/// - `wasm::module::BadWASMModuleError`
fn event_execution_loop(
indexed_apps: &IndexedApps, receiver: Receiver<HermesEvent>,
) -> anyhow::Result<()> {
fn event_execution_loop(indexed_apps: &IndexedApps, receiver: Receiver<HermesEvent>) {
for event in receiver {
targeted_event_execution(indexed_apps, &event)?;
targeted_event_execution(indexed_apps, &event);
}
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ fn build_and_send_block_event(
crate::event::queue::send(HermesEvent::new(
on_block_event,
TargetApp::List(vec![module_state_key.0.clone()]),
TargetModule::_List(vec![module_state_key.1.clone()]),
TargetModule::List(vec![module_state_key.1.clone()]),
))
}

Expand All @@ -331,7 +331,7 @@ fn build_and_send_txns_event(
crate::event::queue::send(HermesEvent::new(
on_txn_event,
TargetApp::List(vec![module_state_key.0.clone()]),
TargetModule::_List(vec![module_state_key.1.clone()]),
TargetModule::List(vec![module_state_key.1.clone()]),
))?;
}

Expand All @@ -351,7 +351,7 @@ fn build_and_send_rollback_event(
crate::event::queue::send(HermesEvent::new(
on_rollback_event,
TargetApp::List(vec![module_state_key.0.clone()]),
TargetModule::_List(vec![module_state_key.1.clone()]),
TargetModule::List(vec![module_state_key.1.clone()]),
))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ pub fn execute_event(
event_dispatch(
app_name,
module.id().clone(),
on_bench_event.as_ref(),
module,
)?;
on_bench_event.as_ref(),
);
// module.execute_event(&on_bench_event)?;
BENCH_RESULT_QUEUE.get_or_init(SegQueue::new).pop()
},
Expand All @@ -102,9 +102,9 @@ pub fn execute_event(
event_dispatch(
app_name,
module.id().clone(),
on_test_event.as_ref(),
module,
)?;
on_test_event.as_ref(),
);
TEST_RESULT_QUEUE.get_or_init(SegQueue::new).pop()
},
};
Expand Down

0 comments on commit 0f96431

Please sign in to comment.