Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dev-full/prometheus/prometheus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ scrape_configs:
- job_name: rivet-client
static_configs:
- targets:
- rivet-client:6090
- rivet-client:8091
2 changes: 1 addition & 1 deletion docker/dev-full/vector-client/vector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ sources:
prometheus_pegboard:
type: prometheus_scrape
endpoints:
- http://rivet-client:6090
- http://rivet-client:8091
scrape_interval_secs: 15

dynamic_events_http:
Expand Down
2 changes: 1 addition & 1 deletion docker/monolith/build-scripts/setup_s6.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ const services: Service[] = [
rootUser: true,
ports: {
runner: 6080,
metrics: 6090,
metrics: 8091,
},
},

Expand Down
2 changes: 1 addition & 1 deletion docker/monolith/vector-client/vector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ sources:
prometheus_pegboard:
type: prometheus_scrape
endpoints:
- http://rivet-client:6090
- http://rivet-client:8091
scrape_interval_secs: 15

pegboard_manager:
Expand Down
3 changes: 2 additions & 1 deletion packages/common/metrics/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::net::SocketAddr;

use global_error::prelude::*;
use hyper::{
header::CONTENT_TYPE,
service::{make_service_fn, service_fn},
Body, Request, Response, Server,
};
use prometheus::{Encoder, TextEncoder};
use std::net::SocketAddr;

// TODO: Record extra labels

Expand Down
8 changes: 8 additions & 0 deletions packages/common/util/core/src/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ pub enum Id {
V1([u8; 18]),
}



impl Id {
/// Construct V0 from uuid.
pub fn new_v0() -> Self {
Expand Down Expand Up @@ -313,6 +315,12 @@ impl sqlx::postgres::PgHasArrayType for Id {
}
}

impl Default for Id {
fn default() -> Self {
Id::V0(Uuid::new_v4())
}
}

/// Decode a base36 string into a fixed-size byte array.
fn decode_base36<const N: usize>(s: &str) -> Result<[u8; N], IdError> {
let mut data = [0u8; N];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub fn configure(namespace: &str, config: &Config, pool_type: PoolType) -> Globa
}
});

