Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ rstest = "0.26.1"
rustls-pemfile = "2.2.0"
rustyline = "15.0.0"
scc = "3.3.2"
semver = "1.0.27"
serde_bare = "0.5.0"
serde_html_form = "0.2.7"
serde_yaml = "0.9.34"
Expand Down
4 changes: 4 additions & 0 deletions engine/packages/config/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ pub struct Root {

#[serde(default)]
pub telemetry: Telemetry,

#[serde(default)]
pub allow_version_rollback: bool,
}

impl Default for Root {
Expand All @@ -112,6 +115,7 @@ impl Default for Root {
clickhouse: None,
vector_http: None,
telemetry: Default::default(),
allow_version_rollback: false,
}
}
}
Expand Down
8 changes: 5 additions & 3 deletions engine/packages/engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,28 @@ futures-util.workspace = true
gas.workspace = true
hex.workspace = true
include_dir.workspace = true
indoc.workspace = true
lz4_flex.workspace = true
pegboard-serverless.workspace = true
pegboard-runner.workspace = true
pegboard-serverless.workspace = true
reqwest.workspace = true
rivet-api-peer.workspace = true
rivet-bootstrap.workspace = true
rivet-cache.workspace = true
rivet-cache-purge.workspace = true
rivet-cache.workspace = true
rivet-config.workspace = true
rivet-guard.workspace = true
rivet-tracing-reconfigure.workspace = true
rivet-logs.workspace = true
rivet-pools.workspace = true
rivet-runtime.workspace = true
rivet-service-manager.workspace = true
rivet-telemetry.workspace = true
rivet-term.workspace = true
rivet-tracing-reconfigure.workspace = true
rivet-util.workspace = true
rivet-workflow-worker.workspace = true
rustyline.workspace = true
semver.workspace = true
serde_json.workspace = true
serde_yaml.workspace = true
serde.workspace = true
Expand Down
50 changes: 48 additions & 2 deletions engine/packages/engine/src/commands/start.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use std::time::Duration;

use anyhow::*;
use anyhow::{Context, Result, anyhow};
use clap::Parser;
use indoc::formatdoc;
use rivet_service_manager::{CronConfig, RunConfig};
use universaldb::utils::IsolationLevel::*;

use crate::keys;

// 7 day logs retention
const LOGS_RETENTION: Duration = Duration::from_secs(7 * 24 * 60 * 60);
Expand Down Expand Up @@ -88,10 +92,52 @@ impl Opts {
.collect::<Vec<_>>()
};

// Start server
let pools = rivet_pools::Pools::new(config.clone()).await?;

verify_engine_version(&config, &pools).await?;

// Start server
rivet_service_manager::start(config, pools, services).await?;

Ok(())
}
}

/// Verifies that no rollback has occurred (if allowing rollback is disabled).
async fn verify_engine_version(
config: &rivet_config::Config,
pools: &rivet_pools::Pools,
) -> Result<()> {
if config.allow_version_rollback {
return Ok(());
}

pools
.udb()?
.run(|tx| async move {
let current_version = semver::Version::parse(env!("CARGO_PKG_VERSION"))
.context("failed to parse cargo pkg version as semver")?;

if let Some(existing_version) =
tx.read_opt(&keys::EngineVersionKey {}, Serializable).await?
{
if current_version < existing_version {
return Ok(Err(anyhow!("{}", formatdoc!(
"
Rivet Engine has been rolled back to a previous version:
- Last Used Version: {existing_version}
- Current Version: {current_version}
Cannot proceed without potential data corruption.

(If you know what you're doing, this error can be disabled in the Rivet config via `allow_version_rollback: true`)
"
))));
}
}

tx.write(&keys::EngineVersionKey {}, current_version)?;

Ok(Ok(()))
})
.await?
}
47 changes: 47 additions & 0 deletions engine/packages/engine/src/keys.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use anyhow::Result;
use universaldb::prelude::*;

#[derive(Debug)]
pub struct EngineVersionKey {}

impl EngineVersionKey {
pub fn new() -> Self {
EngineVersionKey {}
}
}

impl FormalKey for EngineVersionKey {
type Value = semver::Version;

fn deserialize(&self, raw: &[u8]) -> Result<Self::Value> {
semver::Version::parse(str::from_utf8(raw)?).map_err(Into::into)
}

fn serialize(&self, value: Self::Value) -> Result<Vec<u8>> {
Ok(value.to_string().into_bytes())
}
}

impl TuplePack for EngineVersionKey {
fn pack<W: std::io::Write>(
&self,
w: &mut W,
tuple_depth: TupleDepth,
) -> std::io::Result<VersionstampOffset> {
let t = (RIVET, VERSION);
t.pack(w, tuple_depth)
}
}

impl<'de> TupleUnpack<'de> for EngineVersionKey {
fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> {
let (input, (_, data)) = <(usize, usize)>::unpack(input, tuple_depth)?;
if data != VERSION {
return Err(PackError::Message("expected VERSION data".into()));
}

let v = EngineVersionKey {};

Ok((input, v))
}
}
1 change: 1 addition & 0 deletions engine/packages/engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use commands::*;
use rivet_service_manager::RunConfig;

pub mod commands;
pub mod keys;
pub mod run_config;
pub mod util;

Expand Down
Loading