Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
409 changes: 19 additions & 390 deletions Cargo.lock

Large diffs are not rendered by default.

2,082 changes: 2,082 additions & 0 deletions docker/dev-full/grafana/dashboards/pegboard.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions examples/system-test-actor/src/managerClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ function encodeFrame(payload: any): Buffer {
payloadLength.writeUInt32BE(json.length, 0);

const header = Buffer.alloc(4); // All zeros for now

return Buffer.concat([payloadLength, header, Buffer.from(json)]);
}

Expand Down
12 changes: 8 additions & 4 deletions packages/common/fdb-util/src/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,19 @@ pub const ENV: usize = 39;
pub const PORT: usize = 40;
pub const INGRESS: usize = 41;
pub const PROXIED: usize = 42;
pub const CLIENTS_BY_REMAINING_MEM: usize = 43;
pub const CLIENT_BY_REMAINING_MEM: usize = 43;
pub const SQLITE: usize = 44;
pub const INTERNAL: usize = 45;
pub const METADATA: usize = 46;
pub const COMPRESSED_DATA: usize = 47;
pub const RUNNER: usize = 48;
pub const RUNNERS_BY_REMAINING_SLOTS: usize = 49;
pub const RUNNER_BY_REMAINING_SLOTS: usize = 49;
pub const REMAINING_SLOTS: usize = 50;
pub const TOTAL_SLOTS: usize = 51;
pub const IMAGE_ID: usize = 52;
pub const ACTOR2: usize = 53;
pub const PENDING_ACTOR: usize = 54;
pub const PENDING_ACTOR_BY_IMAGE_ID: usize = 55;

// Directories with fdbrs must use string paths instead of tuples
pub mod dir {
Expand Down Expand Up @@ -105,17 +107,19 @@ pub fn key_from_str(key: &str) -> Option<usize> {
"port" => Some(PORT),
"ingress" => Some(INGRESS),
"proxied" => Some(PROXIED),
"clients_by_remaining_mem" => Some(CLIENTS_BY_REMAINING_MEM),
"client_by_remaining_mem" => Some(CLIENT_BY_REMAINING_MEM),
"sqlite" => Some(SQLITE),
"internal" => Some(INTERNAL),
"metadata" => Some(METADATA),
"compressed_data" => Some(COMPRESSED_DATA),
"runner" => Some(RUNNER),
"runners_by_remaining_slots" => Some(RUNNERS_BY_REMAINING_SLOTS),
"runner_by_remaining_slots" => Some(RUNNER_BY_REMAINING_SLOTS),
"remaining_slots" => Some(REMAINING_SLOTS),
"total_slots" => Some(TOTAL_SLOTS),
"image_id" => Some(IMAGE_ID),
"actor2" => Some(ACTOR2),
"pending_actor" => Some(PENDING_ACTOR),
"pending_actor_by_image_id" => Some(PENDING_ACTOR_BY_IMAGE_ID),
_ => None,
}
}
2 changes: 2 additions & 0 deletions packages/common/server-cli/src/util/fdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ impl SimpleTupleValue {
SimpleTupleValue::F64(v)
} else if let Ok(v) = Uuid::from_str(value) {
SimpleTupleValue::Uuid(v)
} else if let Ok(v) = rivet_util::Id::from_str(value) {
SimpleTupleValue::Id(v)
} else {
SimpleTupleValue::String(unescape(value))
}
Expand Down
2 changes: 1 addition & 1 deletion packages/core/services/build/src/ops/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub struct Input {
pub kind: BuildKind,
pub compression: BuildCompression,
pub allocation_type: BuildAllocationType,
pub allocation_total_slots: u64,
pub allocation_total_slots: u32,
pub resources: Option<BuildResources>,
}

Expand Down
2 changes: 1 addition & 1 deletion packages/core/services/build/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub struct Build {
pub kind: BuildKind,
pub compression: BuildCompression,
pub allocation_type: BuildAllocationType,
pub allocation_total_slots: u64,
pub allocation_total_slots: u32,
pub resources: Option<BuildResources>,
pub tags: HashMap<String, String>,
}
Expand Down
150 changes: 93 additions & 57 deletions packages/edge/api/actor/src/route/actors.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::collections::HashMap;

