Skip to content

Commit d35c57d

Browse files
committed
Add subsetting logic for epp
1 parent 191e710 commit d35c57d

File tree

12 files changed

+455
-23
lines changed

12 files changed

+455
-23
lines changed

cmd/epp/runner/runner.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import (
4545
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector"
4646
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
4747
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
48+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/filter"
4849
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/multi/prefix"
4950
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker"
5051
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile"
@@ -275,6 +276,7 @@ func (r *Runner) initializeScheduler(datastore datastore.Datastore) (*scheduling
275276
kvCacheScorerWeight := envutil.GetEnvInt("KV_CACHE_SCORE_WEIGHT", scorer.DefaultKVCacheScorerWeight, setupLog)
276277

277278
schedulerProfile := framework.NewSchedulerProfile().
279+
WithFilters(filter.NewSubsetFilter()).
278280
WithScorers(framework.NewWeightedScorer(scorer.NewQueueScorer(), queueScorerWeight),
279281
framework.NewWeightedScorer(scorer.NewKVCacheScorer(), kvCacheScorerWeight)).
280282
WithPicker(picker.NewMaxScorePicker())

pkg/epp/handlers/server.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,9 @@ type RequestContext struct {
111111
}
112112

113113
type Request struct {
114-
Headers map[string]string
115-
Body map[string]interface{}
114+
Headers map[string]string
115+
Body map[string]interface{}
116+
Metadata map[string]any
116117
}
117118
type Response struct {
118119
Headers map[string]string
@@ -141,8 +142,9 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
141142
reqCtx := &RequestContext{
142143
RequestState: RequestReceived,
143144
Request: &Request{
144-
Headers: make(map[string]string),
145-
Body: make(map[string]interface{}),
145+
Headers: make(map[string]string),
146+
Body: make(map[string]interface{}),
147+
Metadata: make(map[string]any),
146148
},
147149
Response: &Response{
148150
Headers: make(map[string]string),
@@ -185,6 +187,8 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
185187
return status.Errorf(codes.Unknown, "cannot receive stream request: %v", err)
186188
}
187189

190+
reqCtx.Request.Metadata = requtil.ExtractMetadataValues(req)
191+
188192
switch v := req.Request.(type) {
189193
case *extProcPb.ProcessingRequest_RequestHeaders:
190194
if requestId := requtil.ExtractHeaderValue(v, requtil.RequestIdHeaderKey); len(requestId) > 0 {

pkg/epp/requestcontrol/director.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -111,12 +111,13 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
111111
}
112112

113113
// Prepare LLMRequest (needed for both saturation detection and Scheduler)
114-
reqCtx.SchedulingRequest = &schedulingtypes.LLMRequest{
115-
RequestId: reqCtx.Request.Headers[requtil.RequestIdHeaderKey],
116-
TargetModel: reqCtx.ResolvedTargetModel,
117-
Prompt: prompt,
118-
Headers: reqCtx.Request.Headers,
119-
}
114+
reqCtx.SchedulingRequest = schedulingtypes.NewLLMRequest(
115+
reqCtx.Request.Headers[requtil.RequestIdHeaderKey],
116+
reqCtx.ResolvedTargetModel,
117+
prompt,
118+
reqCtx.Request.Headers,
119+
reqCtx.Request.Metadata)
120+
120121
logger = logger.WithValues(
121122
"model", reqCtx.Model,
122123
"resolvedTargetModel", reqCtx.ResolvedTargetModel,

pkg/epp/scheduling/framework/plugins/filter/filter_test.go

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,3 +247,128 @@ func TestLoRASoftAffinityDistribution(t *testing.T) {
247247
actualAvailablePercent, availableLowerBound, availableUpperBound)
248248
}
249249
}
250+
251+
func TestSubsettingFilter(t *testing.T) {
252+
var makeFilterMetadata = func(data []interface{}) map[string]any {
253+
return map[string]any{
254+
"envoy.lb.subset_hint": map[string]any{
255+
"x-gateway-destination-endpoint-subset": data,
256+
},
257+
}
258+
}
259+
260+
tests := []struct {
261+
name string
262+
metadata map[string]any
263+
filter framework.Filter
264+
input []types.Pod
265+
output []types.Pod
266+
}{
267+
{
268+
name: "SubsetFilter, filter not present — return all pods",
269+
filter: &SubsetFilter{},
270+
metadata: map[string]any{},
271+
input: []types.Pod{
272+
&types.PodMetrics{
273+
Pod: &backend.Pod{Address: "10.0.0.1"},
274+
},
275+
&types.PodMetrics{
276+
Pod: &backend.Pod{Address: "10.0.0.2"},
277+
},
278+
},
279+
output: []types.Pod{
280+
&types.PodMetrics{
281+
Pod: &backend.Pod{Address: "10.0.0.1"},
282+
},
283+
&types.PodMetrics{
284+
Pod: &backend.Pod{Address: "10.0.0.2"},
285+
},
286+
},
287+
},
288+
{
289+
name: "SubsetFilter, namespace present filter not present — return all pods",
290+
filter: &SubsetFilter{},
291+
metadata: map[string]any{"envoy.lb.subset_hint": map[string]any{}},
292+
input: []types.Pod{
293+
&types.PodMetrics{
294+
Pod: &backend.Pod{Address: "10.0.0.1"},
295+
},
296+
&types.PodMetrics{
297+
Pod: &backend.Pod{Address: "10.0.0.2"},
298+
},
299+
},
300+
output: []types.Pod{
301+
&types.PodMetrics{
302+
Pod: &backend.Pod{Address: "10.0.0.1"},
303+
},
304+
&types.PodMetrics{
305+
Pod: &backend.Pod{Address: "10.0.0.2"},
306+
},
307+
},
308+
},
309+
{
310+
name: "SubsetFilter, subset with one matching pod",
311+
metadata: makeFilterMetadata([]interface{}{"10.0.0.1"}),
312+
filter: &SubsetFilter{},
313+
input: []types.Pod{
314+
&types.PodMetrics{
315+
Pod: &backend.Pod{Address: "10.0.0.1"},
316+
},
317+
&types.PodMetrics{
318+
Pod: &backend.Pod{Address: "10.0.0.2"},
319+
},
320+
},
321+
output: []types.Pod{
322+
&types.PodMetrics{
323+
Pod: &backend.Pod{Address: "10.0.0.1"},
324+
},
325+
},
326+
},
327+
{
328+
name: "SubsetFilter, subset with multiple matching pods",
329+
metadata: makeFilterMetadata([]interface{}{"10.0.0.1", "10.0.0.2", "10.0.0.3"}),
330+
filter: &SubsetFilter{},
331+
input: []types.Pod{
332+
&types.PodMetrics{
333+
Pod: &backend.Pod{Address: "10.0.0.1"},
334+
},
335+
&types.PodMetrics{
336+
Pod: &backend.Pod{Address: "10.0.0.2"},
337+
},
338+
},
339+
output: []types.Pod{
340+
&types.PodMetrics{
341+
Pod: &backend.Pod{Address: "10.0.0.1"},
342+
},
343+
&types.PodMetrics{
344+
Pod: &backend.Pod{Address: "10.0.0.2"},
345+
},
346+
},
347+
},
348+
{
349+
name: "SubsetFilter, subset with no matching pods",
350+
metadata: makeFilterMetadata([]interface{}{"10.0.0.3"}),
351+
filter: &SubsetFilter{},
352+
input: []types.Pod{
353+
&types.PodMetrics{
354+
Pod: &backend.Pod{Address: "10.0.0.1"},
355+
},
356+
&types.PodMetrics{
357+
Pod: &backend.Pod{Address: "10.0.0.2"},
358+
},
359+
},
360+
output: []types.Pod{},
361+
},
362+
}
363+
364+
for _, test := range tests {
365+
t.Run(test.name, func(t *testing.T) {
366+
req := types.NewLLMRequest(uuid.NewString(), "", "", nil, test.metadata)
367+
got := test.filter.Filter(context.Background(), req, types.NewCycleState(), test.input)
368+
369+
if diff := cmp.Diff(test.output, got); diff != "" {
370+
t.Errorf("Unexpected output (-want +got): %v", diff)
371+
}
372+
})
373+
}
374+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package filter
18+
19+
import (
20+
"context"
21+
"strings"
22+
23+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
24+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
25+
)
26+
27+
const (
28+
subsetHintKey = "x-gateway-destination-endpoint-subset"
29+
subsetHintNamespace = "envoy.lb.subset_hint"
30+
)
31+
32+
// compile-time type assertion
33+
var _ framework.Filter = &SubsetFilter{}
34+
35+
// NewSubsetFilter initializes a new SubsetFilter.
36+
func NewSubsetFilter() *SubsetFilter {
37+
return &SubsetFilter{}
38+
}
39+
40+
// SubsetFilter filters Pods based on the subset hint provided by the proxy via filterMetadata.
41+
type SubsetFilter struct{}
42+
43+
// Name returns the name of the filter.
44+
func (f *SubsetFilter) Name() string {
45+
return "subset-hint"
46+
}
47+
48+
// Filter filters out pods that are not in the subset provided in filterMetadata.
49+
func (f *SubsetFilter) Filter(_ context.Context, request *types.LLMRequest, _ *types.CycleState, pods []types.Pod) []types.Pod {
50+
// Check if subset namespace key is present in the metadata map
51+
subsetMap, found := request.GetMetadata()[subsetHintNamespace].(map[string]any)
52+
if !found {
53+
return pods
54+
}
55+
56+
// Check if endpoint key is present in the subset map and ensure there is at least one value
57+
endpointSubsetList, found := subsetMap[subsetHintKey].([]interface{})
58+
if !found || len(endpointSubsetList) == 0 {
59+
return pods
60+
}
61+
62+
// Create a map of endpoint addys for easy lookup
63+
endpoints := make(map[string]bool)
64+
for _, endpoint := range endpointSubsetList {
65+
epStr := strings.Split(endpoint.(string), ":")[0]
66+
endpoints[epStr] = true
67+
}
68+
69+
// Filter based on address
70+
filteredPods := []types.Pod{}
71+
for _, pod := range pods {
72+
if _, found := endpoints[pod.GetPod().Address]; found {
73+
filteredPods = append(filteredPods, pod)
74+
}
75+
}
76+
77+
return filteredPods
78+
}

pkg/epp/scheduling/scheduler.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ func NewScheduler(datastore Datastore) *Scheduler {
3838
// When the scheduler is initialized with NewScheduler function, thw below config will be used as default.
3939
// it's possible to call NewSchedulerWithConfig to pass a different scheduler config.
4040
// For build time plugins changes, it's recommended to call in main.go to NewSchedulerWithConfig.
41+
endpointSubsetFilter := filter.NewSubsetFilter()
4142
loraAffinityFilter := filter.NewLoraAffinityFilter()
4243
leastQueueFilter := filter.NewLeastQueueFilter()
4344
leastKvCacheFilter := filter.NewLeastKVCacheFilter()
@@ -65,7 +66,7 @@ func NewScheduler(datastore Datastore) *Scheduler {
6566
}
6667

6768
defaultProfile := framework.NewSchedulerProfile().
68-
WithFilters(lowLatencyFilter).
69+
WithFilters(endpointSubsetFilter, lowLatencyFilter).
6970
WithPicker(&picker.RandomPicker{})
7071

7172
profilePicker := profilepicker.NewSingleProfileHandler()

pkg/epp/scheduling/types/types.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,29 @@ type LLMRequest struct {
3333
Prompt string
3434
// Headers is a map of the request headers.
3535
Headers map[string]string
36+
37+
// metadata is a map of metadata in the request
38+
metadata map[string]any
39+
}
40+
41+
func NewLLMRequest(reqID, targetModel, prompt string, headers map[string]string, metadata map[string]any) *LLMRequest {
42+
return &LLMRequest{
43+
RequestId: reqID,
44+
TargetModel: targetModel,
45+
Prompt: prompt,
46+
Headers: headers,
47+
metadata: metadata,
48+
}
3649
}
3750

3851
func (r *LLMRequest) String() string {
3952
return fmt.Sprintf("RequestID: %s, TargetModel: %s, PromptLength: %d, Headers: %v", r.RequestId, r.TargetModel, len(r.Prompt), r.Headers)
4053
}
4154

55+
func (r *LLMRequest) GetMetadata() map[string]any {
56+
return r.metadata
57+
}
58+
4259
type Pod interface {
4360
GetPod() *backend.Pod
4461
GetMetrics() *backendmetrics.MetricsState

pkg/epp/util/request/metadata.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package request
18+
19+
import (
20+
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
21+
)
22+
23+
func ExtractMetadataValues(req *extProcPb.ProcessingRequest) map[string]any {
24+
metadata := make(map[string]any)
25+
if req != nil && req.MetadataContext != nil && req.MetadataContext.FilterMetadata != nil {
26+
for key, val := range req.MetadataContext.FilterMetadata {
27+
metadata[key] = val.AsMap()
28+
}
29+
}
30+
return metadata
31+
}

0 commit comments

Comments
 (0)