Skip to content

Commit 611a4fd

Browse files
authored
[router] bucket policy (#11719)
1 parent 9ea2c68 commit 611a4fd

File tree

12 files changed

+1435
-9
lines changed

12 files changed

+1435
-9
lines changed

sgl-router/py_src/sglang_router/router.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ def policy_from_str(policy_str: Optional[str]) -> PolicyType:
1919
"round_robin": PolicyType.RoundRobin,
2020
"cache_aware": PolicyType.CacheAware,
2121
"power_of_two": PolicyType.PowerOfTwo,
22+
"bucket": PolicyType.Bucket,
2223
}
2324
return policy_map[policy_str]
2425

sgl-router/py_src/sglang_router/router_args.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ class RouterArgs:
3434
eviction_interval_secs: int = 120
3535
max_tree_size: int = 2**26
3636
max_payload_size: int = 512 * 1024 * 1024 # 512MB default for large batches
37+
bucket_adjust_interval_secs: int = 5
3738
dp_aware: bool = False
3839
enable_igw: bool = False # Enable IGW (Inter-Gateway) mode for multi-model support
3940
api_key: Optional[str] = None
@@ -167,7 +168,7 @@ def add_cli_args(
167168
f"--{prefix}prefill-policy",
168169
type=str,
169170
default=None,
170-
choices=["random", "round_robin", "cache_aware", "power_of_two"],
171+
choices=["random", "round_robin", "cache_aware", "power_of_two", "bucket"],
171172
help="Specific policy for prefill nodes in PD mode. If not specified, uses the main policy",
172173
)
173174
parser.add_argument(
@@ -234,6 +235,12 @@ def add_cli_args(
234235
default=RouterArgs.balance_rel_threshold,
235236
help="Load balancing is triggered when (max_load - min_load) > abs_threshold AND max_load > min_load * rel_threshold. Otherwise, use cache aware",
236237
)
238+
parser.add_argument(
239+
f"--{prefix}bucket-adjust-interval-secs",
240+
type=int,
241+
default=RouterArgs.bucket_adjust_interval_secs,
242+
help="Interval in seconds between bucket boundary adjustment operations",
243+
)
237244
parser.add_argument(
238245
f"--{prefix}eviction-interval-secs",
239246
type=int,

sgl-router/src/config/types.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,16 @@ pub enum PolicyConfig {
263263

264264
#[serde(rename = "power_of_two")]
265265
PowerOfTwo { load_check_interval_secs: u64 },
266+
267+
#[serde(rename = "bucket")]
268+
Bucket {
269+
/// Absolute load difference threshold for load balancing
270+
balance_abs_threshold: usize,
271+
/// Relative load ratio threshold for load balancing
272+
balance_rel_threshold: f32,
273+
/// Interval between bucket boundary adjustment cycles (seconds)
274+
bucket_adjust_interval_secs: usize,
275+
},
266276
}
267277

268278
impl PolicyConfig {
@@ -272,6 +282,7 @@ impl PolicyConfig {
272282
PolicyConfig::RoundRobin => "round_robin",
273283
PolicyConfig::CacheAware { .. } => "cache_aware",
274284
PolicyConfig::PowerOfTwo { .. } => "power_of_two",
285+
PolicyConfig::Bucket { .. } => "bucket",
275286
}
276287
}
277288
}
@@ -728,6 +739,28 @@ mod tests {
728739
}
729740
}
730741

742+
#[test]
743+
fn test_bucket_parameters() {
744+
let bucket = PolicyConfig::Bucket {
745+
balance_abs_threshold: 20,
746+
balance_rel_threshold: 2.0,
747+
bucket_adjust_interval_secs: 5,
748+
};
749+
750+
match bucket {
751+
PolicyConfig::Bucket {
752+
balance_abs_threshold,
753+
balance_rel_threshold,
754+
bucket_adjust_interval_secs,
755+
} => {
756+
assert_eq!(balance_abs_threshold, 20);
757+
assert!((balance_rel_threshold - 2.0).abs() < 0.0001);
758+
assert_eq!(bucket_adjust_interval_secs, 5);
759+
}
760+
_ => panic!("Expected Bucket"),
761+
}
762+
}
763+
731764
#[test]
732765
fn test_discovery_config_default() {
733766
let config = DiscoveryConfig::default();

sgl-router/src/config/validation.rs

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,34 @@ impl ConfigValidator {
209209
});
210210
}
211211
}
212+
PolicyConfig::Bucket {
213+
balance_abs_threshold: _,
214+
balance_rel_threshold,
215+
bucket_adjust_interval_secs,
216+
} => {
217+
if *balance_rel_threshold < 1.0 {
218+
return Err(ConfigError::InvalidValue {
219+
field: "balance_rel_threshold".to_string(),
220+
value: balance_rel_threshold.to_string(),
221+
reason: "Must be >= 1.0".to_string(),
222+
});
223+
}
224+
225+
if *bucket_adjust_interval_secs < 1 {
226+
return Err(ConfigError::InvalidValue {
227+
field: "bucket_adjust_interval_secs".to_string(),
228+
value: bucket_adjust_interval_secs.to_string(),
229+
reason: "Must be >= 1s".to_string(),
230+
});
231+
}
232+
if *bucket_adjust_interval_secs >= 4294967296 {
233+
return Err(ConfigError::InvalidValue {
234+
field: "bucket_adjust_interval_secs".to_string(),
235+
value: bucket_adjust_interval_secs.to_string(),
236+
reason: "Must be < 4294967296s".to_string(),
237+
});
238+
}
239+
}
212240
}
213241
Ok(())
214242
}
@@ -505,6 +533,13 @@ impl ConfigValidator {
505533
});
506534
}
507535
}
536+
537+
// Check bucket for decode
538+
if let Some(PolicyConfig::Bucket { .. }) = decode_policy {
539+
return Err(ConfigError::IncompatibleConfig {
540+
reason: "Decode policy should not be allowed to be bucket".to_string(),
541+
});
542+
}
508543
}
509544
}
510545