// Add pegboard manager and runner logs
// Add pegboard manager and actor logs
match pool_type {
PoolType::Pegboard | PoolType::PegboardIsolate => {
config_json["sources"]["pegboard_manager"] = json!({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ pub async fn gen_initialize(
"pegboard".into(),
components::vector::PrometheusTarget {
// Should match port from pb manager config
endpoint: "http://127.0.0.1:6090".into(),
endpoint: "http://127.0.0.1:8091".into(),
scrape_interval: 15,
},
);
Expand All @@ -195,7 +195,7 @@ pub async fn gen_initialize(
"pegboard".into(),
components::vector::PrometheusTarget {
// Should match port from pb manager config
endpoint: "http://127.0.0.1:6090".into(),
endpoint: "http://127.0.0.1:8091".into(),
scrape_interval: 15,
},
);
Expand Down
9 changes: 6 additions & 3 deletions packages/edge/infra/client/config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@ license.workspace = true
edition.workspace = true

[dependencies]
anyhow = "1.0"
ipnet = { version = "2.10.1", features = ["serde"] }
pegboard.workspace = true
rivet-util.workspace = true
schemars = { version = "0.8.21", features = ["url", "uuid1"] }
serde = { version = "1.0.195", features = ["derive"] }
serde_json = "1.0"
tokio-util = { version = "0.7", features = ["codec"] }
url = "2.2.2"
uuid = { version = "1.6.1", features = ["v4"] }
pegboard.workspace = true
rivet-util.workspace = true
uuid = { version = "1.6.1", features = ["v4"] }
9 changes: 1 addition & 8 deletions packages/edge/infra/client/config/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,6 @@ pub struct Runner {
/// ````
pub use_resource_constraints: Option<bool>,

/// WebSocket Port for runners on this machine to connect to.
pub port: Option<u16>,

pub container_runner_binary_path: Option<PathBuf>,

/// Custom host entries to append to /etc/hosts in actor containers.
Expand All @@ -113,10 +110,6 @@ impl Runner {
self.use_resource_constraints.unwrap_or(true)
}

pub fn port(&self) -> u16 {
self.port.unwrap_or(6080)
}

pub fn container_runner_binary_path(&self) -> PathBuf {
self.container_runner_binary_path
.clone()
Expand Down Expand Up @@ -267,7 +260,7 @@ pub struct Metrics {

impl Metrics {
pub fn port(&self) -> u16 {
self.port.unwrap_or(6090)
self.port.unwrap_or(8091)
}
}

Expand Down
54 changes: 49 additions & 5 deletions packages/edge/infra/client/config/src/runner_protocol.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,29 @@
use std::io::{Write, Cursor};

use anyhow::*;
use pegboard::protocol;
use serde::{Deserialize, Serialize};
use tokio_util::codec::LengthDelimitedCodec;
use serde::{de::DeserializeOwned, Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "snake_case", deny_unknown_fields)]
pub enum ToManager {
Init {
// See `packages/edge/infra/client/manager/src/claims.rs`
access_token: String,
},
ActorStateUpdate {
actor_id: rivet_util::Id,
generation: u32,
state: ActorState,
},
Ping,
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "snake_case", deny_unknown_fields)]
pub enum ToRunner {
Pong,
Close {
reason: Option<String>,
},

StartActor {
actor_id: rivet_util::Id,
generation: u32,
Expand All @@ -38,3 +44,41 @@ pub enum ActorState {
Running,
Exited { exit_code: Option<i32> },
}

pub fn codec() -> LengthDelimitedCodec {
LengthDelimitedCodec::builder()
.length_field_type::<u32>()
.length_field_length(4)
// No offset
.length_field_offset(0)
// Header length is not included in the length calculation
.length_adjustment(4)
// Skip length, but header is included in the returned bytes
.num_skip(4)
.new_codec()
}

pub fn encode_frame<T: Serialize>(payload: &T) -> Result<Vec<u8>> {
let mut buf = Vec::with_capacity(4);
let mut cursor = Cursor::new(&mut buf);

cursor.write(&[0u8; 4])?; // header (currently unused)

serde_json::to_writer(&mut cursor, payload)?;

cursor.flush()?;

Ok(buf)
}

pub fn decode_frame<T: DeserializeOwned>(frame: &[u8]) -> Result<([u8; 4], T)> {
ensure!(frame.len() >= 4, "Frame too short");

// Extract the header (first 4 bytes)
let header = [frame[0], frame[1], frame[2], frame[3]];

// Deserialize the rest of the frame (payload after the header)
let payload = serde_json::from_slice(&frame[4..])?;

Ok((header, payload))
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub fn run(
.context("empty `runner_path`")?
.to_string_lossy()
.to_string();
let fs_path = actor_path.join("fs").join("upper");
let fs_path = runner_path.join("fs").join("upper");
let oci_bundle_config_json = fs_path.join("config.json");

// Validate OCI bundle
Expand Down
10 changes: 5 additions & 5 deletions packages/edge/infra/client/container-runner/src/log_shipper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ impl LogShipper {
println!("Log shipper connected");

while let Result::Ok(message) = self.msg_rx.recv() {
let vector_message = VectorMessage::Runners {
runner_id: self.runner_id.as_str(),
let vector_message = VectorMessage::Actors {
// runner_id: self.runner_id.as_str(),
actor_id: self.actor_id.as_ref().map(|x| x.as_str()),
env_id: self.env_id,
stream_type: message.stream_type as u8,
Expand All @@ -116,9 +116,9 @@ impl LogShipper {
#[derive(Serialize)]
#[serde(tag = "source")]
enum VectorMessage<'a> {
#[serde(rename = "runners")]
Runners {
runner_id: &'a str,
#[serde(rename = "actors")]
Actors {
// runner_id: &'a str,
actor_id: Option<&'a str>,
env_id: Uuid,
stream_type: u8,
Expand Down
22 changes: 13 additions & 9 deletions packages/edge/infra/client/echo/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,20 @@ RUN useradd -m -d /home/nonroot -s /bin/sh nonroot
RUN grep nonroot /etc/passwd > /passwd && \
grep nonroot /etc/group > /group

# Create an empty image and copy binaries into it to minimize the size of the image
FROM scratch
USER nonroot

# Copy passwd and group files
COPY --from=rust /passwd /etc/passwd
COPY --from=rust /group /etc/group
CMD ["/app/dist/pegboard-echo-server"]

COPY --from=rust /app/dist/ /
# # Create an empty image and copy binaries into it to minimize the size of the image
# FROM scratch

# Switch to the non-root user
USER nonroot
# # Copy passwd and group files
# COPY --from=rust /passwd /etc/passwd
# COPY --from=rust /group /etc/group

# COPY --from=rust /app/dist/ /

# # Switch to the non-root user
# USER nonroot

CMD ["/pegboard-echo-server"]
# CMD ["/pegboard-echo-server"]
35 changes: 19 additions & 16 deletions packages/edge/infra/client/echo/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{env, io::Cursor, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
use std::{env, io::{Write, Cursor}, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};

use anyhow::*;
use bytes::Bytes;
Expand Down Expand Up @@ -69,8 +69,8 @@ async fn run_socket_client(socket_path: PathBuf) -> Result<()> {
.length_field_offset(0)
// Header length is not included in the length calculation
.length_adjustment(4)
// header is included in the returned bytes
.num_skip(0)
// Skip length, but header is included in the returned bytes
.num_skip(4)
.new_codec();

let framed = Framed::new(stream, codec);
Expand All @@ -86,7 +86,7 @@ async fn run_socket_client(socket_path: PathBuf) -> Result<()> {
tokio::time::sleep(PING_INTERVAL).await;

let payload = json!({
"ping": {}
"ping": null
});

if write2
Expand All @@ -103,7 +103,7 @@ async fn run_socket_client(socket_path: PathBuf) -> Result<()> {

// Process incoming messages
while let Some(frame) = read.next().await.transpose()? {
let (_, packet) = decode_frame::<serde_json::Value>(&frame.freeze())?;
let (_, packet) = decode_frame::<serde_json::Value>(&frame.freeze()).context("failed to decode frame")?;
println!("Received packet: {packet:?}");

if let Some(packet) = packet.get("start_actor") {
Expand All @@ -117,7 +117,7 @@ async fn run_socket_client(socket_path: PathBuf) -> Result<()> {
},
});

write.lock().await.send(encode_frame(&payload)?).await?;
write.lock().await.send(encode_frame(&payload).context("failed to encode frame")?).await?;
} else if let Some(packet) = packet.get("signal_actor") {
let payload = json!({
"actor_state_update": {
Expand All @@ -139,6 +139,19 @@ async fn run_socket_client(socket_path: PathBuf) -> Result<()> {
Ok(())
}

fn encode_frame<T: Serialize>(payload: &T) -> Result<Bytes> {
let mut buf = Vec::with_capacity(4);
let mut cursor = Cursor::new(&mut buf);

cursor.write(&[0u8; 4]); // header (currently unused)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The write() method returns a Result that should be checked for errors. Consider updating this line to cursor.write(&[0u8; 4])?; to properly propagate any IO errors that might occur during the write operation.

Suggested change
cursor.write(&[0u8; 4]); // header (currently unused)
cursor.write(&[0u8; 4])?; // header (currently unused)

Spotted by Diamond

Is this helpful? React 👍 or 👎 to let us know.


serde_json::to_writer(&mut cursor, payload)?;

cursor.flush()?;

Ok(buf.into())
}

fn decode_frame<T: DeserializeOwned>(frame: &Bytes) -> Result<([u8; 4], T)> {
ensure!(frame.len() >= 4, "Frame too short");

Expand All @@ -150,13 +163,3 @@ fn decode_frame<T: DeserializeOwned>(frame: &Bytes) -> Result<([u8; 4], T)> {

Ok((header, payload))
}

fn encode_frame<T: Serialize>(payload: &T) -> Result<Bytes> {
let mut buf = Vec::with_capacity(4);
buf.extend_from_slice(&[0u8; 4]); // header (currently unused)

let mut cursor = Cursor::new(&mut buf);
serde_json::to_writer(&mut cursor, payload)?;

Ok(buf.into())
}
3 changes: 0 additions & 3 deletions packages/edge/infra/client/manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@ test = []

[dependencies]
anyhow.workspace = true
base64 = "0.22"
bytes = "1.0"
futures-util = { version = "0.3" }
hyper = { version = "0.14", features = ["server", "http1", "tcp", "stream"] }
indoc = "2.0"
json5.workspace = true
jsonwebtoken = "9.3.1"
lazy_static = "1.4"
nix.workspace = true
notify = { version = "6.1.1", default-features = false, features = ["serde", "fsevent-sys"] }
Expand All @@ -31,7 +29,6 @@ prometheus = "0.13"
rand = "0.8"
rand_chacha = "0.3.1"
reqwest = { version = "0.12", default-features = false, features = ["stream", "rustls-tls", "json"] }
ring = "0.17"
rivet-logs.workspace = true
rivet-util.workspace = true
serde = { version = "1.0.195", features = ["derive"] }
Expand Down
Loading
Loading