Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rivetkit-rust/packages/rivetkit-core/src/actor/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1160,6 +1160,7 @@
}

fn dispatch_lifecycle_error(&self) -> Option<anyhow::Error> {
// TODO: Share admission policy with RegistryDispatcher::active_actor.
if self.ctx.destroy_requested() {
self.ctx.warn_work_sent_to_stopping_instance("dispatch");
return Some(ActorLifecycleError::Destroying.build());
Expand Down Expand Up @@ -2156,7 +2157,7 @@
self.lifecycle = lifecycle;
if matches!(lifecycle, LifecycleState::Started) {
// A restarted actor is a new generation. Clear shutdown state that was
// only meant to stop the previous generation.

Check warning on line 2160 in rivetkit-rust/packages/rivetkit-core/src/actor/task.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs
self.ctx.reset_abort_signal_for_start();
self.ctx.clear_sleep_requested();
}
Expand Down
13 changes: 13 additions & 0 deletions rivetkit-rust/packages/rivetkit-core/src/registry/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,19 @@ pub(super) fn normalize_actor_request_path(path: &str) -> String {
}
}

#[cfg(test)]
pub(super) fn is_actor_request_path(path: &str) -> bool {
let Some(stripped) = path.strip_prefix("/request") else {
return false;
};

stripped.is_empty()
|| stripped
.as_bytes()
.first()
.is_some_and(|byte| matches!(byte, b'/' | b'?'))
}

pub(super) fn build_envoy_response(response: Response) -> Result<HttpResponse> {
let (status, headers, body) = response.to_parts();

Expand Down
1 change: 1 addition & 0 deletions rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@
Ok(instance) => {
let pending_stop = self
.pending_stops
.remove_async(&request.actor_id.clone())

Check warning on line 678 in rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs
.await
.map(|(_, pending_stop)| pending_stop);
if let Some(pending_stop) = pending_stop {
Expand Down Expand Up @@ -754,7 +754,7 @@
SccEntry::Vacant(entry) => {
entry.insert_entry(state);
}
}

Check warning on line 757 in rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs
}

async fn transition_actor_to_stopping(
Expand Down Expand Up @@ -784,7 +784,7 @@
}
}

async fn remove_stopping_actor_instance(&self, actor_id: &str, expected: &ActiveActorInstance) {

Check warning on line 787 in rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs
match self.actor_instances.entry_async(actor_id.to_owned()).await {
SccEntry::Occupied(entry) => {
let should_remove = match entry.get() {
Expand All @@ -803,11 +803,12 @@
}
}

async fn active_actor(&self, actor_id: &str) -> Result<Arc<ActorTaskHandle>> {

Check warning on line 806 in rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs
if let Some(instance) = self.actor_instances.get_async(&actor_id.to_owned()).await {
match instance.get() {
ActorInstanceState::Active(instance) => {
let instance = instance.clone();
// TODO: Share admission policy with ActorTask::dispatch_lifecycle_error.
if instance.ctx.started() {
if instance.ctx.destroy_requested() {
instance
Expand Down
Loading