use api_helper::{anchor::WatchIndexQuery, ctx::Ctx};
use chirp_workflow::prelude::*;
use futures_util::{FutureExt, StreamExt, TryStreamExt};
use rivet_api::models;
use rivet_convert::{ApiInto, ApiTryInto};
use rivet_operation::prelude::*;
use serde::Deserialize;
use serde_json::json;
use util::serde::AsHashableExt;
Expand Down Expand Up @@ -152,14 +152,14 @@ pub async fn create(
}
};

let allocated_fut = if network.wait_ready.unwrap_or_default() {
let created_fut = if network.wait_ready.unwrap_or_default() {
std::future::pending().boxed()
} else {
let mut allocated_sub = ctx
.subscribe::<pegboard::workflows::actor::Allocated>(("actor_id", actor_id))
let mut created_sub = ctx
.subscribe::<pegboard::workflows::actor::CreateComplete>(("actor_id", actor_id))
.await?;

async move { allocated_sub.next().await }.boxed()
async move { created_sub.next().await }.boxed()
};
let mut ready_sub = ctx
.subscribe::<pegboard::workflows::actor::Ready>(("actor_id", actor_id))
Expand Down Expand Up @@ -239,9 +239,9 @@ pub async fn create(
.tag("actor_id", actor_id)
.dispatch()
.await?;
// Wait for allocated/ready, fail, or destroy
// Wait for create/ready, fail, or destroy
tokio::select! {
res = allocated_fut => { res?; },
res = created_fut => { res?; },
res = ready_sub.next() => { res?; },
res = fail_sub.next() => {
let msg = res?;
Expand All @@ -258,14 +258,14 @@ pub async fn create(
let actor_id = util::Id::new_v1(ctx.config().server()?.rivet.edge()?.datacenter_label());
tracing::info!(?actor_id, ?tags, "creating actor with tags");

let allocated_fut = if network.wait_ready.unwrap_or_default() {
let created_fut = if network.wait_ready.unwrap_or_default() {
std::future::pending().boxed()
} else {
let mut allocated_sub = ctx
.subscribe::<pegboard::workflows::actor2::Allocated>(("actor_id", actor_id))
let mut created_sub = ctx
.subscribe::<pegboard::workflows::actor2::CreateComplete>(("actor_id", actor_id))
.await?;

async move { allocated_sub.next().await }.boxed()
async move { created_sub.next().await }.boxed()
};
let mut ready_sub = ctx
.subscribe::<pegboard::workflows::actor2::Ready>(("actor_id", actor_id))
Expand Down Expand Up @@ -348,7 +348,7 @@ pub async fn create(

// Wait for create/ready, fail, or destroy
tokio::select! {
res = allocated_fut => { res?; },
res = created_fut => { res?; },
res = ready_sub.next() => { res?; },
res = fail_sub.next() => {
let msg = res?;
Expand Down Expand Up @@ -425,6 +425,9 @@ pub async fn destroy(
);

let mut sub = ctx
.subscribe::<pegboard::workflows::actor2::DestroyStarted>(("actor_id", actor_id))
.await?;
let mut old_sub = ctx
.subscribe::<pegboard::workflows::actor::DestroyStarted>(("actor_id", actor_id))
.await?;

Expand All @@ -436,15 +439,33 @@ pub async fn destroy(
return Ok(json!({}));
}

ctx.signal(pegboard::workflows::actor::Destroy {
override_kill_timeout_ms: query.override_kill_timeout,
})
.to_workflow::<pegboard::workflows::actor::Workflow>()
.tag("actor_id", actor_id)
.send()
.await?;
// Try actor2 first
let res = ctx
.signal(pegboard::workflows::actor2::Destroy {
override_kill_timeout_ms: query.override_kill_timeout,
})
.to_workflow::<pegboard::workflows::actor2::Workflow>()
.tag("actor_id", actor_id)
.send()
.await;

if let Some(WorkflowError::WorkflowNotFound) = res.as_workflow_error() {
// Try old actors
ctx
.signal(pegboard::workflows::actor::Destroy {
override_kill_timeout_ms: query.override_kill_timeout,
})
.to_workflow::<pegboard::workflows::actor::Workflow>()
.tag("actor_id", actor_id)
.send()
.await?;

old_sub.next().await?;
} else {
res?;

sub.next().await?;
sub.next().await?;
}

Ok(json!({}))
}
Expand Down Expand Up @@ -481,21 +502,29 @@ pub async fn upgrade(
)
.await?;

// TODO: Add back once we figure out how to cleanly handle if a wf is already complete when
// upgrading
// let mut sub = ctx
// .subscribe::<pegboard::workflows::actor::UpgradeStarted>(("actor_id", actor_id))
// .await?;

ctx.signal(pegboard::workflows::actor::Upgrade {
image_id: build.build_id,
})
.to_workflow::<pegboard::workflows::actor::Workflow>()
.tag("actor_id", actor_id)
.send()
.await?;

// sub.next().await?;
// Try actor2 first
let res = ctx
.signal(pegboard::workflows::actor2::Upgrade {
image_id: build.build_id,
})
.to_workflow::<pegboard::workflows::actor2::Workflow>()
.tag("actor_id", actor_id)
.send()
.await;

if let Some(WorkflowError::WorkflowNotFound) = res.as_workflow_error() {
// Try old actors
ctx
.signal(pegboard::workflows::actor::Upgrade {
image_id: build.build_id,
})
.to_workflow::<pegboard::workflows::actor::Workflow>()
.tag("actor_id", actor_id)
.send()
.await?;
} else {
res?;
}

Ok(json!({}))
}
Expand Down Expand Up @@ -589,35 +618,42 @@ pub async fn upgrade_all(
// cursor of [created_at, actor_id] that we pass to the fdb range
created_before = list_res.actors.last().map(|x| x.create_ts - 1);

// TODO: Add back once we figure out how to cleanly handle if a wf is already complete when
// upgrading
// let subs = futures_util::stream::iter(list_res.actor_ids.clone())
// .map(|actor_id| {
// ctx.subscribe::<pegboard::workflows::actor::UpgradeStarted>(("actor_id", actor_id))
// })
// .buffer_unordered(32)
// .try_collect::<Vec<_>>()
// .await?;

let ctx = (*ctx).clone();
futures_util::stream::iter(list_res.actors)
.map(|actor| {
ctx.signal(pegboard::workflows::actor::Upgrade {
image_id: build.build_id,
})
.to_workflow::<pegboard::workflows::actor::Workflow>()
.tag("actor_id", actor.actor_id)
.send()
let ctx = ctx.clone();
async move {
// Try actor2 first
let res = ctx
.signal(pegboard::workflows::actor2::Upgrade {
image_id: build.build_id,
})
.to_workflow::<pegboard::workflows::actor2::Workflow>()
.tag("actor_id", actor.actor_id)
.send()
.await;

if let Some(WorkflowError::WorkflowNotFound) = res.as_workflow_error() {
// Try old actors
ctx
.signal(pegboard::workflows::actor::Upgrade {
image_id: build.build_id,
})
.to_workflow::<pegboard::workflows::actor::Workflow>()
.tag("actor_id", actor.actor_id)
.send()
.await?;
} else {
res?;
}

GlobalResult::Ok(())
}
})
.buffer_unordered(32)
.try_collect::<Vec<_>>()
.await?;

// futures_util::stream::iter(subs)
// .map(|mut sub| async move { sub.next().await })
// .buffer_unordered(32)
// .try_collect::<Vec<_>>()
// .await?;

if count < 10_000 {
break;
}
Expand Down
12 changes: 6 additions & 6 deletions packages/edge/infra/client/manager/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@ async fn init() -> Result<Init> {
),
};

// SAFETY: No other task has spawned yet.
// Set client_id env var so it can be read by the prometheus registry
unsafe {
std::env::set_var("CLIENT_ID", config.client.cluster.client_id.to_string());
}

if config.client.logs.redirect_logs() {
rivet_logs::Logs::new(
config.client.data_dir().join("logs"),
Expand All @@ -137,12 +143,6 @@ async fn init() -> Result<Init> {
.await?;
}

// SAFETY: No other task has spawned yet.
// Set client_id env var so it can be read by the prometheus registry
unsafe {
std::env::set_var("CLIENT_ID", config.client.cluster.client_id.to_string());
}

// Read system metrics
let system = crate::system_info::fetch().await?;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE actors
DROP COLUMN runner_id;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE actors
DROP COLUMN pending_allocation_at;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE actors
ADD COLUMN pending_allocation_at DateTime64(9);
Loading
Loading