Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 46 additions & 46 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions out/errors/guard.service_unavailable.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion out/openapi.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions packages/core/api-peer/src/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,13 @@ pub async fn epoxy_replica_reconfigure(
_query: (),
_body: ReplicaReconfigureRequest,
) -> Result<ReplicaReconfigureResponse> {
ctx.signal(epoxy::workflows::coordinator::ReplicaReconfigure {})
.send()
.await?;
if ctx.config().is_leader() {
ctx.signal(epoxy::workflows::coordinator::ReplicaReconfigure {})
.to_workflow::<epoxy::workflows::coordinator::Workflow>()
.tag("replica", ctx.config().epoxy_replica_id())
.send()
.await?;
}

Ok(ReplicaReconfigureResponse {})
}
7 changes: 6 additions & 1 deletion packages/core/api-public/src/metadata.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
use axum::Json;
use axum::response::IntoResponse;
use rivet_api_builder::{ApiError, extract::Extension};
use serde_json::json;

use crate::ctx::ApiCtx;

/// Returns metadata about the API including runtime and version
#[tracing::instrument(skip_all)]
pub async fn get_metadata() -> impl IntoResponse {
pub async fn get_metadata(Extension(ctx): Extension<ApiCtx>) -> impl IntoResponse {
ctx.skip_auth();

Json(json!({
"runtime": "engine",
"version": env!("CARGO_PKG_VERSION")
Expand Down
4 changes: 4 additions & 0 deletions packages/core/guard/core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ pub struct ConnectionError {
pub remote_addr: String,
}

#[derive(RivetError, Serialize, Deserialize)]
#[error("guard", "service_unavailable", "Service unavailable.")]
pub struct ServiceUnavailable;

#[derive(RivetError, Serialize, Deserialize)]
#[error(
"guard",
Expand Down
13 changes: 8 additions & 5 deletions packages/core/pegboard-gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ use gas::prelude::*;
use http_body_util::{BodyExt, Full};
use hyper::{Request, Response, StatusCode, header::HeaderName};
use rivet_guard_core::{
WebSocketHandle, custom_serve::CustomServeTrait, errors::WebSocketServiceUnavailable,
proxy_service::ResponseBody, request_context::RequestContext,
WebSocketHandle,
custom_serve::CustomServeTrait,
errors::{ServiceUnavailable, WebSocketServiceUnavailable},
proxy_service::ResponseBody,
request_context::RequestContext,
};
use rivet_runner_protocol as protocol;
use rivet_util::serde::HashableMap;
Expand Down Expand Up @@ -122,20 +125,20 @@ impl CustomServeTrait for PegboardGateway {
},
TunnelMessageData::Timeout => {
tracing::warn!("tunnel message timeout");
return Err(WebSocketServiceUnavailable.build());
return Err(ServiceUnavailable.build());
}
}
}

tracing::warn!("received no message response");
Err(WebSocketServiceUnavailable.build())
Err(ServiceUnavailable.build())
};
let response_start = tokio::time::timeout(TUNNEL_ACK_TIMEOUT, fut)
.await
.map_err(|_| {
tracing::warn!("timed out waiting for tunnel ack");

WebSocketServiceUnavailable.build()
ServiceUnavailable.build()
})??;
tracing::debug!("response handler task ended");

Expand Down
2 changes: 1 addition & 1 deletion packages/services/epoxy/src/workflows/replica/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::types;

#[tracing::instrument(skip_all)]
pub async fn setup_replica(ctx: &mut WorkflowCtx, _input: &super::Input) -> Result<()> {
// Wait for cooridinator to send begin learning signal
// Wait for coordinator to send begin learning signal
let begin_learning = ctx.listen::<super::BeginLearning>().await?;

// TODO: Paralellize replicas
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use futures_util::{FutureExt, StreamExt, TryFutureExt, stream::FuturesUnordered}
use gas::prelude::*;
use rivet_api_types::{runner_configs::list as runner_configs_list, runners::list as runners_list};
use rivet_api_util::{Method, request_remote_datacenter};
use rivet_types::runner_configs::{RunnerConfig, RunnerConfigKind};
use rivet_types::runner_configs::RunnerConfigKind;
use serde::de::DeserializeOwned;

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down
5 changes: 2 additions & 3 deletions packages/services/pegboard/src/workflows/actor/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ use gas::prelude::*;
use rivet_metrics::KeyValue;
use rivet_runner_protocol as protocol;
use rivet_types::{
actors::CrashPolicy,
keys::namespace::runner_config::RunnerConfigVariant,
runner_configs::{RunnerConfig, RunnerConfigKind},
actors::CrashPolicy, keys::namespace::runner_config::RunnerConfigVariant,
runner_configs::RunnerConfigKind,
};
use std::time::Instant;
use universaldb::options::{ConflictRangeType, MutationType, StreamingMode};
Expand Down
Loading