From cba739a6e46726df92cb6ed2c0871a8216d89ce5 Mon Sep 17 00:00:00 2001 From: Griffin Smith Date: Thu, 22 Jun 2023 10:19:41 -0400 Subject: [PATCH] consensus/consul: Use CAS for read_modify_write Use the CAS parameter when setting the key in Consul in ConsulAuthority::read_modify_write, so that multiple concurrent operations on the same key don't overwrite each other. Change-Id: I54358b2da0b7ba07c229cb3fa1f9265c4ceb1e5b Reviewed-on: https://gerrit.readyset.name/c/readyset/+/5217 Tested-by: Buildkite CI Reviewed-by: Luke Osborne --- readyset-client/src/consensus/consul.rs | 60 ++++++++++++++++++++++--- 1 file changed, 55 insertions(+), 5 deletions(-) diff --git a/readyset-client/src/consensus/consul.rs b/readyset-client/src/consensus/consul.rs index 99c7e179e8..b90a255bd7 100644 --- a/readyset-client/src/consensus/consul.rs +++ b/readyset-client/src/consensus/consul.rs @@ -123,7 +123,7 @@ use std::time::Duration; use async_trait::async_trait; use consulrs::api::kv::common::KVPair; -use consulrs::api::kv::requests as kv_requests; +use consulrs::api::kv::requests::{self as kv_requests, SetKeyRequestBuilder}; use consulrs::api::session::requests as session_requests; use consulrs::api::ApiResponse; use consulrs::client::{ConsulClient, ConsulClientSettingsBuilder}; @@ -757,9 +757,27 @@ impl AuthorityControl for ConsulAuthority { E: Send, { loop { - // TODO(justin): Use cas parameter to only modify if we have the same - // ModifyIndex when we write. - let current_val = self.try_read(path).await?; + let (modify_index, current_val) = + match kv::read(&self.consul, &self.prefix_with_deployment(path), None).await { + Ok(resp) => { + if let Some(pair) = resp.response.into_iter().next() { + ( + pair.modify_index, + pair.value + .map(|bytes| -> ReadySetResult<_> { + let bytes: Vec = bytes + .try_into() + .map_err(|e| internal_err!("Consul error: {e}"))?; + Ok(serde_json::from_slice(&bytes)?) + }) + .transpose()?, + ) + } else { + (0, None) + } + } + _ => (0, None), + }; if let Ok(modified) = f(current_val) { let bytes = serde_json::to_vec(&modified)?; @@ -767,7 +785,7 @@ impl AuthorityControl for ConsulAuthority { &self.consul, &self.prefix_with_deployment(path), &bytes, - None, + Some(SetKeyRequestBuilder::default().cas(modify_index)), ) .await?; @@ -940,6 +958,8 @@ mod tests { use std::sync::Arc; use std::time::Duration; + use futures::stream::FuturesUnordered; + use futures::StreamExt; use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; use reqwest::Url; @@ -979,6 +999,36 @@ mod tests { ); } + #[tokio::test] + #[serial] + async fn read_modify_write_in_parallel() { + let authority_address = test_authority_address("read_modify_write_in_parallel"); + let authority = Arc::new(ConsulAuthority::new(&authority_address).unwrap()); + authority.init().await.unwrap(); + authority.delete_all_keys().await; + + let mut futs = (0..10) + .map(|_| { + let authority = authority.clone(); + tokio::spawn(async move { + authority + .read_modify_write("counter", |val: Option| -> Result { + Ok(val.unwrap_or_default() + 1) + }) + .await + .unwrap() + }) + }) + .collect::>(); + + while let Some(res) = futs.next().await { + res.unwrap().unwrap(); + } + + let val = authority.try_read::("counter").await.unwrap().unwrap(); + assert_eq!(val, 10); + } + #[tokio::test] #[serial] async fn overwrite_controller_state() {