diff --git a/engine/packages/api-peer/src/runner_configs.rs b/engine/packages/api-peer/src/runner_configs.rs index e10fcd92b7..8324984794 100644 --- a/engine/packages/api-peer/src/runner_configs.rs +++ b/engine/packages/api-peer/src/runner_configs.rs @@ -124,14 +124,14 @@ pub async fn upsert( }) } -#[derive(Debug, Serialize, Deserialize, IntoParams)] +#[derive(Debug, Serialize, Clone, Deserialize, IntoParams)] #[serde(deny_unknown_fields)] #[into_params(parameter_in = Query)] pub struct DeleteQuery { pub namespace: String, } -#[derive(Deserialize)] +#[derive(Deserialize, Clone)] #[serde(deny_unknown_fields)] pub struct DeletePath { pub runner_name: String, diff --git a/engine/packages/api-public/src/runner_configs/delete.rs b/engine/packages/api-public/src/runner_configs/delete.rs index 0d7f2245c0..caa966cfd0 100644 --- a/engine/packages/api-public/src/runner_configs/delete.rs +++ b/engine/packages/api-public/src/runner_configs/delete.rs @@ -1,5 +1,6 @@ -use anyhow::Result; +use anyhow::{Context, Result}; use axum::response::{IntoResponse, Response}; +use futures_util::{StreamExt, TryStreamExt}; use rivet_api_builder::{ ApiError, extract::{Extension, Json, Path, Query}, @@ -38,30 +39,43 @@ pub async fn delete( async fn delete_inner(ctx: ApiCtx, path: DeletePath, query: DeleteQuery) -> Result { ctx.auth().await?; - for dc in &ctx.config().topology().datacenters { - if ctx.config().dc_label() == dc.datacenter_label { - rivet_api_peer::runner_configs::delete( - ctx.clone().into(), - DeletePath { - runner_name: path.runner_name.clone(), - }, - DeleteQuery { - namespace: query.namespace.clone(), - }, - ) - .await?; - } else { - request_remote_datacenter::( - ctx.config(), - dc.datacenter_label, - &format!("/runner-configs/{}", path.runner_name), - axum::http::Method::DELETE, - Some(&query), - Option::<&()>::None, - ) - .await?; - } - } + let dcs = ctx.config().topology().datacenters.clone(); + futures_util::stream::iter(dcs) + .map(|dc| { + let ctx = ctx.clone(); + let query = query.clone(); + let path = path.clone(); + async move { + if ctx.config().dc_label() == dc.datacenter_label { + rivet_api_peer::runner_configs::delete( + ctx.clone().into(), + DeletePath { + runner_name: path.runner_name.clone(), + }, + DeleteQuery { + namespace: query.namespace.clone(), + }, + ) + .await?; + } else { + request_remote_datacenter::( + ctx.config(), + dc.datacenter_label, + &format!("/runner-configs/{}", path.runner_name), + axum::http::Method::DELETE, + Some(&query), + Option::<&()>::None, + ) + .await?; + } + + anyhow::Ok(()) + } + }) + .buffer_unordered(16) + .try_collect::>() + // NOTE: We must error when any peer request fails, not all + .await?; // Resolve namespace let namespace = ctx diff --git a/engine/packages/api-public/src/runner_configs/upsert.rs b/engine/packages/api-public/src/runner_configs/upsert.rs index d8d2b67049..b13d311ca4 100644 --- a/engine/packages/api-public/src/runner_configs/upsert.rs +++ b/engine/packages/api-public/src/runner_configs/upsert.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use anyhow::Result; use axum::response::{IntoResponse, Response}; +use futures_util::{StreamExt, TryStreamExt}; use rivet_api_builder::{ ApiError, extract::{Extension, Json, Path, Query}, @@ -85,58 +86,75 @@ async fn upsert_inner( }) .next(); - // Apply config - let mut any_endpoint_config_changed = false; - for dc in &ctx.config().topology().datacenters { - if let Some(runner_config) = body.datacenters.remove(&dc.name) { - let response = if ctx.config().dc_label() == dc.datacenter_label { - rivet_api_peer::runner_configs::upsert( - ctx.clone().into(), - path.clone(), - query.clone(), - rivet_api_peer::runner_configs::UpsertRequest(runner_config), - ) - .await? - } else { - request_remote_datacenter::( - ctx.config(), - dc.datacenter_label, - &format!("/runner-configs/{}", path.runner_name), - axum::http::Method::PUT, - Some(&query), - Some(&runner_config), - ) - .await? - }; + let dcs = ctx + .config() + .topology() + .datacenters + .iter() + .map(|dc| (dc.clone(), body.datacenters.remove(&dc.name))) + .collect::>(); + let any_endpoint_config_changed = futures_util::stream::iter(dcs) + .map(|(dc, runner_config)| { + let ctx = ctx.clone(); + let query = query.clone(); + let path = path.clone(); + async move { + if let Some(runner_config) = runner_config { + let response = if ctx.config().dc_label() == dc.datacenter_label { + rivet_api_peer::runner_configs::upsert( + ctx.clone().into(), + path.clone(), + query.clone(), + rivet_api_peer::runner_configs::UpsertRequest(runner_config), + ) + .await? + } else { + request_remote_datacenter::( + ctx.config(), + dc.datacenter_label, + &format!("/runner-configs/{}", path.runner_name), + axum::http::Method::PUT, + Some(&query), + Some(&runner_config), + ) + .await? + }; - if response.endpoint_config_changed { - any_endpoint_config_changed = true; - } - } else { - if ctx.config().dc_label() == dc.datacenter_label { - rivet_api_peer::runner_configs::delete( - ctx.clone().into(), - DeletePath { - runner_name: path.runner_name.clone(), - }, - DeleteQuery { - namespace: query.namespace.clone(), - }, - ) - .await?; - } else { - request_remote_datacenter::( - ctx.config(), - dc.datacenter_label, - &format!("/runner-configs/{}", path.runner_name), - axum::http::Method::DELETE, - Some(&query), - Option::<&()>::None, - ) - .await?; + anyhow::Ok(response.endpoint_config_changed) + } else { + if ctx.config().dc_label() == dc.datacenter_label { + rivet_api_peer::runner_configs::delete( + ctx.clone().into(), + DeletePath { + runner_name: path.runner_name.clone(), + }, + DeleteQuery { + namespace: query.namespace.clone(), + }, + ) + .await?; + } else { + request_remote_datacenter::( + ctx.config(), + dc.datacenter_label, + &format!("/runner-configs/{}", path.runner_name), + axum::http::Method::DELETE, + Some(&query), + Option::<&()>::None, + ) + .await?; + } + + Ok(false) + } } - } - } + }) + .buffer_unordered(16) + .try_collect::>() + // NOTE: We must error when any peer request fails, not all + .await? + .into_iter() + .any(|endpoint_config_changed| endpoint_config_changed); // Update runner metadata // diff --git a/engine/packages/api-public/src/runner_configs/utils.rs b/engine/packages/api-public/src/runner_configs/utils.rs index 5c11284f34..cb2d0939c3 100644 --- a/engine/packages/api-public/src/runner_configs/utils.rs +++ b/engine/packages/api-public/src/runner_configs/utils.rs @@ -89,6 +89,7 @@ pub async fn fetch_serverless_runner_metadata( .headers(header_map) .timeout(Duration::from_secs(10)) .send() + .custom_instrument(tracing::info_span!("fetch_metadata_request")) .await .map_err(|err| { if err.is_timeout() { diff --git a/engine/packages/api-util/src/lib.rs b/engine/packages/api-util/src/lib.rs index 1c67533c45..b67a2fb3cd 100644 --- a/engine/packages/api-util/src/lib.rs +++ b/engine/packages/api-util/src/lib.rs @@ -109,38 +109,39 @@ where A: Fn(u16, I, &mut R), R: Default + Send + 'static, { - let dcs = &ctx.config().topology().datacenters; - - let results = futures_util::stream::iter(dcs.clone().into_iter().map(|dc| { - let ctx = ctx.clone(); - let query = query.clone(); - let endpoint = endpoint.to_string(); - let local_handler = local_handler.clone(); - - async move { - if dc.datacenter_label == ctx.config().dc_label() { - // Local datacenter - use direct API call - (dc.datacenter_label, local_handler(ctx, query).await) - } else { - // Remote datacenter - HTTP request - ( - dc.datacenter_label, - request_remote_datacenter::( - ctx.config(), + let dcs = ctx.config().topology().datacenters.clone(); + + let results = futures_util::stream::iter(dcs) + .map(|dc| { + let ctx = ctx.clone(); + let query = query.clone(); + let endpoint = endpoint.to_string(); + let local_handler = local_handler.clone(); + + async move { + if dc.datacenter_label == ctx.config().dc_label() { + // Local datacenter - use direct API call + (dc.datacenter_label, local_handler(ctx, query).await) + } else { + // Remote datacenter - HTTP request + ( dc.datacenter_label, - &endpoint, - Method::GET, - Some(&query), - Option::<&()>::None, + request_remote_datacenter::( + ctx.config(), + dc.datacenter_label, + &endpoint, + Method::GET, + Some(&query), + Option::<&()>::None, + ) + .await, ) - .await, - ) + } } - } - })) - .buffer_unordered(16) - .collect::>() - .await; + }) + .buffer_unordered(16) + .collect::>() + .await; // Aggregate results let result_count = results.len(); @@ -159,7 +160,7 @@ where // Error only if all requests failed if result_count == errors.len() { if let Some(res) = errors.into_iter().next() { - return Err(res).with_context(|| "all datacenter requests failed"); + return Err(res).context("all datacenter requests failed"); } } diff --git a/engine/packages/engine/src/util/wf/signal.rs b/engine/packages/engine/src/util/wf/signal.rs index 11c968ee2d..b7ca3c30ca 100644 --- a/engine/packages/engine/src/util/wf/signal.rs +++ b/engine/packages/engine/src/util/wf/signal.rs @@ -54,10 +54,6 @@ pub async fn print_signals(signals: Vec, pretty: bool) -> Result<()> ); } - if let Some(workflow_id) = &signal.workflow_id { - println!(" {} {}", style("to workflow id").bold(), workflow_id,); - } - if let Some(workflow_id) = signal.workflow_id { println!(" {} {}", style("to workflow id").bold(), workflow_id); } diff --git a/engine/packages/gasoline/src/ctx/workflow.rs b/engine/packages/gasoline/src/ctx/workflow.rs index ed0b1b066e..870f2ba146 100644 --- a/engine/packages/gasoline/src/ctx/workflow.rs +++ b/engine/packages/gasoline/src/ctx/workflow.rs @@ -536,7 +536,7 @@ impl WorkflowCtx { pub(crate) fn check_stop(&self) -> WorkflowResult<()> { if self.stop.has_changed().unwrap_or(true) { - Err(WorkflowError::WorkflowStopped) + Err(WorkflowError::WorkflowEvicted) } else { Ok(()) } @@ -546,7 +546,7 @@ impl WorkflowCtx { // We have to clone here because this function can't have a mutable reference to self. The state of // the stop channel doesn't matter because it only ever receives one message let _ = self.stop.clone().changed().await; - Err(WorkflowError::WorkflowStopped) + Err(WorkflowError::WorkflowEvicted) } } diff --git a/engine/packages/gasoline/src/error.rs b/engine/packages/gasoline/src/error.rs index 7553046375..49ba8def6e 100644 --- a/engine/packages/gasoline/src/error.rs +++ b/engine/packages/gasoline/src/error.rs @@ -28,8 +28,8 @@ pub enum WorkflowError { #[error("workflow not found")] WorkflowNotFound, - #[error("workflow stopped")] - WorkflowStopped, + #[error("workflow evicted")] + WorkflowEvicted, #[error("history diverged: {0}")] HistoryDiverged(String), @@ -180,7 +180,7 @@ pub enum WorkflowError { impl WorkflowError { pub(crate) fn wake_immediate(&self) -> bool { - matches!(self, WorkflowError::WorkflowStopped) + matches!(self, WorkflowError::WorkflowEvicted) } /// Returns the next deadline for a workflow to be woken up again based on the error. @@ -225,7 +225,7 @@ impl WorkflowError { | WorkflowError::NoSignalFoundAndSleep(_, _) | WorkflowError::SubWorkflowIncomplete(_) | WorkflowError::Sleep(_) - | WorkflowError::WorkflowStopped => true, + | WorkflowError::WorkflowEvicted => true, _ => false, } } diff --git a/engine/packages/guard/src/routing/pegboard_gateway.rs b/engine/packages/guard/src/routing/pegboard_gateway.rs index 76e777514e..f8770bf933 100644 --- a/engine/packages/guard/src/routing/pegboard_gateway.rs +++ b/engine/packages/guard/src/routing/pegboard_gateway.rs @@ -101,6 +101,9 @@ pub async fn route_request( let mut ready_sub = ctx .subscribe::(("actor_id", actor_id)) .await?; + let mut stopped_sub = ctx + .subscribe::(("actor_id", actor_id)) + .await?; let mut fail_sub = ctx .subscribe::(("actor_id", actor_id)) .await?; @@ -122,6 +125,8 @@ pub async fn route_request( // Wake actor if sleeping if actor.sleeping { + tracing::debug!(?actor_id, "actor sleeping, waking"); + ctx.signal(pegboard::workflows::actor::Wake {}) .to_workflow_id(actor.workflow_id) .send() @@ -133,20 +138,51 @@ pub async fn route_request( } else { tracing::debug!(?actor_id, "waiting for actor to become ready"); + let mut wake_retries = 0; + // Wait for ready, fail, or destroy - tokio::select! { - res = ready_sub.next() => { res?.runner_id }, - res = fail_sub.next() => { - let msg = res?; - return Err(msg.error.clone().build()); - } - res = destroy_sub.next() => { - res?; - return Err(pegboard::errors::Actor::DestroyedWhileWaitingForReady.build()); - } - // Ready timeout - _ = tokio::time::sleep(ACTOR_READY_TIMEOUT) => { - return Err(errors::ActorReadyTimeout { actor_id }.build()); + loop { + tokio::select! { + res = ready_sub.next() => break res?.runner_id, + res = stopped_sub.next() => { + res?; + + // Attempt to rewake once + if wake_retries < 3 { + tracing::debug!(?actor_id, ?wake_retries, "actor stopped while we were waiting for it to beocme ready, attempting rewake"); + wake_retries += 1; + + let res = ctx.signal(pegboard::workflows::actor::Wake {}) + .to_workflow_id(actor.workflow_id) + .send() + .await; + + if let Some(WorkflowError::WorkflowNotFound) = res + .as_ref() + .err() + .and_then(|x| x.chain().find_map(|x| x.downcast_ref::())) + { + tracing::warn!( + ?actor_id, + "actor workflow not found for rewake" + ); + } else { + res?; + } + } + } + res = fail_sub.next() => { + let msg = res?; + return Err(msg.error.clone().build()); + } + res = destroy_sub.next() => { + res?; + return Err(pegboard::errors::Actor::DestroyedWhileWaitingForReady.build()); + } + // Ready timeout + _ = tokio::time::sleep(ACTOR_READY_TIMEOUT) => { + return Err(errors::ActorReadyTimeout { actor_id }.build()); + } } } }; diff --git a/engine/packages/namespace/src/ops/resolve_for_name_global.rs b/engine/packages/namespace/src/ops/resolve_for_name_global.rs index 296ecffa9e..c78089b6af 100644 --- a/engine/packages/namespace/src/ops/resolve_for_name_global.rs +++ b/engine/packages/namespace/src/ops/resolve_for_name_global.rs @@ -35,6 +35,7 @@ pub async fn namespace_resolve_for_name_global( .get(url) .query(&[("name", &input.name)]) .send() + .custom_instrument(tracing::info_span!("namespaces_http_request")) .await?; let res = rivet_api_util::parse_response::< diff --git a/engine/packages/pegboard-serverless/src/lib.rs b/engine/packages/pegboard-serverless/src/lib.rs index a26b2a9408..fb58597f64 100644 --- a/engine/packages/pegboard-serverless/src/lib.rs +++ b/engine/packages/pegboard-serverless/src/lib.rs @@ -26,7 +26,7 @@ const X_RIVET_ENDPOINT: HeaderName = HeaderName::from_static("x-rivet-endpoint") const X_RIVET_TOKEN: HeaderName = HeaderName::from_static("x-rivet-token"); const X_RIVET_TOTAL_SLOTS: HeaderName = HeaderName::from_static("x-rivet-total-slots"); const X_RIVET_RUNNER_NAME: HeaderName = HeaderName::from_static("x-rivet-runner-name"); -const X_RIVET_NAMESPACE_ID: HeaderName = HeaderName::from_static("x-rivet-namespace-id"); +const X_RIVET_NAMESPACE_NAME: HeaderName = HeaderName::from_static("x-rivet-namespace-name"); const DRAIN_GRACE_PERIOD: Duration = Duration::from_secs(5); @@ -109,7 +109,9 @@ async fn tick( // Process each runner config with error handling for (ns_id, runner_name, desired_slots) in &serverless_data { - let runner_config = runner_configs.iter().find(|rc| rc.namespace_id == *ns_id); + let runner_config = runner_configs + .iter() + .find(|rc| rc.namespace_id == *ns_id && &rc.name == runner_name); let Some(runner_config) = runner_config else { tracing::debug!( @@ -217,13 +219,15 @@ async fn tick_runner_config( let drain_count = curr.len().saturating_sub(desired_count); let start_count = desired_count.saturating_sub(curr.len()); + tracing::debug!(%namespace_name, %runner_name, %desired_count, %drain_count, %start_count, "scaling"); + if drain_count != 0 { // TODO: Implement smart logic of draining runners with the lowest allocated actors let draining_connections = curr.split_off(desired_count); for conn in draining_connections { if conn.shutdown_tx.send(()).is_err() { - tracing::warn!( + tracing::debug!( "serverless connection shutdown channel dropped, likely already stopped" ); } @@ -337,12 +341,21 @@ async fn outbound_handler( HeaderValue::try_from(slots_per_runner)?, ), (X_RIVET_RUNNER_NAME, HeaderValue::try_from(runner_name)?), - (X_RIVET_NAMESPACE_ID, HeaderValue::try_from(namespace_name)?), + ( + X_RIVET_NAMESPACE_NAME, + HeaderValue::try_from(namespace_name.clone())?, + ), + // Deprecated + ( + HeaderName::from_static("x-rivet-namespace-id"), + HeaderValue::try_from(namespace_name)?, + ), ]) .chain(token) .collect(); let endpoint_url = format!("{}/start", url.trim_end_matches('/')); + tracing::debug!(%endpoint_url, "sending outbound req"); let req = client.get(endpoint_url).headers(headers); let mut source = sse::EventSource::new(req).context("failed creating event source")?; diff --git a/engine/packages/pegboard/src/workflows/actor/destroy.rs b/engine/packages/pegboard/src/workflows/actor/destroy.rs index ea55760ba6..a04126f4dd 100644 --- a/engine/packages/pegboard/src/workflows/actor/destroy.rs +++ b/engine/packages/pegboard/src/workflows/actor/destroy.rs @@ -35,7 +35,15 @@ pub(crate) async fn pegboard_actor_destroy(ctx: &mut WorkflowCtx, input: &Input) // Destroy actor if let (Some(runner_workflow_id), true) = (res.runner_workflow_id, &input.kill) { - kill(ctx, input.actor_id, input.generation, runner_workflow_id).await?; + ctx.signal(crate::workflows::runner::Command { + inner: protocol::Command::CommandStopActor(protocol::CommandStopActor { + actor_id: input.actor_id.to_string(), + generation: input.generation, + }), + }) + .to_workflow_id(runner_workflow_id) + .send() + .await?; } // If a slot was allocated at the time of actor destruction then bump the serverless autoscaler so it can scale down @@ -274,22 +282,3 @@ pub(crate) async fn clear_slot( Ok(()) } - -pub(crate) async fn kill( - ctx: &mut WorkflowCtx, - actor_id: Id, - generation: u32, - runner_workflow_id: Id, -) -> Result<()> { - ctx.signal(crate::workflows::runner::Command { - inner: protocol::Command::CommandStopActor(protocol::CommandStopActor { - actor_id: actor_id.to_string(), - generation, - }), - }) - .to_workflow_id(runner_workflow_id) - .send() - .await?; - - Ok(()) -} diff --git a/engine/packages/pegboard/src/workflows/actor/mod.rs b/engine/packages/pegboard/src/workflows/actor/mod.rs index 415fb94f38..cae0baae00 100644 --- a/engine/packages/pegboard/src/workflows/actor/mod.rs +++ b/engine/packages/pegboard/src/workflows/actor/mod.rs @@ -295,13 +295,18 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> return Ok(Loop::Continue); } + let (Some(runner_id), Some(runner_workflow_id)) = (state.runner_id, state.runner_workflow_id) else { + tracing::warn!("actor not allocated, ignoring event"); + return Ok(Loop::Continue); + }; + match sig.inner { protocol::Event::EventActorIntent(protocol::EventActorIntent { intent, .. }) => match intent { protocol::ActorIntent::ActorIntentSleep => { - if let Some(runner_workflow_id) = state.runner_workflow_id { + if !state.sleeping { state.gc_timeout_ts = Some(util::timestamp::now() + ACTOR_STOP_THRESHOLD_MS); state.sleeping = true; @@ -312,37 +317,35 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> .await?; // Send signal to kill actor now that we know it will be sleeping - destroy::kill( - ctx, - input.actor_id, - state.generation, - runner_workflow_id, - ) + ctx.signal(crate::workflows::runner::Command { + inner: protocol::Command::CommandStopActor(protocol::CommandStopActor { + actor_id: input.actor_id.to_string(), + generation: state.generation, + }), + }) + .to_workflow_id(runner_workflow_id) + .send() .await?; - } else { - tracing::warn!("actor not allocated, ignoring sleep intent"); } } protocol::ActorIntent::ActorIntentStop => { - if let Some(runner_workflow_id) = state.runner_workflow_id { - state.gc_timeout_ts = - Some(util::timestamp::now() + ACTOR_STOP_THRESHOLD_MS); + state.gc_timeout_ts = + Some(util::timestamp::now() + ACTOR_STOP_THRESHOLD_MS); - ctx.activity(runtime::SetNotConnectableInput { - actor_id: input.actor_id, - }) - .await?; + ctx.activity(runtime::SetNotConnectableInput { + actor_id: input.actor_id, + }) + .await?; - destroy::kill( - ctx, - input.actor_id, - state.generation, - runner_workflow_id, - ) - .await?; - } else { - tracing::warn!("actor not allocated, ignoring stop intent"); - } + ctx.signal(crate::workflows::runner::Command { + inner: protocol::Command::CommandStopActor(protocol::CommandStopActor { + actor_id: input.actor_id.to_string(), + generation: state.generation, + }), + }) + .to_workflow_id(runner_workflow_id) + .send() + .await?; } }, protocol::Event::EventActorStateUpdate( @@ -351,23 +354,19 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> }, ) => match actor_state { protocol::ActorState::ActorStateRunning => { - if let Some(runner_id) = state.runner_id { - state.gc_timeout_ts = None; - - ctx.activity(runtime::SetStartedInput { - actor_id: input.actor_id, - }) - .await?; - - ctx.msg(Ready { - runner_id, - }) - .tag("actor_id", input.actor_id) - .send() - .await?; - } else { - tracing::warn!("actor not allocated, ignoring running event"); - } + state.gc_timeout_ts = None; + + ctx.activity(runtime::SetStartedInput { + actor_id: input.actor_id, + }) + .await?; + + ctx.msg(Ready { + runner_id, + }) + .tag("actor_id", input.actor_id) + .send() + .await?; } protocol::ActorState::ActorStateStopped( protocol::ActorStateStopped { code, .. }, @@ -541,6 +540,19 @@ async fn handle_stopped( } } + // Kill old actor if lost (just in case it ended up allocating) + if let (true, Some(old_runner_workflow_id)) = (lost, old_runner_workflow_id) { + ctx.signal(crate::workflows::runner::Command { + inner: protocol::Command::CommandStopActor(protocol::CommandStopActor { + actor_id: input.actor_id.to_string(), + generation: state.generation, + }), + }) + .to_workflow_id(old_runner_workflow_id) + .send() + .await?; + } + // Reschedule no matter what if force_reschedule { match runtime::reschedule_actor(ctx, &input, state, true).await? { @@ -566,18 +578,6 @@ async fn handle_stopped( match (input.crash_policy, failed) { (CrashPolicy::Restart, true) => { - // Kill old actor immediately if lost - if lost { - destroy::kill( - ctx, - input.actor_id, - state.generation, - old_runner_workflow_id - .context("should have runner_workflow_id set if not sleeping")?, - ) - .await?; - } - match runtime::reschedule_actor(ctx, &input, state, false).await? { runtime::SpawnActorOutput::Allocated { .. } => {} // NOTE: Its not possible for `SpawnActorOutput::Sleep` to be returned here, the crash @@ -637,6 +637,11 @@ async fn handle_stopped( state.wake_for_alarm = false; state.will_wake = false; + ctx.msg(Stopped {}) + .tag("actor_id", input.actor_id) + .send() + .await?; + Ok(None) } @@ -653,6 +658,9 @@ pub struct Ready { pub runner_id: Id, } +#[message("pegboard_actor_stopped")] +pub struct Stopped {} + #[signal("pegboard_actor_allocate")] #[derive(Debug)] pub struct Allocate { diff --git a/engine/packages/pegboard/src/workflows/runner.rs b/engine/packages/pegboard/src/workflows/runner.rs index 5ad69f42bb..610b9ac370 100644 --- a/engine/packages/pegboard/src/workflows/runner.rs +++ b/engine/packages/pegboard/src/workflows/runner.rs @@ -247,9 +247,7 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> }) .await?; - // Set all remaining actors to lost immediately and send stop commands to the - // runner. We do both so that the actor's reschedule immediately and the runner is - // informed that the actors should be stopped (if it is still connected) + // Set all remaining actors to lost immediately if !actors.is_empty() { for (actor_id, generation) in &actors { ctx.signal(crate::workflows::actor::Lost { @@ -261,39 +259,6 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> .send() .await?; } - - let commands = actors - .into_iter() - .map(|(actor_id, generation)| { - protocol::Command::CommandStopActor( - protocol::CommandStopActor { - actor_id: actor_id.to_string(), - generation, - }, - ) - }) - .collect::>(); - - let index = ctx - .activity(InsertCommandsInput { - commands: commands.clone(), - }) - .await?; - - ctx.activity(SendMessageToRunnerInput { - runner_id: input.runner_id, - message: protocol::ToClient::ToClientCommands( - commands - .into_iter() - .enumerate() - .map(|(i, cmd)| protocol::CommandWrapper { - index: index + i as i64, - inner: cmd, - }) - .collect(), - ), - }) - .await?; } } } diff --git a/engine/sdks/typescript/runner/src/mod.ts b/engine/sdks/typescript/runner/src/mod.ts index ea1c564bf0..c451e2c530 100644 --- a/engine/sdks/typescript/runner/src/mod.ts +++ b/engine/sdks/typescript/runner/src/mod.ts @@ -12,6 +12,7 @@ const PROTOCOL_VERSION: number = 1; /** Warn once the backlog significantly exceeds the server's ack batch size. */ const EVENT_BACKLOG_WARN_THRESHOLD = 10_000; +const SIGNAL_HANDLERS: (() => void)[] = []; export interface ActorInstance { actorId: string; @@ -44,8 +45,8 @@ export interface RunnerConfig { onConnected: () => void; onDisconnected: () => void; onShutdown: () => void; - fetch: (actorId: string, request: Request) => Promise; - websocket?: (actorId: string, ws: any, request: Request) => Promise; + fetch: (runner: Runner, actorId: string, request: Request) => Promise; + websocket?: (runner: Runner, actorId: string, ws: any, request: Request) => Promise; onActorStart: ( actorId: string, generation: number, @@ -107,14 +108,12 @@ export class Runner { #kvCleanupInterval?: NodeJS.Timeout; // Tunnel for HTTP/WebSocket forwarding - #tunnel: Tunnel; + #tunnel: Tunnel | undefined; constructor(config: RunnerConfig) { this.#config = config; if (this.#config.logger) setLogger(this.#config.logger); - this.#tunnel = new Tunnel(this); - // Start cleaning up old unsent KV requests every 15 seconds this.#kvCleanupInterval = setInterval(() => { this.#cleanupOldKvRequests(); @@ -134,11 +133,21 @@ export class Runner { } async stopActor(actorId: string, generation?: number) { + const actor = this.getActor(actorId, generation); + if (!actor) return; + + this.#sendActorIntent(actorId, actor.generation, "stop"); + + // NOTE: We do NOT remove the actor from this.#actors here + // The server will send a StopActor command if it wants to fully stop + } + + async forceStopActor(actorId: string, generation?: number) { const actor = this.#removeActor(actorId, generation); if (!actor) return; // Unregister actor from tunnel - this.#tunnel.unregisterActor(actor); + this.#tunnel?.unregisterActor(actor); // If onActorStop times out, Pegboard will handle this timeout with ACTOR_STOP_THRESHOLD_DURATION_MS try { @@ -152,6 +161,7 @@ export class Runner { this.#config.onActorStop(actorId, actor.generation).catch((err) => { logger()?.error({ msg: "error in onactorstop for actor", + runnerId: this.runnerId, actorId, err, }); @@ -159,25 +169,27 @@ export class Runner { } #stopAllActors() { - logger()?.info( - "stopping all actors due to runner lost threshold exceeded", - ); + logger()?.info({ + msg: "stopping all actors due to runner lost threshold exceeded", + runnerId: this.runnerId, + }); const actorIds = Array.from(this.#actors.keys()); for (const actorId of actorIds) { - this.stopActor(actorId); + this.forceStopActor(actorId); } } getActor(actorId: string, generation?: number): ActorInstance | undefined { const actor = this.#actors.get(actorId); if (!actor) { - logger()?.error({ msg: "actor not found", actorId }); + logger()?.error({ msg: "actor not found", runnerId: this.runnerId, actorId }); return undefined; } if (generation !== undefined && actor.generation !== generation) { logger()?.error({ msg: "actor generation mismatch", + runnerId: this.runnerId, actorId, generation, }); @@ -202,12 +214,13 @@ export class Runner { ): ActorInstance | undefined { const actor = this.#actors.get(actorId); if (!actor) { - logger()?.error({ msg: "actor not found for removal", actorId }); + logger()?.error({ msg: "actor not found for removal", runnerId: this.runnerId, actorId }); return undefined; } if (generation !== undefined && actor.generation !== generation) { logger()?.error({ msg: "actor generation mismatch", + runnerId: this.runnerId, actorId, generation, }); @@ -225,6 +238,7 @@ export class Runner { } catch (err) { logger()?.error({ msg: "error closing websocket for actor", + runnerId: this.runnerId, actorId, err, }); @@ -241,8 +255,9 @@ export class Runner { if (this.#started) throw new Error("Cannot call runner.start twice"); this.#started = true; - logger()?.info("starting runner"); + logger()?.info({ msg: "starting runner" }); + this.#tunnel = new Tunnel(this); this.#tunnel.start(); try { @@ -253,14 +268,41 @@ export class Runner { } if (!this.#config.noAutoShutdown) { - process.on("SIGTERM", this.shutdown.bind(this, false, true)); - process.on("SIGINT", this.shutdown.bind(this, false, true)); + if (!SIGNAL_HANDLERS.length) { + process.on("SIGTERM", () => { + logger()?.debug("received SIGTERM"); + + for (let handler of SIGNAL_HANDLERS) { + handler(); + } + + process.exit(0); + }); + process.on("SIGINT", () => { + logger()?.debug("received SIGINT"); + + for (let handler of SIGNAL_HANDLERS) { + handler(); + } + + process.exit(0); + }); + + logger()?.debug({ + msg: "added SIGTERM listeners", + }); + } + + SIGNAL_HANDLERS.push(() => { + let weak = new WeakRef(this); + weak.deref()?.shutdown(false, false) + }); } } // MARK: Shutdown async shutdown(immediate: boolean, exit: boolean = false) { - logger()?.info({ msg: "starting shutdown...", immediate }); + logger()?.info({ msg: "starting shutdown", runnerId: this.runnerId, immediate, exit }); this.#shutdown = true; // Clear reconnect timeout @@ -315,6 +357,7 @@ export class Runner { try { logger()?.info({ msg: "sending stopping message", + runnerId: this.runnerId, readyState: pegboardWebSocket.readyState, }); @@ -342,6 +385,7 @@ export class Runner { pegboardWebSocket.addEventListener("close", (ev) => { logger()?.info({ msg: "connection closed", + runnerId: this.runnerId, code: ev.code, reason: ev.reason.toString(), }); @@ -351,28 +395,33 @@ export class Runner { // TODO: Wait for all actors to stop before closing ws - logger()?.info("closing WebSocket"); + logger()?.info({ msg: "closing WebSocket", runnerId: this.runnerId }); pegboardWebSocket.close(1000, "Stopping"); await closePromise; - logger()?.info("websocket shutdown completed"); + logger()?.info({ msg: "websocket shutdown completed", runnerId: this.runnerId }); } catch (error) { logger()?.error({ msg: "error during websocket shutdown:", + runnerId: this.runnerId, error, }); pegboardWebSocket.close(); } } } else { - logger()?.warn("no runner WebSocket to shutdown or already closed"); + logger()?.warn({ + msg: "no runner WebSocket to shutdown or already closed", + runnerId: this.runnerId, + readyState: this.#pegboardWebSocket?.readyState, + }); } // Close tunnel if (this.#tunnel) { this.#tunnel.shutdown(); - logger()?.info("tunnel shutdown completed"); + this.#tunnel = undefined; } if (exit) process.exit(0); @@ -400,7 +449,7 @@ export class Runner { this.#pegboardWebSocket = ws; ws.addEventListener("open", () => { - logger()?.info("Connected"); + logger()?.info({ msg: "Connected" }); // Reset reconnect attempt counter on successful connection this.#reconnectAttempt = 0; @@ -457,7 +506,7 @@ export class Runner { }); } else { clearInterval(pingLoop); - logger()?.info("WebSocket not open, stopping ping loop"); + logger()?.info({ msg: "WebSocket not open, stopping ping loop", runnerId: this.runnerId }); } }, pingInterval); this.#pingLoop = pingLoop; @@ -469,7 +518,7 @@ export class Runner { this.#sendCommandAcknowledgment(); } else { clearInterval(ackLoop); - logger()?.info("WebSocket not open, stopping ack loop"); + logger()?.info({ msg: "WebSocket not open, stopping ack loop", runnerId: this.runnerId }); } }, ackInterval); this.#ackInterval = ackLoop; @@ -526,7 +575,7 @@ export class Runner { } else if (message.tag === "ToClientTunnelMessage") { this.#tunnel?.handleTunnelMessage(message.val); } else if (message.tag === "ToClientClose") { - this.#tunnel.shutdown(); + this.#tunnel?.shutdown(); ws.close(1000, "manual closure"); } else { unreachable(message); @@ -534,7 +583,7 @@ export class Runner { }); ws.addEventListener("error", (ev) => { - logger()?.error(`WebSocket error: ${ev.error}`); + logger()?.error({ msg: `WebSocket error: ${ev.error}`, runnerId: this.runnerId }); if (!this.#shutdown) { // Start runner lost timeout if we have a threshold and are not shutting down @@ -545,6 +594,7 @@ export class Runner { ) { logger()?.info({ msg: "starting runner lost timeout", + runnerId: this.runnerId, seconds: this.#runnerLostThreshold / 1000, }); this.#runnerLostTimeout = setTimeout(() => { @@ -560,6 +610,7 @@ export class Runner { ws.addEventListener("close", async (ev) => { logger()?.info({ msg: "connection closed", + runnerId: this.runnerId, code: ev.code, reason: ev.reason.toString(), }); @@ -567,7 +618,7 @@ export class Runner { this.#config.onDisconnected(); if (ev.reason.toString().startsWith("ws.eviction")) { - logger()?.info("runner evicted"); + logger()?.info({ msg: "runner evicted", runnerId: this.runnerId }); await this.shutdown(true); } @@ -593,6 +644,7 @@ export class Runner { ) { logger()?.info({ msg: "starting runner lost timeout", + runnerId: this.runnerId, seconds: this.#runnerLostThreshold / 1000, }); this.#runnerLostTimeout = setTimeout(() => { @@ -609,11 +661,12 @@ export class Runner { #handleCommands(commands: protocol.ToClientCommands) { logger()?.info({ msg: "received commands", + runnerId: this.runnerId, commandCount: commands.length, }); for (const commandWrapper of commands) { - logger()?.info({ msg: "received command", commandWrapper }); + logger()?.info({ msg: "received command", runnerId: this.runnerId, commandWrapper }); if (commandWrapper.inner.tag === "CommandStartActor") { this.#handleCommandStartActor(commandWrapper); } else if (commandWrapper.inner.tag === "CommandStopActor") { @@ -638,6 +691,7 @@ export class Runner { if (prunedCount > 0) { logger()?.info({ msg: "pruned acknowledged events", + runnerId: this.runnerId, lastAckedIdx: lastAckedIdx.toString(), prunedCount, }); @@ -659,6 +713,7 @@ export class Runner { this.#eventBacklogWarned = true; logger()?.warn({ msg: "unacknowledged event backlog exceeds threshold", + runnerId: this.runnerId, backlogSize: this.#eventHistory.length, threshold: EVENT_BACKLOG_WARN_THRESHOLD, }); @@ -699,13 +754,14 @@ export class Runner { .catch((err) => { logger()?.error({ msg: "error in onactorstart for actor", + runnerId: this.runnerId, actorId, err, }); // TODO: Mark as crashed // Send stopped state update if start failed - this.stopActor(actorId, generation); + this.forceStopActor(actorId, generation); }); } @@ -716,7 +772,7 @@ export class Runner { const actorId = stopCommand.actorId; const generation = stopCommand.generation; - this.stopActor(actorId, generation); + this.forceStopActor(actorId, generation); } #sendActorIntent( @@ -725,7 +781,7 @@ export class Runner { intentType: "sleep" | "stop", ) { if (this.#shutdown) { - logger()?.warn("Runner is shut down, cannot send actor intent"); + logger()?.warn({ msg: "Runner is shut down, cannot send actor intent", runnerId: this.runnerId }); return; } let actorIntent: protocol.ActorIntent; @@ -760,6 +816,7 @@ export class Runner { logger()?.info({ msg: "sending event to server", + runnerId: this.runnerId, index: eventWrapper.index, tag: eventWrapper.inner.tag, val: eventWrapper.inner.val, @@ -777,9 +834,10 @@ export class Runner { stateType: "running" | "stopped", ) { if (this.#shutdown) { - logger()?.warn( - "Runner is shut down, cannot send actor state update", - ); + logger()?.warn({ + msg: "Runner is shut down, cannot send actor state update", + runnerId: this.runnerId, + }); return; } let actorState: protocol.ActorState; @@ -817,6 +875,7 @@ export class Runner { logger()?.info({ msg: "sending event to server", + runnerId: this.runnerId, index: eventWrapper.index, tag: eventWrapper.inner.tag, val: eventWrapper.inner.val, @@ -830,9 +889,10 @@ export class Runner { #sendCommandAcknowledgment() { if (this.#shutdown) { - logger()?.warn( - "Runner is shut down, cannot send command acknowledgment", - ); + logger()?.warn({ + msg: "Runner is shut down, cannot send command acknowledgment", + runnerId: this.runnerId, + }); return; } @@ -857,11 +917,7 @@ export class Runner { if (!request) { const msg = "received kv response for unknown request id"; - if (logger()) { - logger()?.error({ msg, requestId }); - } else { - logger()?.error({ msg, requestId }); - } + logger()?.error({ msg, runnerId: this.runnerId, requestId }); return; } @@ -1266,9 +1322,10 @@ export class Runner { __sendToServer(message: protocol.ToServer) { if (this.#shutdown) { - logger()?.warn( - "Runner is shut down, cannot send message to server", - ); + logger()?.warn({ + msg: "Runner is shut down, cannot send message to server", + runnerId: this.runnerId, + }); return; } @@ -1279,9 +1336,10 @@ export class Runner { ) { this.#pegboardWebSocket.send(encoded); } else { - logger()?.error( - "WebSocket not available or not open for sending data", - ); + logger()?.error({ + msg: "WebSocket not available or not open for sending data", + runnerId: this.runnerId, + }); } } @@ -1305,7 +1363,7 @@ export class Runner { #scheduleReconnect() { if (this.#shutdown) { - logger()?.debug("Runner is shut down, not attempting reconnect"); + logger()?.debug({ msg: "Runner is shut down, not attempting reconnect", runnerId: this.runnerId }); return; } @@ -1316,16 +1374,18 @@ export class Runner { jitter: true, }); - logger()?.debug( - `Scheduling reconnect attempt ${this.#reconnectAttempt + 1} in ${delay}ms`, - ); + logger()?.debug({ + msg: `Scheduling reconnect attempt ${this.#reconnectAttempt + 1} in ${delay}ms`, + runnerId: this.runnerId, + }); this.#reconnectTimeout = setTimeout(async () => { if (!this.#shutdown) { this.#reconnectAttempt++; - logger()?.debug( - `Attempting to reconnect (attempt ${this.#reconnectAttempt})...`, - ); + logger()?.debug({ + msg: `Attempting to reconnect (attempt ${this.#reconnectAttempt})...`, + runnerId: this.runnerId, + }); await this.#openPegboardWebSocket(); } }, delay); diff --git a/engine/sdks/typescript/runner/src/tunnel.ts b/engine/sdks/typescript/runner/src/tunnel.ts index b5faf2e189..c41a231680 100644 --- a/engine/sdks/typescript/runner/src/tunnel.ts +++ b/engine/sdks/typescript/runner/src/tunnel.ts @@ -48,8 +48,6 @@ export class Tunnel { this.#gcInterval = undefined; } - // TODO: Should we use unregisterActor instead - // Reject all pending requests for (const [_, request] of this.#actorPendingRequests) { request.reject(new Error("Tunnel shutting down")); @@ -212,7 +210,7 @@ export class Tunnel { return new Response("Actor not found", { status: 404 }); } - const fetchHandler = this.#runner.config.fetch(actorId, request); + const fetchHandler = this.#runner.config.fetch(this.#runner, actorId, request); if (!fetchHandler) { return new Response("Not Implemented", { status: 501 }); @@ -309,8 +307,8 @@ export class Tunnel { existing.actorId = req.actorId; } else { this.#actorPendingRequests.set(requestIdStr, { - resolve: () => {}, - reject: () => {}, + resolve: () => { }, + reject: () => { }, streamController: controller, actorId: req.actorId, }); @@ -477,7 +475,7 @@ export class Tunnel { const dataBuffer = typeof data === "string" ? (new TextEncoder().encode(data) - .buffer as ArrayBuffer) + .buffer as ArrayBuffer) : data; this.#sendMessage(requestId, { @@ -541,7 +539,7 @@ export class Tunnel { }); // Call websocket handler - await websocketHandler(open.actorId, adapter, request); + await websocketHandler(this.#runner, open.actorId, adapter, request); } catch (error) { logger()?.error({ msg: "error handling websocket open", error }); // Send close on error diff --git a/engine/sdks/typescript/test-runner/package.json b/engine/sdks/typescript/test-runner/package.json index 2facb29aed..1ba7437674 100644 --- a/engine/sdks/typescript/test-runner/package.json +++ b/engine/sdks/typescript/test-runner/package.json @@ -11,7 +11,7 @@ "@hono/node-server": "^1.19.1", "@rivetkit/engine-runner": "workspace:*", "@rivetkit/engine-runner-protocol": "workspace:*", - "hono": "^4.0.0", + "hono": "^4.7.0", "pino": "^9.9.5", "ws": "^8.18.3" }, @@ -24,4 +24,4 @@ "typescript": "^5.9.2", "vitest": "^1.6.1" } -} +} \ No newline at end of file diff --git a/engine/sdks/typescript/test-runner/src/index.ts b/engine/sdks/typescript/test-runner/src/index.ts index 0215e03afe..d30868f232 100644 --- a/engine/sdks/typescript/test-runner/src/index.ts +++ b/engine/sdks/typescript/test-runner/src/index.ts @@ -1,9 +1,10 @@ import { serve } from "@hono/node-server"; import type { ActorConfig, RunnerConfig } from "@rivetkit/engine-runner"; import { Runner } from "@rivetkit/engine-runner"; -import { Hono } from "hono"; +import { Hono, type Context as HonoContext, Next } from "hono"; import { streamSSE } from "hono/streaming"; -import pino from "pino"; +import { getLogger } from "./log" +import { type Logger } from "pino"; import type WebSocket from "ws"; const INTERNAL_SERVER_PORT = process.env.INTERNAL_SERVER_PORT @@ -24,16 +25,37 @@ const RIVET_TOKEN = process.env.RIVET_TOKEN ?? "dev"; const AUTOSTART_SERVER = process.env.NO_AUTOSTART_SERVER === undefined; const AUTOSTART_RUNNER = process.env.NO_AUTOSTART_RUNNER === undefined; -const runnerStarted = Promise.withResolvers(); -const runnerStopped = Promise.withResolvers(); -const websocketOpen = Promise.withResolvers(); -const websocketClosed = Promise.withResolvers(); +let runnerStarted = Promise.withResolvers(); +let runnerStopped = Promise.withResolvers(); let runner: Runner | null = null; const actorWebSockets = new Map(); // Create internal server const app = new Hono(); +function loggerMiddleware(logger: Logger) { + return async (c: HonoContext, next: Next) => { + const method = c.req.method; + const path = c.req.path; + const startTime = Date.now(); + + await next(); + + const duration = Date.now() - startTime; + logger.debug({ + msg: "http request", + method, + path, + status: c.res.status, + dt: `${duration}ms`, + reqSize: c.req.header("content-length"), + resSize: c.res.headers.get("content-length"), + userAgent: c.req.header("user-agent"), + }); + }; +} +app.use("*", loggerMiddleware(getLogger())); + app.get("/wait-ready", async (c) => { await runnerStarted.promise; return c.json(runner?.runnerId); @@ -57,31 +79,69 @@ app.get("/shutdown", async (c) => { app.get("/start", async (c) => { return streamSSE(c, async (stream) => { - if (runner == null) runner = await startRunner(); + const [runner, runnerStarted, runnerStopped] = await startRunner(); + + c.req.raw.signal.addEventListener("abort", () => { + getLogger().debug("SSE aborted, shutting down runner"); + runner.shutdown(true); + }); + + await runnerStarted.promise; - stream.writeSSE({ data: runner.runnerId! }); + stream.writeSSE({ data: runner.getServerlessInitPacket()! }); await runnerStopped.promise; }); }); +await autoConfigureServerless(); + if (AUTOSTART_SERVER) { serve({ fetch: app.fetch, port: INTERNAL_SERVER_PORT, }); - console.log( + getLogger().info( `Internal HTTP server listening on port ${INTERNAL_SERVER_PORT}`, ); } -if (AUTOSTART_RUNNER) runner = await startRunner(); +if (AUTOSTART_RUNNER) [runner, runnerStarted, runnerStopped] = await startRunner(); + +async function autoConfigureServerless() { + let res = await fetch(`http://127.0.0.1:6420/runner-configs/${RIVET_RUNNER_NAME}?namespace=${RIVET_NAMESPACE}`, { + method: "PUT", + headers: { + "Authorization": `Bearer ${RIVET_TOKEN}`, + "Content-Type": "application/json", + }, + body: JSON.stringify({ + datacenters: { + default: { + serverless: { + url: `http://localhost:${INTERNAL_SERVER_PORT}`, + max_runners: 10, + slots_per_runner: 1, + request_lifespan: 15, + } + } + } + }), + }); + + if (!res.ok) { + throw new Error(`request failed: ${res.statusText} (${res.status}):\n${await res.text()}`); + } +} + +async function startRunner(): Promise<[Runner, PromiseWithResolvers, PromiseWithResolvers]> { + getLogger().info("Starting runner"); -async function startRunner(): Promise { - console.log("Starting runner"); + const runnerStarted = Promise.withResolvers(); + const runnerStopped = Promise.withResolvers(); const config: RunnerConfig = { - logger: pino(), + logger: getLogger(), version: RIVET_RUNNER_VERSION, endpoint: RIVET_ENDPOINT, token: RIVET_TOKEN, @@ -93,13 +153,13 @@ async function startRunner(): Promise { onConnected: () => { runnerStarted.resolve(undefined); }, - onDisconnected: () => {}, + onDisconnected: () => { }, onShutdown: () => { runnerStopped.resolve(undefined); }, - fetch: async (actorId: string, request: Request) => { - console.log( - `[TEST-RUNNER] Fetch called for actor ${actorId}, URL: ${request.url}`, + fetch: async (runner: Runner, actorId: string, request: Request) => { + getLogger().info( + `Fetch called for actor ${actorId}, URL: ${request.url}`, ); const url = new URL(request.url); if (url.pathname === "/ping") { @@ -109,14 +169,18 @@ async function startRunner(): Promise { status: "ok", timestamp: Date.now(), }; - console.log( - `[TEST-RUNNER] Returning ping response:`, - responseData, - ); + return new Response(JSON.stringify(responseData), { status: 200, headers: { "Content-Type": "application/json" }, }); + } else if (url.pathname === "/sleep") { + runner.sleepActor(actorId); + + return new Response('ok', { + status: 200, + headers: { "Content-Type": "application/json" }, + }); } return new Response("ok", { status: 200 }); @@ -126,35 +190,35 @@ async function startRunner(): Promise { _generation: number, _config: ActorConfig, ) => { - console.log( + getLogger().info( `Actor ${_actorId} started (generation ${_generation})`, ); }, onActorStop: async (_actorId: string, _generation: number) => { - console.log( + getLogger().info( `Actor ${_actorId} stopped (generation ${_generation})`, ); }, - websocket: async (actorId: string, ws: WebSocket, request: Request) => { - console.log(`WebSocket connected for actor ${actorId}`); - websocketOpen.resolve(undefined); + websocket: async (_runner: Runner, actorId: string, ws: WebSocket, request: Request) => { + getLogger().info(`WebSocket connected for actor ${actorId}`); actorWebSockets.set(actorId, ws); // Echo server - send back any messages received ws.addEventListener("message", (event) => { const data = event.data; - console.log(`WebSocket message from actor ${actorId}:`, data); + getLogger().info({ + msg: `WebSocket message from actor ${actorId}`, data + }); ws.send(`Echo: ${data}`); }); ws.addEventListener("close", () => { - console.log(`WebSocket closed for actor ${actorId}`); + getLogger().info(`WebSocket closed for actor ${actorId}`); actorWebSockets.delete(actorId); - websocketClosed.resolve(undefined); }); ws.addEventListener("error", (error) => { - console.error(`WebSocket error for actor ${actorId}:`, error); + getLogger().error({ msg: `WebSocket error for actor ${actorId}:`, error }); }); }, }; @@ -165,12 +229,12 @@ async function startRunner(): Promise { await runner.start(); // Wait for runner to be ready - console.log("Waiting runner start..."); + getLogger().info("Waiting runner start..."); await runnerStarted.promise; - console.log("Runner started"); + getLogger().info("Runner started"); - return runner; + return [runner, runnerStarted, runnerStopped]; } export default app; diff --git a/engine/sdks/typescript/test-runner/src/log.ts b/engine/sdks/typescript/test-runner/src/log.ts new file mode 100644 index 0000000000..4e035c5a39 --- /dev/null +++ b/engine/sdks/typescript/test-runner/src/log.ts @@ -0,0 +1,194 @@ +import { + type Level, + type LevelWithSilent, + type Logger, + pino, + stdTimeFunctions, +} from "pino"; +import { inspect } from "util"; + +export type { Logger } from "pino"; + +let baseLogger: Logger | undefined; +let configuredLogLevel: Level | undefined; + +/** Cache of child loggers by logger name. */ +const loggerCache = new Map(); + +export function getPinoLevel(logLevel?: Level): LevelWithSilent { + // Priority: provided > configured > env > default + if (logLevel) { + return logLevel; + } + + if (configuredLogLevel) { + return configuredLogLevel; + } + + return (process.env["LOG_LEVEL"] || "warn").toString().toLowerCase() as LevelWithSilent; +} + +export function getIncludeTarget(): boolean { + return process.env["LOG_TARGET"] === "1"; +} + +/** + * Configure a custom base logger. + */ +export function configureBaseLogger(logger: Logger): void { + baseLogger = logger; + loggerCache.clear(); +} + +// TODO: This can be simplified in logfmt.ts +function customWrite(level: string, o: any) { + const entries: any = {}; + + // Add timestamp if enabled + if (process.env["LOG_TIMESTAMP"] === "1" && o.time) { + const date = typeof o.time === "number" ? new Date(o.time) : new Date(); + entries.ts = date; + } + + // Add level + entries.level = level.toUpperCase(); + + // Add target if present + if (o.target) { + entries.target = o.target; + } + + // Add message + if (o.msg) { + entries.msg = o.msg; + } + + // Add other properties + for (const [key, value] of Object.entries(o)) { + if ( + key !== "time" && + key !== "level" && + key !== "target" && + key !== "msg" && + key !== "pid" && + key !== "hostname" + ) { + entries[key] = value; + } + } + + const output = inspect(entries, { + compact: true, + breakLength: Infinity, + colors: true, + }); + console.log(output); +} + +/** + * Configure the default logger with optional log level. + */ +export async function configureDefaultLogger( + logLevel?: Level, +): Promise { + // Store the configured log level + if (logLevel) { + configuredLogLevel = logLevel; + } + + baseLogger = pino({ + level: getPinoLevel(logLevel), + messageKey: "msg", + // Do not include pid/hostname in output + base: {}, + // Keep a string level in the output + formatters: { + level(_label: string, number: number) { + return { level: number }; + }, + }, + timestamp: + process.env["LOG_TIMESTAMP"] === "1" + ? stdTimeFunctions.epochTime + : false, + browser: { + write: { + fatal: customWrite.bind(null, "fatal"), + error: customWrite.bind(null, "error"), + warn: customWrite.bind(null, "warn"), + info: customWrite.bind(null, "info"), + debug: customWrite.bind(null, "debug"), + trace: customWrite.bind(null, "trace"), + }, + }, + hooks: { + logMethod(inputArgs, _method, level) { + // TODO: This is a hack to not implement our own transport target. We can get better perf if we have our own transport target. + + const levelMap: Record = { + 10: "trace", + 20: "debug", + 30: "info", + 40: "warn", + 50: "error", + 60: "fatal", + }; + const levelName = levelMap[level] || "info"; + const time = + process.env["LOG_TIMESTAMP"] === "1" + ? Date.now() + : undefined; + // TODO: This can be simplified in logfmt.ts + if (inputArgs.length >= 2) { + const [objOrMsg, msg] = inputArgs; + if (typeof objOrMsg === "object" && objOrMsg !== null) { + customWrite(levelName, { ...objOrMsg, msg, time }); + } else { + customWrite(levelName, { msg: String(objOrMsg), time }); + } + } else if (inputArgs.length === 1) { + const [objOrMsg] = inputArgs; + if (typeof objOrMsg === "object" && objOrMsg !== null) { + customWrite(levelName, { ...objOrMsg, time }); + } else { + customWrite(levelName, { msg: String(objOrMsg), time }); + } + } + }, + }, + }); + + loggerCache.clear(); +} + +/** + * Get or initialize the base logger. + */ +export function getBaseLogger(): Logger { + if (!baseLogger) { + configureDefaultLogger(); + } + return baseLogger!; +} + +/** + * Returns a child logger with `target` bound for the given name. + */ +export function getLogger(name = "default"): Logger { + // Check cache first + const cached = loggerCache.get(name); + if (cached) { + return cached; + } + + // Create + const base = getBaseLogger(); + + // Add target to log if enabled + const child = getIncludeTarget() ? base.child({ target: name }) : base; + + // Cache the logger + loggerCache.set(name, child); + + return child; +} diff --git a/examples/hono/package.json b/examples/hono/package.json index 9cf1894582..d474f094c1 100644 --- a/examples/hono/package.json +++ b/examples/hono/package.json @@ -9,6 +9,7 @@ "client": "tsx scripts/client.ts" }, "devDependencies": { + "@hono/node-server": "^1.19.1", "@types/node": "^22.13.9", "rivetkit": "workspace:*", "tsx": "^3.12.7", diff --git a/frontend/package.json b/frontend/package.json index d43b08d2ec..63607f398f 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -136,4 +136,4 @@ "vite-tsconfig-paths": "^5.1.4", "zod": "^3.25.76" } -} +} \ No newline at end of file diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 1244ccfe9d..144080fb6f 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -165,7 +165,7 @@ importers: specifier: workspace:* version: link:../runner-protocol hono: - specifier: ^4.0.0 + specifier: ^4.7.0 version: 4.9.8 pino: specifier: ^9.9.5 @@ -787,6 +787,9 @@ importers: specifier: ^4.7.0 version: 4.9.8 devDependencies: + '@hono/node-server': + specifier: ^1.19.1 + version: 1.19.1(hono@4.9.8) '@types/node': specifier: ^22.13.9 version: 22.18.1 diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance.ts index 0d4152458b..a226028fd2 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/instance.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance.ts @@ -1842,6 +1842,11 @@ export class ActorInstance { this.#rLog.info({ msg: "actor stopping" }); + if (this.#sleepTimeout) { + clearTimeout(this.#sleepTimeout); + this.#sleepTimeout = undefined; + } + // Abort any listeners waiting for shutdown try { this.#abortController.abort(); diff --git a/rivetkit-typescript/packages/rivetkit/src/client/utils.ts b/rivetkit-typescript/packages/rivetkit/src/client/utils.ts index d9899ff5b5..4de17e909a 100644 --- a/rivetkit-typescript/packages/rivetkit/src/client/utils.ts +++ b/rivetkit-typescript/packages/rivetkit/src/client/utils.ts @@ -118,9 +118,18 @@ export async function sendHttpRequest< const textResponse = new TextDecoder("utf-8", { fatal: false, }).decode(bufferResponse); - throw new HttpRequestError( - `${response.statusText} (${response.status}):\n${textResponse}`, - ); + + const rayId = response.headers.get("x-rivet-ray-id"); + + if (rayId) { + throw new HttpRequestError( + `${response.statusText} (${response.status}) (Ray ID: ${rayId}):\n${textResponse}`, + ); + } else { + throw new HttpRequestError( + `${response.statusText} (${response.status}):\n${textResponse}`, + ); + } } // Decode metadata based on encoding - only binary encodings have CBOR-encoded metadata diff --git a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts index 333a523058..219fa37a0f 100644 --- a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts +++ b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts @@ -422,17 +422,22 @@ export class EngineActorDriver implements ActorDriver { } async serverlessHandleStart(c: HonoContext): Promise { - await this.#runnerStarted.promise; - return streamSSE(c, async (stream) => { - stream.onAbort(() => this.shutdown(true)); + // NOTE: onAbort does not work reliably + stream.onAbort(() => {}); + c.req.raw.signal.addEventListener("abort", () => { + logger().debug("SSE aborted, shutting down runner"); + this.shutdown(true); + }); + + await this.#runnerStarted.promise; // Runner id should be set if the runner started const payload = this.#runner.getServerlessInitPacket(); invariant(payload, "runnerId not set"); await stream.writeSSE({ data: payload }); - return this.#runnerStopped.promise; + await this.#runnerStopped.promise; }); } } diff --git a/rivetkit-typescript/packages/rivetkit/src/manager/router-schema.ts b/rivetkit-typescript/packages/rivetkit/src/manager/router-schema.ts index 4f3613be80..c2b006d3b9 100644 --- a/rivetkit-typescript/packages/rivetkit/src/manager/router-schema.ts +++ b/rivetkit-typescript/packages/rivetkit/src/manager/router-schema.ts @@ -17,6 +17,6 @@ export const ServerlessStartHeadersSchema = z.object({ required_error: "x-rivet-runner-name header is required", }), namespace: z.string({ - required_error: "x-rivet-namespace-id header is required", + required_error: "x-rivet-namespace-name header is required", }), }); diff --git a/rivetkit-typescript/packages/rivetkit/src/manager/router.ts b/rivetkit-typescript/packages/rivetkit/src/manager/router.ts index 5dbc0e209f..3954219b1e 100644 --- a/rivetkit-typescript/packages/rivetkit/src/manager/router.ts +++ b/rivetkit-typescript/packages/rivetkit/src/manager/router.ts @@ -166,7 +166,7 @@ function addServerlessRoutes( token: c.req.header("x-rivet-token") ?? undefined, totalSlots: c.req.header("x-rivet-total-slots"), runnerName: c.req.header("x-rivet-runner-name"), - namespace: c.req.header("x-rivet-namespace-id"), + namespace: c.req.header("x-rivet-namespace-name"), }); if (!parseResult.success) { throw new InvalidRequest( diff --git a/scripts/openapi/gen_rust.ts b/scripts/openapi/gen_rust.ts index 261c8e0043..fb449156bd 100755 --- a/scripts/openapi/gen_rust.ts +++ b/scripts/openapi/gen_rust.ts @@ -11,7 +11,7 @@ async function generateRustSdk() { console.log("Running OpenAPI generator"); // Delete existing directories - await Deno.remove(GEN_PATH_RUST, { recursive: true }).catch(() => {}); + await Deno.remove(GEN_PATH_RUST, { recursive: true }).catch(() => { }); const dockerCmd = new Deno.Command("docker", { args: [ diff --git a/scripts/tests/actor_sleep.ts b/scripts/tests/actor_sleep.ts new file mode 100755 index 0000000000..e39765b47a --- /dev/null +++ b/scripts/tests/actor_sleep.ts @@ -0,0 +1,149 @@ +#!/usr/bin/env tsx + +import { + getOrCreateActor, + RIVET_ENDPOINT, + RIVET_NAMESPACE, + RIVET_TOKEN, +} from "./utils"; + +async function main() { + try { + console.log("Starting actor E2E test..."); + + // Create an actor + console.log("Creating actor..."); + const actorResponse = await getOrCreateActor(RIVET_NAMESPACE, "test-runner", "key"); + console.log("Actor created:", actorResponse.actor); + + for (let i = 0; i < 10; i++) { + console.log("Sleeping actor..."); + const actorSleepResponse = await fetch(`${RIVET_ENDPOINT}/sleep`, { + method: "GET", + headers: { + "X-Rivet-Token": RIVET_TOKEN, + "X-Rivet-Target": "actor", + "X-Rivet-Actor": actorResponse.actor.actor_id, + }, + }); + + if (!actorSleepResponse.ok) { + const sleepResult = await actorSleepResponse.text(); + throw new Error( + `Failed to sleep actor: ${actorSleepResponse.status} ${actorSleepResponse.statusText}\n${sleepResult}`, + ); + } + + // console.log("Waiting..."); + // await new Promise(resolve => setTimeout(resolve, 2000)); + } + + + // Make a request to the actor + console.log("Making request to actor..."); + const actorPingResponse = await fetch(`${RIVET_ENDPOINT}/ping`, { + method: "GET", + headers: { + "X-Rivet-Token": RIVET_TOKEN, + "X-Rivet-Target": "actor", + "X-Rivet-Actor": actorResponse.actor.actor_id, + }, + }); + + const pingResult = await actorPingResponse.text(); + + if (!actorPingResponse.ok) { + throw new Error( + `Failed to ping actor: ${actorPingResponse.status} ${actorPingResponse.statusText}\n${pingResult}`, + ); + } + + console.log("Actor ping response:", pingResult); + + // await testWebSocket(actorResponse.actor.actor_id); + } catch (error) { + console.error(`Actor test failed:`, error); + } +} + +function testWebSocket(actorId: string): Promise { + console.log("Testing WebSocket connection to actor..."); + + return new Promise((resolve, reject) => { + // Parse the RIVET_ENDPOINT to get WebSocket URL + const wsEndpoint = RIVET_ENDPOINT.replace("http://", "ws://").replace( + "https://", + "wss://", + ); + const wsUrl = `${wsEndpoint}/ws`; + + console.log(`Connecting WebSocket to: ${wsUrl}`); + + const protocols = [ + "rivet", + "rivet_target.actor", + `rivet_actor.${actorId}`, + `rivet_token.${RIVET_TOKEN}`, + ]; + const ws = new WebSocket(wsUrl, protocols); + + let pingReceived = false; + let echoReceived = false; + const timeout = setTimeout(() => { + console.log( + "No response received within timeout, but connection was established", + ); + // Connection was established, that's enough for the test + ws.close(); + resolve(); + }, 2000); + + ws.addEventListener("open", () => { + console.log("WebSocket connected"); + + // Test ping-pong + console.log("Sending 'ping' message..."); + ws.send("ping"); + }); + + ws.addEventListener("message", (ev) => { + const message = ev.data.toString(); + console.log(`WebSocket received raw data:`, ev.data); + console.log(`WebSocket received message: "${message}"`); + + if ( + (message === "Echo: ping" || message === "pong") && + !pingReceived + ) { + pingReceived = true; + console.log("Ping test successful!"); + + // Test echo + console.log("Sending 'hello' message..."); + ws.send("hello"); + } else if (message === "Echo: hello" && !echoReceived) { + echoReceived = true; + console.log("Echo test successful!"); + + // All tests passed + clearTimeout(timeout); + ws.close(); + resolve(); + } + }); + + ws.addEventListener("error", (error) => { + clearTimeout(timeout); + reject(new Error(`WebSocket error: ${error.message}`)); + }); + + ws.addEventListener("close", () => { + clearTimeout(timeout); + if (!pingReceived || !echoReceived) { + reject(new Error("WebSocket closed before completing tests")); + } + }); + }); +} + +main(); diff --git a/scripts/tests/spam_actors.ts b/scripts/tests/actor_spam.ts similarity index 100% rename from scripts/tests/spam_actors.ts rename to scripts/tests/actor_spam.ts diff --git a/scripts/tests/utils.ts b/scripts/tests/utils.ts index 8a93194913..aa9f4b8434 100644 --- a/scripts/tests/utils.ts +++ b/scripts/tests/utils.ts @@ -34,6 +34,38 @@ export async function createActor( return response.json(); } +export async function getOrCreateActor( + namespaceName: string, + runnerNameSelector: string, + key?: string, +): Promise { + const response = await fetch( + `${RIVET_ENDPOINT}/actors?namespace=${namespaceName}`, + { + method: "PUT", + headers: { + Authorization: `Bearer ${RIVET_TOKEN}`, + "Content-Type": "application/json", + }, + body: JSON.stringify({ + name: "thingy", + key: key ?? crypto.randomUUID(), + input: btoa("hello"), + runner_name_selector: runnerNameSelector, + crash_policy: "sleep", + }), + }, + ); + + if (!response.ok) { + throw new Error( + `Failed to create actor: ${response.status} ${response.statusText}\n${await response.text()}`, + ); + } + + return response.json(); +} + export async function destroyActor( namespaceName: string, actorId: string,