Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,17 +107,18 @@ adaptive-timeout = { version = "0.0.1-alpha.4" }
ahash = "0.8.12"
anyhow = "1.0.100"
arc-swap = "1.8"
arrayvec = { version = "0.7" }
arrow = { version = "57.1.0", default-features = false }
assert2 = "0.3.16"
async-channel = "2.5.0"
async-trait = "0.1.89"
axum = { version = "0.8.8", default-features = false }
aws-config = "1.8.12"
aws-sdk-dynamodb = { version = "1.101.0", default-features = false, features = ["behavior-version-latest", "rt-tokio", "default-https-client"] }
aws-credential-types = "1.2.11"
aws-sdk-dynamodb = { version = "1.101.0", default-features = false, features = ["behavior-version-latest", "rt-tokio", "default-https-client"] }
aws-smithy-async = { version = "1.2.7", default-features = false }
aws-smithy-runtime-api = "1.9.3"
aws-smithy-types = "1.3.5"
axum = { version = "0.8.8", default-features = false }
base64 = "0.22"
bilrost = { version = "0.1014", default-features = false, features = ["std", "auto-optimize", "derive"] }
bincode = { version = "2.0.1", default-features = false }
Expand Down
9 changes: 5 additions & 4 deletions crates/limiter/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ mod key;
mod rule;
mod rule_store;

// Re-exports
pub use key::LimitKey;
pub use rule::{Pattern, RuleHandle, RulePattern};
pub use rule_store::{Limit, Rules, StructuredLimits};

/// Represents the hierarchy level of counters or rules
///
/// This enum is used throughout the limiter to identify which level
Expand Down Expand Up @@ -91,7 +96,3 @@ impl std::fmt::Display for Level {
}
}
}

pub use key::LimitKey;
pub use rule::RulePattern;
pub use rule_store::{Limit, Rules, StructuredLimits};
5 changes: 5 additions & 0 deletions crates/limiter/src/rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ impl<S: StringLike> Pattern<S> {
}
}

/// A pattern is never empty (wildcards have len 1, exact values are non-empty).
pub fn is_empty(&self) -> bool {
false
}

/// Matches the input and returns a ranking value's most signifcant bit
fn rank(&self, input: &str, level: Level) -> Option<u8> {
let msb = Level::COUNT - level as usize;
Expand Down
128 changes: 128 additions & 0 deletions crates/limiter/src/rule_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ impl<V> StructuredLimits<V> {
l2: Limit::Undefined,
}
}

pub fn limit_at(&self, level: Level) -> &Limit<V> {
match level {
Level::Scope => &self.scope,
Level::Level1 => &self.l1,
Level::Level2 => &self.l2,
}
}
}

impl<V> Default for StructuredLimits<V> {
Expand Down Expand Up @@ -149,6 +157,7 @@ impl<S: StringLike, V> Rules<S, V> {
}
store
}

/// Add a rule to the store.
pub fn add_rule(&mut self, pattern: RulePattern<S>, limit: V) {
let level = pattern.level();
Expand All @@ -162,6 +171,58 @@ impl<S: StringLike, V> Rules<S, V> {
}
}

/// Finds the rule with the matching pattern and updates its limit in-place
/// without invalidating rule handles. If no matching rule exists, inserts a
/// new one.
pub fn upsert_rule(&mut self, pattern: RulePattern<S>, limit: V) -> RuleHandle {
if let Some(handle) = self.find_rule(&pattern) {
self.limits.insert(handle, limit);
handle
} else {
let level = pattern.level();
let handle = self.rules.insert(pattern);
self.limits.insert(handle, limit);
match level {
Level::Scope => self.scope_rules.push(handle),
Level::Level1 => self.l1_rules.push(handle),
Level::Level2 => self.l2_rules.push(handle),
}
handle
}
}

/// Removes the rule matching the given pattern.
/// Returns the handle and old limit if found.
pub fn remove_rule(&mut self, pattern: &RulePattern<S>) -> Option<(RuleHandle, V)> {
let handle = self.find_rule(pattern)?;

let level_rules = match pattern.level() {
Level::Scope => &mut self.scope_rules,
Level::Level1 => &mut self.l1_rules,
Level::Level2 => &mut self.l2_rules,
};
if let Some(pos) = level_rules.iter().position(|&h| h == handle) {
level_rules.swap_remove(pos);
}

self.rules.remove(handle);
let limit = self.limits.remove(handle)?;
Some((handle, limit))
}

/// Finds an existing rule handle by matching its pattern.
fn find_rule(&self, pattern: &RulePattern<S>) -> Option<RuleHandle> {
let level_rules = match pattern.level() {
Level::Scope => &self.scope_rules,
Level::Level1 => &self.l1_rules,
Level::Level2 => &self.l2_rules,
};
level_rules
.iter()
.copied()
.find(|&h| self.rules.get(h).is_some_and(|p| *p == *pattern))
}

