Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 9 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

[workspace]
resolver = "2"
members = ["engine/packages/actor-kv","engine/packages/api-builder","engine/packages/api-peer","engine/packages/api-public","engine/packages/api-types","engine/packages/api-util","engine/packages/bootstrap","engine/packages/cache","engine/packages/cache-purge","engine/packages/cache-result","engine/packages/clickhouse-inserter","engine/packages/clickhouse-user-query","engine/packages/config","engine/packages/dump-openapi","engine/packages/engine","engine/packages/env","engine/packages/epoxy","engine/packages/error","engine/packages/error-macros","engine/packages/gasoline","engine/packages/gasoline-macros","engine/packages/guard","engine/packages/guard-core","engine/packages/logs","engine/packages/metrics","engine/packages/namespace","engine/packages/pegboard","engine/packages/pegboard-gateway","engine/packages/pegboard-runner","engine/packages/pools","engine/packages/runtime","engine/packages/service-manager","engine/packages/telemetry","engine/packages/test-deps","engine/packages/test-deps-docker","engine/packages/tracing-reconfigure","engine/packages/types","engine/packages/universaldb","engine/packages/universalpubsub","engine/packages/util","engine/packages/util-id","engine/packages/workflow-worker","engine/sdks/rust/api-full","engine/sdks/rust/data","engine/sdks/rust/epoxy-protocol","engine/sdks/rust/runner-protocol","engine/sdks/rust/ups-protocol"]
members = ["engine/packages/actor-kv","engine/packages/api-builder","engine/packages/api-peer","engine/packages/api-public","engine/packages/api-types","engine/packages/api-util","engine/packages/bootstrap","engine/packages/cache","engine/packages/cache-purge","engine/packages/cache-result","engine/packages/clickhouse-inserter","engine/packages/clickhouse-user-query","engine/packages/config","engine/packages/dump-openapi","engine/packages/engine","engine/packages/env","engine/packages/epoxy","engine/packages/error","engine/packages/error-macros","engine/packages/gasoline","engine/packages/gasoline-macros","engine/packages/guard","engine/packages/guard-core","engine/packages/logs","engine/packages/metrics","engine/packages/namespace","engine/packages/pegboard","engine/packages/pegboard-gateway","engine/packages/pegboard-runner","engine/packages/pools","engine/packages/runtime","engine/packages/serverless-backfill","engine/packages/service-manager","engine/packages/telemetry","engine/packages/test-deps","engine/packages/test-deps-docker","engine/packages/tracing-reconfigure","engine/packages/tracing-utils","engine/packages/types","engine/packages/universaldb","engine/packages/universalpubsub","engine/packages/util","engine/packages/util-id","engine/packages/workflow-worker","engine/sdks/rust/api-full","engine/sdks/rust/data","engine/sdks/rust/epoxy-protocol","engine/sdks/rust/runner-protocol","engine/sdks/rust/ups-protocol"]

[workspace.package]
version = "2.0.25"
Expand Down Expand Up @@ -83,10 +83,13 @@ tracing = "0.1.40"
tracing-core = "0.1"
tracing-opentelemetry = "0.29"
tracing-slog = "0.2"
vergen = { version = "9.0.4", features = ["build", "cargo", "rustc"] }
vergen-gitcl = "1.0.0"
reqwest-eventsource = "0.6.0"

[workspace.dependencies.vergen]
version = "9.0.4"
features = ["build","cargo","rustc"]

[workspace.dependencies.sentry]
version = "0.45.0"
default-features = false
Expand Down Expand Up @@ -148,7 +151,7 @@ features = ["now"]

[workspace.dependencies.clap]
version = "4.3"
features = ["derive", "cargo"]
features = ["derive","cargo"]

[workspace.dependencies.rivet-term]
git = "https://github.com/rivet-dev/rivet-term"
Expand Down Expand Up @@ -357,6 +360,9 @@ path = "engine/packages/pools"
[workspace.dependencies.rivet-runtime]
path = "engine/packages/runtime"

[workspace.dependencies.rivet-serverless-backfill]
path = "engine/packages/serverless-backfill"