@@ -792,6 +827,67 @@ mod tests {
792827
}
793828
}
794829

830+
#[test]
831+
fn test_validate_pd_mode_bucket_policy_restrictions() {
832+
let config = RouterConfig::new(
833+
RoutingMode::PrefillDecode {
834+
prefill_urls: vec![
835+
("http://prefill1:8000".to_string(), None),
836+
("http://prefill2:8000".to_string(), None),
837+
],
838+
decode_urls: vec![
839+
"http://decode1:8000".to_string(),
840+
"http://decode2:8000".to_string(),
841+
],
842+
prefill_policy: Some(PolicyConfig::Bucket {
843+
balance_abs_threshold: 32,
844+
balance_rel_threshold: 1.1,
845+
bucket_adjust_interval_secs: 5,
846+
}),
847+
decode_policy: Some(PolicyConfig::PowerOfTwo {
848+
load_check_interval_secs: 60,
849+
}),
850+
},
851+
PolicyConfig::Random, // Main policy as fallback
852+
);
853+
854+
let result = ConfigValidator::validate(&config);
855+
assert!(
856+
result.is_ok(),
857+
"Prefill policy should be allowed to be bucket"
858+
);
859+
860+
let config = RouterConfig::new(
861+
RoutingMode::PrefillDecode {
862+
prefill_urls: vec![
863+
("http://prefill1:8000".to_string(), None),
864+
("http://prefill2:8000".to_string(), None),
865+
],
866+
decode_urls: vec![
867+
"http://decode1:8000".to_string(),
868+
"http://decode2:8000".to_string(),
869+
],
870+
prefill_policy: Some(PolicyConfig::Bucket {
871+
balance_abs_threshold: 32,
872+
balance_rel_threshold: 1.1,
873+
bucket_adjust_interval_secs: 5,
874+
}),
875+
decode_policy: Some(PolicyConfig::Bucket {
876+
balance_abs_threshold: 32,
877+
balance_rel_threshold: 1.1,
878+
bucket_adjust_interval_secs: 5,
879+
}),
880+
},
881+
PolicyConfig::Random, // Main policy as fallback
882+
);
883+
884+
let result = ConfigValidator::validate(&config);
885+
assert!(
886+
result.is_err(),
887+
"Decode policy should not be allowed to be bucket"
888+
);
889+
}
890+
795891
#[test]
796892
fn test_validate_grpc_requires_tokenizer() {
797893
let mut config = RouterConfig::new(

sgl-router/src/core/workflow/steps/worker_registration.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -756,6 +756,13 @@ impl StepExecutor for UpdatePoliciesStep {
756756
.init_cache_aware_policy(&model_id, &all_workers);
757757
}
758758
}
759+
let prefill_workers = app_context.worker_registry.get_prefill_workers();
760+
let policy = app_context.policy_registry.get_prefill_policy();
761+
if policy.name() == "bucket" {
762+
app_context
763+
.policy_registry
764+
.init_pd_bucket_policies(&prefill_workers);
765+
}
759766

760767
debug!(
761768
"Updated policies for worker {} (model: {})",

sgl-router/src/lib.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ pub enum PolicyType {
2929
RoundRobin,
3030
CacheAware,
3131
PowerOfTwo,
32+
Bucket,
3233
}
3334

3435
#[pyclass(eq)]
@@ -169,6 +170,8 @@ struct Router {
169170
request_timeout_secs: u64,
170171
request_id_headers: Option<Vec<String>>,
171172
pd_disaggregation: bool,
173+
// Takes effect in PD mode and when policy = bucket
174+
bucket_adjust_interval_secs: usize,
172175
prefill_urls: Option<Vec<(String, Option<u16>)>>,
173176
decode_urls: Option<Vec<String>>,
174177
prefill_policy: Option<PolicyType>,
@@ -244,6 +247,11 @@ impl Router {
244247
PolicyType::PowerOfTwo => ConfigPolicyConfig::PowerOfTwo {
245248
load_check_interval_secs: 5,
246249
},
250+
PolicyType::Bucket => ConfigPolicyConfig::Bucket {
251+
balance_abs_threshold: self.balance_abs_threshold,
252+
balance_rel_threshold: self.balance_rel_threshold,
253+
bucket_adjust_interval_secs: self.bucket_adjust_interval_secs,
254+
},
247255
}
248256
};
249257

@@ -407,6 +415,7 @@ impl Router {
407415
request_timeout_secs = 1800,
408416
request_id_headers = None,
409417
pd_disaggregation = false,
418+
bucket_adjust_interval_secs = 5,
410419
prefill_urls = None,
411420
decode_urls = None,
412421
prefill_policy = None,
@@ -480,6 +489,7 @@ impl Router {
480489
request_timeout_secs: u64,
481490
request_id_headers: Option<Vec<String>>,
482491
pd_disaggregation: bool,
492+
bucket_adjust_interval_secs: usize,
483493
prefill_urls: Option<Vec<(String, Option<u16>)>>,
484494
decode_urls: Option<Vec<String>>,
485495
prefill_policy: Option<PolicyType>,
@@ -566,6 +576,7 @@ impl Router {
566576
request_timeout_secs,
567577
request_id_headers,
568578
pd_disaggregation,
579+
bucket_adjust_interval_secs,
569580
prefill_urls,
570581
decode_urls,
571582
prefill_policy,

0 commit comments

Comments
 (0)