/// Clear all rules.
pub fn clear(&mut self) {
self.scope_rules.clear();
Expand Down Expand Up @@ -320,6 +381,73 @@ mod tests {
assert!(matches!(result.l1, Limit::Undefined));
}

#[test]
fn upsert_inserts_new_and_updates_existing() {
let mut store = store_with_rules(&[("*", 1000), ("scope1/*", 100)]);
assert_eq!(store.scope_rules.len(), 1);
assert_eq!(store.l1_rules.len(), 1);

// Upsert existing scope rule: updates limit, same handle
let pattern: RulePattern<String> = "*".parse().unwrap();
let h1 = store.find_rule(&pattern).unwrap();
let h2 = store.upsert_rule(pattern, 2000);
assert_eq!(h1, h2);
assert_eq!(store.scope_rules.len(), 1);
assert_eq!(*store.get_limit(h2).unwrap(), 2000);

// Upsert new L2 rule: inserts
let pattern: RulePattern<String> = "scope1/*/tenant1".parse().unwrap();
let h3 = store.upsert_rule(pattern, 10);
assert_eq!(store.l2_rules.len(), 1);
assert_eq!(*store.get_limit(h3).unwrap(), 10);

// Lookups reflect the updated and new rules
let result = store.lookup("scope1", &LimitKey::None);
assert!(matches!(result.scope, Limit::Defined(_, 2000)));
let key: LimitKey<String> = "foo/tenant1".parse().unwrap();
let result = store.lookup("scope1", &key);
assert!(matches!(result.l2, Limit::Defined(_, 10)));
}

#[test]
fn remove_rule_removes_and_returns_old_limit() {
let mut store = store_with_rules(&[
("*", 1000),
("scope1", 500),
("scope1/*", 100),
("scope1/tenant1", 50),
]);
assert_eq!(store.scope_rules.len(), 2);
assert_eq!(store.l1_rules.len(), 2);

// Remove an existing scope rule
let pattern: RulePattern<String> = "scope1".parse().unwrap();
let (handle, limit) = store.remove_rule(&pattern).unwrap();
assert_eq!(limit, 500);
assert_eq!(store.scope_rules.len(), 1);
assert!(store.rules.get(handle).is_none());
assert!(store.limits.get(handle).is_none());

// Lookups fall back to wildcard now
let result = store.lookup("scope1", &LimitKey::None);
assert!(matches!(result.scope, Limit::Defined(_, 1000)));

// Remove a non-existent rule returns None
let pattern: RulePattern<String> = "nonexistent".parse().unwrap();
assert!(store.remove_rule(&pattern).is_none());

// Remove an L1 rule
let pattern: RulePattern<String> = "scope1/tenant1".parse().unwrap();
let (_, limit) = store.remove_rule(&pattern).unwrap();
assert_eq!(limit, 50);
assert_eq!(store.l1_rules.len(), 1);

// Lookups fall back to wildcard L1
let key: LimitKey<String> = "tenant1".parse().unwrap();
let result = store.lookup("scope1", &key);
assert!(matches!(result.l1, Limit::Defined(_, 100)));
}

#[test]
fn lookup_only_populates_matching_depth() {
let store = store_with_rules(&[("*", 1000), ("*/*", 100), ("*/*/*", 10)]);
Expand Down
7 changes: 7 additions & 0 deletions crates/types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,13 @@ impl Scope {
}
}

// Needed for hashbrown's entry_ref API to lazily convert the key reference on insert.
impl From<&Scope> for Scope {
fn from(value: &Scope) -> Self {
value.clone()
}
}

impl AsRef<str> for Scope {
#[inline]
fn as_ref(&self) -> &str {
Expand Down
1 change: 1 addition & 0 deletions crates/vqueues/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ restate-storage-api = { workspace = true }
restate-types = { workspace = true }
restate-util-string = { workspace = true }

arrayvec = { workspace = true }
bilrost = { workspace = true, features = ["smallvec"] }
derive_more = { workspace = true, features = ["debug", "is_variant"] }
gardal = { workspace = true, features = ["tokio"] }
Expand Down
6 changes: 6 additions & 0 deletions crates/vqueues/src/scheduler/eligible.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,12 @@ impl EligibilityTracker {
}
}

pub fn wake_up_queues(&mut self, vqueues: impl IntoIterator<Item = VQueueHandle>) -> bool {
vqueues.into_iter().fold(false, |wake_up, vqueue| {
wake_up | self.wake_up_queue(vqueue)
})
}

/// returns true if the scheduler should be woken up
pub fn wake_up_queue(&mut self, vqueue: VQueueHandle) -> bool {
if let Some(state) = self.states.entry(vqueue) {
Expand Down
Loading
Loading