Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 163 additions & 97 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dev-full/rivet-edge-server/config.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"cluster_id": "11ca8960-acab-4963-909c-99d72af3e1cb",
"datacenter_id": "f288913c-735d-4188-bf9b-2fcf6eac7b9c",
"server_id": "174aca2a-98b7-462c-9ad9-3835094a9a10",
"intercom_endpoint": "http://rivet-server:6421"
"intercom_address": "http://rivet-server:6421"
},
"guard": {
// TLS not configured for local development
Expand Down
2 changes: 1 addition & 1 deletion docker/dev-full/rivet-guard/config.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"cluster_id": "11ca8960-acab-4963-909c-99d72af3e1cb",
"datacenter_id": "f288913c-735d-4188-bf9b-2fcf6eac7b9c",
"server_id": "174aca2a-98b7-462c-9ad9-3835094a9a10",
"intercom_endpoint": "http://rivet-server:6421"
"intercom_address": "http://rivet-server:6421"
},
"guard": {
// TLS not configured for local development
Expand Down
12 changes: 10 additions & 2 deletions docker/universal/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ RUN apt-get update -y && \
redis-tools \
postgresql-client \
gpg \
dirmngr && \
dirmngr \
gcc \
wget && \
curl -fsSL 'https://packages.clickhouse.com/rpm/lts/repodata/repomd.xml.key' | gpg --dearmor -o /usr/share/keyrings/clickhouse-keyring.gpg && \
echo "deb [signed-by=/usr/share/keyrings/clickhouse-keyring.gpg] https://packages.clickhouse.com/deb stable main" | tee /etc/apt/sources.list.d/clickhouse.list && \
apt-get update -y && \
Expand All @@ -76,7 +78,13 @@ RUN apt-get update -y && \
mv migrate /usr/local/bin/migrate && \
apt-get clean && \
rm -rf /var/lib/apt/lists/* && \
curl -Lf -o /lib/libfdb_c.so "https://github.com/apple/foundationdb/releases/download/7.1.60/libfdb_c.x86_64.so"
curl -Lf -o /lib/libfdb_c.so "https://github.com/apple/foundationdb/releases/download/7.1.60/libfdb_c.x86_64.so" && \
# Install go and usql
wget https://go.dev/dl/go1.24.4.linux-amd64.tar.gz && \
tar -C /usr/local -xzf go1.24.4.linux-amd64.tar.gz && \
export PATH=$PATH:/usr/local/go/bin && \
export PATH="$PATH:$(go env GOPATH)/bin" && \
go install github.com/xo/usql@latest

# MARK: Server (Full)
FROM server-full-base AS server-full
Expand Down
14 changes: 7 additions & 7 deletions examples/system-test-actor/tests/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ async function run() {
guard: {},
},
},
udp: {
protocol: "udp",
// internalPort: 80,
routing: {
host: {},
},
},
// udp: {
// protocol: "udp",
// // internalPort: 80,
// routing: {
// host: {},
// },
// },
},
},
runtime: {
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@
"esbuild": "^0.25.5",
"actor-core": "file:./frontend/packages/actor-core.tgz"
}
}
}
3 changes: 2 additions & 1 deletion packages/common/config/src/config/server/rivet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,8 @@ pub struct Edge {
pub cluster_id: Uuid,
pub datacenter_id: Uuid,
pub server_id: Uuid,
pub intercom_endpoint: Url,
/// Url of the core cluster.
pub intercom_address: Url,
/// This API address will be used if there are no worker servers listed in the cluster package
#[serde(default)]
pub api_lan_address: Option<(String, u16)>,
Expand Down
27 changes: 23 additions & 4 deletions packages/common/util/id/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ pub enum Id {
V1([u8; 18]),
}



impl Id {
/// Construct V0 from uuid.
pub fn new_v0() -> Self {
Expand Down Expand Up @@ -241,20 +239,41 @@ impl TuplePack for Id {
w: &mut W,
tuple_depth: TupleDepth,
) -> std::io::Result<VersionstampOffset> {
let mut size = 1;

w.write_all(&[fdb_util::codes::ID])?;

// IMPORTANT: While the normal bytes representation of a v0 ID doesn't include the version, we write
// it here so that we can unpack without a terminating NIL.
if let Id::V0(_) = self {
w.write_all(&[0])?;
size += 1;
}

let bytes = self.as_bytes();

let len = u32::try_from(bytes.len())
.map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidData, err))?;
size += len;

w.write_all(&bytes)?;

Ok(VersionstampOffset::None { size: 1 + len })
Ok(VersionstampOffset::None { size })
}
}

impl<'de> TupleUnpack<'de> for Id {
fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> {
let input = fdb_util::parse_code(input, fdb_util::codes::ID)?;
let (input, slice) = fdb_util::parse_bytes(input, 16)?;
let (input2, version) = fdb_util::parse_byte(input)?;

let (input, slice) = if version == 0 {
// Parse 16 bytes after version
fdb_util::parse_bytes(input2, 16)?
} else {
// Parse 19 bytes including version
fdb_util::parse_bytes(input, 19)?
};

let v = Id::from_bytes(slice)
.map_err(|err| PackError::Message(format!("bad id format: {err}").into()))?;
Expand Down
7 changes: 7 additions & 0 deletions packages/core/infra/server/src/run_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,13 @@ pub fn config(rivet_config: rivet_config::Config) -> Result<RunConfigData> {
),
db_name: "db_pegboard_runner_log",
},
SqlService {
kind: SqlServiceKind::ClickHouse,
migrations: include_dir!(
"$CARGO_MANIFEST_DIR/../../../edge/services/pegboard/db/runner"
),
db_name: "db_pegboard_runner",
},
SqlService {
kind: SqlServiceKind::CockroachDB,
migrations: include_dir!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub fn configure(config: &rivet_config::Config) -> GlobalResult<String> {
datacenter_id: Uuid::nil(),
server_id: Uuid::nil(),
api_lan_address: None,
intercom_endpoint: Url::parse(&format!("http://127.0.0.1:{TUNNEL_API_EDGE_PORT}"))?,
intercom_address: Url::parse(&format!("http://127.0.0.1:{TUNNEL_API_EDGE_PORT}"))?,
redirect_logs_dir: Some(PathBuf::from("/var/log/rivet-guard")),
}),
status: server_config.rivet.status.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub fn configure(config: &rivet_config::Config) -> GlobalResult<String> {
datacenter_id: Uuid::nil(),
server_id: Uuid::nil(),
api_lan_address: None,
intercom_endpoint: Url::parse(&format!("http://127.0.0.1:{TUNNEL_API_EDGE_PORT}"))?,
intercom_address: Url::parse(&format!("http://127.0.0.1:{TUNNEL_API_EDGE_PORT}"))?,
redirect_logs_dir: Some(PathBuf::from("/var/log/rivet-edge-server")),
}),
..Default::default()
Expand Down
39 changes: 3 additions & 36 deletions packages/edge/infra/client/container-runner/src/log_shipper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ pub struct LogShipper {
pub vector_socket_addr: String,

pub runner_id: String,
pub actor_id: Option<String>,

pub env_id: Uuid,
}
Expand Down Expand Up @@ -93,39 +92,8 @@ impl LogShipper {
println!("Log shipper connected");

while let Result::Ok(message) = self.msg_rx.recv() {
// // If actor id is not provided, extract from logs
// let actor_id = if self.actor_id.is_some() {
// self.actor_id.as_deref()
// } else {
// if let Some(start_idx) = message.message.find("actor_") {
// let start_idx = start_idx + 6;

// // Look for next non alphanum (end of actor id)
// let end_idx = if let Some(end_idx) =
// message.message[start_idx..].find(|c: char| !c.is_ascii_alphanumeric())
// {
// start_idx + end_idx
// } else {
// message.message.len()
// };

// let actor_id = &message.message[start_idx..end_idx];

// // Check if valid id
// rivet_util::Id::parse(actor_id).is_ok().then_some(actor_id)
// } else {
// None
// }
// };

// // Cannot determine actor id, ignore log
// let Some(actor_id) = actor_id else {
// continue;
// };

let vector_message = VectorMessage::Actors {
let vector_message = VectorMessage::Runners {
runner_id: self.runner_id.as_str(),
actor_id: self.actor_id.as_deref(),
env_id: self.env_id,
stream_type: message.stream_type as u8,
ts: message.ts,
Expand All @@ -146,10 +114,9 @@ impl LogShipper {
#[derive(Serialize)]
#[serde(tag = "source")]
enum VectorMessage<'a> {
#[serde(rename = "actors")]
Actors {
#[serde(rename = "runners")]
Runners {
runner_id: &'a str,
actor_id: Option<&'a str>,
env_id: Uuid,
stream_type: u8,
ts: u64,
Expand Down
7 changes: 2 additions & 5 deletions packages/edge/infra/client/container-runner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ mod utils;
const MAX_LINE_BYTES: usize = 1024;
/// Maximum number of bytes to buffer before dropping logs
const MAX_BUFFER_BYTES: usize = 1024 * 1024;
// 7 day logs retention
const LOGS_RETENTION: Duration = Duration::from_secs(7 * 24 * 60 * 60);
// 1 day logs retention
const LOGS_RETENTION: Duration = Duration::from_secs(1 * 24 * 60 * 60);

fn main() -> Result<()> {
let mut args = std::env::args().skip(1);
Expand All @@ -37,8 +37,6 @@ fn main() -> Result<()> {
.transpose()
.context("failed to parse vector socket addr")?;
let runner_id = var("RUNNER_ID")?;
// Only set if this is a single allocation runner (one actor running on it)
let actor_id = var("ACTOR_ID").ok();
let env_id = Uuid::parse_str(&var("ENVIRONMENT_ID")?)?;
println!("Starting runner_id={runner_id} env_id={env_id} vector_socket_addr={} root_user_enabled={root_user_enabled}", vector_socket_addr.as_ref().map(|x| x.as_str()).unwrap_or("?"));

Expand All @@ -53,7 +51,6 @@ fn main() -> Result<()> {
msg_rx,
vector_socket_addr,
runner_id,
actor_id,
env_id,
};
let log_shipper_thread = log_shipper.spawn();
Expand Down
8 changes: 1 addition & 7 deletions packages/edge/infra/client/manager/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,9 @@ impl Actor {
.context("should have runner config")?
{
protocol::ActorRunner::New { .. } => {
let actor_id = matches!(
self.runner.config().image.allocation_type,
protocol::ImageAllocationType::Single
)
.then_some(self.actor_id);

// Because the runner is not already started we can get the ports here instead of reading from
// sqlite
let ports = self.runner.start(ctx, actor_id).await?;
let ports = self.runner.start(ctx).await?;

let pid = self.runner.pid().await?;

Expand Down
2 changes: 1 addition & 1 deletion packages/edge/infra/client/manager/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ async fn run(init: Init, first: bool) -> Result<()> {
let ctx = Ctx::new(init.config, init.system, init.pool, tx);

tokio::try_join!(
async { metrics_task.await.map_err(Into::<anyhow::Error>::into) },
async { metrics_task.await.map_err(Into::<anyhow::Error>::into)? },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ? operator is not compatible with the async block inside tokio::try_join!. This block needs to return a Result type for the outer try_join! to handle, but adding ? would cause early returns within the async block itself. The correct pattern is to use map_err(Into::<anyhow::Error>::into) without the ? operator, allowing try_join! to handle the error propagation.

Suggested change
async { metrics_task.await.map_err(Into::<anyhow::Error>::into)? },
async { metrics_task.await.map_err(Into::<anyhow::Error>::into) },

Spotted by Diamond

Is this helpful? React 👍 or 👎 to let us know.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error handling here is incomplete. When awaiting a JoinHandle, you get a Result<T, JoinError> where T is the task's result (which is already an anyhow::Error). The current ? only handles the JoinError but not the inner error.

Consider either:

async { 
    // Handle both the JoinError and the inner anyhow::Error
    metrics_task.await.map_err(Into::<anyhow::Error>::into)?? 
}

Or a more explicit approach:

async {
    let result = metrics_task.await.map_err(Into::<anyhow::Error>::into)?;
    result
}

This ensures both the task completion error and any error returned by the metrics task itself are properly propagated.

Suggested change
async { metrics_task.await.map_err(Into::<anyhow::Error>::into)? },
async { metrics_task.await.map_err(Into::<anyhow::Error>::into)?? },

Spotted by Diamond

Is this helpful? React 👍 or 👎 to let us know.

ctx.run(rx),
)?;

Expand Down
11 changes: 2 additions & 9 deletions packages/edge/infra/client/manager/src/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,12 +251,9 @@ impl Runner {
Ok(())
}

// `actor_id` is set if this runner has a single allocation type which means there is only one actor
// runner on it
pub async fn start(
self: &Arc<Self>,
ctx: &Arc<Ctx>,
actor_id: Option<rivet_util::Id>,
) -> Result<protocol::HashableMap<String, protocol::ProxiedPort>> {
tracing::info!(runner_id=?self.runner_id, "starting");

Expand Down Expand Up @@ -311,7 +308,7 @@ impl Runner {
let self2 = self.clone();
let ctx2 = ctx.clone();
tokio::spawn(async move {
match self2.run(&ctx2, actor_id).await {
match self2.run(&ctx2).await {
Ok(_) => {
if let Err(err) = self2.observe(&ctx2, false).await {
tracing::error!(runner_id=?self2.runner_id, ?err, "observe failed");
Expand All @@ -331,7 +328,7 @@ impl Runner {
Ok(proxied_ports)
}

async fn run(&self, ctx: &Ctx, actor_id: Option<rivet_util::Id>) -> Result<()> {
async fn run(&self, ctx: &Ctx) -> Result<()> {
// NOTE: This is the env that goes to the container-runner process, NOT the env that is inserted into
// the container.
let mut runner_env = vec![
Expand All @@ -351,10 +348,6 @@ impl Runner {
),
];

if let Some(actor_id) = actor_id {
runner_env.push(("ACTOR_ID", actor_id.to_string()));
}

if let Some(vector) = &ctx.config().vector {
runner_env.push(("VECTOR_SOCKET_ADDR", vector.address.to_string()));
}
Expand Down
8 changes: 4 additions & 4 deletions packages/edge/infra/client/manager/tests/vector.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,27 @@
}
},
"transforms": {
"actors": {
"runners": {
"type": "filter",
"inputs": [
"vector",
"tcp_json"
],
"condition": {
"type": "vrl",
"source": ".source == \"actors\""
"source": ".source == \"runners\""
}
},
"add_prefix": {
"type": "remap",
"inputs": [
"actors"
"runners"
],
"source": ".message, err = \"\u001b[2m\" + \"runner_id=\" + .runner_id + \"\u001b[0m \" + .message"
}
},
"sinks": {
"actor_logs": {
"runner_logs": {
"type": "console",
"inputs": [
"add_prefix"
Expand Down
5 changes: 1 addition & 4 deletions packages/edge/infra/guard/server/src/routing/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ use service_discovery::ServiceDiscovery;
use url::Url;
use uuid::Uuid;

// TODO: Copied from cluster/src/workflows/server/install/install_scripts/components/rivet/mod.rs
const TUNNEL_API_EDGE_PORT: u16 = 5010;

/// Route requests to the API service
#[tracing::instrument(skip_all)]
pub async fn route_api_request(
Expand Down Expand Up @@ -49,7 +46,7 @@ pub async fn route_api_request(
let edge = ctx.config().server()?.rivet.edge()?;
let url = Url::parse(&format!(
"{}provision/datacenters/{dc_id}/servers?pools=worker",
edge.intercom_endpoint
edge.intercom_address
))?;
let sd = ServiceDiscovery::new(url);
let servers = sd.fetch().await?;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE actors
ADD COLUMN runner_id UUID;
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
CREATE TABLE IF NOT EXISTS runner_logs (
namespace LowCardinality(String),
runner_id UUID,
actor_id String,
stream_type UInt8, -- pegboard::types::LogsStreamType
ts DateTime64 (9),
message String
) ENGINE = ReplicatedMergeTree ()
PARTITION BY
toStartOfHour (ts)
ORDER BY (
namespace,
runner_id,
toUnixTimestamp (ts),
stream_type
)
TTL toDate (ts + toIntervalDay (3))
TTL toDate (ts + toIntervalDay (14))
SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1;
Loading
Loading