Skip to content

Conversation

@NathanFlurry
Copy link
Member

Changes

@cloudflare-workers-and-pages
Copy link

cloudflare-workers-and-pages bot commented Jun 3, 2025

Deploying rivet-studio with  Cloudflare Pages  Cloudflare Pages

Latest commit: 526f37e
Status: ✅  Deploy successful!
Preview URL: https://124bf5a3.rivet-studio.pages.dev
Branch Preview URL: https://06-02-chore-vector-add-vecto.rivet-studio.pages.dev

View logs

Copy link
Member Author

NathanFlurry commented Jun 3, 2025

Warning

This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
Learn more


How to use the Graphite Merge Queue

Add the label merge-queue to this PR to add it to the merge queue.

You must have a Graphite account in order to use the merge queue. Sign up using this link.

An organization admin has enabled the Graphite Merge Queue in this repository.

Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue.

This stack of pull requests is managed by Graphite. Learn more about stacking.

@NathanFlurry NathanFlurry force-pushed the 06-02-chore_vector_add_vector_pipeline_to_ship_clickhouse_events_from_the_edge branch from b9cc460 to 8d1f2c3 Compare June 3, 2025 06:34
@cloudflare-workers-and-pages
Copy link

cloudflare-workers-and-pages bot commented Jun 3, 2025

Deploying rivet with  Cloudflare Pages  Cloudflare Pages

Latest commit: 526f37e
Status: ✅  Deploy successful!
Preview URL: https://4f91f438.rivet.pages.dev
Branch Preview URL: https://06-02-chore-vector-add-vecto.rivet.pages.dev

View logs

@cloudflare-workers-and-pages
Copy link

cloudflare-workers-and-pages bot commented Jun 3, 2025

Deploying rivet-hub with  Cloudflare Pages  Cloudflare Pages

Latest commit: 526f37e
Status: ✅  Deploy successful!
Preview URL: https://04f65c11.rivet-hub-7jb.pages.dev
Branch Preview URL: https://06-02-chore-vector-add-vecto.rivet-hub-7jb.pages.dev

View logs

@NathanFlurry NathanFlurry force-pushed the 06-02-chore_vector_add_vector_pipeline_to_ship_clickhouse_events_from_the_edge branch 2 times, most recently from 71023be to 58a5eca Compare June 4, 2025 01:42
@NathanFlurry NathanFlurry requested a review from MasterPtato June 4, 2025 01:47
let clickhouse = crate::db::clickhouse::setup(config.clone())?;
let sqlite = SqlitePoolManager::new(fdb.clone()).await?;

// Create the ClickHouse inserter if ClickHouse is enabled
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Create the ClickHouse inserter if ClickHouse is enabled
// Create the ClickHouse inserter if vector is enabled

