Skip to content

Commit

Permalink
Updating defunct docs from new factory migration (#241)
Browse files Browse the repository at this point in the history
* Updating defunct docs from new factory migration

* Version bump to 0.10.1
  • Loading branch information
slawlor committed May 22, 2024
1 parent d5e3c92 commit c48de11
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 134 deletions.
2 changes: 1 addition & 1 deletion ractor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor"
version = "0.10.0"
version = "0.10.1"
authors = ["Sean Lawlor", "Evan Au", "Dillon George"]
description = "A actor framework for Rust"
documentation = "https://docs.rs/ractor"
Expand Down
2 changes: 2 additions & 0 deletions ractor/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ impl<T> std::fmt::Debug for MessagingErr<T> {
}
}

// SAFETY: This is required in order to map [MessagingErr] to
// ActorProcessingErr which requires errors to be Sync.
impl<T> std::error::Error for MessagingErr<T> {}

impl<T> From<tokio::sync::mpsc::error::SendError<T>> for MessagingErr<T> {
Expand Down
2 changes: 1 addition & 1 deletion ractor/src/factory/discard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl DiscardSettings {
pub trait DynamicDiscardController: Send + Sync + 'static {
/// Compute the new threshold for discarding
///
/// If you want to utilize metrics exposed in [crate::modular_factory::stats] you can gather them
/// If you want to utilize metrics exposed in [crate::factory::stats] you can gather them
/// by utilizing `stats_facebook::service_data::get_service_data_singleton` to retrieve a
/// accessor to `ServiceData` which you can then resolve stats by name (either timeseries or
/// counters)
Expand Down
2 changes: 1 addition & 1 deletion ractor/src/factory/factoryimpl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ where
/// * `router`: The message routing implementation the factory should use. Implements
/// the [Router] trait.
/// * `queue`: The message queueing implementation the factory should use. Implements
/// the [FactoryQueue] trait.
/// the [Queue] trait.
pub fn new<TBuilder: WorkerBuilder<TWorker, TWorkerStart> + 'static>(
worker_builder: TBuilder,
router: TRouter,
Expand Down
269 changes: 140 additions & 129 deletions ractor/src/factory/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,149 +13,160 @@
//!
//! ## Worker message routing mode
//!
//! The factory has a series of dispatch modes which are defined in [RoutingMode] and
//! The factory has a series of dispatch modes which are defined in the [routing] module and
//! control the way the factory dispatches work to workers. This should be selected based
//! on the intended workload. Some general guidance:
//!
//! 1. If you need to process a sequence of operations on a given key (i.e. the Job is a user, and
//! there's a sequential list of updates to that user). You then want the job to land on the same
//! worker and should select [RoutingMode::KeyPersistent] or [RoutingMode::StickyQueuer].
//! 2. If you don't need a sequence of operations then [RoutingMode::Queuer] is likely a good choice.
//! 3. If your workers are making remote calls to other services/actors you probably want [RoutingMode::Queuer]
//! or [RoutingMode::StickyQueuer] to prevent head-of-the-line contention. Otherwise [RoutingMode::KeyPersistent]
//! worker and should select [routing::KeyPersistentRouting] or [routing::StickyQueuerRouting].
//! 2. If you don't need a sequence of operations then [routing::QueuerRouting] is likely a good choice.
//! 3. If your workers are making remote calls to other services/actors you probably want [routing::QueuerRouting]
//! or [routing::StickyQueuerRouting] to prevent head-of-the-line contention. Otherwise [routing::KeyPersistentRouting]
//! is sufficient.
//! 4. For some custom defined routing, you can define your own [CustomHashFunction] which will be
//! used in conjunction with [RoutingMode::CustomHashFunction] to take the incoming job key and
//! 4. For some custom defined routing, you can define your own [routing::CustomHashFunction] which will be
//! used in conjunction with [routing::CustomRouting] to take the incoming job key and
//! the space which should be hashed to (i.e. the number of workers).
//! 5. If you just want load balancing there's also [RoutingMode::RoundRobin] and [RoutingMode::Random]
//! for general 1-off dispatching of jobs
//! 5. If you just want load balancing there's also [routing::RoundRobinRouting] for general 1-off
//! dispatching of jobs
//!
//! ## Factory queueing
//!
//! The factory can also support factory-side or worker-side queueing of extra work messages based on the definition
//! of the [routing::Router] and [queues::Queue] assigned to the factory.
//!
//! Supported queueing protocols today for factory-side queueing is
//!
//! 1. Default, no-priority, queueing: [queues::DefaultQueue]
//! 2. Priority-based queuing, based on a constant number of priorities [queues::PriorityQueue]
//!
//! ## Worker lifecycle
//!
//! A worker's lifecycle is managed by the factory. If the worker dies or crashes, the factory will
//! replace the worker with a new instance and continue processing jobs for that worker. The
//! factory also maintains the worker's message queue's so messages won't be lost which were in the
//! "worker"'s queue.
#![cfg_attr(
not(feature = "cluster"),
doc = "
## Example Factory
```rust
use ractor::concurrency::Duration;
use ractor::factory::*;
use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort};
#[derive(Debug)]
enum ExampleMessage {
PrintValue(u64),
EchoValue(u64, RpcReplyPort<u64>),
}
/// The worker's specification for the factory. This defines
/// the business logic for each message that will be done in parallel.
struct ExampleWorker;
#[cfg_attr(feature = \"async-trait\", ractor::async_trait)]
impl Actor for ExampleWorker {
type Msg = WorkerMessage<(), ExampleMessage>;
type State = WorkerStartContext<(), ExampleMessage, ()>;
type Arguments = WorkerStartContext<(), ExampleMessage, ()>;
async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
startup_context: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
Ok(startup_context)
}
async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
WorkerMessage::FactoryPing(time) => {
// This is a message which all factory workers **must**
// adhere to. It is a background processing message from the
// factory which is used for (a) metrics and (b) detecting
// stuck workers, i.e. workers which aren't making progress
// processing their messages
state
.factory
.cast(FactoryMessage::WorkerPong(state.wid, time.elapsed()))?;
}
WorkerMessage::Dispatch(job) => {
// Actual business logic that we want to parallelize
tracing::trace!(\"Worker {} received {:?}\", state.wid, job.msg);
match job.msg {
ExampleMessage::PrintValue(value) => {
tracing::info!(\"Worker {} printing value {value}\", state.wid);
}
ExampleMessage::EchoValue(value, reply) => {
tracing::info!(\"Worker {} echoing value {value}\", state.wid);
let _ = reply.send(value);
}
}
// job finished, on success or err we report back to the factory
state
.factory
.cast(FactoryMessage::Finished(state.wid, job.key))?;
}
}
Ok(())
}
}
/// Used by the factory to build new [ExampleWorker]s.
struct ExampleWorkerBuilder;
impl WorkerBuilder<ExampleWorker, ()> for ExampleWorkerBuilder {
fn build(&self, _wid: usize) -> (ExampleWorker, ()) {
(ExampleWorker, ())
}
}
#[tokio::main]
async fn main() {
let factory_def = Factory::<
(),
ExampleMessage,
(),
ExampleWorker,
routing::QueuerRouting<(), ExampleMessage>,
queues::DefaultQueue<(), ExampleMessage>
>::default();
let factory_args = FactoryArgumentsBuilder::new(ExampleWorkerBuilder, Default::default(), Default::default())
.with_number_of_initial_workers(5)
.build();
let (factory, handle) = Actor::spawn(None, factory_def, factory_args)
.await
.expect(\"Failed to startup factory\");
for i in 0..99 {
factory
.cast(FactoryMessage::Dispatch(Job {
key: (),
msg: ExampleMessage::PrintValue(i),
options: JobOptions::default(),
}))
.expect(\"Failed to send to factory\");
}
let reply = factory
.call(
|prt| {
FactoryMessage::Dispatch(Job {
key: (),
msg: ExampleMessage::EchoValue(123, prt),
options: JobOptions::default(),
})
},
None,
)
.await
.expect(\"Failed to send to factory\")
.expect(\"Failed to parse reply\");
assert_eq!(reply, 123);
factory.stop(None);
handle.await.unwrap();
}
```
"
)]
//!
//! ## Example Factory
//! ```rust
//! use ractor::concurrency::Duration;
//! use ractor::factory::*;
//! use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort};
//!
//! #[derive(Debug)]
//! enum ExampleMessage {
//! PrintValue(u64),
//! EchoValue(u64, RpcReplyPort<u64>),
//! }
//!
//! #[cfg(feature = "cluster")]
//! impl ractor::Message for ExampleMessage {}
//!
//! /// The worker's specification for the factory. This defines
//! /// the business logic for each message that will be done in parallel.
//! struct ExampleWorker;
//! #[cfg_attr(feature = "async-trait", ractor::async_trait)]
//! impl Actor for ExampleWorker {
//! type Msg = WorkerMessage<(), ExampleMessage>;
//! type State = WorkerStartContext<(), ExampleMessage, ()>;
//! type Arguments = WorkerStartContext<(), ExampleMessage, ()>;
//! async fn pre_start(
//! &self,
//! _myself: ActorRef<Self::Msg>,
//! startup_context: Self::Arguments,
//! ) -> Result<Self::State, ActorProcessingErr> {
//! Ok(startup_context)
//! }
//! async fn handle(
//! &self,
//! _myself: ActorRef<Self::Msg>,
//! message: Self::Msg,
//! state: &mut Self::State,
//! ) -> Result<(), ActorProcessingErr> {
//! match message {
//! WorkerMessage::FactoryPing(time) => {
//! // This is a message which all factory workers **must**
//! // adhere to. It is a background processing message from the
//! // factory which is used for (a) metrics and (b) detecting
//! // stuck workers, i.e. workers which aren't making progress
//! // processing their messages
//! state
//! .factory
//! .cast(FactoryMessage::WorkerPong(state.wid, time.elapsed()))?;
//! }
//! WorkerMessage::Dispatch(job) => {
//! // Actual business logic that we want to parallelize
//! tracing::trace!("Worker {} received {:?}", state.wid, job.msg);
//! match job.msg {
//! ExampleMessage::PrintValue(value) => {
//! tracing::info!("Worker {} printing value {value}", state.wid);
//! }
//! ExampleMessage::EchoValue(value, reply) => {
//! tracing::info!("Worker {} echoing value {value}", state.wid);
//! let _ = reply.send(value);
//! }
//! }
//! // job finished, on success or err we report back to the factory
//! state
//! .factory
//! .cast(FactoryMessage::Finished(state.wid, job.key))?;
//! }
//! }
//! Ok(())
//! }
//! }
//! /// Used by the factory to build new [ExampleWorker]s.
//! struct ExampleWorkerBuilder;
//! impl WorkerBuilder<ExampleWorker, ()> for ExampleWorkerBuilder {
//! fn build(&self, _wid: usize) -> (ExampleWorker, ()) {
//! (ExampleWorker, ())
//! }
//! }
//! #[tokio::main]
//! async fn main() {
//! let factory_def = Factory::<
//! (),
//! ExampleMessage,
//! (),
//! ExampleWorker,
//! routing::QueuerRouting<(), ExampleMessage>,
//! queues::DefaultQueue<(), ExampleMessage>
//! >::default();
//! let factory_args = FactoryArgumentsBuilder::new(ExampleWorkerBuilder, Default::default(), Default::default())
//! .with_number_of_initial_workers(5)
//! .build();
//!
//! let (factory, handle) = Actor::spawn(None, factory_def, factory_args)
//! .await
//! .expect("Failed to startup factory");
//! for i in 0..99 {
//! factory
//! .cast(FactoryMessage::Dispatch(Job {
//! key: (),
//! msg: ExampleMessage::PrintValue(i),
//! options: JobOptions::default(),
//! }))
//! .expect("Failed to send to factory");
//! }
//! let reply = factory
//! .call(
//! |prt| {
//! FactoryMessage::Dispatch(Job {
//! key: (),
//! msg: ExampleMessage::EchoValue(123, prt),
//! options: JobOptions::default(),
//! })
//! },
//! None,
//! )
//! .await
//! .expect("Failed to send to factory")
//! .expect("Failed to parse reply");
//! assert_eq!(reply, 123);
//! factory.stop(None);
//! handle.await.unwrap();
//! }
//! ```

use crate::concurrency::{Duration, Instant};
#[cfg(feature = "cluster")]
Expand Down
2 changes: 1 addition & 1 deletion ractor_cluster/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor_cluster"
version = "0.10.0"
version = "0.10.1"
authors = ["Sean Lawlor", "Evan Au", "Dillon George"]
description = "Distributed cluster environment of Ractor actors"
documentation = "https://docs.rs/ractor"
Expand Down
2 changes: 1 addition & 1 deletion ractor_cluster_derive/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor_cluster_derive"
version = "0.10.0"
version = "0.10.1"
authors = ["Sean Lawlor <seanlawlor@fb.com>"]
description = "Derives for ractor_cluster"
license = "MIT"
Expand Down

0 comments on commit c48de11

Please sign in to comment.