Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/dev-full/vector-server/vector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ sinks:
compression: gzip
database: db_pegboard_actor_log
endpoint: http://clickhouse:8123
table: actor_logs2
table: actor_logs3
auth:
strategy: basic
user: vector
Expand Down
2 changes: 1 addition & 1 deletion docker/monolith/vector-server/vector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ sinks:
compression: gzip
endpoint: http://clickhouse:9300
database: db_pegboard_actor_log
table: actor_logs2
table: actor_logs3
auth:
strategy: basic
user: vector
Expand Down
2 changes: 2 additions & 0 deletions packages/core/api/actor/src/route/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ pub async fn get_logs(
// frequently and should not return a significant amount of logs.
let logs_res = ctx
.op(pegboard::ops::actor::log::read::Input {
env_id,
actor_ids: actor_ids_clone.clone(),
stream_types: stream_types_clone.clone(),
count: 64,
Expand Down Expand Up @@ -136,6 +137,7 @@ pub async fn get_logs(
// Read most recent logs

ctx.op(pegboard::ops::actor::log::read::Input {
env_id,
actor_ids: actor_ids.clone(),
stream_types: stream_types.clone(),
count: 256,
Expand Down
2 changes: 1 addition & 1 deletion packages/edge/infra/client/container-runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ rivet-logs.workspace = true
serde = { version = "1.0.195", features = ["derive"] }
serde_json = "1.0.111"
signal-hook = "0.3.17"
uuid = { version = "1.6.1", features = ["v4"] }

[dev-dependencies]
portpicker = "0.1.1"
tempfile = "3.9.0"
uuid = { version = "1.6.1", features = ["v4"] }
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{io::Write, net::TcpStream, sync::mpsc, thread::JoinHandle};
use anyhow::*;
use serde::Serialize;
use serde_json;
use uuid::Uuid;

#[derive(Copy, Clone, Debug)]
#[repr(u8)]
Expand Down Expand Up @@ -37,6 +38,8 @@ pub struct LogShipper {
pub vector_socket_addr: String,

pub actor_id: String,

pub env_id: Uuid,
}

impl LogShipper {
Expand Down Expand Up @@ -91,7 +94,7 @@ impl LogShipper {
while let Result::Ok(message) = self.msg_rx.recv() {
let vector_message = VectorMessage::Actors {
actor_id: self.actor_id.as_str(),
task: "main", // Backwards compatibility with logs
env_id: self.env_id,
stream_type: message.stream_type as u8,
ts: message.ts,
message: message.message.as_str(),
Expand All @@ -114,7 +117,7 @@ enum VectorMessage<'a> {
#[serde(rename = "actors")]
Actors {
actor_id: &'a str,
task: &'a str,
env_id: Uuid,
stream_type: u8,
ts: u64,
message: &'a str,
Expand Down
3 changes: 3 additions & 0 deletions packages/edge/infra/client/container-runner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{fs, path::Path, sync::mpsc, time::Duration};

use anyhow::*;
use utils::var;
use uuid::Uuid;

mod container;
mod log_shipper;
Expand Down Expand Up @@ -36,6 +37,7 @@ fn main() -> Result<()> {
.transpose()
.context("failed to parse vector socket addr")?;
let actor_id = var("ACTOR_ID")?;
let env_id = Uuid::parse_str(&var("ENVIRONMENT_ID")?)?;

let (shutdown_tx, shutdown_rx) = mpsc::sync_channel(1);

Expand All @@ -48,6 +50,7 @@ fn main() -> Result<()> {
msg_rx,
vector_socket_addr,
actor_id,
env_id,
};
let log_shipper_thread = log_shipper.spawn();
(Some(msg_tx), Some(log_shipper_thread))
Expand Down
15 changes: 14 additions & 1 deletion packages/edge/infra/client/manager/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,24 @@ pub struct Actor {
actor_id: Uuid,
generation: u32,
config: protocol::ActorConfig,
metadata: protocol::ActorMetadata,

runner: Mutex<Option<runner::Handle>>,
exited: Mutex<bool>,
}

impl Actor {
pub fn new(actor_id: Uuid, generation: u32, config: protocol::ActorConfig) -> Arc<Self> {
pub fn new(
actor_id: Uuid,
generation: u32,
config: protocol::ActorConfig,
metadata: protocol::ActorMetadata,
) -> Arc<Self> {
Arc::new(Actor {
actor_id,
generation,
config,
metadata,

runner: Mutex::new(None),
exited: Mutex::new(false),
Expand All @@ -50,12 +57,14 @@ impl Actor {
actor_id: Uuid,
generation: u32,
config: protocol::ActorConfig,
metadata: protocol::ActorMetadata,
runner: runner::Handle,
) -> Arc<Self> {
Arc::new(Actor {
actor_id,
generation,
config,
metadata,

runner: Mutex::new(Some(runner)),
exited: Mutex::new(false),
Expand Down Expand Up @@ -209,6 +218,10 @@ impl Actor {
.to_string(),
),
("ACTOR_ID", self.actor_id.to_string()),
(
"ENVIRONMENT_ID",
self.metadata.environment.env_id.to_string(),
),
];
if let Some(vector) = &ctx.config().vector {
runner_env.push(("VECTOR_SOCKET_ADDR", vector.address.to_string()));
Expand Down
10 changes: 7 additions & 3 deletions packages/edge/infra/client/manager/src/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,8 @@ impl Ctx {
generation,
config,
} => {
let metadata = config.metadata.deserialize()?;

let mut actors = self.actors.write().await;

if actors.contains_key(&(actor_id, generation)) {
Expand All @@ -430,7 +432,7 @@ impl Ctx {
"actor with this actor id + generation already exists, ignoring start command",
);
} else {
let actor = Actor::new(actor_id, generation, *config);
let actor = Actor::new(actor_id, generation, *config, metadata);

// Insert actor
actors.insert((actor_id, generation), actor);
Expand Down Expand Up @@ -718,6 +720,7 @@ impl Ctx {

let config = serde_json::from_slice::<protocol::ActorConfig>(&row.config)?;
let generation = row.generation.try_into()?;
let metadata = config.metadata.deserialize()?;

match &isolate_runner {
Some(isolate_runner) if pid == isolate_runner.pid().as_raw() => {}
Expand All @@ -736,7 +739,7 @@ impl Ctx {
}

// Clean up actor. We run `cleanup_setup` instead of `cleanup` because `cleanup` publishes events.
let actor = Actor::new(row.actor_id, generation, config);
let actor = Actor::new(row.actor_id, generation, config, metadata);
actor.cleanup_setup(self).await;
}

Expand Down Expand Up @@ -878,6 +881,7 @@ impl Ctx {

let config = serde_json::from_slice::<protocol::ActorConfig>(&row.config)?;
let generation = row.generation.try_into()?;
let metadata = config.metadata.deserialize()?;

let runner = match &isolate_runner {
// We have to clone the existing isolate runner handle instead of creating a new one so it
Expand All @@ -901,7 +905,7 @@ impl Ctx {
},
};

let actor = Actor::with_runner(row.actor_id, generation, config, runner);
let actor = Actor::with_runner(row.actor_id, generation, config, metadata, runner);
let actor = actors_guard
.entry((row.actor_id, generation))
.or_insert(actor);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP VIEW IF EXISTS actor_logs2_with_metadata;
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
CREATE TABLE IF NOT EXISTS actor_logs3 (
namespace LowCardinality(String),
actor_id String,
env_id UUID,
ts DateTime64 (9),
stream_type UInt8, -- pegboard::types::LogsStreamType
message String
) ENGINE = ReplicatedMergeTree ()
PARTITION BY
toStartOfHour (ts)
ORDER BY (
namespace,
env_id,
actor_id,
toUnixTimestamp (ts),
stream_type
)
TTL toDate (ts + toIntervalDay(14))
SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1;
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
CREATE MATERIALIZED VIEW IF NOT EXISTS actor_logs3_with_metadata
(
namespace LowCardinality(String),
actor_id String,
ts DateTime64(9),
stream_type UInt8, -- pegboard::types::LogsStreamType
message String,
project_id UUID,
env_id UUID,
datacenter_id UUID,
tags Map(String, String),
build_id UUID,
client_id UUID,
durable Bool
)
ENGINE = ReplicatedMergeTree()
PARTITION BY (env_id, toStartOfHour(ts))
ORDER BY (env_id, toUnixTimestamp(ts), actor_id, stream_type)
TTL toDate(ts + toIntervalDay(14))
SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1
AS SELECT
l.namespace,
l.actor_id,
l.ts,
l.stream_type,
l.message,
a.project_id,
a.env_id,
a.datacenter_id,
a.tags,
a.build_id,
a.client_id,
a.durable
FROM actor_logs3 l
LEFT JOIN db_pegboard_analytics.actors a ON l.namespace = a.namespace AND l.env_id = a.env_id AND l.actor_id = a.actor_id;

109 changes: 0 additions & 109 deletions packages/edge/services/pegboard/src/ops/actor/log/export.rs

This file was deleted.

1 change: 0 additions & 1 deletion packages/edge/services/pegboard/src/ops/actor/log/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
pub mod export;
pub mod read;
Loading
Loading