-
Notifications
You must be signed in to change notification settings - Fork 366
/
Copy pathactor.rs
244 lines (218 loc) · 8.1 KB
/
actor.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
// Copyright (C) 2024 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at hello@quickwit.io.
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
use std::any::type_name;
use std::fmt;
use std::sync::Arc;
use async_trait::async_trait;
use thiserror::Error;
use tracing::error;
use crate::{ActorContext, QueueCapacity, SendError};
/// The actor exit status represents the outcome of the execution of an actor,
/// after the end of the execution.
///
/// It is in many ways, similar to the exit status code of a program.
#[derive(Clone, Debug, Error)]
pub enum ActorExitStatus {
/// The actor successfully exited.
///
/// It happens either because:
/// - all of the existing mailboxes were dropped and the actor message queue was exhausted. No
/// new message could ever arrive to the actor. (This exit is triggered by the framework.) or
/// - the actor `process_message` method returned `Err(ExitStatusCode::Success)`. (This exit is
/// triggered by the actor implementer.)
///
/// (This is equivalent to exit status code 0.)
/// Note that this is not really an error.
#[error("success")]
Success,
/// The actor was asked to gracefully shutdown.
///
/// (Semantically equivalent to exit status code 130, triggered by SIGINT aka Ctrl-C, or
/// SIGQUIT)
#[error("quit")]
Quit,
/// The actor tried to send a message to a dowstream actor and failed.
/// The logic ruled that the actor should be killed.
///
/// (Semantically equivalent to exit status code 141, triggered by SIGPIPE)
#[error("downstream actor exited")]
DownstreamClosed,
/// The actor was killed.
///
/// It can happen because:
/// - it received `Command::Kill`.
/// - its kill switch was activated.
///
/// (Semantically equivalent to exit status code 137, triggered by SIGKILL)
#[error("killed")]
Killed,
/// An unexpected error happened while processing a message.
#[error("failure(cause={0:?})")]
Failure(Arc<anyhow::Error>),
/// The thread or the task executing the actor loop panicked.
#[error("panicked")]
Panicked,
}
impl From<anyhow::Error> for ActorExitStatus {
fn from(err: anyhow::Error) -> Self {
ActorExitStatus::Failure(Arc::new(err))
}
}
impl ActorExitStatus {
pub fn is_success(&self) -> bool {
matches!(self, ActorExitStatus::Success)
}
}
impl From<SendError> for ActorExitStatus {
fn from(_: SendError) -> Self {
ActorExitStatus::DownstreamClosed
}
}
/// An actor has an internal state and processes a stream of messages.
/// Each actor has a mailbox where the messages are enqueued before being processed.
///
/// While processing a message, the actor typically
/// - update its state;
/// - emits one or more messages to other actors.
#[async_trait]
pub trait Actor: Send + Sized + 'static {
/// Piece of state that can be copied for assert in unit test, admin, etc.
type ObservableState: fmt::Debug + serde::Serialize + Send + Sync + Clone;
/// A name identifying the type of actor.
///
/// Ideally respect the `CamelCase` convention.
///
/// It does not need to be "instance-unique", and can be the name of
/// the actor implementation.
fn name(&self) -> String {
type_name::<Self>().to_string()
}
/// The runner method makes it possible to decide the environment
/// of execution of the Actor.
///
/// Actor with a handler that may block for more than 50 microseconds
/// should use the `ActorRunner::DedicatedThread`.
fn runtime_handle(&self) -> tokio::runtime::Handle {
tokio::runtime::Handle::current()
}
/// If set to true, the actor will yield after every single
/// message.
///
/// For actors that are calling `.await` regularly,
/// returning `false` can yield better performance.
fn yield_after_each_message(&self) -> bool {
true
}
/// The Actor's incoming mailbox queue capacity. It is set when the actor is spawned.
fn queue_capacity(&self) -> QueueCapacity {
QueueCapacity::Unbounded
}
/// Extracts an observable state. Useful for unit tests, and admin UI.
///
/// This function should return quickly.
fn observable_state(&self) -> Self::ObservableState;
/// Initialize is called before running the actor.
///
/// This function is useful for instance to schedule an initial message in a looping
/// actor.
///
/// It can be compared just to an implicit Initial message.
///
/// Returning an ActorExitStatus will therefore have the same effect as if it
/// was in `process_message` (e.g. the actor will stop, the finalize method will be called.
/// the kill switch may be activated etc.)
async fn initialize(&mut self, _ctx: &ActorContext<Self>) -> Result<(), ActorExitStatus> {
Ok(())
}
/// This function is called after a series of one, or several messages have been processed and
/// no more message is available.
///
/// It is a great place to have the actor "sleep".
///
/// Quickwit's Indexer actor for instance use `on_drained_messages` to
/// schedule indexing in such a way that an indexer drains all of its
/// available messages and sleeps for some amount of time.
async fn on_drained_messages(
&mut self,
_ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
Ok(())
}
/// Hook that can be set up to define what should happen upon actor exit.
/// This hook is called only once.
///
/// It is always called regardless of the reason why the actor exited.
/// The exit status is passed as an argument to make it possible to act conditionnally
/// upon it.
/// For instance, it is often better to do as little work as possible on a killed actor.
/// It can be done by checking the `exit_status` and performing an early-exit if it is
/// equal to `ActorExitStatus::Killed`.
async fn finalize(
&mut self,
_exit_status: &ActorExitStatus,
_ctx: &ActorContext<Self>,
) -> anyhow::Result<()> {
Ok(())
}
}
/// Message handler that allows actor to defer the reply
#[async_trait::async_trait]
pub trait DeferableReplyHandler<M>: Actor {
type Reply: Send + 'static;
async fn handle_message(
&mut self,
message: M,
reply: impl FnOnce(Self::Reply) + Send + Sync + 'static,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus>
where
M: Send + 'static;
}
/// Message handler that requires actor to provide immediate response
#[async_trait::async_trait]
pub trait Handler<M>: Actor {
type Reply: Send + 'static;
/// Processes a message.
///
/// If an exit status is returned as an error, the actor will exit.
/// It will stop processing more message, the finalize method will be called,
/// and its exit status will be the one defined in the error.
async fn handle(
&mut self,
message: M,
ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus>;
}
#[async_trait::async_trait]
impl<H, M> DeferableReplyHandler<M> for H
where H: Handler<M>
{
type Reply = H::Reply;
async fn handle_message(
&mut self,
message: M,
reply: impl FnOnce(Self::Reply) + Send + 'static,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus>
where
M: Send + 'static,
{
self.handle(message, ctx).await.map(reply)
}
}