Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion engine/packages/epoxy/src/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,3 @@ pub const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
/// This keeps learner range reads bounded while still making steady progress through the
/// immutable per-key commit history.
pub const CHANGELOG_READ_COUNT: u64 = 5_000;

9 changes: 7 additions & 2 deletions engine/packages/epoxy/src/http_routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ pub fn mount_routes(
) -> axum::Router<rivet_api_builder::GlobalApiCtx> {
router
.route("/v{version}/epoxy/message", bin::post(message))
.route("/v{version}/epoxy/changelog-read", bin::post(changelog_read))
.route(
"/v{version}/epoxy/changelog-read",
bin::post(changelog_read),
)
}

pub async fn message(ctx: ApiCtx, path: ProtocolPath, _query: (), body: Bytes) -> Result<Vec<u8>> {
Expand Down Expand Up @@ -66,7 +69,9 @@ fn request_kind_label(kind: &protocol::RequestKind) -> &'static str {
protocol::RequestKind::CommitRequest(_) => "commit",
protocol::RequestKind::ChangelogReadRequest(_) => "changelog_read",
protocol::RequestKind::HealthCheckRequest => "health_check",
protocol::RequestKind::CoordinatorUpdateReplicaStatusRequest(_) => "coordinator_update_replica_status",
protocol::RequestKind::CoordinatorUpdateReplicaStatusRequest(_) => {
"coordinator_update_replica_status"
}
protocol::RequestKind::BeginLearningRequest(_) => "begin_learning",
protocol::RequestKind::KvGetRequest(_) => "kv_get",
protocol::RequestKind::KvPurgeCacheRequest(_) => "kv_purge_cache",
Expand Down
4 changes: 1 addition & 3 deletions engine/packages/epoxy/src/keys/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,7 @@ impl TuplePack for ChangelogKey {

impl<'de> TupleUnpack<'de> for ChangelogKey {
fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> {
let (input, (root, versionstamp)) =
<(usize, Versionstamp)>::unpack(input, tuple_depth)?;
let (input, (root, versionstamp)) = <(usize, Versionstamp)>::unpack(input, tuple_depth)?;
if root != CHANGELOG {
return Err(PackError::Message("expected CHANGELOG root".into()));
}
Expand All @@ -347,4 +346,3 @@ impl<'de> TupleUnpack<'de> for ChangelogKey {
Ok((input, v))
}
}

4 changes: 3 additions & 1 deletion engine/packages/epoxy/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ pub fn record_changelog_append() {
}

pub fn record_request(request_type: &str, result: &str, duration: std::time::Duration) {
REQUEST_TOTAL.with_label_values(&[request_type, result]).inc();
REQUEST_TOTAL
.with_label_values(&[request_type, result])
.inc();
REQUEST_DURATION
.with_label_values(&[request_type])
.observe(duration.as_secs_f64());
Expand Down
12 changes: 4 additions & 8 deletions engine/packages/epoxy/src/ops/kv/get_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,10 @@ pub struct Output {

#[operation]
pub async fn epoxy_kv_get_local(ctx: &OperationCtx, input: &Input) -> Result<Output> {
let committed_value = read_value::read_local_value(
ctx,
input.replica_id,
input.key.clone(),
false,
)
.await?
.value;
let committed_value =
read_value::read_local_value(ctx, input.replica_id, input.key.clone(), false)
.await?
.value;

Ok(Output {
value: committed_value.as_ref().map(|value| value.value.clone()),
Expand Down
18 changes: 6 additions & 12 deletions engine/packages/epoxy/src/ops/kv/get_optimistic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,7 @@ pub async fn epoxy_kv_get_optimistic(ctx: &OperationCtx, input: &Input) -> Resul
};

if input.caching_behavior == protocol::CachingBehavior::Optimistic {
cache_fanout_value(
ctx,
input.replica_id,
input.key.clone(),
value.clone(),
)
.await?;
cache_fanout_value(ctx, input.replica_id, input.key.clone(), value.clone()).await?;
}

return Ok(Output {
Expand Down Expand Up @@ -162,7 +156,10 @@ async fn cache_fanout_value(
// This covers the race where a commit lands between the fanout read and
// the cache write.
if let Some(committed_value) = tx
.read_opt(&committed_key, universaldb::utils::IsolationLevel::Serializable)
.read_opt(
&committed_key,
universaldb::utils::IsolationLevel::Serializable,
)
.await?
{
if committed_value.version >= value_to_cache.version {
Expand All @@ -174,10 +171,7 @@ async fn cache_fanout_value(
// prevents a slow fanout response from overwriting a fresher cache entry
// written by a concurrent request.
if let Some(existing_cache) = tx
.read_opt(
&cache_key,
universaldb::utils::IsolationLevel::Serializable,
)
.read_opt(&cache_key, universaldb::utils::IsolationLevel::Serializable)
.await?
{
if existing_cache.version > value_to_cache.version {
Expand Down
4 changes: 3 additions & 1 deletion engine/packages/epoxy/src/ops/kv/read_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use epoxy_protocol::protocol::ReplicaId;
use gas::prelude::*;
use universaldb::utils::{FormalKey, IsolationLevel::Serializable};

use crate::keys::{self, CommittedValue, KvOptimisticCacheKey, KvValueKey, LegacyCommittedValueKey};
use crate::keys::{
self, CommittedValue, KvOptimisticCacheKey, KvValueKey, LegacyCommittedValueKey,
};

#[derive(Debug)]
pub(crate) struct LocalValueRead {
Expand Down
170 changes: 81 additions & 89 deletions engine/packages/epoxy/src/ops/propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ use std::collections::{BTreeSet, HashSet};
use std::time::{Duration, Instant};

use crate::{
http_client, metrics,
http_client,
keys::CommittedValue,
metrics,
replica::{
ballot::{self, Ballot, BallotSelection},
commit_kv::{self, CommitKvOutcome},
Expand Down Expand Up @@ -101,13 +102,11 @@ impl SetProposal {
key,
expect_one_of,
new_value: Some(value),
}) if expect_one_of.len() == 1 && matches!(expect_one_of.first(), Some(None)) => {
Ok(Self {
key: key.clone(),
value: value.clone(),
mutable,
})
}
}) if expect_one_of.len() == 1 && matches!(expect_one_of.first(), Some(None)) => Ok(Self {
key: key.clone(),
value: value.clone(),
mutable,
}),
_ => bail!(
"epoxy v2 only supports single-key set-if-absent proposals with a concrete value"
),
Expand Down Expand Up @@ -314,14 +313,14 @@ pub async fn epoxy_propose(ctx: &OperationCtx, input: &Input) -> Result<Proposal
replica_id,
&quorum_members,
proposal.key.clone(),
CommittedValue {
value: proposal.value.clone(),
version: 1,
mutable: proposal.mutable,
},
ballot,
)
.await?
CommittedValue {
value: proposal.value.clone(),
version: 1,
mutable: proposal.mutable,
},
ballot,
)
.await?
{
PreparePhaseOutcome::Prepared { ballot, value } => {
run_slow_path(
Expand Down Expand Up @@ -444,16 +443,7 @@ async fn run_accept_path(
version,
mutable,
} = value;
commit_kv::commit_kv(
&tx,
replica_id,
key,
value,
ballot,
mutable,
version,
)
.await
commit_kv::commit_kv(&tx, replica_id, key, value, ballot, mutable, version).await
}
})
.custom_instrument(tracing::info_span!("commit_kv_tx"))
Expand All @@ -472,16 +462,15 @@ async fn run_accept_path(
let ballot = ballot;
let purge_cache = purge_cache && proposal.mutable;
async move {
if let Err(err) =
broadcast_commits(
&ctx,
&config,
replica_id,
key.clone(),
chosen_value.clone(),
ballot,
)
.await
if let Err(err) = broadcast_commits(
&ctx,
&config,
replica_id,
key.clone(),
chosen_value.clone(),
ballot,
)
.await
{
tracing::warn!(?err, "commit broadcast failed after local commit");
}
Expand Down Expand Up @@ -550,7 +539,8 @@ async fn run_prepare_phase(
}
PrepareRoundOutcome::Retry { next_ballot } => {
store_prepare_ballot(ctx, replica_id, key.clone(), next_ballot).await?;
let Some(retry_delay) = next_prepare_retry_delay(retry_count, &mut rand::thread_rng())
let Some(retry_delay) =
next_prepare_retry_delay(retry_count, &mut rand::thread_rng())
else {
tracing::warn!(
%replica_id,
Expand Down Expand Up @@ -592,31 +582,32 @@ async fn send_prepare_round(
ballot: protocol::Ballot,
) -> Result<PrepareRoundOutcome> {
let target = utils::calculate_quorum(replica_ids.len(), utils::QuorumType::Slow);
let mut pending = futures_util::stream::iter(replica_ids.iter().copied().map(|to_replica_id| {
let key = key.clone();
let proposed_value = proposed_value.clone();
let ballot = ballot.clone();
async move {
(
to_replica_id,
tokio::time::timeout(
crate::consts::REQUEST_TIMEOUT,
send_prepare_request(
ctx,
config,
from_replica_id,
to_replica_id,
key,
proposed_value,
ballot,
),
let mut pending =
futures_util::stream::iter(replica_ids.iter().copied().map(|to_replica_id| {
let key = key.clone();
let proposed_value = proposed_value.clone();
let ballot = ballot.clone();
async move {
(
to_replica_id,
tokio::time::timeout(
crate::consts::REQUEST_TIMEOUT,
send_prepare_request(
ctx,
config,
from_replica_id,
to_replica_id,
key,
proposed_value,
ballot,
),
)
.await,
)
.await,
)
}
}))
.collect::<FuturesUnordered<_>>()
.await;
}
}))
.collect::<FuturesUnordered<_>>()
.await;

let mut ok_responses = 0;
let mut remaining = replica_ids.len();
Expand Down Expand Up @@ -655,7 +646,9 @@ async fn send_prepare_round(
}
(None, None) => {}
_ => {
bail!("prepare response from replica {to_replica_id} returned partial accepted state");
bail!(
"prepare response from replica {to_replica_id} returned partial accepted state"
);
}
}

Expand Down Expand Up @@ -720,31 +713,32 @@ async fn send_accept_round(
accept_quorum: utils::QuorumType,
) -> Result<AcceptPhaseOutcome> {
let target = utils::calculate_quorum(replica_ids.len(), accept_quorum);
let mut pending = futures_util::stream::iter(replica_ids.iter().copied().map(|to_replica_id| {
let key = key.clone();
let value = value.clone();
let ballot = ballot.clone();
async move {
(
to_replica_id,
tokio::time::timeout(
crate::consts::REQUEST_TIMEOUT,
send_accept_request(
ctx,
config,
from_replica_id,
to_replica_id,
key,
value,
ballot,
),
let mut pending =
futures_util::stream::iter(replica_ids.iter().copied().map(|to_replica_id| {
let key = key.clone();
let value = value.clone();
let ballot = ballot.clone();
async move {
(
to_replica_id,
tokio::time::timeout(
crate::consts::REQUEST_TIMEOUT,
send_accept_request(
ctx,
config,
from_replica_id,
to_replica_id,
key,
value,
ballot,
),
)
.await,
)
.await,
)
}
}))
.collect::<FuturesUnordered<_>>()
.await;
}
}))
.collect::<FuturesUnordered<_>>()
.await;

let mut state = AcceptRoundState {
target,
Expand All @@ -754,9 +748,7 @@ async fn send_accept_round(

while let Some((to_replica_id, response)) = pending.next().await {
let observation = match response {
Ok(Ok(protocol::AcceptResponse::AcceptResponseOk(_))) => {
AcceptObservation::Ok
}
Ok(Ok(protocol::AcceptResponse::AcceptResponseOk(_))) => AcceptObservation::Ok,
Ok(Ok(protocol::AcceptResponse::AcceptResponseAlreadyCommitted(committed))) => {
AcceptObservation::AlreadyCommitted(committed.value)
}
Expand Down
Loading
Loading