Skip to content

Commit

Permalink
consensus/consul: Use CAS for read_modify_write
Browse files Browse the repository at this point in the history
Use the CAS parameter when setting the key in Consul in
ConsulAuthority::read_modify_write, so that multiple concurrent
operations on the same key don't overwrite each other.

Change-Id: I54358b2da0b7ba07c229cb3fa1f9265c4ceb1e5b
Reviewed-on: https://gerrit.readyset.name/c/readyset/+/5217
Tested-by: Buildkite CI
Reviewed-by: Luke Osborne <luke.o@readyset.io>
  • Loading branch information
glittershark committed Jun 23, 2023
1 parent c944d47 commit cba739a
Showing 1 changed file with 55 additions and 5 deletions.
60 changes: 55 additions & 5 deletions readyset-client/src/consensus/consul.rs
Expand Up @@ -123,7 +123,7 @@ use std::time::Duration;

use async_trait::async_trait;
use consulrs::api::kv::common::KVPair;
use consulrs::api::kv::requests as kv_requests;
use consulrs::api::kv::requests::{self as kv_requests, SetKeyRequestBuilder};
use consulrs::api::session::requests as session_requests;
use consulrs::api::ApiResponse;
use consulrs::client::{ConsulClient, ConsulClientSettingsBuilder};
Expand Down Expand Up @@ -757,17 +757,35 @@ impl AuthorityControl for ConsulAuthority {
E: Send,
{
loop {
// TODO(justin): Use cas parameter to only modify if we have the same
// ModifyIndex when we write.
let current_val = self.try_read(path).await?;
let (modify_index, current_val) =
match kv::read(&self.consul, &self.prefix_with_deployment(path), None).await {
Ok(resp) => {
if let Some(pair) = resp.response.into_iter().next() {
(
pair.modify_index,
pair.value
.map(|bytes| -> ReadySetResult<_> {
let bytes: Vec<u8> = bytes
.try_into()
.map_err(|e| internal_err!("Consul error: {e}"))?;
Ok(serde_json::from_slice(&bytes)?)
})
.transpose()?,
)
} else {
(0, None)
}
}
_ => (0, None),
};

if let Ok(modified) = f(current_val) {
let bytes = serde_json::to_vec(&modified)?;
let r = kv::set(
&self.consul,
&self.prefix_with_deployment(path),
&bytes,
None,
Some(SetKeyRequestBuilder::default().cas(modify_index)),
)
.await?;

Expand Down Expand Up @@ -940,6 +958,8 @@ mod tests {
use std::sync::Arc;
use std::time::Duration;

use futures::stream::FuturesUnordered;
use futures::StreamExt;
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use reqwest::Url;
Expand Down Expand Up @@ -979,6 +999,36 @@ mod tests {
);
}

#[tokio::test]
#[serial]
async fn read_modify_write_in_parallel() {
let authority_address = test_authority_address("read_modify_write_in_parallel");
let authority = Arc::new(ConsulAuthority::new(&authority_address).unwrap());
authority.init().await.unwrap();
authority.delete_all_keys().await;

let mut futs = (0..10)
.map(|_| {
let authority = authority.clone();
tokio::spawn(async move {
authority
.read_modify_write("counter", |val: Option<u64>| -> Result<u64, ()> {
Ok(val.unwrap_or_default() + 1)
})
.await
.unwrap()
})
})
.collect::<FuturesUnordered<_>>();

while let Some(res) = futs.next().await {
res.unwrap().unwrap();
}

let val = authority.try_read::<u64>("counter").await.unwrap().unwrap();
assert_eq!(val, 10);
}

#[tokio::test]
#[serial]
async fn overwrite_controller_state() {
Expand Down

0 comments on commit cba739a

Please sign in to comment.