diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1ed6dbd..9de075c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -11,15 +11,13 @@ jobs: steps: - name: Install latest stable Rust - uses: actions-rs/toolchain@v1 + uses: dtolnay/rust-toolchain@stable with: - toolchain: stable - override: true components: clippy - name: Checkout crate - uses: actions/checkout@v2 + uses: actions/checkout@v3 - name: Run tests run: bash ci/test.bash @@ -33,14 +31,12 @@ jobs: steps: - name: Install latest nightly Rust - uses: actions-rs/toolchain@v1 + uses: dtolnay/rust-toolchain@nightly with: - toolchain: nightly - override: true components: clippy - name: Checkout crate - uses: actions/checkout@v2 + uses: actions/checkout@v3 - name: Run clippy diff --git a/CHANGELOG.md b/CHANGELOG.md index 09ac124..08597f8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,9 +5,29 @@ [Unreleased]: https://github.com/najamelan/async_executors/compare/release...dev +## [0.3.0] - 2022-07-01 + +[0.3.0]: https://github.com/najamelan/async_executors/compare/0.2.1...0.3.0 + +### Upgraded + + - __BREAKING__: Update thespis to 0.2.0. + - __BREAKING__: Change the `ActorBuilder::channel` signature so you don't + have to manually convert the error type on the `Sink`. + +### Changed + + - __BREAKING__: Names have become mandatory on `Mailbox` and `ActorBuilder`. + - __BREAKING__: Improved ergonomics of using custom channels. + - switch to edition 2021 + +### Added + - more examples and tests + + ## [0.2.1] - 2022-05-26 -[0.2.0]: https://github.com/najamelan/async_executors/compare/0.2.0...0.2.1 +[0.2.1]: https://github.com/najamelan/async_executors/compare/0.2.0...0.2.1 ### Fixed diff --git a/Cargo.toml b/Cargo.toml index e6c2a42..c097506 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,7 +39,7 @@ version = "^0.3" version = "^0.1" [dependencies.thespis] -version = "^0.1" +version = "^0.2" [dependencies.tracing-futures] features = ["futures-03"] @@ -47,6 +47,7 @@ version = "^0.2" [dev-dependencies] async_nursery = "^0.5" +async_progress = "^0.2" futures-timer = "^3" ring-channel = "^0.11" @@ -129,11 +130,6 @@ name = "concurrent_nursery" path = "examples/concurrent_nursery.rs" required-features = ["not_wasm"] -[[example]] -name = "deadlock_prio" -path = "examples/deadlock_prio.rs" -required-features = ["not_wasm"] - [[example]] name = "desugar" path = "examples/desugar.rs" @@ -179,6 +175,11 @@ name = "throttle" path = "examples/throttle.rs" required-features = ["not_wasm"] +[[example]] +name = "tokio_channel" +path = "examples/tokio_channel.rs" +required-features = ["not_wasm"] + [[example]] name = "tracing" path = "examples/tracing.rs" @@ -196,15 +197,14 @@ authors = ["Naja Melan "] categories = ["asynchronous", "concurrency"] description = "Reference implementation for the thespis actor model" documentation = "https://docs.rs/thespis_impl" -edition = "2018" +edition = "2021" homepage = "https://github.com/thespis-rs/thespis_impl" keywords = ["async", "futures", "actor", "thespis"] license = "Unlicense" name = "thespis_impl" readme = "README.md" repository = "https://github.com/thespis-rs/thespis_impl" -resolver = "2" -version = "0.2.1" +version = "0.3.0" [package.metadata] [package.metadata.docs] @@ -219,6 +219,7 @@ actix = "^0.13" actix-rt = "^2" [target."cfg(not(target_arch = \"wasm32\"))".dev-dependencies.async_chanx] +features = ["tokio"] version = "^0.1.0-alpha" [target."cfg(not(target_arch = \"wasm32\"))".dev-dependencies.async_executors] @@ -227,7 +228,7 @@ version = "^0.6" [target."cfg(not(target_arch = \"wasm32\"))".dev-dependencies.criterion] features = [] -version = "^0.3" +version = "^0.4" [target."cfg(not(target_arch = \"wasm32\"))".dev-dependencies.stream_throttle] default-features = false @@ -241,6 +242,9 @@ version = "^1" [target."cfg(not(target_arch = \"wasm32\"))".dev-dependencies.tokio-stream] version = "^0.1" +[target."cfg(not(target_arch = \"wasm32\"))".dev-dependencies.tokio-util] +version = "^0.7" + [target."cfg(target_arch = \"wasm32\")"] [target."cfg(target_arch = \"wasm32\")".dev-dependencies] wasm-bindgen = "^0.2" diff --git a/Cargo.yml b/Cargo.yml index e2b5e06..e082725 100644 --- a/Cargo.yml +++ b/Cargo.yml @@ -25,10 +25,9 @@ package: # - `git tag x.x.x` with version number. # - `git push && git push --tags` # - version : 0.2.1 + version : 0.3.0 name : thespis_impl - edition : '2018' - resolver : '2' + edition : '2021' authors : [ Naja Melan ] description : Reference implementation for the thespis actor model license : Unlicense @@ -75,7 +74,7 @@ dependencies: # async_executors : { version: ^0.6 } futures : { version: ^0.3, features: [ std, compat ], default-features: false } - thespis : { version: ^0.1 } + thespis : { version: ^0.2 } tracing : ^0.1 # private dependencies. @@ -93,6 +92,8 @@ dev-dependencies: async-std : { version: ^1, features: [ attributes ] } # tracing-test : { version: ^0.1, path: ../../RUST/tracing-test/tracing-test } futures-timer : ^3 + async_progress : ^0.2 + target: @@ -112,16 +113,17 @@ target: dev-dependencies: async_executors : { version: ^0.6, features: [localpool, threadpool, async_std, tokio_ct, tokio_tp, tracing] } - async_chanx : { version: ^0.1.0-alpha } + async_chanx : { version: ^0.1.0-alpha, features: [tokio] } stream_throttle : { version: ^0.4, default-features: false, features: [timer-futures-timer] } + tokio-stream : { version: ^0.1 } + tokio-util : { version: ^0.7 } # for benchmarks # actix-rt : ^2 actix : ^0.13 - criterion : { version: ^0.3, features: [] } + criterion : { version: ^0.4, features: [] } tokio : { version: ^1, features: [ sync, macros ] } - tokio-stream : { version: ^0.1 } build-dependencies: @@ -198,10 +200,6 @@ example: path : examples/concurrent_nursery.rs required-features : [ not_wasm ] - - name : deadlock_prio - path : examples/deadlock_prio.rs - required-features : [ not_wasm ] - - name : desugar path : examples/desugar.rs required-features : [ not_wasm ] @@ -238,6 +236,11 @@ example: path : examples/throttle.rs required-features : [ not_wasm ] + - name : tokio_channel + path : examples/tokio_channel.rs + required-features : [ not_wasm ] + - name : tracing path : examples/tracing.rs required-features : [ not_wasm ] + diff --git a/README.md b/README.md index 707f7cf..fa2e273 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # thespis_impl [![standard-readme compliant](https://img.shields.io/badge/readme%20style-standard-brightgreen.svg?style=flat-square)](https://github.com/RichardLitt/standard-readme) -[![Build Status](https://api.travis-ci.org/najamelan/thespis_impl.svg?branch=release)](https://travis-ci.org/najamelan/thespis_impl) +[![Build Status](https://github.com/thespis-rs/thespis_impl/actions/workflows/ci.yml/badge.svg?branch=release&event=push)](https://github.com/thespis-rs/thespis_impl/actions/workflows/ci.yml) [![Docs](https://docs.rs/thespis_impl/badge.svg)](https://docs.rs/thespis_impl) [![crates.io](https://img.shields.io/crates/v/thespis_impl.svg)](https://crates.io/crates/thespis_impl) @@ -34,14 +34,14 @@ With [cargo yaml](https://gitlab.com/storedbox/cargo-yaml): ```yaml dependencies: - thespis_impl: ^0.2 + thespis_impl: ^0.3 ``` In Cargo.toml: ```toml [dependencies] - thespis_impl = "0.2" + thespis_impl = "0.3" ``` ### Upgrade diff --git a/TODO.md b/TODO.md index aa3d518..97d1c9a 100644 --- a/TODO.md +++ b/TODO.md @@ -1,14 +1,19 @@ ## TODO +- Document the limitation of futures channels of not waking up the sender, as discovered in the deadlock test. What is the consequence for other actors? +- align all repo's and update the super repo. + fix code coverage, currently still on travis. - verify log spans. - see tests/tracing.rs See: https://github.com/dbrgn/tracing-test/issues/4 - neither wasm-logger, nor tracing-wasm show the current span atm. See: https://github.com/storyai/tracing-wasm/issues/17 + - log where addr are created and dropped for easier debugging when we have an addr that we forgot to drop. ### Tests - flesh out tests. +- deadlock still happening # Perf diff --git a/benches/multi_thread/contention.rs b/benches/multi_thread/contention.rs index 3b050be..209f57b 100644 --- a/benches/multi_thread/contention.rs +++ b/benches/multi_thread/contention.rs @@ -209,8 +209,8 @@ fn mpsc( c: &mut Criterion ) ( || { - let (sum_in_addr , sum_in_mb) = Addr::builder().bounded( Some(BOUNDED) ).build() ; - let (mut sum_addr, sum_mb ) = Addr::builder().bounded( Some(BOUNDED) ).build() ; + let (sum_in_addr , sum_in_mb) = Addr::builder( "sum_in" ).bounded( Some(BOUNDED) ).build() ; + let (mut sum_addr, sum_mb ) = Addr::builder( "sum" ).bounded( Some(BOUNDED) ).build() ; let sum = Sum { total: 5, inner: sum_in_addr } ; let sum_in = SumIn { count: 0 } ; @@ -328,8 +328,8 @@ fn mpsc( c: &mut Criterion ) ( || { - let (sum_in_addr , sum_in_mb) = Addr::builder().bounded( Some(BOUNDED) ).build() ; - let (mut sum_addr, sum_mb ) = Addr::builder().bounded( Some(BOUNDED) ).build() ; + let (sum_in_addr , sum_in_mb) = Addr::builder( "sum_in" ).bounded( Some(BOUNDED) ).build() ; + let (mut sum_addr, sum_mb ) = Addr::builder( "sum" ).bounded( Some(BOUNDED) ).build() ; let sum = Sum { total: 5, inner: sum_in_addr } ; let sum_in = SumIn { count: 0 } ; diff --git a/benches/multi_thread/delivery.rs b/benches/multi_thread/delivery.rs index 03c1778..8e0e72f 100644 --- a/benches/multi_thread/delivery.rs +++ b/benches/multi_thread/delivery.rs @@ -176,9 +176,9 @@ fn spsc( c: &mut Criterion ) ( move || // setup { - let (sum_in_addr, sum_in_mb) = Addr::builder().bounded( Some(BOUNDED) ).build() ; + let (sum_in_addr, sum_in_mb) = Addr::builder( "sum_in" ).bounded( Some(BOUNDED) ).build() ; + let (sum_addr, sum_mb) = Addr::builder( "sum" ).bounded( Some(BOUNDED) ).build() ; let sum = Sum{ total: 5, inner: sum_in_addr } ; - let (sum_addr, sum_mb) = Addr::builder().bounded( Some(BOUNDED) ).build() ; let sumin_thread = thread::spawn( move || { @@ -207,7 +207,7 @@ fn spsc( c: &mut Criterion ) let res = sum_addr.call( Show{} ).await.expect( "Call failed" ); - assert_eq!( *msgs as u64 *10 + 5 + termial( *msgs as u64 ), res ); + assert_eq!( *msgs *10 + 5 + termial( *msgs ), res ); }); sumin_thread.join().expect( "join sum_in thread" ); @@ -266,7 +266,7 @@ fn spsc( c: &mut Criterion ) // let res = sum_addr.call( Show{} ).await.expect( "Call failed" ); - // assert_eq!( *msgs as u64 *10 + 5 + termial( *msgs as u64 ), res ); + // assert_eq!( *msgs *10 + 5 + termial( *msgs ), res ); // }); // sumin_thread.join().expect( "join sum_in thread" ); @@ -304,7 +304,7 @@ fn spsc( c: &mut Criterion ) let res = sum_addr.send( Show{} ).await.expect( "Call failed" ); - assert_eq!( *msgs as u64 *10 + 5 + termial( *msgs as u64 ), res ); + assert_eq!( *msgs *10 + 5 + termial( *msgs ), res ); sum_in_thread.stop(); sum_thread .stop(); @@ -327,9 +327,9 @@ fn spsc( c: &mut Criterion ) ( move || // setup { - let (sum_in_addr, sum_in_mb) = Addr::builder().bounded( Some(BOUNDED) ).build() ; - let sum = Sum{ total: 5, inner: sum_in_addr } ; - let (sum_addr, sum_mb) = Addr::builder().bounded( Some(BOUNDED) ).build() ; + let (sum_in_addr, sum_in_mb) = Addr::builder( "sum_in" ).bounded( Some(BOUNDED) ).build() ; + let (sum_addr , sum_mb ) = Addr::builder( "sum" ).bounded( Some(BOUNDED) ).build() ; + let sum = Sum{ total: 5, inner: sum_in_addr } ; let sumin_thread = thread::spawn( move || @@ -359,7 +359,7 @@ fn spsc( c: &mut Criterion ) let res = sum_addr.call( Show{} ).await.expect( "Call failed" ); - assert_eq!( *msgs as u64 *10 + 5 + termial( *msgs as u64 ), res ); + assert_eq!( *msgs *10 + 5 + termial( *msgs ), res ); }); sumin_thread.join().expect( "join sum_in thread" ); @@ -419,7 +419,7 @@ fn spsc( c: &mut Criterion ) // let res = sum_addr.call( Show{} ).await.expect( "Call failed" ); - // assert_eq!( *msgs as u64 *10 + 5 + termial( *msgs as u64 ), res ); + // assert_eq!( *msgs *10 + 5 + termial( *msgs ), res ); // }); // sumin_thread.join().expect( "join sum_in thread" ); diff --git a/benches/single_thread/delivery.rs b/benches/single_thread/delivery.rs index 40ea6c1..09ce6c6 100644 --- a/benches/single_thread/delivery.rs +++ b/benches/single_thread/delivery.rs @@ -176,9 +176,9 @@ fn seq( c: &mut Criterion ) ( move || // setup { - let (sum_in_addr, sum_in_mb) = Addr::builder().bounded( Some(BOUNDED) ).build() ; - let sum = Sum{ total: 5, inner: sum_in_addr } ; - let (sum_addr, sum_mb) = Addr::builder().bounded( Some(BOUNDED) ).build() ; + let (sum_in_addr, sum_in_mb) = Addr::builder( "sum_in" ).bounded( Some(BOUNDED) ).build() ; + let (sum_addr, sum_mb) = Addr::builder( "sum" ).bounded( Some(BOUNDED) ).build() ; + let sum = Sum{ total: 5, inner: sum_in_addr } ; let exec = TokioCtBuilder::new().build().expect( "build runtime" ); let sumin = SumIn{ count: 0 }; @@ -201,7 +201,7 @@ fn seq( c: &mut Criterion ) let res = sum_addr.call( Show{} ).await.expect( "Call failed" ); - assert_eq!( *msgs as u64 *10 + 5 + termial( *msgs as u64 ), res ); + assert_eq!( *msgs *10 + 5 + termial( *msgs ), res ); drop( sum_addr ); drop( sum_handle .await ); @@ -240,7 +240,7 @@ fn seq( c: &mut Criterion ) // let res = sum_addr.send( Show{} ).await.expect( "Call failed" ); - // assert_eq!( *msgs as u64 *10 + 5 + termial( *msgs as u64 ), res ); + // assert_eq!( *msgs *10 + 5 + termial( *msgs ), res ); // actix_rt::System::current().stop(); // }); @@ -257,9 +257,9 @@ fn seq( c: &mut Criterion ) ( move || // setup { - let (sum_in_addr, sum_in_mb) = Addr::builder().bounded( Some(BOUNDED) ).build() ; - let sum = Sum{ total: 5, inner: sum_in_addr } ; - let (sum_addr, sum_mb) = Addr::builder().bounded( Some(BOUNDED) ).build() ; + let (sum_in_addr, sum_in_mb) = Addr::builder( "sum_in" ).bounded( Some(BOUNDED) ).build() ; + let (sum_addr , sum_mb ) = Addr::builder( "sum" ).bounded( Some(BOUNDED) ).build() ; + let sum = Sum{ total: 5, inner: sum_in_addr } ; let exec = TokioCtBuilder::new().build().expect( "build runtime" ); let sumin = SumIn{ count: 0 }; @@ -282,7 +282,7 @@ fn seq( c: &mut Criterion ) let res = sum_addr.call( Show{} ).await.expect( "Call failed" ); - assert_eq!( *msgs as u64 *10 + 5 + termial( *msgs as u64 ), res ); + assert_eq!( *msgs *10 + 5 + termial( *msgs ), res ); drop( sum_addr ); @@ -319,7 +319,7 @@ fn seq( c: &mut Criterion ) let res = sum_addr.send( Show{} ).await.expect( "Call failed" ); - assert_eq!( *msgs as u64 *10 + 5 + termial( *msgs as u64 ), res ); + assert_eq!( *msgs *10 + 5 + termial( *msgs ), res ); actix_rt::System::current().stop(); }); diff --git a/examples/across_yields.rs b/examples/across_yields.rs index 6d59805..5f3f17d 100644 --- a/examples/across_yields.rs +++ b/examples/across_yields.rs @@ -64,7 +64,7 @@ async fn main() -> Result< (), Box > let a = MyActor{ seed: "seed - ".into() }; - let mut addr = Addr::builder().spawn( a, &AsyncStd )?; + let mut addr = Addr::builder( "My actor" ).spawn( a, &AsyncStd )?; let mut addr2 = addr.clone(); trace!( "calling addr.call( Ping('ping') )" ); diff --git a/examples/addr_is_sink.rs b/examples/addr_is_sink.rs index ffc0f22..1c1292e 100644 --- a/examples/addr_is_sink.rs +++ b/examples/addr_is_sink.rs @@ -32,7 +32,7 @@ impl Handler< Count > for MyActor async fn main() -> Result< (), Box > { let a = MyActor { count: 0 }; - let mut addr = Addr::builder().spawn( a, &AsyncStd )?; + let mut addr = Addr::builder( "My actor" ).spawn( a, &AsyncStd )?; // Create an ad hoc stream. // diff --git a/examples/basic.rs b/examples/basic.rs index 58ae9ab..bf77676 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -51,7 +51,7 @@ async fn main() -> Result< (), Box > // structured concurrency approach, use `start_handle`, so you can await // the output of the mailbox. // - let mut addr = Addr::builder().spawn( MyActor, &AsyncStd )?; + let mut addr = Addr::builder( "basic" ).spawn( MyActor, &AsyncStd )?; // Call is request-response, as opposed to `send` which comes from `Sink` // and is a one way message. diff --git a/examples/concurrent.rs b/examples/concurrent.rs index d30a088..ae7a199 100644 --- a/examples/concurrent.rs +++ b/examples/concurrent.rs @@ -90,7 +90,7 @@ impl Handler for MyActor async fn main() -> Result< (), DynError > { let actor = MyActor{ exec: Box::new(AsyncStd) }; - let mut addr = Addr::builder().spawn( actor, &AsyncStd )?; + let mut addr = Addr::builder( "concurrent" ).spawn( actor, &AsyncStd )?; // Admittedly, this looks a bit weird. Call is fallible, and it returns a result over // the SpawnError, since the handler needs to spawn and spawning is fallible. diff --git a/examples/concurrent_nursery.rs b/examples/concurrent_nursery.rs index dfb94aa..55ca6e7 100644 --- a/examples/concurrent_nursery.rs +++ b/examples/concurrent_nursery.rs @@ -99,7 +99,7 @@ impl Handler for MyActor async fn main() -> Result< (), DynError > { let actor = MyActor::new( Box::new(AsyncStd) )?; - let mut addr = Addr::builder().spawn( actor, &AsyncStd )?; + let mut addr = Addr::builder( "nursed" ).spawn( actor, &AsyncStd )?; // Admittedly, this looks a bit weird. Call is fallible, and it returns a result over // the NurseErr, since the handler needs to spawn and spawning is fallible. diff --git a/examples/deadlock_prio.rs b/examples/deadlock_prio.rs deleted file mode 100644 index 204826b..0000000 --- a/examples/deadlock_prio.rs +++ /dev/null @@ -1,189 +0,0 @@ -//! One of the prominent problems with the actor model are deadlocks. -//! The one we deal with here occurs when we use bounded channels for -//! back pressure. However one actor sits at the gate, processing both -//! incoming and outgoing messages. -//! -//! When the incoming outpace the capacity for processing, the system -//! fills up and when the gate's mailbox is full, outgoing messages -//! cannot get out, leading to a deadlock. -//! -//! The problem is described at length in this blogpost: -//! https://elizarov.medium.com/deadlocks-in-non-hierarchical-csp-e5910d137cc -//! -//! The solution we develop here is rather simple. As thespis uses interfaces -//! like Sink+Stream for communication between addresses and mailboxes, we -//! can simply combine two channels with futures::stream::select_with_strategy so that -//! one of them takes priority. This way we give outgoing messages priority. -//! -use -{ - tracing :: { * } , - thespis :: { * } , - thespis_impl :: { * } , - async_executors :: { AsyncStd, SpawnHandleExt } , - std :: { error::Error, time::Duration } , - futures_timer :: { Delay } , - futures :: { stream::{ select_with_strategy, PollNext } } , -}; - -static BOUNDED: usize = 5; - -pub type DynResult = Result< T, Box >; - - -#[ derive( Actor ) ] struct Gate { worker: WeakAddr } -#[ derive( Actor ) ] struct Worker { gate : Addr } - - -struct Request(usize); -struct Response(usize); - -impl Message for Request { type Return = (); } -impl Message for Response { type Return = (); } - - - -impl Handler< Request > for Gate -{ - #[async_fn] fn handle( &mut self, req: Request ) - { - info!( "Gate: New request: {}.", req.0 ); - - self.worker.send( Request(req.0) ).await.expect( "send" ); - } -} - - - -impl Handler< Response > for Gate -{ - #[async_fn] fn handle( &mut self, resp: Response ) - { - info!( "Gate: Sending reponse to request: {}.", resp.0 ); - } -} - - - -impl Handler< Request > for Worker -{ - #[async_fn] fn handle( &mut self, work: Request ) - { - info!( "Worker: Grinding on request: {}.", work.0 ); - - // Processing takes a little bit of time. - // - Delay::new( Duration::from_millis(50) ).await; - - self.gate.send( Response(work.0) ).await.expect( "send" ); - } -} - - - -#[ async_std::main ] -// -async fn main() -> DynResult<()> -{ - let _ = tracing_subscriber::fmt::Subscriber::builder() - - .with_max_level(tracing::Level::TRACE) - .with_env_filter( "info,thespis_impl=debug" ) - // .json() - .try_init() - ; - - // The naive implementation deadlocks. - // - // _naive().await?; - - // The solution. - // - fancy().await?; - - Ok(()) -} - - -async fn _naive() -> DynResult<()> -{ - let (mut gate_addr, gate_mb) = Addr::builder().bounded( Some(BOUNDED) ).build(); - let ( worker_addr, worker_mb) = Addr::builder().bounded( Some(BOUNDED) ).build(); - - let gate = Gate { worker: worker_addr.weak() }; - let worker = Worker { gate : gate_addr.clone() }; - - let gate_handle = AsyncStd.spawn_handle( gate_mb.start( gate ) )?; - let worker_handle = AsyncStd.spawn_handle( worker_mb.start( worker ) )?; - - for i in 1..100 - { - gate_addr.send( Request(i) ).await?; - } - - // This synchronizes the last one so we don't drop the addresses to early. - // - gate_addr.call( Request(100) ).await?; - - drop(worker_addr); - drop(gate_addr); - - worker_handle.await; - gate_handle.await; - - Ok(()) -} - - -async fn fancy() -> DynResult<()> -{ - // We will create 2 separate channels to the same mailbox. - // One for high priority (outbound) and one for low priority - // (inbound). - // - // Both will be bounded, but delivering outbound work is always - // prioritized over taking more inbound work. This will keep the - // system from congesting. - // - // Another solution is to use an unbounded channel for the outbound. - // - let ( low_tx, low_rx) = futures::channel::mpsc::channel( BOUNDED ); - let (high_tx, high_rx) = futures::channel::mpsc::channel( BOUNDED ); - - let strategy = |_: &mut ()| PollNext::Left; - let gate_rx = Box::new( select_with_strategy( high_rx, low_rx, strategy ) ); - - let gate_mb = Mailbox::new( Some("gate"), gate_rx ); - - let gate_low_tx = low_tx .sink_map_err( |e| Box::new(e) as DynError ); - let gate_high_tx = high_tx.sink_map_err( |e| Box::new(e) as DynError ); - - let mut gate_low_addr = gate_mb.addr( Box::new( gate_low_tx ) ); - let gate_high_addr = gate_mb.addr( Box::new( gate_high_tx ) ); - - let (worker_addr, worker_mb) = Addr::builder().bounded( Some(BOUNDED) ).build(); - - let gate = Gate { worker: worker_addr.weak() }; - let worker = Worker { gate : gate_high_addr }; - - let gate_handle = AsyncStd.spawn_handle( gate_mb.start( gate ) )?; - let worker_handle = AsyncStd.spawn_handle( worker_mb.start( worker ) )?; - - for i in 1..100 - { - gate_low_addr.send( Request(i) ).await?; - } - - // This synchronizes the last one so we don't drop the addresses to early. - // - gate_low_addr.call( Request(100) ).await?; - - - drop(gate_low_addr); - drop(worker_addr); - - worker_handle.await; - gate_handle.await; - - Ok(()) -} diff --git a/examples/desugar.rs b/examples/desugar.rs index 9a85ef2..5c9e2c7 100644 --- a/examples/desugar.rs +++ b/examples/desugar.rs @@ -4,7 +4,7 @@ use { thespis :: { Return, Actor, Message, Handler, Address } , - thespis_impl :: { DynError, Mailbox, MailboxEnd } , + thespis_impl :: { DynError, Mailbox, MailboxEnd } , async_executors :: { AsyncStd, SpawnHandleExt } , std :: { error::Error } , futures :: { channel::mpsc, FutureExt, SinkExt } , @@ -49,7 +49,7 @@ async fn main() -> Result< (), Box > // Manually create a mailbox, with a name for the actor and the receiver of // our channel. // - let mb = Mailbox::new( Some("HelloWorld"), Box::new(rx) ); + let mb = Mailbox::new( "HelloWorld", Box::new(rx) ); // The mailbox gives us the address. // diff --git a/examples/drop_channel.rs b/examples/drop_channel.rs index ab0bcfe..3bca8a5 100644 --- a/examples/drop_channel.rs +++ b/examples/drop_channel.rs @@ -63,23 +63,24 @@ async fn main() -> Result< (), Box > let tx = tx.sink_map_err( |_| { // The error from ring_channel is not Sync, because it contains the message. - // This is a problem because we don't require messages to be Sync. That would - // imply making thespis unsafe by addin a Sync impl to our wrapper for messages - // on the channel. + // This is a problem because we don't require messages to be `Sync`, however + // we do want our error type to be `Send` + `Sync`, so it can't have the message. + // If we would not require `Sync` on the error type, it couldn't be send across + // threads unless it was `Clone`, but we don't require the user's messages to + // be clone either... // - // Alternatively, we just don't count on recovering the message and construct a + // We don't count on recovering the message and construct a // simple io error here. Note that we could have wanted to use ThesErr::MailboxClosed, // but that requires ActorInfo and as we haven't spawned our actor yet, we don't really // know our ActorInfo. You can still do it by not using the builder and first creating // the Mailbox manually, however I don't think it's worth it. // - let error = std::io::Error::from(std::io::ErrorKind::NotConnected); - Box::new(error) as DynError + std::io::Error::from(std::io::ErrorKind::NotConnected) }); - let (mut accu_addr , accu_mb) = Addr::builder() + let (mut accu_addr , accu_mb) = Addr::builder( "accu" ) - .channel( Box::new(tx), Box::new(rx) ) + .channel( tx, rx ) .build() ; diff --git a/examples/local_spawn.rs b/examples/local_spawn.rs index f54ed05..613e1f0 100644 --- a/examples/local_spawn.rs +++ b/examples/local_spawn.rs @@ -61,7 +61,7 @@ fn main() -> Result< (), Box > let exec = pool.spawner(); let actor = MyActor { i: 3, nosend: PhantomData }; - let mut addr = Addr::builder().spawn_local( actor, &exec )?; + let mut addr = Addr::builder( "local" ).spawn_local( actor, &exec )?; exec.spawn_local( async move { diff --git a/examples/move_fut.rs b/examples/move_fut.rs index 56b5934..b41fd22 100644 --- a/examples/move_fut.rs +++ b/examples/move_fut.rs @@ -1,4 +1,4 @@ -//! Showning how the futures produced by thespis can be ran to completion on any thread. +//! Showning how the futures produced by thespis can be run to completion on any thread. // use { @@ -38,7 +38,7 @@ impl Handler< Ping > for MyActor async fn main() -> Result< (), Box > { let exec = ThreadPool::new()?; - let mut addr = Addr::builder().spawn( MyActor, &exec )?; + let mut addr = Addr::builder( "my actor" ).spawn( MyActor, &exec )?; // call uses &mut self for Addr, so it's borrowed by the future. This means we can't just // move the future to another thread or spawn it directly. We have to move Addr with it. diff --git a/examples/perf/thespis.rs b/examples/perf/thespis.rs index 8e05585..00656f4 100644 --- a/examples/perf/thespis.rs +++ b/examples/perf/thespis.rs @@ -15,8 +15,8 @@ const SENDERS: usize = 1 ; // async fn main() { - let (sum_in_addr, sum_in_mb) = Addr::builder().bounded( Some(BOUNDED) ).build() ; - let (mut sum_addr, sum_mb) = Addr::builder().bounded( Some(BOUNDED) ).build() ; + let (sum_in_addr, sum_in_mb) = Addr::builder( "sum_in" ).bounded( Some(BOUNDED) ).build() ; + let (mut sum_addr, sum_mb) = Addr::builder( "sum" ).bounded( Some(BOUNDED) ).build() ; let sum_in = SumIn{ count: 0 } ; diff --git a/examples/perf/thespis_local.rs b/examples/perf/thespis_local.rs index ab918dc..d242d15 100644 --- a/examples/perf/thespis_local.rs +++ b/examples/perf/thespis_local.rs @@ -10,10 +10,10 @@ fn main() -> Result< (), DynError > let exec = TokioCtBuilder::new().build().unwrap(); let sum_in = SumIn{ count: 0 }; - let sum_in_addr = Addr::builder().bounded( Some(BOUNDED) ).spawn_local( sum_in, &exec )?; + let sum_in_addr = Addr::builder( "sum_in" ).bounded( Some(BOUNDED) ).spawn_local( sum_in, &exec )?; let sum = Sum{ total: 5, inner: sum_in_addr, _nosend: PhantomData }; - let mut sum_addr = Addr::builder().bounded( Some(BOUNDED) ).spawn_local( sum , &exec )?; + let mut sum_addr = Addr::builder( "sum" ).bounded( Some(BOUNDED) ).spawn_local( sum , &exec )?; exec.block_on( async move diff --git a/examples/perf/thespis_mpsc.rs b/examples/perf/thespis_mpsc.rs index c9bd58f..408b323 100644 --- a/examples/perf/thespis_mpsc.rs +++ b/examples/perf/thespis_mpsc.rs @@ -6,8 +6,8 @@ use common::*; // async fn main() -> Result< (), DynError > { - let (sum_in_addr , sum_in_mb) = Addr::builder().bounded( Some(MPSC_BOUNDED) ).build() ; - let (mut sum_addr, sum_mb ) = Addr::builder().bounded( Some(MPSC_BOUNDED) ).build() ; + let (sum_in_addr , sum_in_mb) = Addr::builder( "sum_in" ).bounded( Some(MPSC_BOUNDED) ).build() ; + let (mut sum_addr, sum_mb ) = Addr::builder( "sum" ).bounded( Some(MPSC_BOUNDED) ).build() ; // Create sender threads. diff --git a/examples/perf/thespis_unbounded.rs b/examples/perf/thespis_unbounded.rs index 4d86e5e..6190417 100644 --- a/examples/perf/thespis_unbounded.rs +++ b/examples/perf/thespis_unbounded.rs @@ -6,12 +6,10 @@ // use { - async_chanx :: { * } , + async_chanx :: { tokio } , thespis :: { * } , thespis_impl :: { * } , std :: { thread } , - tokio :: { sync::mpsc } , - tokio_stream :: { wrappers::UnboundedReceiverStream } , }; @@ -78,16 +76,21 @@ impl Handler< Show > for SumIn fn main() { - let (tx, rx) = mpsc::unbounded_channel() ; - let tx = Box::new( TokioUnboundedSender::new( tx ).sink_map_err( |e| Box::new(e) as DynError ) ) ; - let sum_in_mb = Mailbox::new( None, Box::new( UnboundedReceiverStream::new(rx) ) ) ; - let sum_in_addr = sum_in_mb.addr( tx ) ; - - let (tx, rx) = mpsc::unbounded_channel() ; - let tx = Box::new( TokioUnboundedSender::new( tx ).sink_map_err( |e| Box::new(e) as DynError ) ) ; - let sum_mb = Mailbox::new( None, Box::new( UnboundedReceiverStream::new(rx) ) ) ; - let mut sum_addr = sum_mb.addr( tx ) ; - let sum = Sum{ total: 5, inner: sum_in_addr } ; + let (tx, rx) = tokio::mpsc::unbounded_channel(); + + let (sum_in_addr, sum_in_mb) = Addr::builder( "sum_in" ) + .channel( tx, rx ) + .build() + ; + + let (tx, rx) = tokio::mpsc::unbounded_channel(); + + let (mut sum_addr, sum_mb) = Addr::builder( "sum" ) + .channel( tx, rx ) + .build() + ; + + let sum = Sum{ total: 5, inner: sum_in_addr } ; let sumin_thread = thread::spawn( move || { diff --git a/examples/readme.md b/examples/readme.md index 9af46d0..7ceb6e3 100644 --- a/examples/readme.md +++ b/examples/readme.md @@ -15,3 +15,5 @@ These examples demonstrate how to use thespis with the reference implementation 11. [*concurrent_nursery*](/concurrent_nursery.rs): Let an actor process messages concurrently when no mutable state is needed. This time we make sure that none of the spawned subtasks can outlive our actor. 12. [*drop_channel*](/drop_channel.rs): An example of using a channel that overwrites older messages instead of providing back pressure. 13. [*supervisor*](/supervisor.rs): How to supervise an actor in case it panics. +14. [*tokio_channel*](/tokio_channel): How to use a tokio channel with thespis. +15. [*deadlock_prio*](../tests/deadlock.rs): Use a priority channel with thespis to give an actor a double mailbox and avoid a deadlock in a specific situation. Note you can also use priority channels like this for other usecases, eg. you can make a channel that will poll different addresses in an alternating pattern. diff --git a/examples/recipient.rs b/examples/recipient.rs index c84be39..dda66a1 100644 --- a/examples/recipient.rs +++ b/examples/recipient.rs @@ -38,8 +38,8 @@ async fn main() -> Result< (), Box > // As you can see, the addresses are generic over the actor type. // So we can't just store them in a collection. // - let addr1: Addr = Addr::builder().spawn( Actor1, &AsyncStd )?; - let addr2: Addr = Addr::builder().spawn( Actor2, &AsyncStd )?; + let addr1: Addr = Addr::builder( "actor 1" ).spawn( Actor1, &AsyncStd )?; + let addr2: Addr = Addr::builder( "actor 2" ).spawn( Actor2, &AsyncStd )?; // pub type BoxAddress = Box< dyn Address + Send + Unpin >; // Type can be elided here. It's just there for illustrative purposes. diff --git a/examples/recipient_any.rs b/examples/recipient_any.rs index 1f3e6fb..f589247 100644 --- a/examples/recipient_any.rs +++ b/examples/recipient_any.rs @@ -43,8 +43,8 @@ async fn main() -> Result< (), Box > // As you can see, the addresses are generic over the actor type. // So we can't just store them in a collection. // - let addr1: Addr = Addr::builder().spawn( Actor1, &AsyncStd )?; - let addr2: Addr = Addr::builder().spawn( Actor2, &AsyncStd )?; + let addr1: Addr = Addr::builder( "actor 1" ).spawn( Actor1, &AsyncStd )?; + let addr2: Addr = Addr::builder( "actor 2" ).spawn( Actor2, &AsyncStd )?; // pub type BoxAddress = Box< dyn Address + Send + Unpin >; // Type can be elided here. It's just there for illustrative purposes. diff --git a/examples/supervisor.rs b/examples/supervisor.rs index 231efff..caacffc 100644 --- a/examples/supervisor.rs +++ b/examples/supervisor.rs @@ -84,7 +84,10 @@ impl Handler< Supervise > for Supervisor let mut mb_handle = if actor.mailbox.is_none() { - let (addr_new, mb_handle) = Addr::builder().spawn_handle( (actor.create)(), &AsyncStd ).unwrap(); + let (addr_new, mb_handle) = Addr::builder( "supervised" ) + .spawn_handle( (actor.create)(), &AsyncStd ) + .unwrap() + ; addr = Some(addr_new); @@ -129,7 +132,9 @@ async fn main() -> Result< (), Box > .init() ; - let mut supervisor = Addr::builder().spawn( Supervisor{ exec: Box::new( AsyncStd ) }, &AsyncStd )?; + let mut supervisor = Addr::builder( "supervisor" ) + .spawn( Supervisor{ exec: Box::new( AsyncStd ) }, &AsyncStd )? + ; // Here we use a closure to create new actors, but if you don't need to capture diff --git a/examples/throttle.rs b/examples/throttle.rs index 3dd9552..316d6fb 100644 --- a/examples/throttle.rs +++ b/examples/throttle.rs @@ -4,8 +4,8 @@ use { thespis :: { * } , - thespis_impl :: { DynError, Mailbox } , - async_executors :: { AsyncStd, SpawnHandleExt } , + thespis_impl :: { Addr } , + async_executors :: { AsyncStd, } , std :: { error::Error, time::Duration } , futures :: { channel::mpsc, SinkExt } , stream_throttle :: { ThrottleRate, ThrottlePool, ThrottledStream } , @@ -55,12 +55,10 @@ async fn main() -> Result< (), Box > let pool = ThrottlePool::new( rate ); let rx = rx.throttle( pool ); - let tx = Box::new( tx.sink_map_err( |e| Box::new(e) as DynError ) ); - let mb = Mailbox::new( Some("Throttled"), Box::new(rx) ); - let mut addr = mb.addr( tx ); - - - let mb_handle = AsyncStd.spawn_handle( mb.start( MyActor{ count: 0 } ) )?; + let (mut addr, mb_handle) = Addr::builder( "Throttled" ) + .channel( tx, rx ) + .spawn_handle( MyActor{ count: 0 }, &AsyncStd )? + ; for _ in 0..10 diff --git a/examples/tokio_channel.rs b/examples/tokio_channel.rs new file mode 100644 index 0000000..bdf30bd --- /dev/null +++ b/examples/tokio_channel.rs @@ -0,0 +1,79 @@ +//! Illustration of how to use tokio channels. +// +use +{ + tokio_util :: { sync::PollSender } , + tokio_stream :: { wrappers::ReceiverStream } , + thespis :: { * } , + thespis_impl :: { * } , + async_executors :: { AsyncStd } , + std :: { error::Error } , +}; + + +#[ derive( Actor ) ] +// +struct MyActor; +struct Ping; + +impl Message for Ping +{ + type Return = &'static str; +} + + +impl Handler< Ping > for MyActor +{ + #[async_fn] fn handle( &mut self, _msg: Ping ) -> &'static str + { + "pong" + } +} + + +#[async_std::main] +// +async fn main() -> Result< (), Box > +{ + let (tx, rx) = tokio::sync::mpsc::channel( 16 ); + let rx = ReceiverStream::new(rx); + + let tx = PollSender::new(tx).sink_map_err( |_| + { + // The error from tokio-util is not Sync, because it contains the message. + // This is a problem because we don't require messages to be `Sync`, however + // we do want our error type to be `Send` + `Sync`, so it can't have the message. + // If we would not require `Sync` on the error type, it couldn't be send across + // threads unless it was `Clone`, but we don't require the user's messages to + // be clone either... + // + // We don't count on recovering the message and construct a + // simple io error here. Note that we could have wanted to use ThesErr::MailboxClosed, + // but that requires ActorInfo and as we haven't spawned our actor yet, we don't really + // know our ActorInfo. You can still do it by not using the builder and first creating + // the Mailbox manually, however I don't think it's worth it. + // + std::io::Error::from(std::io::ErrorKind::NotConnected) + }); + + // We pass in our custom channel here as long as it implements `Sink + Clone + Unpin + 'static` + // on the sender and `Stream + Unpin + 'static` on the receiver. + // + let (mut addr, mb_handle) = Addr::builder( "powerd by tokio" ) + .channel( tx, rx ) + .spawn_handle( MyActor, &AsyncStd )? + ; + + let result = addr.call( Ping ).await?; + + assert_eq!( "pong", result ); + dbg!( result ); + + // will cause the mailbox future to end. + // + drop( addr ); + + mb_handle.await; + + Ok(()) +} diff --git a/examples/tracing.rs b/examples/tracing.rs index f8d7ea0..6f789ea 100644 --- a/examples/tracing.rs +++ b/examples/tracing.rs @@ -69,8 +69,8 @@ async fn main() -> Result< (), Box > // structured concurrency approach, use `start_handle`, so you can await // the output of the mailbox. // - let mut addr = Addr::builder().name( "Alice" ).spawn( MyActor, &AsyncStd )?; - let mut addr2 = Addr::builder().name( "Bob" ).spawn( MyActor, &AsyncStd )?; + let mut addr = Addr::builder( "Alice" ).spawn( MyActor, &AsyncStd )?; + let mut addr2 = Addr::builder( "Bob" ).spawn( MyActor, &AsyncStd )?; // Call is request-response, as opposed to `send` which comes from `Sink` // and is a one way message. diff --git a/src/actor_builder.rs b/src/actor_builder.rs index 8265d73..38a2b38 100644 --- a/src/actor_builder.rs +++ b/src/actor_builder.rs @@ -1,4 +1,4 @@ -use crate::{ import::*, ChanSender, ChanReceiver, Addr, ThesErr, Mailbox, MailboxEnd, DynError }; +use crate::{ import::*, BoxEnvelope, ChanSender, ChanReceiver, CloneSinkExt, Addr, ThesErr, Mailbox, MailboxEnd }; /// Default buffer size for bounded channel between Addr and Mailbox. // @@ -10,50 +10,58 @@ pub const BOUNDED: usize = 16; /// /// Also provides methods for spawning the mailbox immediately as well as a /// [`build`](ActorBuilder::build) method which lets you do it manually. +/// +/// ## Example +/// +/// ```rust +/// # futures::executor::block_on(async { +/// use +/// { +/// thespis :: { * } , +/// thespis_impl :: { * } , +/// async_executors :: { AsyncStd } , +/// }; +/// +/// #[ derive( Actor ) ] +/// // +/// struct MyActor; +/// +/// let (mut addr, mb_handle) = Addr::builder( "my very own actor" ) +/// .bounded( Some(24) ) +/// .spawn_handle( MyActor, &AsyncStd )? +/// ; +/// +/// // mb will end when the last addr is dropped. +/// // +/// drop(addr); +/// mb_handle.await; +/// +/// # Ok::<(), ThesErr>(()) +/// # }).unwrap(); +/// ``` // pub struct ActorBuilder { tx : Option< ChanSender > , rx : Option< ChanReceiver > , bounded: Option< usize > , - name : Option< Arc > , -} - - - -impl Default for ActorBuilder -{ - fn default() -> Self - { - Self - { - tx : None , - rx : None , - bounded: Some( BOUNDED ) , - name : None , - } - } + name : Arc , } - impl ActorBuilder { /// Create a new ActorBuilder with default settings. // - pub fn new() -> Self - { - Self::default() - } - - - /// Configure a name for this actor. This will be helpful for interpreting - /// debug logs. You can also retrieve the name later on both the `Addr` and the `Mailbox`. - // - pub fn name( mut self, name: impl AsRef ) -> Self + pub fn new( name: impl AsRef ) -> Self { - self.name = Some( name.as_ref().into() ); - self + Self + { + tx : None , + rx : None , + bounded: Some( BOUNDED ) , + name : name.as_ref().into() , + } } @@ -83,17 +91,26 @@ impl ActorBuilder /// Set the channel to use for communication between `Addr` and `Mailbox`. /// - /// This option is incompatible with bounded. + /// This option is incompatible with [`ActorBuilder::bounded`]. /// /// ## Panics /// In debug mode this will panic if you have already called [`ActorBuilder::bounded`]. + /// + /// ## Example + /// + /// [This example](https://github.com/thespis-rs/thespis_impl/blob/dev/examples/tokio_channel.rs) + /// shows how to use tokio channels instead. // - pub fn channel( mut self, tx: ChanSender, rx: ChanReceiver ) -> Self + pub fn channel( mut self, tx:TX, rx: RX ) -> Self + + where TX: Sink, Error=E> + Clone + Unpin + Send + 'static, + E : Error + Sync + Send + 'static, + RX: Stream> + Send + Unpin + 'static, { debug_assert!( self.bounded == Some( BOUNDED ) ); - self.tx = tx.into(); - self.rx = rx.into(); + self.tx = Some( tx.dyned() ); + self.rx = Some( Box::new(rx) ); self } @@ -113,25 +130,21 @@ impl ActorBuilder if let Some( bounded ) = self.bounded { let (tx, rx) = futures::channel::mpsc::channel( bounded ); - let tx = tx.sink_map_err( |e| -> DynError { Box::new(e) } ); - - self.tx = Some( Box::new(tx) ); + self.tx = Some( tx.dyned() ); self.rx = Some( Box::new(rx) ); } else { let (tx, rx) = futures::channel::mpsc::unbounded(); - let tx = tx.sink_map_err( |e| -> DynError { Box::new(e) } ); - - self.tx = Some( Box::new(tx) ); + self.tx = Some( tx.dyned() ); self.rx = Some( Box::new(rx) ); } } let rx = self.rx.unwrap(); - let mb = Mailbox::new( self.name.as_deref(), rx ); + let mb = Mailbox::new( self.name, rx ); let addr = mb.addr( self.tx.unwrap() ); (addr, mb) diff --git a/src/actor_info.rs b/src/actor_info.rs index fa5cef9..4edc6ed 100644 --- a/src/actor_info.rs +++ b/src/actor_info.rs @@ -9,9 +9,9 @@ use crate::import::*; // pub struct ActorInfo { - pub(crate) id : usize , - pub(crate) name : Option< Arc > , - pub(crate) type_name: String , + pub(crate) id : usize , + pub(crate) name : Arc , + pub(crate) type_name: String , } @@ -19,7 +19,7 @@ impl ActorInfo { /// Setup actor information. // - pub(crate) fn new( id: usize, name: Option> ) -> Self + pub(crate) fn new( id: usize, name: Arc ) -> Self { Self { @@ -56,14 +56,14 @@ impl ActorInfo // pub fn span( &self ) -> Span { - if let Some( name ) = &self.name + if self.name.is_empty() { - error_span!( "actor", id = self.id, "type" = self.type_name(), a_name = name.as_ref() ) + error_span!( "actor", id = self.id, "type" = self.type_name() ) } else { - error_span!( "actor", id = self.id, "type" = self.type_name() ) + error_span!( "actor", id = self.id, "type" = self.type_name(), a_name = self.name.as_ref() ) } } } @@ -77,7 +77,7 @@ impl Identify for ActorInfo self.id } - fn name( &self ) -> Option< Arc > + fn name( &self ) -> Arc { self.name.clone() } @@ -98,14 +98,14 @@ impl fmt::Display for ActorInfo { fn fmt( &self, f: &mut fmt::Formatter<'_> ) -> fmt::Result { - if let Some(name) = &self.name + if self.name.is_empty() { - write!( f, "{}: id={}, name={}", self.type_name(), self.id, name ) + write!( f, "{}: id={}", self.type_name(), self.id ) } else { - write!( f, "{}: id={}", self.type_name(), self.id ) + write!( f, "{}: id={}, name={}", self.type_name(), self.id, self.name ) } } } diff --git a/src/addr.rs b/src/addr.rs index bd0e824..6cd6ef8 100644 --- a/src/addr.rs +++ b/src/addr.rs @@ -18,7 +18,7 @@ impl< A: Actor > Clone for Addr fn clone( &self ) -> Self { let _s = self.info().span().entered(); - trace!( "CREATE (clone) Addr" ); + trace!( "CREATE (clone) Addr id:{}", self.info().id() ); self.inner.strong.lock().expect( "Mutex poisoned" ).increment(); @@ -48,10 +48,10 @@ impl fmt::Debug for Addr { fn fmt( &self, f: &mut fmt::Formatter<'_> ) -> fmt::Result { - let name = match &self.name() + let name = match self.name().is_empty() { - Some( s ) => format!( ", {}", s ) , - None => String::new() , + true => String::new(), + false => format!( ", {}", self.name() ) }; write! @@ -71,17 +71,16 @@ impl fmt::Display for Addr { fn fmt( &self, f: &mut fmt::Formatter<'_> ) -> fmt::Result { - match &self.name() + match self.name().is_empty() { - Some(n) => write!( f, "{} ({}, {})", self.inner.type_name(), self.id(), n ) , - None => write!( f, "{} ({})" , self.inner.type_name(), self.id() ) , + true => write!( f, "{} ({})" , self.inner.type_name(), self.id() ) , + false => write!( f, "{} ({}, {})", self.inner.type_name(), self.id(), self.name() ) , } } } - impl Addr where A: Actor { // Create a new address. This is restricted to the crate because StrongCount does not @@ -98,7 +97,7 @@ impl Addr where A: Actor let inner = AddrInner::new( tx, info, strong ); let _s = inner.span().entered(); - trace!( "CREATE Addr" ); + trace!( "CREATE Addr id:{}", inner.id() ); Self{ inner } } @@ -106,9 +105,9 @@ impl Addr where A: Actor /// Produces a builder for convenient creation of both [`Addr`] and [`Mailbox`](crate::Mailbox). // - pub fn builder() -> ActorBuilder + pub fn builder( name: impl AsRef ) -> ActorBuilder { - Default::default() + ActorBuilder::new( name ) } @@ -135,7 +134,7 @@ impl Drop for Addr fn drop( &mut self ) { let _s = self.info().span().entered(); - trace!( "DROP Addr" ); + trace!( "DROP Addr id:{}", self.info().id() ); self.inner.strong.lock().expect( "Mutex poisoned" ).decrement(); } @@ -179,7 +178,7 @@ impl Identify for Addr self.inner.id() } - fn name( &self ) -> Option< Arc > + fn name( &self ) -> Arc { self.inner.name() } diff --git a/src/addr_inner.rs b/src/addr_inner.rs index ed84bc0..2fc8759 100644 --- a/src/addr_inner.rs +++ b/src/addr_inner.rs @@ -51,10 +51,10 @@ impl fmt::Debug for AddrInner { fn fmt( &self, f: &mut fmt::Formatter<'_> ) -> fmt::Result { - let name = match &self.info.name + let name = match &self.info.name().is_empty() { - Some( s ) => format!( ", {}", s ) , - None => String::new() , + true => String::new(), + false => format!( ", {}", &self.info.name ) }; write! @@ -62,7 +62,7 @@ impl fmt::Debug for AddrInner f , "AddrInner<{}> ~ {}{}" , std::any::type_name::() , - &self.info.id , + &self.info.id , name , ) } @@ -158,7 +158,7 @@ impl Identify for AddrInner self.info.id() } - fn name( &self ) -> Option< Arc > + fn name( &self ) -> Arc { self.info.name() } @@ -180,14 +180,24 @@ impl Sink for AddrInner { Poll::Ready( p ) => match p { - Ok (_) => Poll::Ready( Ok(()) ), + Ok (_) => + { + self.info.span().in_scope(|| trace!( "Mailbox ready for message." )); + Poll::Ready( Ok(()) ) + } + Err(e) => { - Poll::Ready( Err( ThesErr::MailboxClosed{ info: self.info.clone(), src: e.into() } ) ) + let err = ThesErr::MailboxClosed{ info: self.info.clone(), src: e.into() }; + Poll::Ready( Err(err) ) } } - Poll::Pending => Poll::Pending + Poll::Pending => + { + self.info.span().in_scope(|| trace!( "Mailbox giving backpressure." )); + Poll::Pending + } } } diff --git a/src/envelope.rs b/src/envelope.rs index b361984..b8afbb6 100644 --- a/src/envelope.rs +++ b/src/envelope.rs @@ -1,4 +1,4 @@ -use crate::import::*; +use crate::{ import::*, BoxEnvelope }; /// Wrapper for a message that is generic over actor instead of over message type. @@ -140,3 +140,11 @@ impl Envelope for CallEnvelope } } + +impl fmt::Debug for BoxEnvelope +{ + fn fmt( &self, f: &mut fmt::Formatter<'_> ) -> fmt::Result + { + write!( f, "BoxEnvelope<{}>", std::any::type_name::() ) + } +} diff --git a/src/lib.rs b/src/lib.rs index 22861d6..45cd62e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -67,6 +67,8 @@ pub type BoxEnvelope = Box< dyn envelope::Envelope + Send >; pub type DynError = Box< dyn std::error::Error + Send + Sync >; /// Type of boxed channel sender for Addr. +/// Can be created conveniently with [`CloneSinkExt::dyned`], but that is rarely +/// needed as you can use the [`ActorBuilder`](ActorBuilder::channel) to override default channels. // pub type ChanSender = Box< dyn CloneSink<'static, BoxEnvelope, DynError> >; @@ -75,9 +77,11 @@ pub type ChanSender = Box< dyn CloneSink<'static, BoxEnvelope, DynError> > pub type ChanReceiver = Box< dyn futures::Stream> + Send + Unpin >; -/// Interface for T: Sink + Clone +/// Interface for `T: Sink + Clone + Unpin + Send`. +/// This is object safe, so you can clone on a boxed trait. In _thespis_impl_ it is used +/// for the channel sender that goes in the [actor address](Addr). // -pub trait CloneSink<'a, Item, E>: Sink + Unpin + Send +pub trait CloneSink<'a, Item, E>: Sink + Unpin + Send + 'a { /// Clone this sink. // @@ -97,6 +101,44 @@ impl<'a, T, Item, E> CloneSink<'a, Item, E> for T } +/// Helper trait to smoothen API for converting a `T: `[`CloneSink`] into [`ChanSender`], +/// which is `Box< dyn CloneSink<'static, BoxEnvelope, DynError> >`. +/// +/// Blanket implemented. +// +pub trait CloneSinkExt +{ + /// Convert a `T: CloneSink` into `ChanSender`, which is + /// `Box< dyn CloneSink<'static, BoxEnvelope, DynError> >`. + // + fn dyned( self ) -> ChanSender + + where Self: CloneSink<'static, BoxEnvelope, E> + Clone + Sized, + E: std::error::Error + Sync + Send + 'static + { + let closure = |e| -> DynError { Box::new(e) }; + + Box::new( self.sink_map_err( closure ) ) + } +} + +impl CloneSinkExt for T + + where T: Sink, Error=E> + Clone + Unpin + Send + 'static, + E: std::error::Error + Sync + Send + 'static +{} + + +/// Turn into a boxed error +// +pub fn dyn_err<'a, T, Item, E>( sink: T ) -> impl CloneSink<'a, Item, DynError> + + where T: 'a + Sink + Clone + Unpin + Send + ?Sized, + E: std::error::Error + Sync + Send + 'static +{ + sink.sink_map_err( |e| -> DynError { Box::new(e) } ) +} + // Import module. Avoid * imports here. These are all the foreign names that exist throughout // the crate. The must all be unique. diff --git a/src/mailbox.rs b/src/mailbox.rs index 4d9e92c..36e2367 100644 --- a/src/mailbox.rs +++ b/src/mailbox.rs @@ -39,7 +39,7 @@ impl Mailbox where A: Actor { /// Create a new inbox. // - pub fn new( name: Option<&str>, rx: ChanReceiver ) -> Self + pub fn new( name: impl AsRef, rx: ChanReceiver ) -> Self { static MB_COUNTER: AtomicUsize = AtomicUsize::new( 1 ); @@ -49,7 +49,7 @@ impl Mailbox where A: Actor let id = MB_COUNTER.fetch_add( 1, Ordering::Relaxed ); let rx = RxStrong::new(rx); - let info = Arc::new( ActorInfo::new::( id, name.map( |n| n.into() ) ) ); + let info = Arc::new( ActorInfo::new::( id, name.as_ref().into() ) ); Self { rx, info } } @@ -167,7 +167,7 @@ impl Identify for Mailbox - fn name( &self ) -> Option> + fn name( &self ) -> Arc { self.info.name.clone() } @@ -178,7 +178,11 @@ impl fmt::Debug for Mailbox { fn fmt( &self, f: &mut fmt::Formatter<'_> ) -> fmt::Result { - write!( f, "Mailbox<{}> ~ {}", std::any::type_name::(), &self.info.id ) + match self.info.name.is_empty() + { + true => write!( f, "Mailbox<{}> ~ id: {}" , std::any::type_name::(), &self.info.id ) , + false => write!( f, "Mailbox<{}> ~ id: {}, name: {}", std::any::type_name::(), &self.info.id, self.info.name ) , + } } } @@ -187,10 +191,10 @@ impl fmt::Display for Mailbox { fn fmt( &self, f: &mut fmt::Formatter<'_> ) -> fmt::Result { - match &self.info.name + match self.info.name.is_empty() { - Some(n) => write!( f, "{} ({}, {})", self.info.type_name(), self.info.id, n ) , - None => write!( f, "{} ({})" , self.info.type_name(), self.info.id ) , + true => write!( f, "{} ({})" , self.info.type_name(), self.info.id ) , + false => write!( f, "{} ({}, {})", self.info.type_name(), self.info.id, self.info.name ) , } } } diff --git a/src/rx_strong.rs b/src/rx_strong.rs index 09e7049..d592f03 100644 --- a/src/rx_strong.rs +++ b/src/rx_strong.rs @@ -3,7 +3,8 @@ use crate::{ import::*, BoxEnvelope, ChanReceiver, StrongCount }; /// This wraps a channel receiver in order to do an extra check when the channel returns pending. /// We want strong and weak addresses. When there are no strong addresses left, we shall return -/// `Poll::Ready(None)` instead of `Poll::Pending`. +/// `Poll::Ready(None)` instead of `Poll::Pending`. This causes the mailbox to stop as it thinks +/// the channel is closed. /// /// A waker is stored in case the strong count goes to zero while we are already pending. // @@ -41,13 +42,13 @@ impl Stream for RxStrong where A: Actor fn poll_next( mut self: Pin<&mut Self>, cx: &mut TaskContext<'_> ) -> Poll< Option > { - let size_hint = self.rx.size_hint(); + trace!( "RxStrong polled" ); match Pin::new( &mut self.rx ).poll_next( cx ) { Poll::Pending => { - trace!( "size hint is: {:?}", size_hint ); + trace!( "RxStrong: inner channel returned Pending" ); let count = self.count.lock().expect( "Mutex poisoned" ); @@ -68,7 +69,11 @@ impl Stream for RxStrong where A: Actor // pass through anything but Pending to the channel. // - x => x, + x => + { + trace!( "RxStrong: inner channel returned Poll::Ready(Some(_)): {}", matches!( x, Poll::Ready(Some(_)) ) ); + x + } } } diff --git a/src/weak_addr.rs b/src/weak_addr.rs index 0fae84b..e2c5a75 100644 --- a/src/weak_addr.rs +++ b/src/weak_addr.rs @@ -47,10 +47,10 @@ impl fmt::Debug for WeakAddr { fn fmt( &self, f: &mut fmt::Formatter<'_> ) -> fmt::Result { - let name = match &self.name() + let name = match &self.name().is_empty() { - Some( s ) => format!( ", {}", s ) , - None => String::new() , + true => String::new(), + false => format!( ", {}", &self.name() ) }; write! @@ -70,11 +70,7 @@ impl fmt::Display for WeakAddr { fn fmt( &self, f: &mut fmt::Formatter<'_> ) -> fmt::Result { - match &self.name() - { - Some(n) => write!( f, "{} ({}, {})", self.inner.type_name(), self.id(), n ) , - None => write!( f, "{} ({})" , self.inner.type_name(), self.id() ) , - } + write!( f, "{} ({}, {})", self.inner.type_name(), self.id(), &self.name() ) } } @@ -150,7 +146,7 @@ impl Identify for WeakAddr self.inner.id() } - fn name( &self ) -> Option< Arc > + fn name( &self ) -> Arc { self.inner.name() } diff --git a/tests/addr.rs b/tests/addr.rs index c0e9a66..e910a6a 100644 --- a/tests/addr.rs +++ b/tests/addr.rs @@ -21,7 +21,7 @@ async fn stop_when_addresses_dropped_before_start_mb() -> Result<(), DynError > { // let _ = flexi_logger::Logger::with_str( "trace" ).start(); - let (addr, mb) = Addr::builder().build(); + let (addr, mb) = Addr::builder( "dropped" ).build(); let addr2 = addr.clone(); @@ -44,7 +44,7 @@ async fn stop_when_addresses_dropped() -> Result<(), DynError > { // let _ = flexi_logger::Logger::with_str( "trace" ).start(); - let (mut addr, mb) = Addr::builder().build(); + let (mut addr, mb) = Addr::builder( "stop" ).build(); let mb_handle = AsyncStd.spawn_handle( mb.start( Sum(5) ) )?; diff --git a/tests/basic.rs b/tests/basic.rs index 27af46a..21a6211 100644 --- a/tests/basic.rs +++ b/tests/basic.rs @@ -20,9 +20,9 @@ use #[async_std::test] // -async fn test_basic_send() -> Result<(), DynError > +async fn basic_send() -> Result<(), DynError > { - let mut addr = Addr::builder().spawn( Sum(5), &AsyncStd )?; + let mut addr = Addr::builder( "basic_send" ).spawn( Sum(5), &AsyncStd )?; addr.send( Add( 10 ) ).await?; @@ -35,9 +35,9 @@ async fn test_basic_send() -> Result<(), DynError > #[async_std::test] // -async fn test_basic_call() -> Result<(), DynError > +async fn basic_call() -> Result<(), DynError > { - let mut addr = Addr::builder().spawn( Sum(5), &AsyncStd )?; + let mut addr = Addr::builder( "basic_call" ).spawn( Sum(5), &AsyncStd )?; addr.call( Add(10) ).await?; @@ -52,7 +52,7 @@ async fn test_basic_call() -> Result<(), DynError > // async fn send_from_multiple_addrs() -> Result<(), DynError > { - let mut addr = Addr::builder().spawn( Sum(5), &AsyncStd )?; + let mut addr = Addr::builder( "send_from_multiple_addrs").spawn( Sum(5), &AsyncStd )?; let mut addr2 = addr.clone(); addr .send( Add( 10 ) ).await?; @@ -69,7 +69,7 @@ async fn send_from_multiple_addrs() -> Result<(), DynError > // async fn call_from_multiple_addrs() -> Result<(), DynError > { - let mut addr = Addr::builder().spawn( Sum(5), &AsyncStd )?; + let mut addr = Addr::builder( "call_from_multiple_addrs" ).spawn( Sum(5), &AsyncStd )?; let mut addr2 = addr.clone(); addr .call( Add( 10 ) ).await?; diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 4ab25ad..7a439e0 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -7,13 +7,13 @@ pub mod import { pub use { - futures :: { future::{ FutureExt }, stream, SinkExt, StreamExt, task::{ Spawn, SpawnExt }, channel::* } , - thespis :: { * } , - thespis_impl :: { * } , - tracing :: { trace, error_span } , - tracing_futures::Instrument, - std :: { marker::PhantomData, error::Error, sync::{ Arc, Mutex, atomic::Ordering::SeqCst }, num::NonZeroUsize } , + futures :: { future::{ FutureExt }, stream, SinkExt, StreamExt, task::{ Spawn, SpawnExt }, channel::* } , + thespis :: { * } , + thespis_impl :: { * } , + tracing :: { trace, error_span } , + std :: { marker::PhantomData, error::Error, sync::{ Arc, Mutex, atomic::Ordering::SeqCst }, num::NonZeroUsize } , async_executors :: { * } , + tracing_futures::Instrument, }; } diff --git a/tests/deadlock.rs.bak b/tests/deadlock.rs.bak new file mode 100644 index 0000000..89342c5 --- /dev/null +++ b/tests/deadlock.rs.bak @@ -0,0 +1,250 @@ +//! TODO: this test is still broken. It's still deadlocking. Unfortunately adding +//! log statementes makes the deadlock go away. +//! It will need further debugging. + +//! Test for the gateway type deadlock. Verify that by using a priority channel +//! we can avoid it. +//! +//! When an actor sits on the intersection between incoming requests and outgoing responses, +//! like when it's managing a network connection, a deadlock can arise. +//! +//! If processing cannot keep up with incoming requests, the inbox of the gate will fill up. +//! This will provide back pressure. If the solution to this is to send some response out +//! to free up space, an issue arises as the gate mailbox wont take any outgoing message as +//! it's already full and the gate actor itself is not taking anything out of it's mailbox +//! as it's currently blocked on an incoming message. +//! +//! In the most simple scenario we can work around this by using a priority channel for the +//! gate mailbox. This way even if the incoming side is fully backed up, a worker actor can +//! still deliver the outgoing response to the gate. It will now take its next message out of +//! it's mailbox and the free slot will propagate back up to the gate. It will now get it's +//! next message and as the mailbox is a priority channel, the outgoing side will get handled +//! first. Even a buffer of one message on the outgoing side is enough to prevent a deadlock. +//! +//! The downside is that this only works in this most simple use case. As soon as you have +//! more than one source of new messages (either more connections sending requests, or new +//! messages being created internally), you cannot guarantee the free slot propagates to the +//! gate that received the response. Actually an actor can block itself by sending itself +//! a message while it's mailbox is full. +//! +//! My conclusion is that in a more complex system using channels for backpressure is not +//! an option. There are a few things you can do based on the architecture of your application: +//! +//! - use a semaphore to keep track of how many requests are currently being +//! processed by the system. +//! - spawn tasks so that they can operate concurrently, so if they block on a send, they are +//! not blocking a processing actor. +//! - use unbounded channels for outgoing messages, for example with a priority channel +//! as in this test. +//! +//! This test is an implementation of the priority channel solution. Note that this does +//! not work with futures channels as they don't guarantee to wake up a sender when +//! a message is read from a reader. +// +#![ cfg(not( target_arch = "wasm32" )) ] +#![allow(clippy::suspicious_else_formatting)] +use +{ + tracing :: { * } , + thespis :: { * } , + thespis_impl :: { * } , + async_executors :: { AsyncStd, SpawnHandleExt } , + std :: { error::Error, future::Future, pin::Pin } , + futures :: { stream::{ select_with_strategy, PollNext }, FutureExt } , + async_progress :: { Progress } , + tokio_util :: { sync::PollSender } , + tokio_stream :: { wrappers::ReceiverStream } , +}; + + + + +const BOUNDED: usize = 1; + +pub type DynResult = Result< T, Box >; + + +#[ derive( Actor ) ] struct Gate +{ + worker: WeakAddr , +} + + +#[ derive( Actor ) ] struct Worker +{ + gate : Addr , + steps : Progress , + send_out: Option + Send >>>, +} + + +struct Request (usize); +struct Response(usize); + +impl Message for Request { type Return = (); } +impl Message for Response { type Return = (); } + + + +impl Handler< Request > for Gate +{ + #[async_fn] fn handle( &mut self, req: Request ) + { + info!( "Gate: New request: {}.", req.0 ); + + if req.0 == 2 + { + let mut f = self.worker.send( Request(req.0) ); + + let f2 = futures::future::poll_fn( |cx: &mut std::task::Context| + { + let p = Pin::new( &mut f ).poll(cx); + debug!( "result of trying to send: {:?}", &p ); + p + }); + + f2.await.expect("send"); + } + + else + { + self.worker.send( Request(req.0) ).await.expect( "send" ); + } + + info!( "Gate: Successfully sent: {}.", req.0 ); + } +} + + + +impl Handler< Response > for Gate +{ + #[async_fn] fn handle( &mut self, resp: Response ) + { + info!( "Gate: Sending reponse to request: {}.", resp.0 ); + } +} + + + +impl Handler< Request > for Worker +{ + #[async_fn] fn handle( &mut self, work: Request ) + { + info!( "Worker: Grinding on request: {}.", work.0 ); + + if work.0 == 1 + { + self.steps.set_state( GateStep::BackedUp ).await; + } + + + info!( "Worker: Waiting for SendOut." ); + if let Some(f) = self.send_out.take() { f.await; } + info!( "Worker: Green light from SendOut." ); + + self.gate.send( Response(work.0) ).await.expect( "send" ); + info!( "Worker: Response sent." ); + + } +} + + +// Steps we need to take. +// +// - send BOUNDED *2 + 2 messages. This should fill the both mailboxes and +// both actors should have a message ready to go out. +// - verify that that the mailbox in for gate gives back pressure +// - verify that we can send out with the mailbox out of gate. +// - verify that we take the next message out of our mailbox in worker, this now +// allows gate to forward the message it was waiting with. +// - now gate can take the next message from it's mailbox which should prioritize the outgoing one. + +// Some steps in our flow. +// +#[ derive( Debug, Clone, PartialEq, Eq )] +// +enum GateStep +{ + Fill, + BackedUp, + SendOut, +} + + +#[ async_std::test ] +// +async fn deadlock() -> DynResult<()> +{ + // let _ = tracing_subscriber::fmt::Subscriber::builder() + + // .with_max_level(tracing::Level::TRACE) + // .with_env_filter( "trace" ) + // .json() + // .try_init() + // ; + + let steps = Progress::new( GateStep::Fill ); + let backed_up = steps.once( GateStep::BackedUp ); + let send_out = steps.once( GateStep::SendOut ).map(|_|()); + + let ( low_tx, low_rx) = tokio::sync::mpsc::channel( BOUNDED ); + let (high_tx, high_rx) = tokio::sync::mpsc::channel( BOUNDED ); + + let low_rx = ReceiverStream::new( low_rx); + let high_rx = ReceiverStream::new(high_rx); + + // tokio error contains the message which isn't guaranteed to be `Sync`. + // + let low_tx = PollSender::new( low_tx).sink_map_err( |_| std::io::Error::from(std::io::ErrorKind::NotConnected) ); + let high_tx = PollSender::new(high_tx).sink_map_err( |_| std::io::Error::from(std::io::ErrorKind::NotConnected) ); + + let strategy = |_: &mut ()| PollNext::Left; + let gate_rx = Box::new( select_with_strategy( high_rx, low_rx, strategy ) ); + + let gate_mb = Mailbox::new( "gate", gate_rx ); + let mut gate_low_addr = gate_mb.addr( low_tx.dyned() ); + let gate_high_addr = gate_mb.addr( high_tx.dyned() ); + + let (worker_addr, worker_mb) = Addr::builder( "worker" ).bounded( Some(BOUNDED) ).build(); + + let gate = Gate { worker: worker_addr.weak() }; + let worker = Worker { gate : gate_high_addr, steps: steps.clone(), send_out: send_out.boxed().into() }; + + let gate_handle = AsyncStd.spawn_handle( gate_mb.start( gate ) )?; + let worker_handle = AsyncStd.spawn_handle( worker_mb.start( worker ) )?; + + // Fill both queues and one message for each actor. + // + let fill = BOUNDED*2 + 2; + + for i in 1..=fill + { + info!( "main: prepping request: {i}." ); + gate_low_addr.send( Request(i) ).await?; + info!( "main: sent request: {i}." ); + } + + backed_up.await; + steps.set_state( GateStep::SendOut ).await; + + info!( "main: sent SendOut." ); + + for i in (fill+1)..10 + { + gate_low_addr.send( Request(i) ).await?; + } + + // This synchronizes the last one so we don't drop the addresses to early. + // + gate_low_addr.call( Request(10) ).await?; + + + drop(gate_low_addr); + drop(worker_addr); + + worker_handle.await; + gate_handle.await; + + Ok(()) +} diff --git a/tests/errors.rs b/tests/errors.rs index bffffd6..82f9e76 100644 --- a/tests/errors.rs +++ b/tests/errors.rs @@ -21,9 +21,9 @@ use // #[async_std::test] // -async fn test_mb_closed() -> Result<(), DynError > +async fn mb_closed() -> Result<(), DynError > { - let (mut addr, mb) = Addr::builder().build(); + let (mut addr, mb) = Addr::builder( "mb_closed" ).build(); let (trigger_tx, trigger_rx) = oneshot::channel(); @@ -84,11 +84,11 @@ impl Handler for Panic #[async_std::test] // -async fn test_mb_closed_before_response() -> Result<(), DynError > +async fn mb_closed_before_response() -> Result<(), DynError > { // flexi_logger::Logger::with_str( "warn, thespis_impl=trace" ).start().expect( "flexi_logger"); - let (mut addr, mb) = Addr::builder().build(); + let (mut addr, mb) = Addr::builder( "mb_closed_before_response" ).build(); let mb_task = async move diff --git a/tests/local_spawn.rs b/tests/local_spawn.rs index bbd4509..fd60112 100644 --- a/tests/local_spawn.rs +++ b/tests/local_spawn.rs @@ -20,7 +20,7 @@ use #[test] // -fn test_not_send_actor() -> Result<(), DynError > +fn not_send_actor() -> Result<(), DynError > { let mut pool = LocalPool::new(); let exec = pool.spawner(); @@ -32,7 +32,7 @@ fn test_not_send_actor() -> Result<(), DynError > // If we inline this in the next statement, it actually compiles with rt::spawn( program ) instead // of spawn_local. // - let mut addr = Addr::builder().spawn_local( SumNoSend::new(5), &exec2 ).expect( "start mailbox" ); + let mut addr = Addr::builder( "not_send_actor" ).spawn_local( SumNoSend::new(5), &exec2 ).expect( "start mailbox" ); addr.send( Add( 10 ) ).await.expect( "Send" ); @@ -53,7 +53,7 @@ fn test_not_send_actor() -> Result<(), DynError > #[test] // -fn test_send_actor() -> Result<(), DynError > +fn send_actor() -> Result<(), DynError > { let mut pool = LocalPool::new(); let exec = pool.spawner(); @@ -64,7 +64,7 @@ fn test_send_actor() -> Result<(), DynError > // If we inline this in the next statement, it actually compiles with rt::spawn( program ) instead // of spawn_local. // - let mut addr = Addr::builder().spawn_local( Sum(5), &exec2 ).expect( "spawn actor mailbox" ); + let mut addr = Addr::builder( "send_actor" ).spawn_local( Sum(5), &exec2 ).expect( "spawn actor mailbox" ); addr.send( Add( 10 ) ).await.expect( "Send failed" ); @@ -94,8 +94,8 @@ fn test_manually_not_send_actor() -> Result<(), DynError > let actor = SumNoSend::new(5); let (tx, rx) = mpsc::unbounded() ; - let mb = Mailbox::new( Some("SumNoSend"), Box::new(rx) ) ; - let tx = Box::new(tx.sink_map_err( |e| Box::new(e) as DynError )) ; + let mb = Mailbox::new( "SumNoSend", Box::new(rx) ) ; + let tx = Box::new(tx.sink_map_err( |e| Box::new(e) as DynError )) ; let mut addr = mb.addr( tx ) ; exec2.spawn_local( async { mb.start_local( actor ).await; } ).expect( "spawn actor mailbox" ); @@ -131,8 +131,8 @@ fn test_manually_send_actor() -> Result<(), DynError > // let actor = Sum(5) ; let (tx, rx) = mpsc::unbounded() ; - let mb = Mailbox::new( Some("Sum"), Box::new(rx) ) ; - let tx = Box::new(tx.sink_map_err( |e| Box::new(e) as DynError )) ; + let mb = Mailbox::new( "Sum", Box::new(rx) ) ; + let tx = Box::new(tx.sink_map_err( |e| Box::new(e) as DynError )) ; let mut addr = mb.addr( tx ) ; exec2.spawn_local( async { mb.start_local( actor ).await; } ).expect( "spawn actor mailbox" ); diff --git a/tests/multi_thread.rs b/tests/multi_thread.rs index b379ad9..683929d 100644 --- a/tests/multi_thread.rs +++ b/tests/multi_thread.rs @@ -21,8 +21,8 @@ use async fn move_addr_send() -> Result { - let mut addr = Addr::builder().spawn( Sum(5), &AsyncStd )?; - let mut addr2 = addr.clone(); + let mut addr = Addr::builder( "move_addr_send" ).spawn( Sum(5), &AsyncStd )?; + let mut addr2 = addr.clone(); thread::spawn( move || { @@ -42,8 +42,8 @@ async fn move_addr_send() -> Result async fn move_addr() -> Result { - let mut addr = Addr::builder().spawn( Sum(5), &AsyncStd )?; - let mut addr2 = addr.clone(); + let mut addr = Addr::builder( "move_addr" ).spawn( Sum(5), &AsyncStd )?; + let mut addr2 = addr.clone(); let (tx, rx) = oneshot::channel::<()>(); @@ -72,7 +72,7 @@ async fn move_addr() -> Result async fn move_call() -> Result { - let mut addr = Addr::builder().spawn( Sum(5), &AsyncStd )?; + let mut addr = Addr::builder( "move_call" ).spawn( Sum(5), &AsyncStd )?; let mut addr2 = addr.clone(); let (tx, rx) = oneshot::channel::<()>(); let call = async move { addr2.call( Add( 10 ) ).await.expect( "call addr2" ) }; diff --git a/tests/supervisor.rs b/tests/supervisor.rs index 166c982..968f488 100644 --- a/tests/supervisor.rs +++ b/tests/supervisor.rs @@ -61,7 +61,7 @@ impl Handler< Supervise > for Supervisor let mut mb_handle = if actor.inbox.is_none() { - let (addr_new, mb_handle) = Addr::builder().spawn_handle( (actor.create)(), &AsyncStd ).unwrap(); + let (addr_new, mb_handle) = Addr::builder( "supervised" ).spawn_handle( (actor.create)(), &AsyncStd ).unwrap(); addr = Some(addr_new); @@ -93,7 +93,7 @@ impl Handler< Supervise > for Supervisor // async fn supervise() -> Result< (), DynError > { - let (mut addr, mut mb_handle) = Addr::builder().spawn_handle( Counter, &AsyncStd )?; + let (mut addr, mut mb_handle) = Addr::builder( "supervised").spawn_handle( Counter, &AsyncStd )?; let supervisor = async move @@ -123,7 +123,7 @@ async fn supervise() -> Result< (), DynError > // async fn supervisor() -> Result< (), DynError > { - let mut supervisor = Addr::builder().spawn( Supervisor{ exec: Box::new( AsyncStd ) }, &AsyncStd )?; + let mut supervisor = Addr::builder( "supervisor" ).spawn( Supervisor{ exec: Box::new( AsyncStd ) }, &AsyncStd )?; let supervise = Supervise { diff --git a/tests/wasm.rs b/tests/wasm.rs index dd9a702..080fa44 100644 --- a/tests/wasm.rs +++ b/tests/wasm.rs @@ -28,7 +28,7 @@ async fn stop_when_addresses_dropped_before_start_mb() { // let _ = flexi_logger::Logger::with_str( "trace" ).start(); - let (addr, mb) = Addr::builder().build(); + let (addr, mb) = Addr::builder( "stop_when_addresses_dropped_before_start_mb" ).build(); let addr2 = addr.clone(); @@ -46,9 +46,9 @@ async fn stop_when_addresses_dropped_before_start_mb() #[ wasm_bindgen_test ] // -async fn test_basic_send() +async fn basic_send() { - let mut addr = Addr::builder().spawn( Sum(5), &Bindgen ).unwrap_throw(); + let mut addr = Addr::builder( "basic_send" ).spawn( Sum(5), &Bindgen ).unwrap_throw(); addr.send( Add( 10 ) ).await.unwrap_throw(); @@ -59,9 +59,9 @@ async fn test_basic_send() #[ wasm_bindgen_test ] // -async fn test_basic_call() +async fn basic_call() { - let mut addr = Addr::builder().spawn( Sum(5), &Bindgen ).unwrap_throw(); + let mut addr = Addr::builder( "basic_call" ).spawn( Sum(5), &Bindgen ).unwrap_throw(); addr.call( Add(10) ).await.unwrap_throw(); @@ -74,7 +74,7 @@ async fn test_basic_call() // async fn send_from_multiple_addrs() { - let mut addr = Addr::builder().spawn( Sum(5), &Bindgen ).unwrap_throw(); + let mut addr = Addr::builder( "send_from_multiple_addrs" ).spawn( Sum(5), &Bindgen ).unwrap_throw(); let mut addr2 = addr.clone(); addr .send( Add( 10 ) ).await.unwrap_throw(); @@ -89,7 +89,7 @@ async fn send_from_multiple_addrs() // async fn call_from_multiple_addrs() { - let mut addr = Addr::builder().spawn( Sum(5), &Bindgen ).unwrap_throw(); + let mut addr = Addr::builder( "call_from_multiple_addrs" ).spawn( Sum(5), &Bindgen ).unwrap_throw(); let mut addr2 = addr.clone(); addr .call( Add( 10 ) ).await.unwrap_throw(); diff --git a/tests/weak_addr.rs b/tests/weak_addr.rs index c08b4a8..d85a602 100644 --- a/tests/weak_addr.rs +++ b/tests/weak_addr.rs @@ -24,7 +24,7 @@ use // async fn weak_basic_use() -> Result<(), DynError > { - let addr = Addr::builder().spawn( Sum(5), &AsyncStd )?; + let addr = Addr::builder( "weak_basic_use" ).spawn( Sum(5), &AsyncStd )?; let mut weak = addr.weak(); weak.send( Add( 10 ) ).await?; @@ -39,7 +39,7 @@ async fn weak_basic_use() -> Result<(), DynError > // async fn weak_plenty() -> Result<(), DynError > { - let addr = Addr::builder().spawn( Sum(5), &AsyncStd )?; + let addr = Addr::builder( "weak_plenty" ).spawn( Sum(5), &AsyncStd )?; let mut weak = addr.weak(); let weak2 = addr.weak(); let _weak3 = addr.weak(); @@ -69,7 +69,7 @@ async fn weak_plenty() -> Result<(), DynError > // async fn weak_refuse() -> Result<(), DynError > { - let addr = Addr::builder().spawn( Sum(5), &AsyncStd )?; + let addr = Addr::builder( "weak_refuse" ).spawn( Sum(5), &AsyncStd )?; let mut weak = addr.weak(); let addr2 = weak.strong()?; @@ -91,7 +91,7 @@ async fn weak_refuse() -> Result<(), DynError > // async fn weak_upgrade() -> Result<(), DynError > { - let addr = Addr::builder().spawn( Sum(5), &AsyncStd )?; + let addr = Addr::builder( "weak_upgrade" ).spawn( Sum(5), &AsyncStd )?; let weak = addr.weak(); let mut upgrade = weak.strong()?; @@ -107,7 +107,7 @@ async fn weak_upgrade() -> Result<(), DynError > // async fn weak_upgrade_refuse() -> Result<(), DynError > { - let addr = Addr::builder().spawn( Sum(5), &AsyncStd )?; + let addr = Addr::builder( "weak_upgrade_refuse" ).spawn( Sum(5), &AsyncStd )?; let weak = addr.weak(); drop(addr); @@ -128,7 +128,7 @@ async fn weak_upgrade_refuse() -> Result<(), DynError > // async fn strong_drop_close_mailbox() -> Result<(), DynError > { - let (addr, mb_handle) = Addr::builder().spawn_handle( Sum(5), &AsyncStd )?; + let (addr, mb_handle) = Addr::builder( "strong_drop_close_mailbox" ).spawn_handle( Sum(5), &AsyncStd )?; let _weak = addr.weak(); drop(addr); @@ -146,7 +146,7 @@ async fn strong_drop_close_mailbox() -> Result<(), DynError > // async fn weak_drop_dont_close_mailbox() -> Result<(), DynError > { - let mut addr = Addr::builder().spawn( Sum(5), &AsyncStd )?; + let mut addr = Addr::builder( "weak_drop_dont_close_mailbox" ).spawn( Sum(5), &AsyncStd )?; let weak = addr.weak(); drop(weak); @@ -184,7 +184,7 @@ async fn drop_strong_while_mb_pending() -> Result<(), DynError > let shared = Arc::new(Mutex::new( () )); let shared2 = shared.clone(); - let (addr, mb) = Addr::builder().bounded( Some(1) ).build(); + let (addr, mb) = Addr::builder( "drop_strong_while_mb_pending" ).bounded( Some(1) ).build(); let mut weak = addr.weak();