[workspace.dependencies.rivet-service-manager]
path = "engine/packages/service-manager"

Expand Down Expand Up @@ -425,8 +431,3 @@ debug = false
lto = "fat"
codegen-units = 1
opt-level = 3

# strip = true
# panic = "abort"
# overflow-checks = false
# debug-assertions = false
2 changes: 1 addition & 1 deletion engine/packages/api-builder/src/global_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl GlobalApiCtx {
name: &'static str,
) -> Result<Self> {
let cache = rivet_cache::CacheInner::from_env(&config, pools.clone())?;
let db = gas::prelude::db::DatabaseKv::from_pools(pools.clone()).await?;
let db = gas::prelude::db::DatabaseKv::new(config.clone(), pools.clone()).await?;

Ok(Self {
db,
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/bootstrap/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use gas::prelude::*;
pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> Result<()> {
let cache = rivet_cache::CacheInner::from_env(&config, pools.clone())?;
let ctx = StandaloneCtx::new(
db::DatabaseKv::from_pools(pools.clone()).await?,
db::DatabaseKv::new(config.clone(), pools.clone()).await?,
config.clone(),
pools,
cache,
Expand Down
1 change: 1 addition & 0 deletions engine/packages/config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ serde_json.workspace = true
serde.workspace = true
url.workspace = true
uuid.workspace = true
tracing.workspace = true
3 changes: 3 additions & 0 deletions engine/packages/config/src/config/pegboard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ pub struct Pegboard {
///
/// **Experimental**
pub serverless_backoff_max_exponent: Option<usize>,

/// Global pool desired max.
pub pool_desired_max_override: Option<u32>,
}

impl Pegboard {
Expand Down
3 changes: 3 additions & 0 deletions engine/packages/config/src/config/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize, Default, JsonSchema)]
pub struct Runtime {
/// Adjusts worker curve around this value (in millicores, i.e. 1000 = 1 core). Is not a hard limit. When
/// unset, uses /sys/fs/cgroup/cpu.max, and if that is unset uses total host cpu.
pub worker_cpu_max: Option<usize>,
/// Time (in seconds) to allow for the gasoline worker engine to stop gracefully after receiving SIGTERM.
/// Defaults to 30 seconds.
worker_shutdown_duration: Option<u32>,
Expand Down
11 changes: 10 additions & 1 deletion engine/packages/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,15 @@ fn add_source<P: AsRef<Path>>(
let path = path.as_ref();

if !path.exists() {
tracing::debug!(path=%path.display(), "ignoring non-existent config path");

// Silently ignore non-existent paths
return Ok(settings);
}

if path.is_dir() {
tracing::debug!(path=%path.display(), "loading config from directory");

for entry in std::fs::read_dir(path)? {
let entry = entry?;
let path = entry.path();
Expand All @@ -96,9 +100,14 @@ fn add_source<P: AsRef<Path>>(
}
}
} else if path.is_file() {
tracing::debug!(path=%path.display(), "loading config from file");

settings = add_file_source(settings, path)?;
} else {
bail!("Invalid path: {}", path.display());
bail!(
"Invalid Rivet config path: {}. Ensure the path exists and is either a directory with config files or a specific config file.",
path.display()
);
}

Ok(settings)
Expand Down
1 change: 1 addition & 0 deletions engine/packages/engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ rivet-guard.workspace = true
rivet-logs.workspace = true
rivet-pools.workspace = true
rivet-runtime.workspace = true
rivet-serverless-backfill.workspace = true
rivet-service-manager.workspace = true
rivet-telemetry.workspace = true
rivet-term.workspace = true
Expand Down
41 changes: 39 additions & 2 deletions engine/packages/engine/src/commands/wf/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use anyhow::*;
use anyhow::{Result, ensure};
use clap::{Parser, ValueEnum};
use gas::db::{
self, Database,
Expand Down Expand Up @@ -32,6 +32,18 @@ pub enum SubCommand {
Silence { workflow_ids: Vec<Id> },
/// Sets the wake immediate property of a workflow to true.
Wake { workflow_ids: Vec<Id> },
/// Wakes dead workflows that match the name and error queries.
Revive {
#[clap(short = 'n', long)]
name: Vec<String>,
/// Matches via substring (i.e. error = "database" will match workflows that died with error "database transaction failed").
#[clap(short = 'e', long)]
error: Vec<String>,
#[clap(short = 'd', long)]
dry_run: bool,
#[clap(short = 'p', long)]
parallelization: Option<u128>,
},
/// Lists the entire event history of a workflow.
History {
#[clap(index = 1)]
Expand All @@ -58,7 +70,7 @@ pub enum SubCommand {
impl SubCommand {
pub async fn execute(self, config: rivet_config::Config) -> Result<()> {
let pools = rivet_pools::Pools::new(config.clone()).await?;
let db = db::DatabaseKv::from_pools(pools).await? as Arc<dyn DatabaseDebug>;
let db = db::DatabaseKv::new(config.clone(), pools).await? as Arc<dyn DatabaseDebug>;

match self {
Self::Get { workflow_ids } => {
Expand All @@ -85,6 +97,31 @@ impl SubCommand {
}
Self::Silence { workflow_ids } => db.silence_workflows(workflow_ids).await,
Self::Wake { workflow_ids } => db.wake_workflows(workflow_ids).await,
Self::Revive {
name,
error,
dry_run,
parallelization,
} => {
ensure!(!name.is_empty(), "must provide at least one name");

let total = db
.revive_workflows(
&name.iter().map(|x| x.as_str()).collect::<Vec<_>>(),
&error.iter().map(|x| x.as_str()).collect::<Vec<_>>(),
dry_run,
parallelization.unwrap_or(1),
)
.await?;

if dry_run {
rivet_term::status::success("Workflows Matched", total);
} else {
rivet_term::status::success("Workflows Revived", total);
}

Ok(())
}
Self::History {
workflow_id,
exclude_json,
Expand Down
6 changes: 6 additions & 0 deletions engine/packages/engine/src/run_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ pub fn config(_rivet_config: rivet_config::Config) -> Result<RunConfigData> {
|config, pools| Box::pin(rivet_cache_purge::start(config, pools)),
false,
),
Service::new(
"serverless_backfill",
ServiceKind::Oneshot,
|config, pools| Box::pin(rivet_serverless_backfill::start(config, pools)),
false,
),
];

Ok(RunConfigData { services })
Expand Down
15 changes: 10 additions & 5 deletions engine/packages/epoxy/src/ops/propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ pub async fn epoxy_propose(ctx: &OperationCtx, input: &Input) -> Result<Proposal
.udb()?
.run(|tx| async move { utils::read_config(&tx, replica_id).await })
.custom_instrument(tracing::info_span!("read_config_tx"))
.await?;
.await
.context("failed reading config")?;

// Lead consensus
let payload = ctx
Expand All @@ -48,7 +49,8 @@ pub async fn epoxy_propose(ctx: &OperationCtx, input: &Input) -> Result<Proposal
async move { replica::lead_consensus::lead_consensus(&*tx, replica_id, proposal).await }
})
.custom_instrument(tracing::info_span!("lead_consensus_tx"))
.await?;
.await
.context("failed leading consensus")?;

// Get quorum members (only active replicas for voting)
let quorum_members = utils::get_quorum_members(&config);
Expand All @@ -66,7 +68,8 @@ pub async fn epoxy_propose(ctx: &OperationCtx, input: &Input) -> Result<Proposal
async move { replica::decide_path::decide_path(&*tx, pre_accept_oks, &payload) }
})
.custom_instrument(tracing::info_span!("decide_path_tx"))
.await?;
.await
.context("failed deciding path")?;

match path {
Path::PathFast(protocol::PathFast { payload }) => {
Expand Down Expand Up @@ -105,7 +108,8 @@ pub async fn run_paxos_accept(
async move { replica::messages::accepted(&*tx, replica_id, payload).await }
})
.custom_instrument(tracing::info_span!("accept_tx"))
.await?;
.await
.context("failed accepting")?;

// EPaxos Step 17
let quorum = send_accepts(
Expand Down Expand Up @@ -150,7 +154,8 @@ pub async fn commit(
}
})
.custom_instrument(tracing::info_span!("committed_tx"))
.await?
.await
.context("failed committing")?
};

// EPaxos Step 23
Expand Down
4 changes: 2 additions & 2 deletions engine/packages/epoxy/src/workflows/purger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ pub async fn epoxy_purger(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> {
let replica_id = input.replica_id;

async move {
let sig = ctx.listen::<Purge>().await?;
let signals = ctx.listen_n::<Purge>(1024).await?;

ctx.activity(PurgeInput {
replica_id,
keys: sig.keys,
keys: signals.into_iter().flat_map(|sig| sig.keys).collect(),
})
.await?;

Expand Down
5 changes: 3 additions & 2 deletions engine/packages/gasoline/src/ctx/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ impl TestCtx {
let cache = rivet_cache::CacheInner::from_env(&config, pools.clone())
.expect("failed to create cache");

let db = db::DatabaseKv::from_pools(pools.clone()).await?;
let debug_db = db::DatabaseKv::from_pools(pools.clone()).await? as Arc<dyn DatabaseDebug>;
let db = db::DatabaseKv::new(config.clone(), pools.clone()).await?;
let debug_db =
db::DatabaseKv::new(config.clone(), pools.clone()).await? as Arc<dyn DatabaseDebug>;

let service_name = format!("{}-test--{}", rivet_env::service_name(), "gasoline_test");
let ray_id = Id::new_v1(config.dc_label());
Expand Down
21 changes: 12 additions & 9 deletions engine/packages/gasoline/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ impl WorkflowCtx {

let res = tokio::time::timeout(A::TIMEOUT, A::run(&ctx, input).in_current_span())
.await
.map_err(|_| WorkflowError::ActivityTimeout(0));
.map_err(|_| WorkflowError::ActivityTimeout(A::NAME, 0));

let dt = start_instant.elapsed().as_secs_f64();

Expand Down Expand Up @@ -401,7 +401,7 @@ impl WorkflowCtx {
],
);

Err(WorkflowError::ActivityFailure(err, 0))
Err(WorkflowError::ActivityFailure(A::NAME, err, 0))
}
Err(err) => {
tracing::debug!("activity timeout");
Expand Down Expand Up @@ -604,25 +604,28 @@ impl WorkflowCtx {
// Convert error in the case of max retries exceeded. This will only act on retryable
// errors
let err = match err {
WorkflowError::ActivityFailure(err, _) => {
WorkflowError::ActivityFailure(name, err, _) => {
if error_count.saturating_add(1) >= I::Activity::MAX_RETRIES {
WorkflowError::ActivityMaxFailuresReached(err)
WorkflowError::ActivityMaxFailuresReached(name, err)
} else {
// Add error count to the error for backoff calculation
WorkflowError::ActivityFailure(err, error_count)
WorkflowError::ActivityFailure(name, err, error_count)
}
}
WorkflowError::ActivityTimeout(_) => {
WorkflowError::ActivityTimeout(name, _) => {
if error_count.saturating_add(1) >= I::Activity::MAX_RETRIES {
WorkflowError::ActivityMaxFailuresReached(err.into())
WorkflowError::ActivityMaxFailuresReached(name, err.into())
} else {
// Add error count to the error for backoff calculation
WorkflowError::ActivityTimeout(error_count)
WorkflowError::ActivityTimeout(name, error_count)
}
}
WorkflowError::OperationTimeout(op_name, _) => {
if error_count.saturating_add(1) >= I::Activity::MAX_RETRIES {
WorkflowError::ActivityMaxFailuresReached(err.into())
WorkflowError::ActivityMaxFailuresReached(
I::Activity::NAME,
err.into(),
)
} else {
// Add error count to the error for backoff calculation
WorkflowError::OperationTimeout(op_name, error_count)
Expand Down
Loading
Loading