From bdc8fc3fb1bba68eddc69215dca089dbabff4526 Mon Sep 17 00:00:00 2001 From: Kacy Fortner Date: Wed, 25 Feb 2026 07:10:19 -0500 Subject: [PATCH] perf: move version field from entry to lazy side table the version field (u64, 8 bytes) existed on every entry but was only read during WATCH/EXEC, which is <1% of production workloads. every mutation unconditionally bumped it. moved version tracking to a lazily-populated AHashMap on keyspace: - key_version() inserts into the side table on first call (like WATCH) - bump_version() only bumps if the key is tracked (almost never) - version entries are cleaned up on delete/expire/evict/flush this shrinks Entry from 52 to 44 bytes and reduces ENTRY_OVERHEAD from 128 to 120, saving ~8 bytes per key in sharded mode. --- crates/ember-core/src/dropper.rs | 1 - crates/ember-core/src/keyspace/hash.rs | 10 +- crates/ember-core/src/keyspace/list.rs | 85 ++++++++-------- crates/ember-core/src/keyspace/mod.rs | 112 ++++++++++++++------- crates/ember-core/src/keyspace/proto.rs | 7 +- crates/ember-core/src/keyspace/set.rs | 10 +- crates/ember-core/src/keyspace/string.rs | 6 +- crates/ember-core/src/keyspace/vector.rs | 121 +++++++++++------------ crates/ember-core/src/keyspace/zset.rs | 20 ++-- crates/ember-core/src/memory.rs | 12 ++- 10 files changed, 213 insertions(+), 171 deletions(-) diff --git a/crates/ember-core/src/dropper.rs b/crates/ember-core/src/dropper.rs index 5ea5431f..fd4f1091 100644 --- a/crates/ember-core/src/dropper.rs +++ b/crates/ember-core/src/dropper.rs @@ -139,7 +139,6 @@ mod tests { expires_at_ms: 0, cached_value_size: 0, last_access_secs: 0, - version: 0, }, ); } diff --git a/crates/ember-core/src/keyspace/hash.rs b/crates/ember-core/src/keyspace/hash.rs index beb49025..11707eab 100644 --- a/crates/ember-core/src/keyspace/hash.rs +++ b/crates/ember-core/src/keyspace/hash.rs @@ -90,7 +90,6 @@ impl Keyspace { return Ok(vec![]); } - let ver = self.next_ver(); let Some(entry) = self.entries.get_mut(key) else { return Ok(vec![]); }; @@ -113,7 +112,7 @@ impl Keyspace { false }; if !removed.is_empty() { - entry.version = ver; + self.bump_version(key); } self.cleanup_after_remove(key, old_entry_size, is_empty, removed_bytes); @@ -178,9 +177,9 @@ impl Keyspace { if is_new { let value = Value::Hash(Box::default()); self.memory.add(key, &value); - let mut entry = Entry::new(value, None); - entry.version = self.next_ver(); + let entry = Entry::new(value, None); self.entries.insert(CompactString::from(key), entry); + self.bump_version(key); } // safe: key was either just inserted above or verified to exist @@ -202,13 +201,12 @@ impl Keyspace { let new_val = current_val.checked_add(delta).ok_or(IncrError::Overflow)?; hash.insert(field.into(), Bytes::from(new_val.to_string())); entry.touch(); - self.next_version += 1; - entry.version = self.next_version; let new_value_size = memory::value_size(&entry.value); entry.cached_value_size = new_value_size; let new_entry_size = key.len() + new_value_size + memory::ENTRY_OVERHEAD; self.memory.adjust(old_entry_size, new_entry_size); + self.bump_version(key); Ok(new_val) } diff --git a/crates/ember-core/src/keyspace/list.rs b/crates/ember-core/src/keyspace/list.rs index d23cb9ff..b2c52256 100644 --- a/crates/ember-core/src/keyspace/list.rs +++ b/crates/ember-core/src/keyspace/list.rs @@ -141,18 +141,17 @@ impl Keyspace { (old_len, new_len) }; - let ver = self.next_ver(); let delta = new_len as isize - old_len as isize; { let entry = self.entries.get_mut(key).expect("key confirmed to exist"); entry.touch(); - entry.version = ver; if delta > 0 { entry.cached_value_size += delta as usize; } else if delta < 0 { entry.cached_value_size = entry.cached_value_size.saturating_sub((-delta) as usize); } } + self.bump_version(key); if delta > 0 { self.memory.grow_by(delta as usize); } else if delta < 0 { @@ -182,7 +181,6 @@ impl Keyspace { let (s, e) = normalize_range(start, stop, len as i64); let (is_empty, new_value_size) = { - let ver = self.next_ver(); let entry = self.entries.get_mut(key).expect("verified above"); { let Value::List(ref mut deque) = entry.value else { @@ -200,7 +198,6 @@ impl Keyspace { let empty = matches!(&entry.value, Value::List(d) if d.is_empty()); entry.touch(); - entry.version = ver; let nvs = if !empty { let size = memory::value_size(&entry.value); entry.cached_value_size = size; @@ -212,6 +209,8 @@ impl Keyspace { (empty, nvs) }; + self.bump_version(key); + if is_empty { if let Some(removed) = self.entries.remove(key) { self.decrement_expiry_if_set(&removed); @@ -265,7 +264,6 @@ impl Keyspace { return Err(WriteError::OutOfMemory); } - let ver = self.next_ver(); let new_len = { let entry = match self.entries.get_mut(key) { Some(e) => e, @@ -283,11 +281,11 @@ impl Keyspace { deque.insert(insert_pos, value); let len = deque.len() as i64; entry.touch(); - entry.version = ver; entry.cached_value_size += element_cost; len }; + self.bump_version(key); self.memory.grow_by(element_cost); Ok(new_len) } @@ -311,7 +309,6 @@ impl Keyspace { } let old_entry_size = self.entries.get(key).expect("exists").entry_size(key); - let ver = self.next_ver(); let (removed_count, removed_bytes, is_empty) = { let entry = self.entries.get_mut(key).expect("exists"); @@ -363,7 +360,6 @@ impl Keyspace { let empty = deque.is_empty(); entry.touch(); - entry.version = ver; if !empty { entry.cached_value_size = entry.cached_value_size.saturating_sub(bytes); @@ -372,6 +368,8 @@ impl Keyspace { (n, bytes, empty) }; + self.bump_version(key); + if is_empty { if let Some(removed) = self.entries.remove(key) { self.decrement_expiry_if_set(&removed); @@ -478,23 +476,25 @@ impl Keyspace { } // safe: key was just inserted or confirmed to exist - let ver = self.next_ver(); - let entry = self.entries.get_mut(key).unwrap(); - let Value::List(ref mut deque) = entry.value else { - unreachable!("type verified by ensure_collection_type"); - }; - for val in values { - if left { - deque.push_front(val.clone()); - } else { - deque.push_back(val.clone()); + let len = { + let entry = self.entries.get_mut(key).unwrap(); + let Value::List(ref mut deque) = entry.value else { + unreachable!("type verified by ensure_collection_type"); + }; + for val in values { + if left { + deque.push_front(val.clone()); + } else { + deque.push_back(val.clone()); + } } - } - let len = deque.len(); - entry.touch(); - entry.version = ver; - entry.cached_value_size += element_increase; + let len = deque.len(); + entry.touch(); + entry.cached_value_size += element_increase; + len + }; + self.bump_version(key); // apply the known delta — no need to rescan the entire list self.memory.grow_by(element_increase); @@ -507,27 +507,30 @@ impl Keyspace { return Ok(None); } - let ver = self.next_ver(); - let Some(entry) = self.entries.get_mut(key) else { - return Ok(None); - }; - if !matches!(entry.value, Value::List(_)) { - return Err(WrongType); - } + let (popped, is_empty) = { + let Some(entry) = self.entries.get_mut(key) else { + return Ok(None); + }; + if !matches!(entry.value, Value::List(_)) { + return Err(WrongType); + } - let Value::List(ref mut deque) = entry.value else { - // checked above - return Err(WrongType); - }; - let popped = if left { - deque.pop_front() - } else { - deque.pop_back() + let Value::List(ref mut deque) = entry.value else { + // checked above + return Err(WrongType); + }; + let popped = if left { + deque.pop_front() + } else { + deque.pop_back() + }; + entry.touch(); + + let is_empty = matches!(&entry.value, Value::List(d) if d.is_empty()); + (popped, is_empty) }; - entry.touch(); - entry.version = ver; - let is_empty = matches!(&entry.value, Value::List(d) if d.is_empty()); + self.bump_version(key); if let Some(ref elem) = popped { let element_size = elem.len() + memory::VECDEQUE_ELEMENT_OVERHEAD; let old_size = if is_empty { diff --git a/crates/ember-core/src/keyspace/mod.rs b/crates/ember-core/src/keyspace/mod.rs index e56bf658..c865b84a 100644 --- a/crates/ember-core/src/keyspace/mod.rs +++ b/crates/ember-core/src/keyspace/mod.rs @@ -266,8 +266,11 @@ pub enum SetResult { /// `expires_at_ms` (the hot-path read fields) sit at the front so /// they share the first L1 cache line with the HashMap key pointer. /// `cached_value_size` is warm (used on writes). `last_access_secs` -/// is cold (only used during eviction sampling). `version` is cold -/// (only read on WATCH/EXEC). +/// is cold (only used during eviction sampling). +/// +/// Version tracking for WATCH/EXEC lives in a separate side table on +/// Keyspace (`versions` map), not on every entry. This saves 8 bytes +/// per entry since <1% of keys are ever WATCHed. #[derive(Debug, Clone)] pub(crate) struct Entry { pub(crate) value: Value, @@ -280,9 +283,6 @@ pub(crate) struct Entry { /// Monotonic last access time in seconds since process start (for LRU). /// Using u32 saves 4 bytes per entry; wraps at ~136 years. pub(crate) last_access_secs: u32, - /// Monotonic version counter for optimistic locking (WATCH/EXEC). - /// Bumped on every mutation so that EXEC can detect concurrent writes. - pub(crate) version: u64, } impl Entry { @@ -293,7 +293,6 @@ impl Entry { expires_at_ms: time::expiry_from_duration(ttl), cached_value_size, last_access_secs: time::now_secs(), - version: 0, } } @@ -378,6 +377,11 @@ pub struct Keyspace { /// Monotonic counter for entry versions. Each mutation gets the next /// value, giving WATCH a cheap way to detect concurrent writes. next_version: u64, + /// Side table for WATCH/EXEC version tracking. Only populated for + /// keys that have been WATCHed. On mutation, we bump the version + /// only if the key exists in this map — which is almost never, + /// so the hot path pays only a fast hash-miss lookup. + versions: AHashMap, } impl Keyspace { @@ -398,6 +402,7 @@ impl Keyspace { oom_rejections: 0, drop_handle: None, next_version: 0, + versions: AHashMap::new(), } } @@ -408,21 +413,43 @@ impl Keyspace { self.drop_handle = Some(handle); } - /// Advances the version counter and returns the new value. - /// Called on every write so that WATCH can detect modifications. - fn next_ver(&mut self) -> u64 { - self.next_version += 1; - self.next_version + /// Bumps the version for a key in the side table, but only if the + /// key is being tracked (i.e. someone WATCHed it). When no keys + /// are watched, the versions map is empty and this is a fast miss. + fn bump_version(&mut self, key: &str) { + if let Some(ver) = self.versions.get_mut(key) { + self.next_version += 1; + *ver = self.next_version; + } } /// Returns the current version of a key, or `None` if the key is /// missing or expired. Used by WATCH/EXEC — cold path only. - pub fn key_version(&self, key: &str) -> Option { + /// + /// If the key hasn't been WATCHed yet, inserts the current + /// `next_version` so that future mutations will be detected. + pub fn key_version(&mut self, key: &str) -> Option { let entry = self.entries.get(key)?; if entry.is_expired() { return None; } - Some(entry.version) + let ver = *self + .versions + .entry(CompactString::from(key)) + .or_insert(self.next_version); + Some(ver) + } + + /// Removes version tracking for a key. Called on key deletion, + /// expiration, and eviction so the side table doesn't leak. + fn remove_version(&mut self, key: &str) { + self.versions.remove(key); + } + + /// Removes all version tracking entries. Called on UNWATCH/DISCARD + /// or FLUSHDB. + pub fn clear_versions(&mut self) { + self.versions.clear(); } /// Decrements the expiry count if the entry had a TTL set. @@ -500,9 +527,9 @@ impl Keyspace { /// collection-write methods after type-checking and memory reservation. fn insert_empty(&mut self, key: &str, value: Value) { self.memory.add(key, &value); - let mut entry = Entry::new(value, None); - entry.version = self.next_ver(); + let entry = Entry::new(value, None); self.entries.insert(CompactString::from(key), entry); + self.bump_version(key); } /// Measures entry size before and after a mutation, adjusting the @@ -521,9 +548,7 @@ impl Keyspace { entry.cached_value_size = new_value_size; let new_size = key.len() + new_value_size + memory::ENTRY_OVERHEAD; self.memory.adjust(old_size, new_size); - // bump version for WATCH optimistic locking - self.next_version += 1; - entry.version = self.next_version; + self.bump_version(key); Some(result) } @@ -587,6 +612,7 @@ impl Keyspace { self.memory.remove(&victim, &entry.value); self.decrement_expiry_if_set(&entry); self.evicted_total += 1; + self.remove_version(&victim); self.defer_drop(entry.value); return true; } @@ -654,6 +680,7 @@ impl Keyspace { if let Some(entry) = self.entries.remove(key) { self.memory.remove(key, &entry.value); self.decrement_expiry_if_set(&entry); + self.remove_version(key); self.defer_drop(entry.value); true } else { @@ -673,6 +700,7 @@ impl Keyspace { if let Some(entry) = self.entries.remove(key) { self.memory.remove(key, &entry.value); self.decrement_expiry_if_set(&entry); + self.remove_version(key); // always defer for UNLINK, regardless of value size if let Some(ref handle) = self.drop_handle { handle.defer_value(entry.value); @@ -690,6 +718,7 @@ impl Keyspace { let old = std::mem::take(&mut self.entries); self.memory.reset(); self.expiry_count = 0; + self.versions.clear(); old } @@ -828,8 +857,7 @@ impl Keyspace { self.expiry_count += 1; } entry.expires_at_ms = time::now_ms().saturating_add(seconds.saturating_mul(1000)); - self.next_version += 1; - entry.version = self.next_version; + self.bump_version(key); true } None => false, @@ -866,8 +894,7 @@ impl Keyspace { if entry.expires_at_ms != 0 { entry.expires_at_ms = 0; self.expiry_count = self.expiry_count.saturating_sub(1); - self.next_version += 1; - entry.version = self.next_version; + self.bump_version(key); true } else { false @@ -908,8 +935,7 @@ impl Keyspace { self.expiry_count += 1; } entry.expires_at_ms = time::now_ms().saturating_add(millis); - self.next_version += 1; - entry.version = self.next_version; + self.bump_version(key); true } None => false, @@ -987,10 +1013,9 @@ impl Keyspace { if entry.expires_at_ms != 0 { self.expiry_count += 1; } - let ver = self.next_ver(); - let mut entry = entry; - entry.version = ver; + self.remove_version(key); self.entries.insert(CompactString::from(newkey), entry); + self.bump_version(newkey); Ok(()) } @@ -1046,14 +1071,13 @@ impl Keyspace { if has_expiry { self.expiry_count += 1; } - let ver = self.next_ver(); let mut entry = Entry::new(cloned_value, None); // preserve the source's absolute expiry timestamp if let Some(ts) = cloned_expire { entry.expires_at_ms = ts; } - entry.version = ver; self.entries.insert(CompactString::from(dest), entry); + self.bump_version(dest); Ok(true) } @@ -1081,6 +1105,7 @@ impl Keyspace { self.entries.clear(); self.memory.reset(); self.expiry_count = 0; + self.versions.clear(); } /// Returns `true` if the keyspace has no entries. @@ -1189,9 +1214,9 @@ impl Keyspace { } } - let mut entry = Entry::new(value, ttl); - entry.version = self.next_ver(); - self.entries.insert(CompactString::from(key), entry); + let entry = Entry::new(value, ttl); + self.entries.insert(CompactString::from(key.clone()), entry); + self.bump_version(&key); } /// Randomly samples up to `count` keys and removes any that have expired. @@ -1230,6 +1255,7 @@ impl Keyspace { self.memory.remove(key, &entry.value); self.decrement_expiry_if_set(&entry); self.expired_total += 1; + self.remove_version(key); self.defer_drop(entry.value); } } @@ -1248,6 +1274,7 @@ impl Keyspace { self.memory.remove(key, &entry.value); self.decrement_expiry_if_set(&entry); self.expired_total += 1; + self.remove_version(key); self.defer_drop(entry.value); } } @@ -2300,10 +2327,14 @@ mod tests { } // --- key_version (WATCH support) --- + // + // Version tracking uses a lazily-populated side table. `key_version()` + // inserts the key into the table on first call (simulating WATCH). + // Subsequent mutations only bump the version if the key is tracked. #[test] fn key_version_returns_none_for_missing() { - let ks = Keyspace::new(); + let mut ks = Keyspace::new(); assert_eq!(ks.key_version("nope"), None); } @@ -2311,6 +2342,7 @@ mod tests { fn key_version_changes_on_set() { let mut ks = Keyspace::new(); ks.set("k".into(), Bytes::from("v1"), None, false, false); + // first call registers the key in the version table (like WATCH) let v1 = ks.key_version("k").expect("key should exist"); ks.set("k".into(), Bytes::from("v2"), None, false, false); let v2 = ks.key_version("k").expect("key should exist"); @@ -2357,13 +2389,17 @@ mod tests { } #[test] - fn key_version_unique_across_keys() { + fn key_version_stable_without_watch() { + // if key_version is never called, mutations don't create + // version entries — the side table stays empty let mut ks = Keyspace::new(); ks.set("a".into(), Bytes::from("1"), None, false, false); - ks.set("b".into(), Bytes::from("2"), None, false, false); - let va = ks.key_version("a").unwrap(); - let vb = ks.key_version("b").unwrap(); - assert_ne!(va, vb, "different keys should have different versions"); + ks.set("a".into(), Bytes::from("2"), None, false, false); + // first call to key_version returns a snapshot + let v1 = ks.key_version("a").unwrap(); + // no mutation between calls — version is stable + let v2 = ks.key_version("a").unwrap(); + assert_eq!(v1, v2, "version should be stable without mutations"); } // --- copy tests --- diff --git a/crates/ember-core/src/keyspace/proto.rs b/crates/ember-core/src/keyspace/proto.rs index b1ef264c..722c4c3a 100644 --- a/crates/ember-core/src/keyspace/proto.rs +++ b/crates/ember-core/src/keyspace/proto.rs @@ -42,9 +42,10 @@ impl Keyspace { } } - let mut entry = Entry::new(new_value, expire); - entry.version = self.next_ver(); - self.entries.insert(CompactString::from(key), entry); + let entry = Entry::new(new_value, expire); + self.entries + .insert(CompactString::from(key.as_str()), entry); + self.bump_version(&key); SetResult::Ok } diff --git a/crates/ember-core/src/keyspace/set.rs b/crates/ember-core/src/keyspace/set.rs index bb5afe82..d594ed53 100644 --- a/crates/ember-core/src/keyspace/set.rs +++ b/crates/ember-core/src/keyspace/set.rs @@ -51,7 +51,6 @@ impl Keyspace { return Ok(0); } - let ver = self.next_ver(); let Some(entry) = self.entries.get_mut(key) else { return Ok(0); }; @@ -75,7 +74,7 @@ impl Keyspace { false }; if removed > 0 { - entry.version = ver; + self.bump_version(key); } self.cleanup_after_remove(key, old_entry_size, is_empty, removed_bytes); @@ -348,9 +347,9 @@ impl Keyspace { let set: std::collections::HashSet = members.into_iter().collect(); let value = Value::Set(Box::new(set)); self.memory.add(dest, &value); - let mut entry = Entry::new(value, None); - entry.version = self.next_ver(); + let entry = Entry::new(value, None); self.entries.insert(CompactString::from(dest), entry); + self.bump_version(dest); Ok((count, stored)) } @@ -402,7 +401,6 @@ impl Keyspace { return Ok(vec![]); } - let ver = self.next_ver(); let Some(entry) = self.entries.get_mut(key) else { return Ok(vec![]); }; @@ -437,7 +435,7 @@ impl Keyspace { let is_empty = set.is_empty(); if !chosen.is_empty() { - entry.version = ver; + self.bump_version(key); } self.cleanup_after_remove(key, old_entry_size, is_empty, removed_bytes); diff --git a/crates/ember-core/src/keyspace/string.rs b/crates/ember-core/src/keyspace/string.rs index b44d2b69..9d856a39 100644 --- a/crates/ember-core/src/keyspace/string.rs +++ b/crates/ember-core/src/keyspace/string.rs @@ -138,9 +138,9 @@ impl Keyspace { } } - let mut entry = Entry::new(new_value, expire); - entry.version = self.next_ver(); - self.entries.insert(CompactString::from(key), entry); + let entry = Entry::new(new_value, expire); + self.entries.insert(CompactString::from(key.clone()), entry); + self.bump_version(&key); SetResult::Ok } diff --git a/crates/ember-core/src/keyspace/vector.rs b/crates/ember-core/src/keyspace/vector.rs index 6275938f..f9ed43f3 100644 --- a/crates/ember-core/src/keyspace/vector.rs +++ b/crates/ember-core/src/keyspace/vector.rs @@ -53,9 +53,9 @@ impl Keyspace { .map_err(|e| VectorWriteError::IndexError(e.to_string()))?; let value = Value::Vector(vs); self.memory.add(key, &value); - let mut entry = Entry::new(value, None); - entry.version = self.next_ver(); + let entry = Entry::new(value, None); self.entries.insert(CompactString::from(key), entry); + self.bump_version(key); } let entry = match self.entries.get_mut(key) { @@ -71,13 +71,12 @@ impl Keyspace { _ => return Err(VectorWriteError::WrongType), }; entry.touch(); - self.next_version += 1; - entry.version = self.next_version; let new_value_size = memory::value_size(&entry.value); entry.cached_value_size = new_value_size; let new_entry_size = key.len() + new_value_size + memory::ENTRY_OVERHEAD; self.memory.adjust(old_entry_size, new_entry_size); + self.bump_version(key); Ok(VAddResult { element, @@ -164,72 +163,73 @@ impl Keyspace { .map_err(|e| VectorWriteError::IndexError(e.to_string()))?; let value = Value::Vector(vs); self.memory.add(key, &value); - let mut new_entry = Entry::new(value, None); - new_entry.version = self.next_ver(); + let new_entry = Entry::new(value, None); self.entries.insert(CompactString::from(key), new_entry); + self.bump_version(key); } - let entry = match self.entries.get_mut(key) { - Some(e) => e, - None => return Err(VectorWriteError::IndexError("entry missing".into())), - }; + // batch result: Ok((added_count, bytes_added)) or Err with partial info + let batch_outcome = { + let entry = match self.entries.get_mut(key) { + Some(e) => e, + None => return Err(VectorWriteError::IndexError("entry missing".into())), + }; + + match entry.value { + Value::Vector(ref mut vs) => { + let per_elem = vs.per_element_bytes(); + + // use parallel HNSW construction for large batches + let result = vs + .add_batch_parallel(&entries) + .map_err(|e| VectorWriteError::IndexError(e.to_string()))?; + + // compute memory delta from what was actually added + let bytes_added = if result.added_count > 0 { + // sum up name lengths of new elements. since we don't know + // exactly which were new vs updates, estimate using the + // per-element cost * added_count + average name length + let total_name_bytes: usize = entries.iter().map(|(e, _)| e.len()).sum(); + let avg_name = total_name_bytes / entries.len(); + result.added_count.saturating_mul(per_elem + avg_name) + } else { + 0 + }; - let batch_result = match entry.value { - Value::Vector(ref mut vs) => { - let per_elem = vs.per_element_bytes(); - - // use parallel HNSW construction for large batches - let result = vs - .add_batch_parallel(&entries) - .map_err(|e| VectorWriteError::IndexError(e.to_string()))?; - - // compute memory delta from what was actually added - let bytes_added = if result.added_count > 0 { - // sum up name lengths of new elements. since we don't know - // exactly which were new vs updates, estimate using the - // per-element cost * added_count + average name length - let total_name_bytes: usize = entries.iter().map(|(e, _)| e.len()).sum(); - let avg_name = total_name_bytes / entries.len(); - result.added_count.saturating_mul(per_elem + avg_name) - } else { - 0 - }; - - if let Some(ref err) = result.error { - // partial success — track what was applied before returning error entry.touch(); - self.next_version += 1; - entry.version = self.next_version; entry.cached_value_size = entry.cached_value_size.saturating_add(bytes_added); self.memory.grow_by(bytes_added); - // return the partial results for AOF persistence - return Err(VectorWriteError::PartialBatch { - message: format!( - "batch error: {err} ({} vectors added before failure)", - result.added_count, - ), - applied: entries, - }); + if let Some(err) = result.error { + // partial success — track what was applied before returning error + Err((err, result.added_count)) + } else { + Ok((result.added_count, bytes_added)) + } } - - (result.added_count, bytes_added) + _ => return Err(VectorWriteError::WrongType), } - _ => return Err(VectorWriteError::WrongType), }; - - let (added_count, bytes_added) = batch_result; - entry.touch(); - self.next_version += 1; - entry.version = self.next_version; - // incremental tracking — no full-set rescan via memory::value_size() - entry.cached_value_size = entry.cached_value_size.saturating_add(bytes_added); - self.memory.grow_by(bytes_added); - - Ok(VAddBatchResult { - added_count, - applied: entries, - }) + // entry borrow released — safe to call bump_version + + match batch_outcome { + Err((err, added_count)) => { + self.bump_version(key); + Err(VectorWriteError::PartialBatch { + message: format!( + "batch error: {err} ({added_count} vectors added before failure)", + ), + applied: entries, + }) + } + Ok((added_count, _bytes_added)) => { + self.bump_version(key); + Ok(VAddBatchResult { + added_count, + applied: entries, + }) + } + } } /// Searches for the k nearest neighbors in a vector set. @@ -282,13 +282,12 @@ impl Keyspace { if removed { entry.touch(); - self.next_version += 1; - entry.version = self.next_version; let is_empty = matches!(entry.value, Value::Vector(ref vs) if vs.is_empty()); let new_vs = memory::value_size(&entry.value); entry.cached_value_size = new_vs; let new_size = key.len() + new_vs + memory::ENTRY_OVERHEAD; self.memory.adjust(old_size, new_size); + self.bump_version(key); if is_empty { self.memory.remove_with_size(new_size); diff --git a/crates/ember-core/src/keyspace/zset.rs b/crates/ember-core/src/keyspace/zset.rs index 4ae9ae8a..15c1c76f 100644 --- a/crates/ember-core/src/keyspace/zset.rs +++ b/crates/ember-core/src/keyspace/zset.rs @@ -82,7 +82,6 @@ impl Keyspace { return Err(WrongType); } - let ver = self.next_ver(); let Some(entry) = self.entries.get_mut(key) else { return Ok(vec![]); }; @@ -98,13 +97,14 @@ impl Keyspace { } } entry.touch(); - if !removed.is_empty() { - entry.version = ver; - } let is_empty = matches!(&entry.value, Value::SortedSet(ss) if ss.is_empty()); self.cleanup_after_remove(key, old_entry_size, is_empty, removed_bytes); + if !removed.is_empty() { + self.bump_version(key); + } + Ok(removed) } @@ -346,7 +346,6 @@ impl Keyspace { return Err(WrongType); } - let ver = self.next_ver(); let Some(entry) = self.entries.get_mut(key) else { return Ok(vec![]); }; @@ -363,13 +362,16 @@ impl Keyspace { }; if !popped.is_empty() { - entry.version = ver; entry.touch(); } let is_empty = matches!(&entry.value, Value::SortedSet(ss) if ss.is_empty()); self.cleanup_after_remove(key, old_entry_size, is_empty, removed_bytes); + if !popped.is_empty() { + self.bump_version(key); + } + Ok(popped) } @@ -386,7 +388,6 @@ impl Keyspace { return Err(WrongType); } - let ver = self.next_ver(); let Some(entry) = self.entries.get_mut(key) else { return Ok(vec![]); }; @@ -403,13 +404,16 @@ impl Keyspace { }; if !popped.is_empty() { - entry.version = ver; entry.touch(); } let is_empty = matches!(&entry.value, Value::SortedSet(ss) if ss.is_empty()); self.cleanup_after_remove(key, old_entry_size, is_empty, removed_bytes); + if !popped.is_empty() { + self.bump_version(key); + } + Ok(popped) } diff --git a/crates/ember-core/src/memory.rs b/crates/ember-core/src/memory.rs index 7ec7e1b1..c5bdb714 100644 --- a/crates/ember-core/src/memory.rs +++ b/crates/ember-core/src/memory.rs @@ -46,9 +46,13 @@ pub fn effective_limit(max_bytes: usize) -> usize { /// Estimated overhead per entry in the HashMap. /// /// Accounts for: the CompactString key (24 bytes inline on 64-bit), -/// Entry struct fields (Value enum tag + Bytes/collection inline storage -/// + expires_at_ms + cached_value_size + last_access_secs), plus hashbrown per-entry bookkeeping -/// (1 control byte + empty slot waste at ~87.5% load factor). +/// Entry struct fields (Value enum tag + Bytes/collection inline storage, +/// expires_at_ms, cached_value_size, last_access_secs), plus hashbrown +/// per-entry bookkeeping (1 control byte + empty slot waste at ~87.5% +/// load factor). +/// +/// Reduced from 128 to 120 after moving the 8-byte `version` field out of +/// Entry and into a lazily-populated side table on Keyspace. /// /// This is calibrated from `std::mem::size_of` on 64-bit platforms. The /// exact value varies by compiler version, but precision isn't critical — @@ -58,7 +62,7 @@ pub fn effective_limit(max_bytes: usize) -> usize { /// /// The `entry_overhead_not_too_small` test validates this constant against /// the actual struct sizes on each platform. -pub(crate) const ENTRY_OVERHEAD: usize = 128; +pub(crate) const ENTRY_OVERHEAD: usize = 120; /// Tracks memory usage for a single keyspace. ///