Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/prompts/ai-review.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ Actor framework-specific considerations:
- Timer and interval handling

Be concise and specific. Provide line references when suggesting changes.
If the code looks good, acknowledge it briefly.
If the code looks good, acknowledge it briefly.
8 changes: 8 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ jobs:
- name: Run Tests
run: cargo test --verbose

fmt:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4
- name: Run cargo fmt
run: cargo fmt --all -- --check

clippy:
runs-on: ubuntu-latest
needs: build
Expand Down
5 changes: 4 additions & 1 deletion concurrency/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ pub enum RegistryError {
///
/// Returns `Err(AlreadyRegistered)` if the name is already taken.
/// Use [`unregister`] first if you need to replace an existing entry.
pub fn register<T: Clone + Send + Sync + 'static>(name: &str, value: T) -> Result<(), RegistryError> {
pub fn register<T: Clone + Send + Sync + 'static>(
name: &str,
value: T,
) -> Result<(), RegistryError> {
use std::collections::hash_map::Entry;
let mut store = global_store().write().unwrap_or_else(|p| p.into_inner());
match store.entry(name.to_string()) {
Expand Down
10 changes: 4 additions & 6 deletions concurrency/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,10 @@ impl<T: Send + 'static> Future for Response<T> {
}
std::task::Poll::Pending => std::task::Poll::Pending,
},
ResponseState::Ready(_) => {
match std::mem::replace(&mut this.0, ResponseState::Done) {
ResponseState::Ready(result) => std::task::Poll::Ready(result),
_ => unreachable!(),
}
}
ResponseState::Ready(_) => match std::mem::replace(&mut this.0, ResponseState::Done) {
ResponseState::Ready(result) => std::task::Poll::Ready(result),
_ => unreachable!(),
},
ResponseState::Done => panic!("Response polled after completion"),
}
}
Expand Down
90 changes: 50 additions & 40 deletions concurrency/src/tasks/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use spawned_rt::{
tasks::{self as rt, mpsc, oneshot, timeout, watch, CancellationToken, JoinHandle},
threads,
};
use std::{fmt::Debug, future::Future, panic::AssertUnwindSafe, pin::Pin, sync::Arc, time::Duration};
use std::{
fmt::Debug, future::Future, panic::AssertUnwindSafe, pin::Pin, sync::Arc, time::Duration,
};

pub use crate::response::DEFAULT_REQUEST_TIMEOUT;

