Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 46 additions & 46 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion out/openapi.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion packages/common/api-builder/src/error_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl IntoResponse for ApiError {
if let Some(rivet_err) = self.0.chain().find_map(|x| x.downcast_ref::<RivetError>()) {
let status = match (rivet_err.group(), rivet_err.code()) {
("api", "not_found") => StatusCode::NOT_FOUND,
("api", "invalid_token") | ("api", "unauthorized") => StatusCode::UNAUTHORIZED,
("api", "unauthorized") => StatusCode::UNAUTHORIZED,
("api", "forbidden") => StatusCode::FORBIDDEN,
_ => StatusCode::BAD_REQUEST,
};
Expand Down
4 changes: 0 additions & 4 deletions packages/common/api-builder/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@ use serde::Serialize;
#[error("api", "not_found", "The requested resource was not found")]
pub struct ApiNotFound;

#[derive(RivetError)]
#[error("api", "invalid_token", "The provided authentication token is invalid")]
pub struct ApiInvalidToken;

#[derive(RivetError)]
#[error("api", "unauthorized", "Authentication required")]
pub struct ApiUnauthorized;
Expand Down
4 changes: 1 addition & 3 deletions packages/common/api-builder/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
pub use crate::{ApiCtx, ApiError, GlobalApiCtx};

// Error types
pub use crate::errors::{
ApiForbidden, ApiInternalError, ApiInvalidToken, ApiNotFound, ApiUnauthorized,
};
pub use crate::errors::{ApiForbidden, ApiInternalError, ApiNotFound, ApiUnauthorized};

// HTTP method handlers
pub use crate::router::ApiRouter;
Expand Down
16 changes: 0 additions & 16 deletions packages/common/api-builder/tests/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@ enum TestErrorWrapper {
WrapError(#[source] anyhow::Error),
}

// Handler that returns invalid token error
async fn handle_invalid_token(_ctx: ApiCtx, _path: (), _query: ()) -> Result<()> {
Err(rivet_api_builder::errors::ApiInvalidToken.build())
}

// Handler that returns unauthorized error
async fn handle_unauthorized(_ctx: ApiCtx, _path: (), _query: ()) -> Result<()> {
Err(rivet_api_builder::errors::ApiUnauthorized.build())
Expand Down Expand Up @@ -56,7 +51,6 @@ async fn test_error_responses() {
// Create router with error routes
let app = create_router("test", config, pools, |router| {
router
.route("/invalid-token", get(handle_invalid_token))
.route("/unauthorized", get(handle_unauthorized))
.route("/forbidden", get(handle_forbidden))
.route("/internal-error", get(handle_internal_error))
Expand All @@ -68,16 +62,6 @@ async fn test_error_responses() {

let server = TestServer::new(app).unwrap();

// Test invalid token error
let res = server.get("/invalid-token").await;
res.assert_status(axum::http::StatusCode::UNAUTHORIZED);
let error_response: ErrorResponse = res.json();
assert_eq!(error_response.group, "api");
assert_eq!(
error_response.message,
"The provided authentication token is invalid"
);

// Test unauthorized error
let res = server.get("/unauthorized").await;
res.assert_status(axum::http::StatusCode::UNAUTHORIZED);
Expand Down
19 changes: 17 additions & 2 deletions packages/core/api-peer/src/actors/delete.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::Result;
use gas::prelude::*;
use rivet_api_builder::ApiCtx;
use rivet_util::Id;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -61,11 +62,25 @@ pub async fn delete(ctx: ApiCtx, path: DeletePath, query: DeleteQuery) -> Result
}
}

ctx.signal(pegboard::workflows::actor::Destroy {})
let res = ctx
.signal(pegboard::workflows::actor::Destroy {})
.to_workflow::<pegboard::workflows::actor::Workflow>()
.tag("actor_id", path.actor_id)
.send()
.await?;
.await;

if let Some(WorkflowError::WorkflowNotFound) = res
.as_ref()
.err()
.and_then(|x| x.chain().find_map(|x| x.downcast_ref::<WorkflowError>()))
{
tracing::warn!(
actor_id=?path.actor_id,
"actor workflow not found, likely already stopped"
);
} else {
res?;
}

Ok(DeleteResponse {})
}
2 changes: 1 addition & 1 deletion packages/core/guard/server/src/routing/pegboard_gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ async fn find_actor(
let runner_id = if let Some(actor) = actor {
actor.runner_id
} else {
tracing::info!(?actor_id, "waiting for actor to become ready");
tracing::debug!(?actor_id, "waiting for actor to become ready");

// Wait for ready, fail, or destroy
tokio::select! {
Expand Down
2 changes: 1 addition & 1 deletion packages/core/pegboard-serverless/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ async fn outbound_handler(
req = req.header(X_RIVET_TOKEN, auth.admin_token.read());
}

let mut source = sse::EventSource::new(req)?;
let mut source = sse::EventSource::new(req).context("failed creating event source")?;
let mut runner_id = None;

let stream_handler = async {
Expand Down
44 changes: 26 additions & 18 deletions packages/services/pegboard/src/workflows/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,15 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
}
}
protocol::ToServer::ToServerEvents(events) => {
let last_event_idx = events.last().map(|event| event.index);

// NOTE: This should not be parallelized because signals should be sent in order
// Forward to actor workflows
for event in events {
for event in &events {
let actor_id =
crate::utils::event_actor_id(&event.inner).to_string();
let res = ctx
.signal(crate::workflows::actor::Event { inner: event.inner })
.signal(crate::workflows::actor::Event {
inner: event.inner.clone(),
})
.to_workflow::<crate::workflows::actor::Workflow>()
.tag("actor_id", &actor_id)
.send()
Expand All @@ -186,20 +186,29 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
}
}

// Ack every 500 events
if let Some(last_event_idx) = last_event_idx {
if last_event_idx > state.last_event_ack_idx.saturating_add(500) {
state.last_event_ack_idx = last_event_idx;
if !events.is_empty() {
ctx.activity(InsertEventsInput {
events: events.clone(),
})
.await?;

ctx.activity(SendMessageToRunnerInput {
runner_id: input.runner_id,
message: protocol::ToClient::ToClientAckEvents(
protocol::ToClientAckEvents {
last_event_idx: state.last_event_ack_idx,
},
),
})
.await?;
// Ack every 500 events
let last_event_idx = events.last().map(|event| event.index);
if let Some(last_event_idx) = last_event_idx {
if last_event_idx > state.last_event_ack_idx.saturating_add(500)
{
state.last_event_ack_idx = last_event_idx;

ctx.activity(SendMessageToRunnerInput {
runner_id: input.runner_id,
message: protocol::ToClient::ToClientAckEvents(
protocol::ToClientAckEvents {
last_event_idx: state.last_event_ack_idx,
},
),
})
.await?;
}
}
}
}
Expand Down Expand Up @@ -822,7 +831,6 @@ async fn ack_commands(ctx: &ActivityCtx, input: &AckCommandsInput) -> Result<()>

#[derive(Debug, Serialize, Deserialize, Hash)]
struct InsertEventsInput {
runner_id: Id,
events: Vec<protocol::EventWrapper>,
}

Expand Down
Loading