Skip to content

Commit

Permalink
Added a way to set a TTL to key values. (#137)
Browse files Browse the repository at this point in the history
* Added a way to set a TTL to key values.

After the GC grace period, the key value will be subject to the GC
just like regularly deleted KVs.

* Removed reference
  • Loading branch information
fulmicoton committed Mar 12, 2024
1 parent 78f8aff commit de1edcf
Show file tree
Hide file tree
Showing 6 changed files with 402 additions and 122 deletions.
3 changes: 1 addition & 2 deletions chitchat-test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ impl Api {
let mut chitchat_guard = self.chitchat.lock().await;

let cc_state = chitchat_guard.self_node_state();
cc_state.mark_for_deletion(key.as_str());

cc_state.delete(key.as_str());
Json(serde_json::to_value(&SetKeyValueResponse { status: true }).unwrap())
}
}
Expand Down
45 changes: 25 additions & 20 deletions chitchat/src/delta.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashSet;

use crate::serialize::*;
use crate::types::{KeyValueMutation, KeyValueMutationRef};
use crate::types::{DeletionStatusMutation, KeyValueMutation, KeyValueMutationRef};
use crate::{ChitchatId, Version, VersionedValue};

/// A delta is the message we send to another node to update it.
Expand Down Expand Up @@ -117,12 +117,12 @@ impl Deserializable for DeltaOp {
let key = String::deserialize(buf)?;
let value = String::deserialize(buf)?;
let version = u64::deserialize(buf)?;
let deleted = bool::deserialize(buf)?;
let deleted = DeletionStatusMutation::deserialize(buf)?;
Ok(DeltaOp::KeyValue(KeyValueMutation {
key,
value,
version,
tombstone: deleted,
status: deleted,
}))
}
DeltaOpTag::SetMaxVersion => {
Expand Down Expand Up @@ -280,7 +280,11 @@ impl Delta {
key: key.to_string(),
value: value.to_string(),
version,
tombstone: deleted,
status: if deleted {
DeletionStatusMutation::Delete
} else {
DeletionStatusMutation::Set
},
});
}

Expand Down Expand Up @@ -440,7 +444,7 @@ impl DeltaSerializer {
key: key.to_string(),
value: versioned_value.value,
version: versioned_value.version,
tombstone: versioned_value.tombstone.is_some(),
status: versioned_value.status.into(),
};
let key_value_op = DeltaOp::KeyValue(key_value_mutation);
self.try_add_op(key_value_op)
Expand Down Expand Up @@ -472,6 +476,7 @@ mod tests {
use tokio::time::Instant;

use super::*;
use crate::types::DeletionStatus;

#[test]
fn test_delta_serialization_default() {
Expand Down Expand Up @@ -512,7 +517,7 @@ mod tests {
VersionedValue {
value: "val11".to_string(),
version: 1,
tombstone: None,
status: DeletionStatus::Set,
},
));
// +26 bytes: 2 bytes (key length) + 5 bytes (key) + 8 bytes (version) +
Expand All @@ -522,7 +527,7 @@ mod tests {
VersionedValue {
value: "".to_string(),
version: 2,
tombstone: Some(Instant::now()),
status: DeletionStatus::Deleted(Instant::now()),
},
));

Expand All @@ -536,7 +541,7 @@ mod tests {
VersionedValue {
value: "val21".to_string(),
version: 2,
tombstone: None,
status: DeletionStatus::Set,
},
));
// +23 bytes.
Expand All @@ -545,7 +550,7 @@ mod tests {
VersionedValue {
value: "val22".to_string(),
version: 3,
tombstone: None,
status: DeletionStatus::Set,
},
));
test_aux_delta_writer(delta_writer, 98);
Expand All @@ -567,7 +572,7 @@ mod tests {
VersionedValue {
value: "val11".to_string(),
version: 1,
tombstone: None,
status: DeletionStatus::Set,
}
));