Expand Down Expand Up @@ -56,11 +58,7 @@ pub trait Actor: Send + Sized + 'static {
/// Uses RPITIT (return-position `impl Trait` in traits), which means this trait
/// is **not** object-safe. For type-erased references, use [`Receiver<M>`] / [`Recipient<M>`].
pub trait Handler<M: Message>: Actor {
fn handle(
&mut self,
msg: M,
ctx: &Context<Self>,
) -> impl Future<Output = M::Result> + Send;
fn handle(&mut self, msg: M, ctx: &Context<Self>) -> impl Future<Output = M::Result> + Send;
}

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -165,10 +163,7 @@ impl<A: Actor> Context<A> {
M: Message,
{
let (tx, rx) = oneshot::channel();
let envelope = MessageEnvelope {
msg,
tx: Some(tx),
};
let envelope = MessageEnvelope { msg, tx: Some(tx) };
self.sender
.send(Box::new(envelope))
.map_err(|_| ActorError::ActorStopped)?;
Expand All @@ -181,7 +176,8 @@ impl<A: Actor> Context<A> {
A: Handler<M>,
M: Message,
{
self.request_with_timeout(msg, DEFAULT_REQUEST_TIMEOUT).await
self.request_with_timeout(msg, DEFAULT_REQUEST_TIMEOUT)
.await
}

/// Send a request and wait for the reply with a custom timeout.
Expand Down Expand Up @@ -324,10 +320,7 @@ impl<A: Actor> ActorRef<A> {
M: Message,
{
let (tx, rx) = oneshot::channel();
let envelope = MessageEnvelope {
msg,
tx: Some(tx),
};
let envelope = MessageEnvelope { msg, tx: Some(tx) };
self.sender
.send(Box::new(envelope))
.map_err(|_| ActorError::ActorStopped)?;
Expand All @@ -340,7 +333,8 @@ impl<A: Actor> ActorRef<A> {
A: Handler<M>,
M: Message,
{
self.request_with_timeout(msg, DEFAULT_REQUEST_TIMEOUT).await
self.request_with_timeout(msg, DEFAULT_REQUEST_TIMEOUT)
.await
}

/// Send a request and wait for the reply with a custom timeout.
Expand Down Expand Up @@ -435,14 +429,10 @@ impl<A: Actor> ActorRef<A> {
let _handle = rt::spawn(inner_future);
}
Backend::Blocking => {
let _handle = rt::spawn_blocking(move || {
rt::block_on(inner_future)
});
let _handle = rt::spawn_blocking(move || rt::block_on(inner_future));
}
Backend::Thread => {
let _handle = threads::spawn(move || {
threads::block_on(inner_future)
});
let _handle = threads::spawn(move || threads::block_on(inner_future));
}
}

Expand All @@ -456,9 +446,7 @@ async fn run_actor<A: Actor>(
mut rx: mpsc::Receiver<Box<dyn Envelope<A> + Send>>,
cancellation_token: CancellationToken,
) {
let start_result = AssertUnwindSafe(actor.started(&ctx))
.catch_unwind()
.await;
let start_result = AssertUnwindSafe(actor.started(&ctx)).catch_unwind().await;
if let Err(panic) = start_result {
tracing::error!("Panic in started() callback: {panic:?}");
cancellation_token.cancel();
Expand Down Expand Up @@ -497,9 +485,7 @@ async fn run_actor<A: Actor>(
}

cancellation_token.cancel();
let stop_result = AssertUnwindSafe(actor.stopped(&ctx))
.catch_unwind()
.await;
let stop_result = AssertUnwindSafe(actor.stopped(&ctx)).catch_unwind().await;
if let Err(panic) = stop_result {
tracing::error!("Panic in stopped() callback: {panic:?}");
}
Expand Down Expand Up @@ -620,13 +606,19 @@ mod tests {
}

struct GetCount;
impl Message for GetCount { type Result = u64; }
impl Message for GetCount {
type Result = u64;
}

struct Increment;
impl Message for Increment { type Result = u64; }
impl Message for Increment {
type Result = u64;
}

struct StopCounter;
impl Message for StopCounter { type Result = u64; }
impl Message for StopCounter {
type Result = u64;
}

impl Actor for Counter {}

Expand Down Expand Up @@ -784,7 +776,9 @@ mod tests {
runtime.block_on(async move {
struct SlowActor;
struct SlowOp;
impl Message for SlowOp { type Result = (); }
impl Message for SlowOp {
type Result = ();
}
impl Actor for SlowActor {}
impl Handler<SlowOp> for SlowActor {
async fn handle(&mut self, _msg: SlowOp, _ctx: &Context<Self>) {
Expand Down Expand Up @@ -812,7 +806,9 @@ mod tests {
assert_eq!(result, 42);

// Also test request helper
let result = request(&*recipient, GetCount, Duration::from_secs(5)).await.unwrap();
let result = request(&*recipient, GetCount, Duration::from_secs(5))
.await
.unwrap();
assert_eq!(result, 42);
});
}
Expand All @@ -822,7 +818,9 @@ mod tests {
struct SlowShutdownActor;

struct StopSlow;
impl Message for StopSlow { type Result = (); }
impl Message for StopSlow {
type Result = ();
}

impl Actor for SlowShutdownActor {
async fn stopped(&mut self, _ctx: &Context<Self>) {
Expand Down Expand Up @@ -903,7 +901,9 @@ mod tests {
struct BadlyBehavedTask;

struct DoBlock;
impl Message for DoBlock { type Result = (); }
impl Message for DoBlock {
type Result = ();
}

impl Actor for BadlyBehavedTask {}

Expand All @@ -916,7 +916,9 @@ mod tests {
}

struct IncrementWell;
impl Message for IncrementWell { type Result = (); }
impl Message for IncrementWell {
type Result = ();
}

struct WellBehavedTask {
pub count: u64,
Expand Down Expand Up @@ -998,7 +1000,9 @@ mod tests {
runtime.block_on(async move {
struct PanicOnStart;
struct Ping;
impl Message for Ping { type Result = (); }
impl Message for Ping {
type Result = ();
}
impl Actor for PanicOnStart {
async fn started(&mut self, _ctx: &Context<Self>) {
panic!("boom in started");
Expand All @@ -1021,9 +1025,13 @@ mod tests {
runtime.block_on(async move {
struct PanicOnMsg;
struct Explode;
impl Message for Explode { type Result = (); }
impl Message for Explode {
type Result = ();
}
struct Check;
impl Message for Check { type Result = u32; }
impl Message for Check {
type Result = u32;
}
impl Actor for PanicOnMsg {}
impl Handler<Explode> for PanicOnMsg {
async fn handle(&mut self, _msg: Explode, _ctx: &Context<Self>) {
Expand All @@ -1050,7 +1058,9 @@ mod tests {
runtime.block_on(async move {
struct PanicOnStop;
struct StopMe;
impl Message for StopMe { type Result = (); }
impl Message for StopMe {
type Result = ();
}
impl Actor for PanicOnStop {
async fn stopped(&mut self, _ctx: &Context<Self>) {
panic!("boom in stopped");
Expand Down
23 changes: 11 additions & 12 deletions concurrency/src/tasks/stream_tests.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use crate::tasks::{
send_after, Actor, ActorStart, Context, Handler,
stream::spawn_listener,
};
use crate::message::Message;
use crate::tasks::{send_after, stream::spawn_listener, Actor, ActorStart, Context, Handler};
use futures::{stream, StreamExt};
use spawned_rt::tasks::{self as rt, BroadcastStream, ReceiverStream};
use std::time::Duration;
Expand All @@ -14,15 +11,21 @@ enum StreamMsg {
Add(u16),
Error,
}
impl Message for StreamMsg { type Result = (); }
impl Message for StreamMsg {
type Result = ();
}

#[derive(Debug)]
struct StopSum;
impl Message for StopSum { type Result = (); }
impl Message for StopSum {
type Result = ();
}

#[derive(Debug)]
struct GetValue;
impl Message for GetValue { type Result = u16; }
impl Message for GetValue {
type Result = u16;
}

// --- Summatory Actor ---

Expand Down Expand Up @@ -158,11 +161,7 @@ pub fn test_stream_cancellation() {
.filter_map(|result| async move { result.ok().map(StreamMsg::Add) }),
);

let _ = send_after(
Duration::from_millis(STOP_TIME),
ctx,
StopSum,
);
let _ = send_after(Duration::from_millis(STOP_TIME), ctx, StopSum);

rt::sleep(Duration::from_millis(READ_TIME)).await;
let val = summatory.request(GetValue).await.unwrap();
Expand Down
Loading
Loading