mut target: RouteTarget,
request_context: &mut RequestContext,
) -> GlobalResult<Response<Full<Bytes>>> {
let _request_start = Instant::now();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused?

Comment on lines 1665 to 1667
let request_id = Uuid::new_v4();
let mut request_context =
RequestContext::new_with_request_id(request_id, self.state.clickhouse_inserter.clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uses new_with_request_id with a new uuid, same as new

Suggested change
let request_id = Uuid::new_v4();
let mut request_context =
RequestContext::new_with_request_id(request_id, self.state.clickhouse_inserter.clone());
let mut request_context =
RequestContext::new(self.state.clickhouse_inserter.clone());

network_ports_ingress: HashMap<String, ActorClickHouseRowPortIngress>,
network_ports_host: HashMap<String, ActorClickHouseRowPortHost>,
network_ports_proxied: HashMap<String, ActorClickHouseRowPortProxied>,
client_id: Uuid,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add runner_id now or later?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now

if let Some(actor_input) = &input.actor_input {
// Get metadata
let meta = ctx
.activity(GetMetaInput {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this also be .v(2)?


ctx.v(2).activity(InsertClickHouseInput {
input: input.clone(),
meta,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This metadata includes build data which changes if the actor's build is changed. the metadata is re-fetched on reschedule in reschedule_actor as actor_setup.meta

@MasterPtato MasterPtato marked this pull request as ready for review June 5, 2025 02:11
Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR Summary

This PR adds a Vector pipeline to ship ClickHouse events from edge services, with significant changes across the codebase. Here are the key points:

  • Introduces new clickhouse-inserter package for batched event insertion with configurable batch sizes and intervals
  • Adds http_requests table in ClickHouse for guard analytics with 30-day TTL and comprehensive request metadata
  • Creates new actor_logs2 table with 14-day TTL and materialized view actor_logs2_with_metadata for enhanced actor logging
  • Implements request context tracking in guard service with detailed metrics collection and WebSocket support
  • Adds non-interactive route management capabilities to CLI with auto-create and auto-sync options

58 file(s) reviewed, 14 comment(s)
Edit PR Review Bot Settings | Greptile

Comment on lines +14 to +17
dynamic_events_http:
type: http_server
address: 0.0.0.0:5022
encoding: ndjson
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: No transform is defined for dynamic_events_http before sending to sink. Consider adding a transform to ensure consistent metadata tagging like other sources.

Comment on lines +89 to +94
if events.len() >= BATCH_SIZE {
if let Err(e) = self.send_events(&events).await {
tracing::error!(?e, "failed to send events to Vector");
}
events.clear();
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Events are cleared even if send_events fails. This could result in data loss. Consider only clearing after successful send or implementing a retry mechanism.

Suggested change
if events.len() >= BATCH_SIZE {
if let Err(e) = self.send_events(&events).await {
tracing::error!(?e, "failed to send events to Vector");
}
events.clear();
}
if events.len() >= BATCH_SIZE {
if let Err(e) = self.send_events(&events).await {
tracing::error!(?e, "failed to send events to Vector");
} else {
events.clear();
}
}

use global_error::GlobalResult;
use lazy_static::lazy_static;
use std::{net::IpAddr, time::SystemTime};
use tracing::warn;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: tracing::warn is imported but never used in the code

Suggested change
use tracing::warn;
use global_error::GlobalResult;
use lazy_static::lazy_static;
use std::{net::IpAddr, time::SystemTime};
use uuid::Uuid;

Comment on lines +893 to 894
let request_send_start = Instant::now();
match timeout(timeout_duration, self.client.request(proxied_req)).await {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: request_send_start is captured but response_receive_time is calculated and never used. Consider either removing the unused timing or using it for metrics/logging.

ts DateTime64 (9),
stream_type UInt8, -- pegboard::types::LogsStreamType
message String
) ENGINE = ReplicatedMergeTree ()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: ReplicatedMergeTree() missing required parameters for zookeeper path and replica name

Comment on lines +93 to +96
ALTER TABLE state ADD project_id BLOB DEFAULT X'00000000000000000000000000000000'; -- UUID
ALTER TABLE state ADD root_user_enabled INT DEFAULT false;
ALTER TABLE state ADD build_kind INT DEFAULT -1;
ALTER TABLE state ADD build_compression INT DEFAULT -1;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: The default values for build_kind and build_compression are set to -1, which may be invalid enum values. Consider using 0 or NULL if these are meant to represent unset states.

Comment on lines +133 to +164
let state_row = sql_fetch_one!(
[ctx, StateRow, &pool]
"
SELECT
project_id,
env_id,
json(tags) AS tags,
resources_cpu_millicores,
resources_memory_mib,
selected_resources_cpu_millicores,
selected_resources_memory_mib,
client_id,
client_workflow_id,
client_wan_hostname,
lifecycle_kill_timeout_ms,
lifecycle_durable,
create_ts,
start_ts,
connectable_ts,
finish_ts,
destroy_ts,
image_id,
build_kind,
build_compression,
root_user_enabled,
json(args) AS args,
network_mode,
json(environment) AS environment
FROM state
",
)
.await?;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: SQL query lacks a WHERE clause to filter by actor_id, which could return incorrect data if multiple actors exist.

Suggested change
let state_row = sql_fetch_one!(
[ctx, StateRow, &pool]
"
SELECT
project_id,
env_id,
json(tags) AS tags,
resources_cpu_millicores,
resources_memory_mib,
selected_resources_cpu_millicores,
selected_resources_memory_mib,
client_id,
client_workflow_id,
client_wan_hostname,
lifecycle_kill_timeout_ms,
lifecycle_durable,
create_ts,
start_ts,
connectable_ts,
finish_ts,
destroy_ts,
image_id,
build_kind,
build_compression,
root_user_enabled,
json(args) AS args,
network_mode,
json(environment) AS environment
FROM state
",
)
.await?;
let state_row = sql_fetch_one!(
[ctx, StateRow, &pool]
"
SELECT
project_id,
env_id,
json(tags) AS tags,
resources_cpu_millicores,
resources_memory_mib,
selected_resources_cpu_millicores,
selected_resources_memory_mib,
client_id,
client_workflow_id,
client_wan_hostname,
lifecycle_kill_timeout_ms,
lifecycle_durable,
create_ts,
start_ts,
connectable_ts,
finish_ts,
destroy_ts,
image_id,
build_kind,
build_compression,
root_user_enabled,
json(args) AS args,
network_mode,
json(environment) AS environment
FROM state
WHERE actor_id = ?
",
input.actor_id
)
.await?;

Comment on lines +415 to +430
sql_execute!(
[ctx, pool]
"
UPDATE state
SET
project_id = ?,
build_kind = ?,
build_compression = ?,
root_user_enabled = ?
",
input.meta.project_id,
input.meta.build_kind as i64,
input.meta.build_compression as i64,
input.root_user_enabled,
)
.await?;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: SQL update lacks a WHERE clause - could update all state records unintentionally

let result = apis::routes_api::routes_update(
&ctx.openapi_config_cloud,
&self.name,
update_route_body.clone(),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Unnecessary clone() of update_route_body since it's not used after the API call

Suggested change
update_route_body.clone(),
update_route_body,

let matching_route = routes_response
.routes
.iter()
.find(|route| route.id == *route_id)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Route lookup uses ID equality but name field may not be an ID. Consider documenting the expected format or validating input.

@MasterPtato MasterPtato force-pushed the 06-04-increase-actor-log-ttl_ branch from 21b6b12 to ec7ede8 Compare June 6, 2025 18:29
@MasterPtato MasterPtato force-pushed the 06-02-chore_vector_add_vector_pipeline_to_ship_clickhouse_events_from_the_edge branch from a32e202 to 526f37e Compare June 6, 2025 18:29
@graphite-app
Copy link
Contributor

graphite-app bot commented Jun 9, 2025

Merge activity

  • Jun 9, 4:47 PM UTC: MasterPtato added this pull request to the Graphite merge queue.
  • Jun 9, 4:48 PM UTC: CI is running for this pull request on a draft pull request (#2574) due to your merge queue CI optimization settings.
  • Jun 9, 4:49 PM UTC: Merged by the Graphite merge queue via draft PR: #2574.

graphite-app bot pushed a commit that referenced this pull request Jun 9, 2025
… edge (#2526)

<!-- Please make sure there is an issue that this PR is correlated to. -->

## Changes

<!-- If there are frontend changes, please include screenshots. -->
@graphite-app graphite-app bot closed this Jun 9, 2025
@graphite-app graphite-app bot deleted the 06-02-chore_vector_add_vector_pipeline_to_ship_clickhouse_events_from_the_edge branch June 9, 2025 16:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants