Skip to content

Commit 0f73b10

Browse files
rlakhtakiashmuelk
authored andcommitted
Add subsetting logic for epp (kubernetes-sigs#981)
1 parent 783f429 commit 0f73b10

File tree

12 files changed

+494
-23
lines changed

12 files changed

+494
-23
lines changed

cmd/epp/runner/runner.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector"
4747
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
4848
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
49+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/filter"
4950
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/multi/prefix"
5051
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker"
5152
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile"
@@ -285,6 +286,7 @@ func (r *Runner) initializeScheduler() (*scheduling.Scheduler, error) {
285286
kvCacheScorerWeight := envutil.GetEnvInt("KV_CACHE_SCORE_WEIGHT", scorer.DefaultKVCacheScorerWeight, setupLog)
286287

287288
schedulerProfile := framework.NewSchedulerProfile().
289+
WithFilters(filter.NewSubsetFilter()).
288290
WithScorers(framework.NewWeightedScorer(scorer.NewQueueScorer(), queueScorerWeight),
289291
framework.NewWeightedScorer(scorer.NewKVCacheScorer(), kvCacheScorerWeight)).
290292
WithPicker(picker.NewMaxScorePicker())

pkg/epp/handlers/server.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,9 @@ type RequestContext struct {
111111
}
112112

113113
type Request struct {
114-
Headers map[string]string
115-
Body map[string]interface{}
114+
Headers map[string]string
115+
Body map[string]interface{}
116+
Metadata map[string]any
116117
}
117118
type Response struct {
118119
Headers map[string]string
@@ -141,8 +142,9 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
141142
reqCtx := &RequestContext{
142143
RequestState: RequestReceived,
143144
Request: &Request{
144-
Headers: make(map[string]string),
145-
Body: make(map[string]interface{}),
145+
Headers: make(map[string]string),
146+
Body: make(map[string]interface{}),
147+
Metadata: make(map[string]any),
146148
},
147149
Response: &Response{
148150
Headers: make(map[string]string),
@@ -185,6 +187,8 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
185187
return status.Errorf(codes.Unknown, "cannot receive stream request: %v", err)
186188
}
187189

190+
reqCtx.Request.Metadata = requtil.ExtractMetadataValues(req)
191+
188192
switch v := req.Request.(type) {
189193
case *extProcPb.ProcessingRequest_RequestHeaders:
190194
if requestId := requtil.ExtractHeaderValue(v, requtil.RequestIdHeaderKey); len(requestId) > 0 {

pkg/epp/requestcontrol/director.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -118,14 +118,15 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
118118
}
119119

120120
// Prepare LLMRequest (needed for both saturation detection and Scheduler)
121-
reqCtx.SchedulingRequest = &schedulingtypes.LLMRequest{
122-
RequestId: reqCtx.Request.Headers[requtil.RequestIdHeaderKey],
123-
TargetModel: reqCtx.ResolvedTargetModel,
124-
Prompt: prompt,
125-
Headers: reqCtx.Request.Headers,
126-
}
121+
reqCtx.SchedulingRequest = schedulingtypes.NewLLMRequest(
122+
reqCtx.Request.Headers[requtil.RequestIdHeaderKey],
123+
reqCtx.ResolvedTargetModel,
124+
prompt,
125+
reqCtx.Request.Headers,
126+
reqCtx.Request.Metadata)
127127

128128
logger = logger.WithValues("model", reqCtx.Model, "resolvedTargetModel", reqCtx.ResolvedTargetModel, "criticality", requestCriticality)
129+
129130
ctx = log.IntoContext(ctx, logger)
130131
logger.V(logutil.DEBUG).Info("LLM request assembled")
131132

pkg/epp/scheduling/framework/plugins/filter/filter_test.go

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -533,3 +533,142 @@ const decisionTreeParametersErrorBadNextOnSuccessOrFailure = `
533533
}
534534
}
535535
`
536+
537+
func TestSubsettingFilter(t *testing.T) {
538+
var makeFilterMetadata = func(data []interface{}) map[string]any {
539+
return map[string]any{
540+
"envoy.lb.subset_hint": map[string]any{
541+
"x-gateway-destination-endpoint-subset": data,
542+
},
543+
}
544+
}
545+
546+
tests := []struct {
547+
name string
548+
metadata map[string]any
549+
filter framework.Filter
550+
input []types.Pod
551+
output []types.Pod
552+
}{
553+
{
554+
name: "SubsetFilter, filter not present — return all pods",
555+
filter: &SubsetFilter{},
556+
metadata: map[string]any{},
557+
input: []types.Pod{
558+
&types.PodMetrics{
559+
Pod: &backend.Pod{Address: "10.0.0.1"},
560+
},
561+
&types.PodMetrics{
562+
Pod: &backend.Pod{Address: "10.0.0.2"},
563+
},
564+
},
565+
output: []types.Pod{
566+
&types.PodMetrics{
567+
Pod: &backend.Pod{Address: "10.0.0.1"},
568+
},
569+
&types.PodMetrics{
570+
Pod: &backend.Pod{Address: "10.0.0.2"},
571+
},
572+
},
573+
},
574+
{
575+
name: "SubsetFilter, namespace present filter not present — return all pods",
576+
filter: &SubsetFilter{},
577+
metadata: map[string]any{"envoy.lb.subset_hint": map[string]any{}},
578+
input: []types.Pod{
579+
&types.PodMetrics{
580+
Pod: &backend.Pod{Address: "10.0.0.1"},
581+
},
582+
&types.PodMetrics{
583+
Pod: &backend.Pod{Address: "10.0.0.2"},
584+
},
585+
},
586+
output: []types.Pod{
587+
&types.PodMetrics{
588+
Pod: &backend.Pod{Address: "10.0.0.1"},
589+
},
590+
&types.PodMetrics{
591+
Pod: &backend.Pod{Address: "10.0.0.2"},
592+
},
593+
},
594+
},
595+
{
596+
name: "SubsetFilter, filter present with empty list — return no pods",
597+
filter: &SubsetFilter{},
598+
metadata: makeFilterMetadata([]interface{}{}),
599+
input: []types.Pod{
600+
&types.PodMetrics{
601+
Pod: &backend.Pod{Address: "10.0.0.1"},
602+
},
603+
&types.PodMetrics{
604+
Pod: &backend.Pod{Address: "10.0.0.2"},
605+
},
606+
},
607+
output: []types.Pod{},
608+
},
609+
{
610+
name: "SubsetFilter, subset with one matching pod",
611+
metadata: makeFilterMetadata([]interface{}{"10.0.0.1"}),
612+
filter: &SubsetFilter{},
613+
input: []types.Pod{
614+
&types.PodMetrics{
615+
Pod: &backend.Pod{Address: "10.0.0.1"},
616+
},
617+
&types.PodMetrics{
618+
Pod: &backend.Pod{Address: "10.0.0.2"},
619+
},
620+
},
621+
output: []types.Pod{
622+
&types.PodMetrics{
623+
Pod: &backend.Pod{Address: "10.0.0.1"},
624+
},
625+
},
626+
},
627+
{
628+
name: "SubsetFilter, subset with multiple matching pods",
629+
metadata: makeFilterMetadata([]interface{}{"10.0.0.1", "10.0.0.2", "10.0.0.3"}),
630+
filter: &SubsetFilter{},
631+
input: []types.Pod{
632+
&types.PodMetrics{
633+
Pod: &backend.Pod{Address: "10.0.0.1"},
634+
},
635+
&types.PodMetrics{
636+
Pod: &backend.Pod{Address: "10.0.0.2"},
637+
},
638+
},
639+
output: []types.Pod{
640+
&types.PodMetrics{
641+
Pod: &backend.Pod{Address: "10.0.0.1"},
642+
},
643+
&types.PodMetrics{
644+
Pod: &backend.Pod{Address: "10.0.0.2"},
645+
},
646+
},
647+
},
648+
{
649+
name: "SubsetFilter, subset with no matching pods",
650+
metadata: makeFilterMetadata([]interface{}{"10.0.0.3"}),
651+
filter: &SubsetFilter{},
652+
input: []types.Pod{
653+
&types.PodMetrics{
654+
Pod: &backend.Pod{Address: "10.0.0.1"},
655+
},
656+
&types.PodMetrics{
657+
Pod: &backend.Pod{Address: "10.0.0.2"},
658+
},
659+
},
660+
output: []types.Pod{},
661+
},
662+
}
663+
664+
for _, test := range tests {
665+
t.Run(test.name, func(t *testing.T) {
666+
req := types.NewLLMRequest(uuid.NewString(), "", "", nil, test.metadata)
667+
got := test.filter.Filter(context.Background(), types.NewCycleState(), req, test.input)
668+
669+
if diff := cmp.Diff(test.output, got); diff != "" {
670+
t.Errorf("Unexpected output (-want +got): %v", diff)
671+
}
672+
})
673+
}
674+
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package filter
18+
19+
import (
20+
"context"
21+
"strings"
22+
23+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
24+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
25+
)
26+
27+
const (
28+
SubsetFilterType = "subset"
29+
30+
subsetHintKey = "x-gateway-destination-endpoint-subset"
31+
subsetHintNamespace = "envoy.lb.subset_hint"
32+
)
33+
34+
// compile-time type assertion
35+
var _ framework.Filter = &SubsetFilter{}
36+
37+
// NewSubsetFilter initializes a new SubsetFilter.
38+
func NewSubsetFilter() *SubsetFilter {
39+
return &SubsetFilter{}
40+
}
41+
42+
// SubsetFilter filters Pods based on the subset hint provided by the proxy via filterMetadata.
43+
type SubsetFilter struct{}
44+
45+
// Name returns the name of the filter.
46+
func (f *SubsetFilter) Name() string {
47+
return "subset-hint"
48+
}
49+
50+
// Type returns the type of the filter.
51+
func (f *SubsetFilter) Type() string {
52+
return SubsetFilterType
53+
}
54+
55+
// Filter filters out pods that are not in the subset provided in filterMetadata.
56+
func (f *SubsetFilter) Filter(_ context.Context, _ *types.CycleState, request *types.LLMRequest, pods []types.Pod) []types.Pod {
57+
// Check if subset namespace key is present in the metadata map
58+
subsetMap, found := request.GetMetadata()[subsetHintNamespace].(map[string]any)
59+
if !found {
60+
return pods
61+
}
62+
63+
// Check if endpoint key is present in the subset map and ensure there is at least one value
64+
endpointSubsetList, found := subsetMap[subsetHintKey].([]interface{})
65+
if !found {
66+
return pods
67+
} else if len(endpointSubsetList) == 0 {
68+
return []types.Pod{}
69+
}
70+
71+
// Create a map of endpoint addrs for easy lookup
72+
endpoints := make(map[string]bool)
73+
for _, endpoint := range endpointSubsetList {
74+
// Extract address from endpoint
75+
// The endpoint is formatted as "<address>:<port>" (ex. "10.0.1.0:8080")
76+
epStr := strings.Split(endpoint.(string), ":")[0]
77+
endpoints[epStr] = true
78+
}
79+
80+
// Filter based on address
81+
filteredPods := []types.Pod{}
82+
for _, pod := range pods {
83+
if _, found := endpoints[pod.GetPod().Address]; found {
84+
filteredPods = append(filteredPods, pod)
85+
}
86+
}
87+
88+
return filteredPods
89+
}

pkg/epp/scheduling/scheduler.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ func NewScheduler() *Scheduler {
4444
// it's possible to call NewSchedulerWithConfig to pass a different scheduler config.
4545
// For build time plugins changes, it's recommended to call in main.go to NewSchedulerWithConfig.
4646
loraAffinityFilter := filter.NewLoraAffinityFilter(config.Conf.LoraAffinityThreshold)
47+
endpointSubsetFilter := filter.NewSubsetFilter()
4748
leastQueueFilter := filter.NewLeastQueueFilter()
4849
leastKvCacheFilter := filter.NewLeastKVCacheFilter()
4950

@@ -70,7 +71,7 @@ func NewScheduler() *Scheduler {
7071
}
7172

7273
defaultProfile := framework.NewSchedulerProfile().
73-
WithFilters(lowLatencyFilter).
74+
WithFilters(endpointSubsetFilter, lowLatencyFilter).
7475
WithPicker(&picker.RandomPicker{})
7576

7677
profileHandler := profile.NewSingleProfileHandler()

pkg/epp/scheduling/types/types.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,29 @@ type LLMRequest struct {
3333
Prompt string
3434
// Headers is a map of the request headers.
3535
Headers map[string]string
36+
37+
// metadata is a map of metadata in the request
38+
metadata map[string]any
39+
}
40+
41+
func NewLLMRequest(reqID, targetModel, prompt string, headers map[string]string, metadata map[string]any) *LLMRequest {
42+
return &LLMRequest{
43+
RequestId: reqID,
44+
TargetModel: targetModel,
45+
Prompt: prompt,
46+
Headers: headers,
47+
metadata: metadata,
48+
}
3649
}
3750

3851
func (r *LLMRequest) String() string {
3952
return fmt.Sprintf("RequestID: %s, TargetModel: %s, PromptLength: %d, Headers: %v", r.RequestId, r.TargetModel, len(r.Prompt), r.Headers)
4053
}
4154

55+
func (r *LLMRequest) GetMetadata() map[string]any {
56+
return r.metadata
57+
}
58+
4259
type Pod interface {
4360
GetPod() *backend.Pod
4461
GetMetrics() *backendmetrics.MetricsState

pkg/epp/util/request/metadata.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package request
18+
19+
import (
20+
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
21+
)
22+
23+
func ExtractMetadataValues(req *extProcPb.ProcessingRequest) map[string]any {
24+
metadata := make(map[string]any)
25+
if req != nil && req.MetadataContext != nil && req.MetadataContext.FilterMetadata != nil {
26+
for key, val := range req.MetadataContext.FilterMetadata {
27+
metadata[key] = val.AsMap()
28+
}
29+
}
30+
return metadata
31+
}

0 commit comments

Comments
 (0)