Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 1 addition & 7 deletions engine/packages/api-builder/src/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,7 @@ pub async fn http_logging_middleware(
);

// Update metrics
metrics::API_REQUEST_PENDING.add(
-1,
&[
KeyValue::new("method", method_clone.to_string()),
KeyValue::new("path", path_clone.clone()),
],
);
metrics::API_REQUEST_PENDING.add(-1, &[KeyValue::new("method", method_clone.to_string()), KeyValue::new("path", path_clone.clone())]);

let error_code: String = if status.is_success() {
String::new()
Expand Down
10 changes: 0 additions & 10 deletions engine/packages/engine/src/commands/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ pub enum SubCommand {
pub enum DatabaseType {
#[clap(alias = "ch")]
Clickhouse,
#[clap(alias = "wfd")]
WorkflowData,
#[clap(alias = "wfi")]
WorkflowInternal,
}

impl SubCommand {
Expand All @@ -48,12 +44,6 @@ impl SubCommand {
DatabaseType::Clickhouse => {
crate::util::db::clickhouse_shell(config, shell_ctx).await?
}
DatabaseType::WorkflowData => {
crate::util::db::wf_sqlite_shell(config, shell_ctx, false).await?
}
DatabaseType::WorkflowInternal => {
crate::util::db::wf_sqlite_shell(config, shell_ctx, true).await?
}
}

Ok(())
Expand Down
16 changes: 8 additions & 8 deletions engine/packages/engine/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,23 +115,23 @@ async fn verify_engine_version(
pools
.udb()?
.run(|tx| async move {
let current_version = semver::Version::parse(env!("CARGO_PKG_VERSION"))
.context("failed to parse cargo pkg version as semver")?;
let current_version = semver::Version::parse(env!("CARGO_PKG_VERSION")).context("failed to parse cargo pkg version as semver")?;

if let Some(existing_version) =
tx.read_opt(&keys::EngineVersionKey {}, Serializable).await?
{
if let Some(existing_version) = tx.read_opt(&keys::EngineVersionKey {}, Serializable).await? {
if current_version < existing_version {
return Ok(Err(anyhow!("{}", formatdoc!(
"
return Ok(Err(anyhow!(
"{}",
formatdoc!(
"
Rivet Engine has been rolled back to a previous version:
- Last Used Version: {existing_version}
- Current Version: {current_version}
Cannot proceed without potential data corruption.

(If you know what you're doing, this error can be disabled in the Rivet config via `allow_version_rollback: true`)
"
))));
)
)));
}
}

Expand Down
26 changes: 1 addition & 25 deletions engine/packages/engine/src/util/db.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{path::Path, result::Result::Ok, str::FromStr};
use std::{path::Path, result::Result::Ok};

use anyhow::*;
use rivet_util::Id;
use serde_json::json;

pub struct ShellQuery {
Expand Down Expand Up @@ -74,26 +73,3 @@ pub async fn clickhouse_shell(

Ok(())
}

pub async fn wf_sqlite_shell(
config: rivet_config::Config,
shell_ctx: ShellContext<'_>,
_internal: bool,
) -> Result<()> {
let ShellContext { queries, .. } = shell_ctx;

let _pools = rivet_pools::Pools::new(config.clone()).await?;

// Combine all queries into one command
for ShellQuery {
svc: workflow_id,
query: _query,
} in queries
{
let _workflow_id = Id::from_str(workflow_id).context("could not parse input as Id")?;

todo!();
}

Ok(())
}
6 changes: 3 additions & 3 deletions engine/packages/guard-core/src/websocket_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::*;
use futures_util::{SinkExt, StreamExt, stream::Peekable};
use hyper::upgrade::Upgraded;
use hyper_tungstenite::HyperWebsocket;
use hyper_tungstenite::tungstenite::Message as WsMessage;
use hyper_tungstenite::tungstenite::Message;
use hyper_util::rt::TokioIo;
use std::sync::Arc;
use tokio::sync::Mutex;
Expand All @@ -12,7 +12,7 @@ pub type WebSocketReceiver =
Peekable<futures_util::stream::SplitStream<WebSocketStream<TokioIo<Upgraded>>>>;

pub type WebSocketSender =
futures_util::stream::SplitSink<WebSocketStream<TokioIo<Upgraded>>, WsMessage>;
futures_util::stream::SplitSink<WebSocketStream<TokioIo<Upgraded>>, Message>;

#[derive(Clone)]
pub struct WebSocketHandle {
Expand All @@ -31,7 +31,7 @@ impl WebSocketHandle {
})
}

pub async fn send(&self, message: WsMessage) -> Result<()> {
pub async fn send(&self, message: Message) -> Result<()> {
self.ws_tx.lock().await.send(message).await?;
Ok(())
}
Expand Down
38 changes: 19 additions & 19 deletions engine/packages/guard-core/tests/simple_websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,25 +175,25 @@ async fn start_websocket_server() -> SocketAddr {
match message_result {
Ok(msg) => {
match &msg {
hyper_tungstenite::tungstenite::Message::Text(text) => {
println!("Server: Received text message: {}", text);
},
hyper_tungstenite::tungstenite::Message::Binary(data) => {
println!("Server: Received binary message of {} bytes", data.len());
},
hyper_tungstenite::tungstenite::Message::Ping(_) => {
println!("Server: Received ping");
},
hyper_tungstenite::tungstenite::Message::Pong(_) => {
println!("Server: Received pong");
},
hyper_tungstenite::tungstenite::Message::Close(_) => {
println!("Server: Received close message");
},
_ => {
println!("Server: Received unknown message type");
}
}
hyper_tungstenite::tungstenite::Message::Text(text) => {
println!("Server: Received text message: {}", text);
}
hyper_tungstenite::tungstenite::Message::Binary(data) => {
println!("Server: Received binary message of {} bytes", data.len());
}
hyper_tungstenite::tungstenite::Message::Ping(_) => {
println!("Server: Received ping");
}
hyper_tungstenite::tungstenite::Message::Pong(_) => {
println!("Server: Received pong");
}
hyper_tungstenite::tungstenite::Message::Close(_) => {
println!("Server: Received close message");
}
_ => {
println!("Server: Received unknown message type");
}
}

println!("Server: Echoing message back");
match write.send(msg).await {
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/guard/src/routing/pegboard_gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ async fn route_request_inner(
.to_workflow_id(actor.workflow_id)
.graceful_not_found()
.send()
.await;
.await?;

if res.is_none() {
tracing::warn!(
Expand Down
4 changes: 2 additions & 2 deletions engine/packages/metrics/src/buckets.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
pub const BUCKETS: &[f64] = &[
// For otel
0.0, // Added
0.001, 0.0025,
// Copied from https://docs.rs/prometheus/latest/src/prometheus/histogram.rs.html#25-27
0.001,
0.0025, // Copied from https://docs.rs/prometheus/latest/src/prometheus/histogram.rs.html#25-27
0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, // Added
25.0, 50.0, 100.0, 250.0, 500.0,
];
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/pegboard-gateway/src/keepalive_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub async fn task(
ctx.op(pegboard::ops::actor::hibernating_request::upsert::Input {
actor_id,
gateway_id,
request_id,
request_id
}),
// Keep alive in flight req during hibernation
shared_state.keepalive_hws(request_id),
Expand Down
8 changes: 3 additions & 5 deletions engine/packages/pegboard-gateway/src/shared_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,12 +527,10 @@ impl SharedState {
let reason = 'reason: {
if let Some(hs) = &req.hibernation_state {
if let Some(earliest_pending_ws_msg) = hs.pending_ws_msgs.first() {
if now.duration_since(earliest_pending_ws_msg.send_instant)
> HWS_MESSAGE_ACK_TIMEOUT
{
if now.duration_since(earliest_pending_ws_msg.send_instant) > HWS_MESSAGE_ACK_TIMEOUT {
break 'reason Some(MsgGcReason::WebSocketMessageNotAcked {
first_msg_index: earliest_pending_ws_msg.message_index,
last_msg_index: req.message_index
last_msg_index: req.message_index,
});
}
}
Expand All @@ -543,7 +541,7 @@ impl SharedState {
timeout=%hibernation_timeout.as_secs_f64(),
"checking hibernating state elapsed time"
);
if hs_elapsed> hibernation_timeout {
if hs_elapsed > hibernation_timeout {
break 'reason Some(MsgGcReason::HibernationTimeout);
}
} else if req.msg_tx.is_closed() {
Expand Down
10 changes: 6 additions & 4 deletions engine/packages/pegboard-runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,17 @@ rivet-guard-core.workspace = true
rivet-metrics.workspace = true
rivet-runner-protocol.workspace = true
rivet-runtime.workspace = true
serde.workspace = true
serde_json.workspace = true
rivet-data.workspace = true
serde_bare.workspace = true
tokio.workspace = true
serde_json.workspace = true
serde.workspace = true
tokio-tungstenite.workspace = true
tokio.workspace = true
tracing.workspace = true
universaldb.workspace = true
universalpubsub.workspace = true
url.workspace = true
vbare.workspace = true
universalpubsub.workspace = true

pegboard.workspace = true
pegboard-actor-kv.workspace = true
Expand Down
16 changes: 10 additions & 6 deletions engine/packages/pegboard-runner/src/actor_event_demuxer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,38 @@ const GC_INTERVAL: Duration = Duration::from_secs(30);
const MAX_LAST_SEEN: Duration = Duration::from_secs(30);

struct Channel {
tx: mpsc::UnboundedSender<protocol::mk2::Event>,
tx: mpsc::UnboundedSender<protocol::mk2::EventWrapper>,
handle: JoinHandle<()>,
last_seen: Instant,
}

pub struct ActorEventDemuxer {
ctx: StandaloneCtx,
runner_id: Id,
channels: HashMap<Id, Channel>,
last_gc: Instant,
}

impl ActorEventDemuxer {
pub fn new(ctx: StandaloneCtx) -> Self {
pub fn new(ctx: StandaloneCtx, runner_id: Id) -> Self {
Self {
ctx,
runner_id,
channels: HashMap::new(),
last_gc: Instant::now(),
}
}

/// Process an event by routing it to the appropriate actor's queue
#[tracing::instrument(skip_all)]
pub fn ingest(&mut self, actor_id: Id, event: protocol::mk2::Event) {
pub fn ingest(&mut self, actor_id: Id, event: protocol::mk2::EventWrapper) {
if let Some(channel) = self.channels.get(&actor_id) {
let _ = channel.tx.send(event);
} else {
let (tx, mut rx) = mpsc::unbounded_channel();

let ctx = self.ctx.clone();
let runner_id = self.runner_id;
let handle = tokio::spawn(async move {
loop {
let mut buffer = Vec::new();
Expand All @@ -49,7 +52,7 @@ impl ActorEventDemuxer {
break;
}

if let Err(err) = dispatch_events(&ctx, actor_id, buffer).await {
if let Err(err) = dispatch_events(&ctx, runner_id, actor_id, buffer).await {
tracing::error!(?err, "actor event processor failed");
break;
}
Expand Down Expand Up @@ -106,11 +109,12 @@ impl ActorEventDemuxer {

async fn dispatch_events(
ctx: &StandaloneCtx,
runner_id: Id,
actor_id: Id,
events: Vec<protocol::mk2::Event>,
events: Vec<protocol::mk2::EventWrapper>,
) -> Result<()> {
let res = ctx
.signal(pegboard::workflows::actor::Events { inner: events })
.signal(pegboard::workflows::actor::Events { runner_id, events })
.tag("actor_id", actor_id)
.graceful_not_found()
.send()
Expand Down
Loading
Loading