Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions examples/system-test-actor/rivet.jsonc
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
{
"builds": {
"ws-isolate": {
"script": "src/isolate/main.ts"
},
// "ws-isolate": {
// "script": "src/isolate/main.ts"
// },
"ws-container": {
"dockerfile": "Dockerfile"
"dockerfile": "Dockerfile",
// "unstable": {
// "compression": "none"
// }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@ use std::{
use tracing::Instrument;
use uuid::Uuid;

use super::{
keys,
sqlite::SqlStub,
DatabaseFdbSqliteNats,
};
use super::{keys, sqlite::SqlStub, DatabaseFdbSqliteNats};
use crate::{
db::debug::{
ActivityError, ActivityEvent, DatabaseDebug, Event, EventData, HistoryData, LoopEvent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1277,7 +1277,10 @@ impl Database for DatabaseFdbSqliteNats {
async move {
let pool = &self
.pools
.sqlite(crate::db::sqlite_db_name_internal(partial.workflow_id), false)
.sqlite(
crate::db::sqlite_db_name_internal(partial.workflow_id),
false,
)
.await?;

// Handle error during sqlite init
Expand Down
3 changes: 1 addition & 2 deletions packages/common/pools/src/db/sqlite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -907,8 +907,7 @@ impl SqlitePoolInner {

#[tracing::instrument(name = "sqlite_pool_evict", skip_all)]
pub async fn evict(&self) -> GlobalResult<()> {
self
.manager
self.manager
.evict_with_key(&[self.key_packed.clone()])
.await
}
Expand Down
17 changes: 6 additions & 11 deletions packages/core/api/actor/src/route/builds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,13 +520,7 @@ pub async fn complete_build(
edge_intercom_pegboard_prewarm_image(
&config,
&build_id.to_string(),
models::EdgeIntercomPegboardPrewarmImageRequest {
image_artifact_url_stub: pegboard::util::image_artifact_url_stub(
ctx.config(),
build.upload_id,
&build::utils::file_name(build.kind, build.compression),
)?,
},
json!({}),
)
.await
.map_err(Into::<GlobalError>::into)
Expand All @@ -542,10 +536,11 @@ pub async fn complete_build(
}
}

// Error only if all prewarm requests failed
if !results.is_empty() && results.iter().all(|res| res.is_err()) {
return Err(unwrap!(unwrap!(results.into_iter().next()).err()));
}
// TODO: Disabled until deploy
// // Error only if all prewarm requests failed
// if !results.is_empty() && results.iter().all(|res| res.is_err()) {
// return Err(unwrap!(unwrap!(results.into_iter().next()).err()));
// }
}

Ok(json!({}))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,8 @@ ExecStart=/usr/local/bin/rivet-client -c /etc/rivet-client/config.json
Restart=always
RestartSec=2

# High scheduling priority
Nice=-15
# Real time service
CPUSchedulingPolicy=fifo
# High CPU priority
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ ExecStart=/usr/local/bin/rivet-guard
Restart=always
RestartSec=2

# High scheduling priority
Nice=-15
# Real time service
CPUSchedulingPolicy=fifo
# High CPU priority
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ ExecStart=/usr/local/bin/rivet-edge-server start --skip-provision
Restart=always
RestartSec=2

# High scheduling priority
Nice=-15
# Real time service
CPUSchedulingPolicy=fifo
# High CPU priority
Expand Down
3 changes: 3 additions & 0 deletions packages/edge/api/intercom/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ rivet-env.workspace = true
rivet-health-checks.workspace = true
rivet-operation.workspace = true
rivet-pools.workspace = true
s3-util.workspace = true
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
Expand All @@ -35,4 +36,6 @@ tracing-subscriber = { version = "0.3", default-features = false, features = ["f
url = "2.2.2"
uuid = { version = "1", features = ["v4"] }

build.workspace = true
cluster.workspace = true
pegboard.workspace = true
2 changes: 1 addition & 1 deletion packages/edge/api/intercom/src/route/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ define_router! {
POST: pegboard::prewarm_image(
internal_endpoint: true,
opt_auth: true,
body: models::EdgeIntercomPegboardPrewarmImageRequest,
body: serde_json::Value,
),
},

Expand Down
78 changes: 71 additions & 7 deletions packages/edge/api/intercom/src/route/pegboard.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use api_helper::ctx::Ctx;
use chirp_workflow::prelude::*;
use cluster::types::BuildDeliveryMethod;
use fdb_util::SERIALIZABLE;
use foundationdb::{self as fdb, options::StreamingMode};
use futures_util::TryStreamExt;
use pegboard::protocol;
use rivet_api::models;
use serde_json::json;

Expand All @@ -12,13 +14,10 @@ use crate::auth::Auth;
pub async fn prewarm_image(
ctx: Ctx<Auth>,
image_id: Uuid,
body: models::EdgeIntercomPegboardPrewarmImageRequest,
body: serde_json::Value,
) -> GlobalResult<serde_json::Value> {
ctx.auth().bypass()?;

// TODO: If we replicate the algorithm for choosing the correct ATS node from the pb manager, we can
// remove prewarming from the pb protocol entirely and just prewarm the image here since this api service
// is in the same dc
let client_id = ctx
.fdb()
.await?
Expand Down Expand Up @@ -59,10 +58,35 @@ pub async fn prewarm_image(
return Ok(json!({}));
};

let dc_id = ctx.config().server()?.rivet.edge()?.datacenter_id;
let (dc_res, builds_res) = tokio::try_join!(
ctx.op(cluster::ops::datacenter::get::Input {
datacenter_ids: vec![dc_id],
}),
ctx.op(build::ops::get::Input {
build_ids: vec![image_id],
}),
)?;

let dc = unwrap!(dc_res.datacenters.first());
let build = unwrap!(builds_res.builds.first());

let fallback_artifact_url =
resolve_image_fallback_artifact_url(&ctx, dc.build_delivery_method, &build).await?;

let res = ctx
.signal(pegboard::workflows::client::PrewarmImage {
image_id,
image_artifact_url_stub: body.image_artifact_url_stub,
.signal(pegboard::workflows::client::PrewarmImage2 {
image: protocol::Image {
id: image_id,
artifact_url_stub: pegboard::util::image_artifact_url_stub(
ctx.config(),
build.upload_id,
&build::utils::file_name(build.kind, build.compression),
)?,
fallback_artifact_url,
kind: build.kind.into(),
compression: build.compression.into(),
},
})
.to_workflow::<pegboard::workflows::client::Workflow>()
.tag("client_id", client_id)
Expand Down Expand Up @@ -131,3 +155,43 @@ pub async fn toggle_drain_client(

Ok(json!({}))
}

async fn resolve_image_fallback_artifact_url(
ctx: &Ctx<Auth>,
dc_build_delivery_method: BuildDeliveryMethod,
build: &build::types::Build,
) -> GlobalResult<Option<String>> {
if let BuildDeliveryMethod::S3Direct = dc_build_delivery_method {
tracing::debug!("using s3 direct delivery");

// Build client
let s3_client = s3_util::Client::with_bucket_and_endpoint(
ctx.config(),
"bucket-build",
s3_util::EndpointKind::EdgeInternal,
)
.await?;

let presigned_req = s3_client
.get_object()
.bucket(s3_client.bucket())
.key(format!(
"{upload_id}/{file_name}",
upload_id = build.upload_id,
file_name = build::utils::file_name(build.kind, build.compression),
))
.presigned(
s3_util::aws_sdk_s3::presigning::PresigningConfig::builder()
.expires_in(std::time::Duration::from_secs(15 * 60))
.build()?,
)
.await?;

let addr_str = presigned_req.uri().to_string();
tracing::debug!(addr = %addr_str, "resolved artifact s3 presigned request");

Ok(Some(addr_str))
} else {
Ok(None)
}
}
7 changes: 7 additions & 0 deletions packages/edge/infra/client/config/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ impl Runner {
#[serde(rename_all = "snake_case", deny_unknown_fields)]
pub struct Images {
pub pull_addresses: Option<Addresses>,
/// Bytes. Defaults to 64 GiB.
pub max_cache_size: Option<u64>,
}

impl Images {
Expand All @@ -129,6 +131,11 @@ impl Images {
.map(Cow::Borrowed)
.unwrap_or_else(|| Cow::Owned(Addresses::Static(Vec::new())))
}

pub fn max_cache_size(&self) -> u64 {
// 64 GiB
self.max_cache_size.unwrap_or(1024 * 1024 * 1024 * 64)
}
}

#[derive(Clone, Deserialize, JsonSchema)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub fn run(
.context("empty `actor_path`")?
.to_string_lossy()
.to_string();
let fs_path = actor_path.join("fs");
let fs_path = actor_path.join("fs").join("upper");
let oci_bundle_config_json = fs_path.join("config.json");

// Validate OCI bundle
Expand Down
9 changes: 6 additions & 3 deletions packages/edge/infra/client/echo/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ use tiny_http::{Response, Server, StatusCode};

// TODO: This can't pick up SIGTERM
fn main() {
let port = std::env::var("PORT")
.ok()
.unwrap_or_else(|| "8080".to_string());
println!("Env:");
for (key, value) in std::env::vars() {
println!(" {}: {}", key, value);
}

let port = std::env::var("PORT_MAIN").expect("no PORT_MAIN");
let addr = format!("0.0.0.0:{port}");
let server = Server::http(&addr).unwrap();
println!("Listening on {addr}");
Expand Down
5 changes: 3 additions & 2 deletions packages/edge/infra/client/isolate-v8-runner/src/isolate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ pub async fn run_inner(
tracing::info!(?actor_id, ?generation, "isolate kv initialized");

// Should match the path from `Actor::make_fs` in manager/src/actor/setup.rs
let index = actor_path.join("fs").join("index.js");
let index = actor_path.join("fs").join("upper").join("index.js");

// Load index.js
let index_script_content = match fs::read_to_string(&index).await {
Expand Down Expand Up @@ -560,7 +560,8 @@ mod tests {

let fs_path = actors_path
.join(format!("{actor_id}-{generation}"))
.join("fs");
.join("fs")
.join("upper");
std::fs::create_dir_all(&fs_path)?;

std::fs::copy(
Expand Down
1 change: 1 addition & 0 deletions packages/edge/infra/client/manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ rand = "0.8"
rand_chacha = "0.3.1"
reqwest = { version = "0.12", default-features = false, features = ["stream", "rustls-tls", "json"] }
rivet-logs.workspace = true
scc = "2.3.4"
serde = { version = "1.0.195", features = ["derive"] }
serde_json = "1.0.111"
serde_yaml = "0.9.34"
Expand Down
Loading
Loading