Expand All @@ -577,7 +582,7 @@ mod tests {
VersionedValue {
value: "val12".to_string(),
version: 2,
tombstone: None,
status: DeletionStatus::Set,
}
));

Expand Down Expand Up @@ -614,7 +619,7 @@ mod tests {
VersionedValue {
value: "val11".to_string(),
version: 1,
tombstone: None,
status: DeletionStatus::Set,
}
));
// +23 bytes (kv) + 1 (op tag)
Expand All @@ -624,7 +629,7 @@ mod tests {
VersionedValue {
value: "val12".to_string(),
version: 2,
tombstone: None,
status: DeletionStatus::Set,
}
));

Expand All @@ -651,7 +656,7 @@ mod tests {
VersionedValue {
value: "val11".to_string(),
version: 1,
tombstone: None,
status: DeletionStatus::Set,
}
));
// +23 bytes.
Expand All @@ -660,7 +665,7 @@ mod tests {
VersionedValue {
value: "val12".to_string(),
version: 2,
tombstone: None,
status: DeletionStatus::Set,
}
));

Expand Down Expand Up @@ -690,7 +695,7 @@ mod tests {
VersionedValue {
value: "val11".to_string(),
version: 1,
tombstone: None,
status: DeletionStatus::Set,
}
));

Expand All @@ -701,7 +706,7 @@ mod tests {
VersionedValue {
value: "val12aaaaaaaaaabcc".to_string(),
version: 2,
tombstone: None,
status: DeletionStatus::Set,
}
));
test_aux_delta_writer(delta_writer, 72);
Expand All @@ -720,23 +725,23 @@ mod tests {
VersionedValue {
value: "val11".to_string(),
version: 1,
tombstone: None,
status: DeletionStatus::Set,
}
));
assert!(!delta_writer.try_add_kv(
"key12",
VersionedValue {
value: "val12".to_string(),
version: 2,
tombstone: None,
status: DeletionStatus::Set,
}
));
delta_writer.try_add_kv(
"key13",
VersionedValue {
value: "val12".to_string(),
version: 2,
tombstone: None,
status: DeletionStatus::Set,
},
);
}
Expand Down
10 changes: 5 additions & 5 deletions chitchat/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::digest::Digest;
pub use crate::message::ChitchatMessage;
pub use crate::server::{spawn_chitchat, ChitchatHandle};
use crate::state::ClusterState;
pub use crate::types::{ChitchatId, Heartbeat, Version, VersionedValue};
pub use crate::types::{ChitchatId, DeletionStatus, Heartbeat, Version, VersionedValue};

/// Maximum UDP datagram payload size (in bytes).
///
Expand Down Expand Up @@ -608,7 +608,7 @@ mod tests {

assert_nodes_sync(&[&node1, &node2]);

node1.self_node_state().mark_for_deletion("k1");
node1.self_node_state().delete("k1");

// Advance time before triggering the GC of that deleted key
tokio::time::advance(Duration::from_secs(3_600 * 3)).await;
Expand Down Expand Up @@ -718,7 +718,7 @@ mod tests {
.lock()
.await
.self_node_state()
.mark_for_deletion("READY");
.delete("READY");

let live_members = loop {
let live_nodes = live_nodes_stream.next().await.unwrap();
Expand Down Expand Up @@ -1085,8 +1085,8 @@ mod tests {
node1.self_node_state().set("self1:suffix1", "updated");
assert_eq!(counter_self_key.load(Ordering::SeqCst), 2);

node1.self_node_state().mark_for_deletion("self1:suffix1");
node2.self_node_state().mark_for_deletion("other:suffix");
node1.self_node_state().delete("self1:suffix1");
node2.self_node_state().delete("other:suffix");

run_chitchat_handshake(&mut node1, &mut node2);

Expand Down
Loading

0 comments on commit de1edcf

Please sign in to comment.