From 224a7f1b3bf8b9f68f3fb70ecbc4742be3b2b0d0 Mon Sep 17 00:00:00 2001 From: Colin Roberts Date: Tue, 6 Feb 2024 13:49:31 -0700 Subject: [PATCH 1/7] cleanup and start errors --- arbiter-engine/src/agent.rs | 35 +++------------ arbiter-engine/src/errors.rs | 12 ++++++ arbiter-engine/src/lib.rs | 9 ++-- arbiter-engine/src/machine.rs | 79 ++++++++++++++++++++-------------- arbiter-engine/src/messager.rs | 1 - arbiter-engine/src/universe.rs | 5 +-- arbiter-engine/src/world.rs | 29 +++---------- 7 files changed, 79 insertions(+), 91 deletions(-) create mode 100644 arbiter-engine/src/errors.rs diff --git a/arbiter-engine/src/agent.rs b/arbiter-engine/src/agent.rs index d7330a48..b428f62b 100644 --- a/arbiter-engine/src/agent.rs +++ b/arbiter-engine/src/agent.rs @@ -1,12 +1,8 @@ //! The agent module contains the core agent abstraction for the Arbiter Engine. -use std::{fmt::Debug, sync::Arc}; - -use arbiter_core::middleware::RevmMiddleware; -use serde::{de::DeserializeOwned, Serialize}; -use thiserror::Error; - +use super::*; use crate::{ + errors::ArbiterEngineError, machine::{Behavior, Engine, StateMachine}, messager::Messager, }; @@ -21,6 +17,7 @@ use crate::{ /// each of its [`Behavior`]s `startup()` methods. The [`Behavior`]s themselves /// will return a stream of events that then let the [`Behavior`] move into the /// `State::Processing` stage. +#[derive(Debug)] pub struct Agent { /// Identifier for this agent. /// Used for routing messages. @@ -38,17 +35,6 @@ pub struct Agent { pub(crate) behavior_engines: Vec>, } -impl Debug for Agent { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Agent") - .field("id", &self.id) - .field("messager", &self.messager) - .field("client", &self.client) - .field("behavior_engines", &self.behavior_engines) - .finish() - } -} - impl Agent { /// Creates a new [`AgentBuilder`] instance with a specified identifier. /// @@ -165,7 +151,7 @@ impl AgentBuilder { self, client: Arc, messager: Messager, - ) -> Result { + ) -> Result { match self.behavior_engines { Some(engines) => Ok(Agent { id: self.id, @@ -173,16 +159,9 @@ impl AgentBuilder { client, behavior_engines: engines, }), - None => Err(AgentBuildError::MissingBehaviorEngines), + None => Err(ArbiterEngineError::AgentBuildError( + "Missing behavior engines".to_owned(), + )), } } } - -/// enum representing the possible error states encountered by the agent builder -#[derive(Debug, Error, Clone, PartialEq, Eq)] -pub enum AgentBuildError { - /// Error representing the case where the agent is missing behavior engines; - /// an agent has to have behaviors to be useful! - #[error("Agent is missing behavior engines")] - MissingBehaviorEngines, -} diff --git a/arbiter-engine/src/errors.rs b/arbiter-engine/src/errors.rs new file mode 100644 index 00000000..54883e73 --- /dev/null +++ b/arbiter-engine/src/errors.rs @@ -0,0 +1,12 @@ +use thiserror::Error; + +use super::*; + +#[derive(Debug, Error)] +pub enum ArbiterEngineError { + #[error("AgentBuildError: {0}")] + AgentBuildError(String), + + #[error("StateMachineError: {0}")] + JoinError(#[from] tokio::task::JoinError), +} diff --git a/arbiter-engine/src/lib.rs b/arbiter-engine/src/lib.rs index c85ee2f1..3cf3dddb 100644 --- a/arbiter-engine/src/lib.rs +++ b/arbiter-engine/src/lib.rs @@ -5,14 +5,17 @@ //! distributed fashion where each agent is running in its own process and //! communicating with other agents via a messaging layer. -use std::collections::HashMap; +use std::{collections::HashMap, fmt::Debug, sync::Arc}; use anyhow::{anyhow, Result}; -use serde::{Deserialize, Serialize}; -#[allow(unused)] +use arbiter_core::middleware::RevmMiddleware; +use futures_util::future::join_all; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use tokio::task::{spawn, JoinError}; use tracing::{debug, trace, warn}; pub mod agent; +pub mod errors; pub mod examples; pub mod machine; pub mod messager; diff --git a/arbiter-engine/src/machine.rs b/arbiter-engine/src/machine.rs index 99b5c63a..e08b51cb 100644 --- a/arbiter-engine/src/machine.rs +++ b/arbiter-engine/src/machine.rs @@ -1,13 +1,12 @@ //! The [`StateMachine`] trait, [`Behavior`] trait, and the [`Engine`] that runs //! [`Behavior`]s. -use std::{fmt::Debug, pin::Pin, sync::Arc}; +use std::pin::Pin; -use arbiter_core::middleware::RevmMiddleware; use futures_util::{Stream, StreamExt}; -use serde::de::DeserializeOwned; +use tokio::task::JoinHandle; -use self::messager::Messager; +use self::{errors::ArbiterEngineError, messager::Messager}; use super::*; /// A type alias for a pinned, boxed stream of events. @@ -29,15 +28,21 @@ pub enum MachineInstruction { /// Used to make a [`StateMachine`] process events. /// This will offload the process into a task that can be halted by sending - /// a [`MachineHalt`] message from the [`Messager`]. For our purposes, the - /// [`crate::world::World`] will handle this. + /// a [`ControlFlow::Halt`] message from the [`Messager`]. For our purposes, + /// the [`crate::world::World`] will handle this. Process, } -/// The message that can be used in a [`StateMachine`] to halt its processing. -/// Optionally returned by [`Behavior::process`] to close tasks. +/// The message that is used in a [`StateMachine`] to continue or halt its +/// processing. #[derive(Clone, Copy, Debug, Serialize, Deserialize)] -pub struct MachineHalt; +pub enum ControlFlow { + /// Used to halt the processing of a [`StateMachine`]. + Halt, + + /// Used to continue on the processing of a [`StateMachine`]. + Continue, +} /// The state used by any entity implementing [`StateMachine`]. #[derive(Clone, Copy, Debug)] @@ -68,12 +73,16 @@ pub trait Behavior: Serialize + DeserializeOwned + Send + Sync + Debug + 'sta /// Used to start the agent. /// This is where the agent can engage in its specific start up activities /// that it can do given the current state of the world. - async fn startup(&mut self, client: Arc, messager: Messager) -> EventStream; + async fn startup( + &mut self, + client: Arc, + messager: Messager, + ) -> Result, ArbiterEngineError>; /// Used to process events. /// This is where the agent can engage in its specific processing /// of events that can lead to actions being taken. - async fn process(&mut self, event: E) -> Option; + async fn process(&mut self, event: E) -> Result; } /// A trait for creating a state machine. /// @@ -130,7 +139,7 @@ pub trait StateMachine: Send + Sync + Debug + 'static { /// This method does not return a value, but it may result in state changes /// within the implementing type or the generation of further instructions /// or events. - async fn execute(&mut self, instruction: MachineInstruction); + async fn execute(&mut self, instruction: MachineInstruction) -> Result<(), ArbiterEngineError>; } /// The `Engine` struct represents the core logic unit of a state machine-based @@ -201,39 +210,43 @@ where B: Behavior + Debug + Serialize + DeserializeOwned, E: DeserializeOwned + Serialize + Send + Sync + Debug + 'static, { - async fn execute(&mut self, instruction: MachineInstruction) { + async fn execute(&mut self, instruction: MachineInstruction) -> Result<(), ArbiterEngineError> { match instruction { MachineInstruction::Start(client, messager) => { - trace!("Behavior is starting up."); self.state = State::Starting; let mut behavior = self.behavior.take().unwrap(); - let behavior_task = tokio::spawn(async move { - let id = messager.id.clone(); - let stream = behavior.startup(client, messager).await; - debug!("startup complete for {:?}!", id); - (stream, behavior) - }); - let (stream, behavior) = behavior_task.await.unwrap(); + let behavior_task: JoinHandle, B), ArbiterEngineError>> = + tokio::spawn(async move { + let id = messager.id.clone(); + let stream = behavior.startup(client, messager).await?; + debug!("startup complete for {:?}!", id); + Ok((stream, behavior)) + }); + let (stream, behavior) = behavior_task.await??; self.event_stream = Some(stream); self.behavior = Some(behavior); - // TODO: This feels weird but I think it works properly? self.execute(MachineInstruction::Process).await; + Ok(()) } MachineInstruction::Process => { let mut behavior = self.behavior.take().unwrap(); let mut stream = self.event_stream.take().unwrap(); - let behavior_task = tokio::spawn(async move { - while let Some(event) = stream.next().await { - let halt_option = behavior.process(event).await; - if halt_option.is_some() { - break; + let behavior_task: JoinHandle> = + tokio::spawn(async move { + while let Some(event) = stream.next().await { + match behavior.process(event).await? { + ControlFlow::Halt => { + break; + } + ControlFlow::Continue => {} + } } - } - behavior - }); - // TODO: This could be removed as we probably don't need to have the behavior - // stored once its done. - self.behavior = Some(behavior_task.await.unwrap()); + Ok(behavior) + }); + // TODO: We don't have to store the behavior again here, we could just discard + // it. + self.behavior = Some(behavior_task.await??); + Ok(()) } } } diff --git a/arbiter-engine/src/messager.rs b/arbiter-engine/src/messager.rs index 7f4032ac..c0f54b54 100644 --- a/arbiter-engine/src/messager.rs +++ b/arbiter-engine/src/messager.rs @@ -4,7 +4,6 @@ // TODO: It might be nice to have some kind of messaging header so that we can // pipe messages to agents and pipe messages across worlds. -use serde::Serialize; use tokio::sync::broadcast::{channel, Receiver, Sender}; use self::machine::EventStream; diff --git a/arbiter-engine/src/universe.rs b/arbiter-engine/src/universe.rs index 66154dff..c63f0c42 100644 --- a/arbiter-engine/src/universe.rs +++ b/arbiter-engine/src/universe.rs @@ -1,12 +1,9 @@ //! The [`universe`] module contains the [`Universe`] struct which is the //! primary interface for creating and running many `World`s in parallel. -use std::collections::HashMap; - use anyhow::Result; -use futures_util::future::join_all; -use tokio::task::{spawn, JoinError}; +use super::*; use crate::world::World; /// The [`Universe`] struct is the primary interface for creating and running diff --git a/arbiter-engine/src/world.rs b/arbiter-engine/src/world.rs index 53981cdc..e21a454d 100644 --- a/arbiter-engine/src/world.rs +++ b/arbiter-engine/src/world.rs @@ -1,30 +1,15 @@ -// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -// TODO: Notes ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -// * Probably should move labels to world instead of on the environment. -// * One thing that is different about the Arbiter world is that give a bunch of -// different channels to communicate with the Environment's tx thread. This is -// different from a connection to a blockchain where you typically will just -// have a single HTTP/WS connection. What we want is some kind of way of -// having the world own a reference to a provider or something -// * Can add a messager as an interconnect and have the manager give each world -// it owns a clone of the same messager. -// * The worlds now are just going to be revm worlds. We can generalize this -// later. -// * Can we give the world an address book?? -// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - //! The world module contains the core world abstraction for the Arbiter Engine. -use std::{collections::VecDeque, fmt::Debug}; +use std::collections::VecDeque; -use arbiter_core::{environment::Environment, middleware::RevmMiddleware}; -use futures_util::future::join_all; -use serde::de::DeserializeOwned; -use tokio::spawn; +use arbiter_core::environment::Environment; -use self::{agent::AgentBuilder, machine::MachineInstruction}; use super::*; -use crate::{agent::Agent, machine::CreateStateMachine, messager::Messager}; +use crate::{ + agent::{Agent, AgentBuilder}, + machine::{CreateStateMachine, MachineInstruction}, + messager::Messager, +}; /// A world is a collection of agents that use the same type of provider, e.g., /// operate on the same blockchain or same `Environment`. The world is From 80a92a61c35f6a76f3f1376df2a71cefdf10d529 Mon Sep 17 00:00:00 2001 From: Colin Roberts Date: Tue, 6 Feb 2024 15:09:12 -0700 Subject: [PATCH 2/7] saving --- arbiter-engine/src/agent.rs | 2 +- arbiter-engine/src/errors.rs | 8 +++++++- .../src/examples/minter/behaviors/token_admin.rs | 10 +++++----- .../examples/minter/behaviors/token_requester.rs | 15 +++++++++------ arbiter-engine/src/examples/minter/mod.rs | 2 +- arbiter-engine/src/examples/mod.rs | 3 ++- arbiter-engine/src/examples/timed_message/mod.rs | 14 +++++--------- arbiter-engine/src/machine.rs | 4 ++-- arbiter-engine/src/messager.rs | 16 +++++++++------- arbiter-engine/src/world.rs | 1 + 10 files changed, 42 insertions(+), 33 deletions(-) diff --git a/arbiter-engine/src/agent.rs b/arbiter-engine/src/agent.rs index b428f62b..0ee5119a 100644 --- a/arbiter-engine/src/agent.rs +++ b/arbiter-engine/src/agent.rs @@ -105,7 +105,7 @@ impl AgentBuilder { /// # Returns /// /// Returns the `AgentBuilder` instance to allow for method chaining. - pub fn with_engine(mut self, engine: Box) -> Self { + pub(crate) fn with_engine(mut self, engine: Box) -> Self { if let Some(engines) = &mut self.behavior_engines { engines.push(engine); } else { diff --git a/arbiter-engine/src/errors.rs b/arbiter-engine/src/errors.rs index 54883e73..7dab0999 100644 --- a/arbiter-engine/src/errors.rs +++ b/arbiter-engine/src/errors.rs @@ -7,6 +7,12 @@ pub enum ArbiterEngineError { #[error("AgentBuildError: {0}")] AgentBuildError(String), - #[error("StateMachineError: {0}")] + #[error("MessagerError: {0}")] + MessagerError(String), + + #[error(transparent)] JoinError(#[from] tokio::task::JoinError), + + #[error(transparent)] + SerdeJsonError(#[from] serde_json::Error), } diff --git a/arbiter-engine/src/examples/minter/behaviors/token_admin.rs b/arbiter-engine/src/examples/minter/behaviors/token_admin.rs index 59d02271..7ab29d20 100644 --- a/arbiter-engine/src/examples/minter/behaviors/token_admin.rs +++ b/arbiter-engine/src/examples/minter/behaviors/token_admin.rs @@ -31,7 +31,7 @@ impl Behavior for TokenAdmin { &mut self, client: Arc, messager: Messager, - ) -> EventStream { + ) -> Result, ArbiterEngineError> { self.messager = Some(messager.clone()); self.client = Some(client.clone()); for token_data in self.token_data.values_mut() { @@ -53,12 +53,12 @@ impl Behavior for TokenAdmin { .get_or_insert_with(HashMap::new) .insert(token_data.name.clone(), token.clone()); } - Box::pin(messager.stream()) + Ok(Box::pin(messager.stream())) } #[tracing::instrument(skip(self), fields(id = self.messager.as_ref().unwrap().id.as_deref()))] - async fn process(&mut self, event: Message) -> Option { + async fn process(&mut self, event: Message) -> Result { if self.tokens.is_none() { error!( "There were no tokens to deploy! You must add tokens to @@ -98,10 +98,10 @@ impl Behavior for TokenAdmin { self.count += 1; if self.count == self.max_count.unwrap_or(u64::MAX) { warn!("Reached max count. Halting behavior."); - return Some(MachineHalt); + return Ok(ControlFlow::Halt); } } } - None + Ok(ControlFlow::Continue) } } diff --git a/arbiter-engine/src/examples/minter/behaviors/token_requester.rs b/arbiter-engine/src/examples/minter/behaviors/token_requester.rs index c662d396..204dc1e6 100644 --- a/arbiter-engine/src/examples/minter/behaviors/token_requester.rs +++ b/arbiter-engine/src/examples/minter/behaviors/token_requester.rs @@ -2,7 +2,10 @@ use arbiter_bindings::bindings::arbiter_token::TransferFilter; use arbiter_core::data_collection::EventLogger; use token_admin::{MintRequest, TokenAdminQuery}; -use self::{examples::minter::agents::token_requester::TokenRequester, machine::EventStream}; +use self::{ + errors::ArbiterEngineError, examples::minter::agents::token_requester::TokenRequester, + machine::EventStream, +}; use super::*; #[async_trait::async_trait] @@ -12,7 +15,7 @@ impl Behavior for TokenRequester { &mut self, client: Arc, mut messager: Messager, - ) -> EventStream { + ) -> Result, ArbiterEngineError> { messager .send( To::Agent(self.request_to.clone()), @@ -35,18 +38,18 @@ impl Behavior for TokenRequester { self.messager = Some(messager.clone()); self.client = Some(client.clone()); - return Box::pin( + Ok(Box::pin( EventLogger::builder() .add_stream(token.transfer_filter()) .stream() .unwrap() .map(|value| serde_json::from_str(&value).unwrap()), - ); + )) } #[tracing::instrument(skip(self), fields(id = self.messager.as_ref().unwrap().id.as_deref()))] - async fn process(&mut self, event: TransferFilter) -> Option { + async fn process(&mut self, event: TransferFilter) -> Result { let messager = self.messager.as_ref().unwrap(); while (self.count < self.max_count.unwrap()) { debug!("sending message from requester"); @@ -60,6 +63,6 @@ impl Behavior for TokenRequester { .await; self.count += 1; } - Some(MachineHalt) + Ok(ControlFlow::Halt) } } diff --git a/arbiter-engine/src/examples/minter/mod.rs b/arbiter-engine/src/examples/minter/mod.rs index ba5ec2e3..b6bf9e33 100644 --- a/arbiter-engine/src/examples/minter/mod.rs +++ b/arbiter-engine/src/examples/minter/mod.rs @@ -14,7 +14,7 @@ use tracing::error; use super::*; use crate::{ agent::Agent, - machine::{Behavior, MachineHalt, MachineInstruction, StateMachine}, + machine::{Behavior, ControlFlow, MachineInstruction, StateMachine}, messager::To, world::World, }; diff --git a/arbiter-engine/src/examples/mod.rs b/arbiter-engine/src/examples/mod.rs index 592e9e64..63147553 100644 --- a/arbiter-engine/src/examples/mod.rs +++ b/arbiter-engine/src/examples/mod.rs @@ -11,8 +11,9 @@ use futures_util::{stream, StreamExt}; use super::*; use crate::{ agent::Agent, + errors::ArbiterEngineError, machine::{ - Behavior, CreateStateMachine, Engine, EventStream, MachineHalt, State, StateMachine, + Behavior, ControlFlow, CreateStateMachine, Engine, EventStream, State, StateMachine, }, messager::{Message, Messager, To}, world::World, diff --git a/arbiter-engine/src/examples/timed_message/mod.rs b/arbiter-engine/src/examples/timed_message/mod.rs index 2fd858d2..025e733e 100644 --- a/arbiter-engine/src/examples/timed_message/mod.rs +++ b/arbiter-engine/src/examples/timed_message/mod.rs @@ -16,10 +16,6 @@ fn default_max_count() -> Option { Some(3) } -fn default_max_count() -> Option { - Some(3) -} - #[derive(Debug, Clone, Serialize, Deserialize)] pub(crate) struct TimedMessage { delay: u64, @@ -60,24 +56,24 @@ impl Behavior for TimedMessage { &mut self, _client: Arc, messager: Messager, - ) -> EventStream { + ) -> Result, ArbiterEngineError> { if let Some(startup_message) = &self.startup_message { messager.send(To::All, startup_message).await; } self.messager = Some(messager.clone()); - return messager.stream(); + Ok(messager.stream()) } - async fn process(&mut self, event: Message) -> Option { + async fn process(&mut self, event: Message) -> Result { if event.data == serde_json::to_string(&self.receive_data).unwrap() { let messager = self.messager.clone().unwrap(); messager.send(To::All, self.send_data.clone()).await; self.count += 1; } if self.count == self.max_count.unwrap_or(u64::MAX) { - return Some(MachineHalt); + return Ok(ControlFlow::Halt); } - return None; + Ok(ControlFlow::Continue) } } diff --git a/arbiter-engine/src/machine.rs b/arbiter-engine/src/machine.rs index e08b51cb..45fcb0f5 100644 --- a/arbiter-engine/src/machine.rs +++ b/arbiter-engine/src/machine.rs @@ -157,7 +157,7 @@ pub trait StateMachine: Send + Sync + Debug + 'static { /// - `behavior`: An optional behavior that the engine is currently managing. /// This is where the engine's logic is primarily executed in response to /// events. -pub struct Engine +pub(crate) struct Engine where B: Behavior, { @@ -219,7 +219,7 @@ where tokio::spawn(async move { let id = messager.id.clone(); let stream = behavior.startup(client, messager).await?; - debug!("startup complete for {:?}!", id); + debug!("startup complete for behavior {:?}", id); Ok((stream, behavior)) }); let (stream, behavior) = behavior_task.await??; diff --git a/arbiter-engine/src/messager.rs b/arbiter-engine/src/messager.rs index c0f54b54..b8e7f60a 100644 --- a/arbiter-engine/src/messager.rs +++ b/arbiter-engine/src/messager.rs @@ -1,13 +1,10 @@ //! The messager module contains the core messager layer for the Arbiter Engine. -// TODO: Allow for modulating the capacity of the messager. -// TODO: It might be nice to have some kind of messaging header so that we can -// pipe messages to agents and pipe messages across worlds. - use tokio::sync::broadcast::{channel, Receiver, Sender}; use self::machine::EventStream; use super::*; +use crate::errors::ArbiterEngineError; /// A message that can be sent between agents. #[derive(Clone, Debug, Deserialize, Serialize)] @@ -78,16 +75,21 @@ impl Messager { /// utility function for getting the next value from the broadcast_receiver /// without streaming - pub async fn get_next(&mut self) -> Message { + pub async fn get_next(&mut self) -> Result { + if self.broadcast_receiver.is_none() { + return Err(ArbiterEngineError::MessagerError( + "Receiver has been taken! Are you already streaming?".to_owned(), + )); + } while let Ok(message) = self.broadcast_receiver.as_mut().unwrap().recv().await { match &message.to { To::All => { - return message; + return Ok(message); } To::Agent(id) => { if let Some(self_id) = &self.id { if id == self_id { - return message; + return Ok(message); } } continue; diff --git a/arbiter-engine/src/world.rs b/arbiter-engine/src/world.rs index e21a454d..3ddbb14c 100644 --- a/arbiter-engine/src/world.rs +++ b/arbiter-engine/src/world.rs @@ -113,6 +113,7 @@ impl World { for (agent, behaviors) in agents_map { let mut next_agent = Agent::builder(&agent); for behavior in behaviors { + println!("Behavior: {:?}", behavior); let engine = behavior.create_state_machine(); next_agent = next_agent.with_engine(engine); } From 91bcc33d0514a76b690c250568ea1751ded6cf17 Mon Sep 17 00:00:00 2001 From: Colin Roberts Date: Tue, 6 Feb 2024 17:51:19 -0700 Subject: [PATCH 3/7] save again --- arbiter-engine/src/messager.rs | 32 ++++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/arbiter-engine/src/messager.rs b/arbiter-engine/src/messager.rs index b8e7f60a..87c56d36 100644 --- a/arbiter-engine/src/messager.rs +++ b/arbiter-engine/src/messager.rs @@ -76,12 +76,16 @@ impl Messager { /// utility function for getting the next value from the broadcast_receiver /// without streaming pub async fn get_next(&mut self) -> Result { - if self.broadcast_receiver.is_none() { - return Err(ArbiterEngineError::MessagerError( - "Receiver has been taken! Are you already streaming?".to_owned(), - )); - } - while let Ok(message) = self.broadcast_receiver.as_mut().unwrap().recv().await { + let mut receiver = match self.broadcast_receiver.take() { + Some(receiver) => receiver, + None => { + return Err(ArbiterEngineError::MessagerError( + "Receiver has been taken! Are you already streaming on this messager?" + .to_owned(), + )) + } + }; + while let Ok(message) = receiver.recv().await { match &message.to { To::All => { return Ok(message); @@ -101,9 +105,17 @@ impl Messager { /// Returns a stream of messages that are either sent to [`To::All`] or to /// the agent via [`To::Agent(id)`]. - pub fn stream(mut self) -> EventStream { - let mut receiver = self.broadcast_receiver.take().unwrap(); - Box::pin(async_stream::stream! { + pub fn stream(mut self) -> Result, ArbiterEngineError> { + let mut receiver = match self.broadcast_receiver.take() { + Some(receiver) => receiver, + None => { + return Err(ArbiterEngineError::MessagerError( + "Receiver has been taken! Are you already streaming on this messager?" + .to_owned(), + )) + } + }; + Ok(Box::pin(async_stream::stream! { while let Ok(message) = receiver.recv().await { match &message.to { To::All => { @@ -118,7 +130,7 @@ impl Messager { } } } - }) + })) } /// Asynchronously sends a message to a specified recipient. /// From e89b89a75eae392a09f3eb843b63e17d96fe6616 Mon Sep 17 00:00:00 2001 From: Colin Roberts Date: Tue, 6 Feb 2024 18:27:34 -0700 Subject: [PATCH 4/7] cleanup --- Cargo.lock | 1 - arbiter-engine/Cargo.toml | 1 - arbiter-engine/src/agent.rs | 6 +- arbiter-engine/src/errors.rs | 31 +++++++++- .../examples/minter/behaviors/token_admin.rs | 2 +- .../minter/behaviors/token_requester.rs | 2 +- .../src/examples/timed_message/mod.rs | 8 +-- arbiter-engine/src/lib.rs | 3 +- arbiter-engine/src/machine.rs | 10 ++-- arbiter-engine/src/messager.rs | 24 +++++--- arbiter-engine/src/universe.rs | 10 ++-- arbiter-engine/src/world.rs | 58 +++++-------------- 12 files changed, 80 insertions(+), 76 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0010a7c1..6fd35d45 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -291,7 +291,6 @@ dependencies = [ name = "arbiter-engine" version = "0.1.0" dependencies = [ - "anyhow", "arbiter-bindings", "arbiter-core", "arbiter-macros", diff --git a/arbiter-engine/Cargo.toml b/arbiter-engine/Cargo.toml index 0059563a..ebb80572 100644 --- a/arbiter-engine/Cargo.toml +++ b/arbiter-engine/Cargo.toml @@ -23,7 +23,6 @@ serde_json.workspace = true serde.workspace = true tokio.workspace = true async-stream.workspace = true -anyhow = { version = "=1.0.79" } tracing.workspace = true tokio-stream = "0.1.14" futures = "0.3.30" diff --git a/arbiter-engine/src/agent.rs b/arbiter-engine/src/agent.rs index 0ee5119a..9f5fd59b 100644 --- a/arbiter-engine/src/agent.rs +++ b/arbiter-engine/src/agent.rs @@ -1,11 +1,7 @@ //! The agent module contains the core agent abstraction for the Arbiter Engine. use super::*; -use crate::{ - errors::ArbiterEngineError, - machine::{Behavior, Engine, StateMachine}, - messager::Messager, -}; +use crate::machine::{Behavior, Engine, StateMachine}; /// An agent is an entity capable of processing events and producing actions. /// These are the core actors in simulations or in onchain systems. diff --git a/arbiter-engine/src/errors.rs b/arbiter-engine/src/errors.rs index 7dab0999..f06199b2 100644 --- a/arbiter-engine/src/errors.rs +++ b/arbiter-engine/src/errors.rs @@ -1,18 +1,45 @@ +//! Error types for the arbiter engine. + use thiserror::Error; use super::*; +/// Errors that can occur in the arbiter engine. #[derive(Debug, Error)] pub enum ArbiterEngineError { + /// Error occured with the [`Messager`]. + #[error("MessagerError: {0}")] + MessagerError(String), + + /// Error occured with the [`crate::agent::Agent`]. #[error("AgentBuildError: {0}")] AgentBuildError(String), - #[error("MessagerError: {0}")] - MessagerError(String), + /// Error occured with the [`crate::world::World`]. + #[error("WorldError: {0}")] + WorldError(String), + + /// Error occured with the [`crate::universe::Universe`]. + #[error("UniverseError: {0}")] + UniverseError(String), + /// Error occured in joining a task. #[error(transparent)] JoinError(#[from] tokio::task::JoinError), + /// Error occured in sending a message. + #[error(transparent)] + SendError(#[from] tokio::sync::broadcast::error::SendError), + + /// Error occured in deserializing json. #[error(transparent)] SerdeJsonError(#[from] serde_json::Error), + + /// Error occured in reading in a file. + #[error(transparent)] + IoError(#[from] std::io::Error), + + /// Error occured in deserializing toml. + #[error(transparent)] + TomlError(#[from] toml::de::Error), } diff --git a/arbiter-engine/src/examples/minter/behaviors/token_admin.rs b/arbiter-engine/src/examples/minter/behaviors/token_admin.rs index 7ab29d20..f4d96898 100644 --- a/arbiter-engine/src/examples/minter/behaviors/token_admin.rs +++ b/arbiter-engine/src/examples/minter/behaviors/token_admin.rs @@ -53,7 +53,7 @@ impl Behavior for TokenAdmin { .get_or_insert_with(HashMap::new) .insert(token_data.name.clone(), token.clone()); } - Ok(Box::pin(messager.stream())) + messager.stream() } #[tracing::instrument(skip(self), fields(id = diff --git a/arbiter-engine/src/examples/minter/behaviors/token_requester.rs b/arbiter-engine/src/examples/minter/behaviors/token_requester.rs index 204dc1e6..1616905b 100644 --- a/arbiter-engine/src/examples/minter/behaviors/token_requester.rs +++ b/arbiter-engine/src/examples/minter/behaviors/token_requester.rs @@ -22,7 +22,7 @@ impl Behavior for TokenRequester { &TokenAdminQuery::AddressOf(self.token_data.name.clone()), ) .await; - let message = messager.get_next().await; + let message = messager.get_next().await.unwrap(); let token_address = serde_json::from_str::
(&message.data).unwrap(); let token = ArbiterToken::new(token_address, client.clone()); self.token_data.address = Some(token_address); diff --git a/arbiter-engine/src/examples/timed_message/mod.rs b/arbiter-engine/src/examples/timed_message/mod.rs index 025e733e..ed1cb324 100644 --- a/arbiter-engine/src/examples/timed_message/mod.rs +++ b/arbiter-engine/src/examples/timed_message/mod.rs @@ -61,7 +61,7 @@ impl Behavior for TimedMessage { messager.send(To::All, startup_message).await; } self.messager = Some(messager.clone()); - Ok(messager.stream()) + messager.stream() } async fn process(&mut self, event: Message) -> Result { @@ -94,7 +94,7 @@ async fn echoer() { world.run().await; - let mut stream = Box::pin(messager.stream()); + let mut stream = messager.stream().unwrap(); let mut idx = 0; loop { @@ -136,7 +136,7 @@ async fn ping_pong() { let messager = world.messager.for_agent("outside_world"); world.run().await; - let mut stream = Box::pin(messager.stream()); + let mut stream = messager.stream().unwrap(); let mut idx = 0; loop { @@ -177,7 +177,7 @@ async fn ping_pong_two_agent() { let messager = world.messager.for_agent("outside_world"); world.run().await; - let mut stream = Box::pin(messager.stream()); + let mut stream = messager.stream().unwrap(); let mut idx = 0; loop { diff --git a/arbiter-engine/src/lib.rs b/arbiter-engine/src/lib.rs index 3cf3dddb..b304a573 100644 --- a/arbiter-engine/src/lib.rs +++ b/arbiter-engine/src/lib.rs @@ -7,13 +7,14 @@ use std::{collections::HashMap, fmt::Debug, sync::Arc}; -use anyhow::{anyhow, Result}; use arbiter_core::middleware::RevmMiddleware; use futures_util::future::join_all; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use tokio::task::{spawn, JoinError}; use tracing::{debug, trace, warn}; +use crate::{errors::ArbiterEngineError, messager::Messager}; + pub mod agent; pub mod errors; pub mod examples; diff --git a/arbiter-engine/src/machine.rs b/arbiter-engine/src/machine.rs index 45fcb0f5..e1c31b4a 100644 --- a/arbiter-engine/src/machine.rs +++ b/arbiter-engine/src/machine.rs @@ -6,7 +6,6 @@ use std::pin::Pin; use futures_util::{Stream, StreamExt}; use tokio::task::JoinHandle; -use self::{errors::ArbiterEngineError, messager::Messager}; use super::*; /// A type alias for a pinned, boxed stream of events. @@ -162,10 +161,10 @@ where B: Behavior, { /// The behavior the `Engine` runs. - pub behavior: Option, + behavior: Option, /// The current state of the [`Engine`]. - pub state: State, + state: State, /// The receiver of events that the [`Engine`] will process. /// The [`State::Processing`] stage will attempt a decode of the [`String`]s @@ -211,6 +210,8 @@ where E: DeserializeOwned + Serialize + Send + Sync + Debug + 'static, { async fn execute(&mut self, instruction: MachineInstruction) -> Result<(), ArbiterEngineError> { + // NOTE: The unwraps here are safe because the `Behavior` in an engine is only + // accessed here and it is private. match instruction { MachineInstruction::Start(client, messager) => { self.state = State::Starting; @@ -225,10 +226,11 @@ where let (stream, behavior) = behavior_task.await??; self.event_stream = Some(stream); self.behavior = Some(behavior); - self.execute(MachineInstruction::Process).await; + self.execute(MachineInstruction::Process).await?; Ok(()) } MachineInstruction::Process => { + trace!("Behavior is starting up."); let mut behavior = self.behavior.take().unwrap(); let mut stream = self.event_stream.take().unwrap(); let behavior_task: JoinHandle> = diff --git a/arbiter-engine/src/messager.rs b/arbiter-engine/src/messager.rs index 87c56d36..6d95b4a0 100644 --- a/arbiter-engine/src/messager.rs +++ b/arbiter-engine/src/messager.rs @@ -2,9 +2,8 @@ use tokio::sync::broadcast::{channel, Receiver, Sender}; -use self::machine::EventStream; use super::*; -use crate::errors::ArbiterEngineError; +use crate::machine::EventStream; /// A message that can be sent between agents. #[derive(Clone, Debug, Deserialize, Serialize)] @@ -151,13 +150,20 @@ impl Messager { /// a broadcast to all agents. /// - `data`: The data to be sent in the message. This data is serialized /// into JSON format. - pub async fn send(&self, to: To, data: S) { + pub async fn send(&self, to: To, data: S) -> Result<(), ArbiterEngineError> { trace!("Sending message via messager."); - let message = Message { - from: self.id.clone().unwrap(), - to, - data: serde_json::to_string(&data).unwrap(), - }; - self.broadcast_sender.send(message).unwrap(); + if let Some(id) = &self.id { + let message = Message { + from: id.clone(), + to, + data: serde_json::to_string(&data)?, + }; + self.broadcast_sender.send(message)?; + Ok(()) + } else { + Err(ArbiterEngineError::MessagerError( + "Messager has no ID! You must have an ID to send messages!".to_owned(), + )) + } } } diff --git a/arbiter-engine/src/universe.rs b/arbiter-engine/src/universe.rs index c63f0c42..0a967183 100644 --- a/arbiter-engine/src/universe.rs +++ b/arbiter-engine/src/universe.rs @@ -1,8 +1,6 @@ //! The [`universe`] module contains the [`Universe`] struct which is the //! primary interface for creating and running many `World`s in parallel. -use anyhow::Result; - use super::*; use crate::world::World; @@ -33,12 +31,14 @@ impl Universe { } /// Runs all of the [`World`]s in the [`Universe`] in parallel. - pub async fn run_worlds(&mut self) -> Result<()> { + pub async fn run_worlds(&mut self) -> Result<(), ArbiterEngineError> { if self.is_online() { - return Err(anyhow::anyhow!("Universe is already running.")); + return Err(ArbiterEngineError::UniverseError( + "Universe is already running.".to_owned(), + )); } let mut tasks = Vec::new(); - // TODO: These unwraps need to be checkdd a bit. + // NOTE: Unwrap is safe because we checked if the universe is online. for (_, mut world) in self.worlds.take().unwrap().drain() { tasks.push(spawn(async move { world.run().await.unwrap(); diff --git a/arbiter-engine/src/world.rs b/arbiter-engine/src/world.rs index 3ddbb14c..9dca5211 100644 --- a/arbiter-engine/src/world.rs +++ b/arbiter-engine/src/world.rs @@ -8,7 +8,6 @@ use super::*; use crate::{ agent::{Agent, AgentBuilder}, machine::{CreateStateMachine, MachineInstruction}, - messager::Messager, }; /// A world is a collection of agents that use the same type of provider, e.g., @@ -98,17 +97,15 @@ impl World { pub fn from_config( &mut self, config_path: &str, - ) { - let cwd = std::env::current_dir().expect("Failed to determine current working directory"); + ) -> Result<(), ArbiterEngineError> { + let cwd = std::env::current_dir()?; let path = cwd.join(config_path); - let mut file = File::open(path).expect("Failed to open configuration file"); + let mut file = File::open(path)?; let mut contents = String::new(); - file.read_to_string(&mut contents) - .expect("Failed to read configuration file to string"); + file.read_to_string(&mut contents)?; - let agents_map: HashMap> = - toml::from_str(&contents).expect("Failed to deserialize configuration file"); + let agents_map: HashMap> = toml::from_str(&contents)?; for (agent, behaviors) in agents_map { let mut next_agent = Agent::builder(&agent); @@ -119,6 +116,7 @@ impl World { } self.add_agent(next_agent); } + Ok(()) } /// Adds an agent, constructed from the provided `AgentBuilder`, to the @@ -176,13 +174,16 @@ impl World { /// Returns an error if no agents are found in the world, possibly /// indicating that the world has already been run or that no agents /// were added prior to execution. - pub async fn run(&mut self) -> Result<()> { + pub async fn run(&mut self) -> Result<(), ArbiterEngineError> { + let agents = match self.agents.take() { + Some(agents) => agents, + None => { + return Err(ArbiterEngineError::WorldError( + "No agents found. Has the world already been ran?".to_owned(), + )) + } + }; let mut tasks = vec![]; - // Retrieve the agents, erroring if none are found. - let agents = self - .agents - .take() - .ok_or_else(|| anyhow!("No agents found! Has the world already been run?"))?; // Prepare a queue for messagers corresponding to each behavior engine. let mut messagers = VecDeque::new(); // Populate the messagers queue. @@ -192,6 +193,7 @@ impl World { } } // For each agent, spawn a task for each of its behavior engines. + // Unwrap here is safe as we just built the dang thing. for (_, mut agent) in agents { for mut engine in agent.behavior_engines.drain(..) { let client = agent.client.clone(); @@ -208,31 +210,3 @@ impl World { Ok(()) } } - -#[cfg(test)] -mod tests { - use std::{str::FromStr, sync::Arc}; - - use arbiter_bindings::bindings::weth::WETH; - use ethers::{ - providers::{Middleware, Provider, Ws}, - types::Address, - }; - use futures_util::StreamExt; - - #[ignore = "This is unnecessary to run on CI currently."] - #[tokio::test] - async fn mainnet_ws() { - let ws_url = std::env::var("MAINNET_WS_URL").expect("MAINNET_WS_URL must be set"); - let ws = Ws::connect(ws_url).await.unwrap(); - let provider = Provider::new(ws); - let client = Arc::new(provider); - let weth = WETH::new( - Address::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap(), - client.clone(), - ); - let filter = weth.approval_filter().filter; - let mut subscription = client.subscribe_logs(&filter).await.unwrap(); - println!("next: {:?}", subscription.next().await); - } -} From b17117d48f46dc57a5b33c0098e451540c93f09e Mon Sep 17 00:00:00 2001 From: Colin Roberts Date: Tue, 6 Feb 2024 18:35:36 -0700 Subject: [PATCH 5/7] fixes --- .../arbiter_engine/agents_and_engines.md | 4 ++-- .../src/usage/arbiter_engine/behaviors.md | 20 +++++++++---------- .../src/usage/arbiter_engine/configuration.md | 6 +++--- .../arbiter_engine/worlds_and_universes.md | 8 ++++---- 4 files changed, 19 insertions(+), 19 deletions(-) diff --git a/documentation/src/usage/arbiter_engine/agents_and_engines.md b/documentation/src/usage/arbiter_engine/agents_and_engines.md index 193f53c4..69872dd2 100644 --- a/documentation/src/usage/arbiter_engine/agents_and_engines.md +++ b/documentation/src/usage/arbiter_engine/agents_and_engines.md @@ -13,7 +13,7 @@ The latter will also require a `stream::select` type of operation to merge all t The `Agent` struct is the primary struct that you will be working with. It contains an ID, a client (`Arc`) that provides means to send calls and transactions to an Arbiter `Environment`, and a `Messager`. It looks like this: -```rust +```rust, ignore pub struct Agent { pub id: String, pub messager: Messager, @@ -39,7 +39,7 @@ This encapsulation also allows the `Agent` to hold onto `Behavior` for variou ## Example Let's create an `Agent` that has two `Behavior`s using the `Replier` behavior from before. -```rust +```rust, ignore use arbiter_engine::agent::Agent; use crate::Replier; diff --git a/documentation/src/usage/arbiter_engine/behaviors.md b/documentation/src/usage/arbiter_engine/behaviors.md index dd51377e..6d842eae 100644 --- a/documentation/src/usage/arbiter_engine/behaviors.md +++ b/documentation/src/usage/arbiter_engine/behaviors.md @@ -7,10 +7,10 @@ All you should be looking for is how to define your `Agent`s behaviors and what ## `trait Behavior` To define a `Behavior`, you need to implement the `Behavior` trait on a struct of your own design. The `Behavior` trait is defined as follows: -```rust +```rust, ignore pub trait Behavior { - fn startup(&mut self, client: Arc, messager: Messager) -> EventStream; - fn process(&mut self, event: E) -> Option; + fn startup(&mut self, client: Arc, messager: Messager) -> Result, ArbiterEngineError>; + fn process(&mut self, event: E) -> Result; } ``` To outline the design principles here: @@ -28,11 +28,11 @@ Otherwise you risk having a `Behavior` that is too complex and difficult to unde ### Example To see this in use, let's take a look at an example of a `Behavior` called `Replier` that replies to a message with a message of its own, and stops once it has replied a certain number of times. -```rust +```rust, ignore use std::sync::Arc; use arbiter_core::middleware::RevmMiddleware; use arbiter_engine::{ - machine::{Behavior, MachineHalt}, + machine::{Behavior, ControlFlow}, messager::{Messager, To}, EventStream}; @@ -68,23 +68,23 @@ impl Behavior for Replier { &mut self, client: Arc, messager: Messager, - ) -> EventStream { + ) -> Result, ArbiterEngineError> { if let Some(startup_message) = &self.startup_message { messager.send(To::All, startup_message).await; } self.messager = Some(messager.clone()); - return messager.stream(); + messager.stream() } - async fn process(&mut self, event: Message) -> Option { + async fn process(&mut self, event: Message) -> Result { if event.data == self.receive_data { self.messager.unwrap().messager.send(To::All, send_data).await; self.count += 1; } if self.count == self.max_count { - return Some(MachineHalt); + return Ok(ControlFlow::Halt); } - return None + Ok(ControlFlow::Continue) } } ``` diff --git a/documentation/src/usage/arbiter_engine/configuration.md b/documentation/src/usage/arbiter_engine/configuration.md index 071145db..19b5873d 100644 --- a/documentation/src/usage/arbiter_engine/configuration.md +++ b/documentation/src/usage/arbiter_engine/configuration.md @@ -6,7 +6,7 @@ Let's take a look at how to do this. It is good practice to take your `Behavior`s and wrap them in an `enum` so that you can use them in a configuration file. For instance, let's say you have two struct `Maker` and `Taker` that implement `Behavior` for their own `E`. Then you can make your `enum` like this: -```rust +```rust, ignore use arbiter_macros::Behaviors; #[derive(Behaviors)] @@ -23,7 +23,7 @@ The macro solely requires that the `Behavior`s you have implement the `Behavior` Now that you have your `enum` of `Behavior`s, you can configure your `World` and the `Agent`s inside of it from configuration file. Since the `World` and your simulation is completely defined by the `Agent` `Behavior`s you make, all you need to do is specify your `Agent`s in the configuration file. For example, let's say we have the `Replier` behavior from before, so we have: -```rust +```rust, ignore #[derive(Behaviors)] pub enum Behaviors { Replier(Replier), @@ -57,7 +57,7 @@ Replier = { send_data = "pong", receive_data = "ping", max_count = 5 } ## Loading the Configuration Once you have your configuration file located at `./path/to/config.toml`, you can load it and run your simulation like this: -```rust +```rust, ignore fn main() { let world = World::from_config("./path/to/config.toml")?; world.run().await; diff --git a/documentation/src/usage/arbiter_engine/worlds_and_universes.md b/documentation/src/usage/arbiter_engine/worlds_and_universes.md index 3503ce2c..aa207c25 100644 --- a/documentation/src/usage/arbiter_engine/worlds_and_universes.md +++ b/documentation/src/usage/arbiter_engine/worlds_and_universes.md @@ -10,7 +10,7 @@ The choice is yours. ## `struct Universe` The `Universe` struct looks like this: -```rust +```rust, ignore pub struct Universe { worlds: Option>, world_tasks: Option>>, @@ -24,7 +24,7 @@ The `Universe::run_worlds` currently iterates through the `World`s and starts th ## `struct World` The `World` struct looks like this: -```rust +```rust, ignore pub struct World { pub id: String, pub agents: Option>, @@ -42,7 +42,7 @@ In future development, the `World` will be generic over your choice of `Provider ## Example Let's first do a quick example where we take a `World` and add an `Agent` to it. -```rust +```rust, ignore use arbiter_engine::{agent::Agent, world::World}; use crate::Replier; @@ -62,7 +62,7 @@ fn main() { } ``` If you wanted to extend this to use a `Universe`, you would simply create a `Universe` and add the `World` to it. -```rust +```rust, ignore use arbiter_engine::{agent::Agent, world::World}; use crate::Replier; From 02ac956771344d2ec0b1736583d98b774be4c632 Mon Sep 17 00:00:00 2001 From: Colin Roberts Date: Tue, 6 Feb 2024 18:39:09 -0700 Subject: [PATCH 6/7] version the macros --- Cargo.lock | 2 +- arbiter-macros/Cargo.toml | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 6fd35d45..7e7f6306 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -313,7 +313,7 @@ dependencies = [ [[package]] name = "arbiter-macros" -version = "0.0.0" +version = "0.1.0" dependencies = [ "quote", "syn 2.0.48", diff --git a/arbiter-macros/Cargo.toml b/arbiter-macros/Cargo.toml index 3754898f..8ea92a88 100644 --- a/arbiter-macros/Cargo.toml +++ b/arbiter-macros/Cargo.toml @@ -1,5 +1,6 @@ [package] name = "arbiter-macros" +version = "0.1.0" [lib] proc-macro = true From 21217573f462e4096ab52cf7f9e4b7f937433591 Mon Sep 17 00:00:00 2001 From: Colin Roberts Date: Tue, 6 Feb 2024 18:41:28 -0700 Subject: [PATCH 7/7] typos --- arbiter-engine/src/errors.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/arbiter-engine/src/errors.rs b/arbiter-engine/src/errors.rs index f06199b2..8dcdce64 100644 --- a/arbiter-engine/src/errors.rs +++ b/arbiter-engine/src/errors.rs @@ -7,39 +7,39 @@ use super::*; /// Errors that can occur in the arbiter engine. #[derive(Debug, Error)] pub enum ArbiterEngineError { - /// Error occured with the [`Messager`]. + /// Error occurred with the [`Messager`]. #[error("MessagerError: {0}")] MessagerError(String), - /// Error occured with the [`crate::agent::Agent`]. + /// Error occurred with the [`crate::agent::Agent`]. #[error("AgentBuildError: {0}")] AgentBuildError(String), - /// Error occured with the [`crate::world::World`]. + /// Error occurred with the [`crate::world::World`]. #[error("WorldError: {0}")] WorldError(String), - /// Error occured with the [`crate::universe::Universe`]. + /// Error occurred with the [`crate::universe::Universe`]. #[error("UniverseError: {0}")] UniverseError(String), - /// Error occured in joining a task. + /// Error occurred in joining a task. #[error(transparent)] JoinError(#[from] tokio::task::JoinError), - /// Error occured in sending a message. + /// Error occurred in sending a message. #[error(transparent)] SendError(#[from] tokio::sync::broadcast::error::SendError), - /// Error occured in deserializing json. + /// Error occurred in deserializing json. #[error(transparent)] SerdeJsonError(#[from] serde_json::Error), - /// Error occured in reading in a file. + /// Error occurred in reading in a file. #[error(transparent)] IoError(#[from] std::io::Error), - /// Error occured in deserializing toml. + /// Error occurred in deserializing toml. #[error(transparent)] TomlError(#[from] toml::de::Error), }