Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ COPY internal ./internal
COPY apix ./apix
COPY api ./api
COPY version ./version
COPY sidecars ./sidecars
WORKDIR /src/cmd/epp
RUN go build -ldflags="-X sigs.k8s.io/gateway-api-inference-extension/version.CommitSHA=${COMMIT_SHA} -X sigs.k8s.io/gateway-api-inference-extension/version.BuildRef=${BUILD_REF}" -o /epp

Expand Down
82 changes: 79 additions & 3 deletions cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package runner
import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"flag"
"fmt"
Expand Down Expand Up @@ -69,13 +70,15 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/multi/prefix"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/scorer"
testfilter "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/test/filter"
runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/env"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
latencypredictor "sigs.k8s.io/gateway-api-inference-extension/sidecars/latencypredictorasync"
"sigs.k8s.io/gateway-api-inference-extension/version"
)

Expand Down Expand Up @@ -126,6 +129,7 @@ var (
"then a self-signed certificate is used.")
// metric flags
totalQueuedRequestsMetric = flag.String("total-queued-requests-metric", runserver.DefaultTotalQueuedRequestsMetric, "Prometheus metric for the number of queued requests.")
totalRunningRequestsMetric = flag.String("total-running-requests-metric", runserver.DefaultTotalRunningRequestsMetric, "Prometheus metric for the number of running requests.")
kvCacheUsagePercentageMetric = flag.String("kv-cache-usage-percentage-metric", runserver.DefaultKvCacheUsagePercentageMetric, "Prometheus metric for the fraction of KV-cache blocks currently in use (from 0 to 1).")
// LoRA metrics
loraInfoMetric = flag.String("lora-info-metric", runserver.DefaultLoraInfoMetric, "Prometheus metric for the LoRA info metrics (must be in vLLM label format).")
Expand All @@ -145,7 +149,10 @@ var (
modelServerMetricsScheme = flag.String("model-server-metrics-scheme", "http", "Scheme to scrape metrics from pods")
modelServerMetricsHttpsInsecureSkipVerify = flag.Bool("model-server-metrics-https-insecure-skip-verify", true, "When using 'https' scheme for 'model-server-metrics-scheme', configure 'InsecureSkipVerify' (default to true)")
haEnableLeaderElection = flag.Bool("ha-enable-leader-election", false, "Enables leader election for high availability. When enabled, readiness probes will only pass on the leader.")
tracing = flag.Bool("tracing", true, "Enables emitting traces")

// Latency Predictor Flag
enableLatencyPredictor = flag.Bool("enable-latency-predictor", false, "Enable the regression-based latency predictor and scheduler scorer.")
tracing = flag.Bool("tracing", true, "Enables emitting traces")

setupLog = ctrl.Log.WithName("setup")
)
Expand Down Expand Up @@ -227,7 +234,20 @@ func (r *Runner) Run(ctx context.Context) error {
return err
}

rawConfig, err := r.parseConfigurationPhaseOne(ctx)
// ===================================================================
// == Latency Predictor Integration
// ===================================================================
var predictor latencypredictor.PredictorInterface // Use the interface type
if *enableLatencyPredictor {
setupLog.Info("Latency predictor is enabled. Initializing...")
predictor = latencypredictor.New(latencypredictor.ConfigFromEnv(), ctrl.Log.WithName("latency-predictor"))
} else {
setupLog.Info("Latency predictor is disabled.")
predictor = nil // This will be a true nil interface
}
// ===================================================================

rawConfig, err := r.parseConfigurationPhaseOne(ctx, predictor)
if err != nil {
setupLog.Error(err, "Failed to parse configuration")
return err
Expand Down Expand Up @@ -366,6 +386,7 @@ func (r *Runner) Run(ctx context.Context) error {
Director: director,
SaturationDetector: saturationDetector,
UseExperimentalDatalayerV2: r.featureGates[datalayer.FeatureGate], // pluggable data layer feature flag
LatencyPredictor: predictor,
}
if err := serverRunner.SetupWithManager(ctx, mgr); err != nil {
setupLog.Error(err, "Failed to setup EPP controllers")
Expand All @@ -383,6 +404,12 @@ func (r *Runner) Run(ctx context.Context) error {
return err
}

if *enableLatencyPredictor && predictor != nil {
if err := registerLatencyPredictorServer(mgr, predictor); err != nil {
return err
}
}

// --- Start Manager ---
// This blocks until a signal is received.
setupLog.Info("Controller manager starting")
Expand Down Expand Up @@ -432,7 +459,14 @@ func (r *Runner) registerInTreePlugins() {
plugins.Register(testresponsereceived.DestinationEndpointServedVerifierType, testresponsereceived.DestinationEndpointServedVerifierFactory)
}

func (r *Runner) parseConfigurationPhaseOne(ctx context.Context) (*configapi.EndpointPickerConfig, error) {
func (r *Runner) registerLatencyPredictorPlugins(predictor latencypredictor.PredictorInterface) {
plugins.Register(slo_aware_router.SLOAwareRouterPluginType, func(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
return slo_aware_router.NewSLOAwareRouter(predictor, slo_aware_router.HeadroomSelectionStrategy).WithName(name), nil
})
plugins.Register(profile.SLOAwareProfileHandlerType, profile.SLOAwareProfileHandlerFactory)
}

func (r *Runner) parseConfigurationPhaseOne(ctx context.Context, predictor latencypredictor.PredictorInterface) (*configapi.EndpointPickerConfig, error) {
if *configText == "" && *configFile == "" {
return nil, nil // configuring through code, not through file
}
Expand All @@ -454,6 +488,12 @@ func (r *Runner) parseConfigurationPhaseOne(ctx context.Context) (*configapi.End
loader.RegisterFeatureGate(flowcontrol.FeatureGate)

r.registerInTreePlugins()
// If we have a latency predictor enabled and predictor and datastore are not nil,
// register the latency predictor plugins (currently just the SLO scorer).
if *enableLatencyPredictor && predictor != nil {
setupLog.Info("Registering latency predictor plugins")
r.registerLatencyPredictorPlugins(predictor)
}

rawConfig, featureGates, err := loader.LoadConfigPhaseOne(configBytes, logger)
if err != nil {
Expand Down Expand Up @@ -538,6 +578,7 @@ func (r *Runner) setupMetricsCollection(setupLog logr.Logger, useExperimentalDat
func setupMetricsV1(setupLog logr.Logger) (datalayer.EndpointFactory, error) {
mapping, err := backendmetrics.NewMetricMapping(
*totalQueuedRequestsMetric,
*totalRunningRequestsMetric,
*kvCacheUsagePercentageMetric,
*loraInfoMetric,
*cacheInfoMetric,
Expand Down Expand Up @@ -586,6 +627,7 @@ func setupDatalayer(logger logr.Logger) (datalayer.EndpointFactory, error) {
*modelServerMetricsHttpsInsecureSkipVerify,
nil)
extractor, err := dlmetrics.NewExtractor(*totalQueuedRequestsMetric,
*totalRunningRequestsMetric,
*kvCacheUsagePercentageMetric,
*loraInfoMetric, *cacheInfoMetric)

Expand Down Expand Up @@ -653,6 +695,18 @@ func registerHealthServer(mgr manager.Manager, logger logr.Logger, ds datastore.
return nil
}

// registerLatencyPredictorServer adds the Latency Predictor server as a Runnable to the given manager.
func registerLatencyPredictorServer(mgr manager.Manager, predictor latencypredictor.PredictorInterface) error {
// For the runnable, you'll need to type assert back to the concrete type
concretePredictor := predictor.(*latencypredictor.Predictor)
if err := mgr.Add(runnable.NoLeaderElection(&predictorRunnable{predictor: concretePredictor})); err != nil {
setupLog.Error(err, "Failed to register latency predictor runnable")
return err
}
setupLog.Info("Latency predictor runnable added to manager.")
return nil
}

func validateFlags() error {
if (*poolName != "" && *endpointSelector != "") || (*poolName == "" && *endpointSelector == "") {
return errors.New("either pool-name or endpoint-selector must be set")
Expand Down Expand Up @@ -799,3 +853,25 @@ func resolvePoolNamespace(poolNamespace string) string {
}
return runserver.DefaultPoolNamespace
}

// ===================================================================
// == Latency Predictor Plugin and Helpers
// ===================================================================

// predictorRunnable implements controller-runtime's Runnable interface to manage the predictor's lifecycle.
type predictorRunnable struct {
predictor *latencypredictor.Predictor
}

func (p *predictorRunnable) Start(ctx context.Context) error {
setupLog.Info("Starting latency predictor...")
if err := p.predictor.Start(ctx); err != nil {
setupLog.Error(err, "Failed to start latency predictor")
return err
}
setupLog.Info("Latency predictor started.")
<-ctx.Done()
setupLog.Info("Stopping latency predictor...")
p.predictor.Stop()
return nil
}
21 changes: 21 additions & 0 deletions config/charts/inferencepool/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,27 @@ $ helm install triton-llama3-8b-instruct \
oci://us-central1-docker.pkg.dev/k8s-staging-images/gateway-api-inference-extension/charts/inferencepool --version v0
```

### Install with SLO-Aware Routing

For full details see the dedicated [SLO-Aware Routing Guide](../../../site-src/guides/slo-aware-routing.md)

#### SLO-Aware Router Environment Variables

The behavior of the SLO-aware router can be fine-tuned using the following environment variables in the Endpoint Picker deployment. These can be set under `inferenceExtension.env` in your `values.yaml` file.

| Environment Variable | Description | Default |
| -------------------------------- | ------------------------------------------------------------------------------------------------------- | ----------- |
| `SAMPLING_MEAN` | The sampling mean (lambda) for the Poisson distribution of token sampling. | `100.0` |
| `MAX_SAMPLED_TOKENS` | The maximum number of tokens to sample for TPOT prediction. | `20` |
| `SLO_BUFFER_FACTOR` | A buffer to apply to the SLO to make it more or less strict. | `1.0` |
| `NEG_HEADROOM_TTFT_WEIGHT` | The weight to give to the TTFT when a pod has negative headroom. | `0.8` |
| `NEG_HEADROOM_TPOT_WEIGHT` | The weight to give to the TPOT when a pod has negative headroom. | `0.2` |
| `HEADROOM_TTFT_WEIGHT` | The weight to give to the TTFT when a pod has positive headroom. | `0.8` |
| `HEADROOM_TPOT_WEIGHT` | The weight to give to the TPOT when a pod has positive headroom. | `0.2` |
| `HEADROOM_SELECTION_STRATEGY` | The strategy to use for selecting a pod based on headroom. Options: `least`, `most`, `composite-least`, `composite-most`, `composite-only`. | `least` |

**Note:** Enabling SLO-aware routing also exposes a number of Prometheus metrics for monitoring the feature, including actual vs. predicted latency, SLO violations, and more.

### Install with High Availability (HA)

To deploy the EndpointPicker in a high-availability (HA) active-passive configuration set replicas to be greater than one. In such a setup, only one "leader" replica will be active and ready to process traffic at any given time. If the leader pod fails, another pod will be elected as the new leader, ensuring service continuity.
Expand Down
45 changes: 44 additions & 1 deletion config/charts/inferencepool/templates/epp-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,28 @@ data:
- type: queue-scorer
- type: kv-cache-utilization-scorer
- type: prefix-cache-scorer
{{- if .Values.inferenceExtension.latencyPredictor.enabled }}
- type: slo-aware-routing
- type: slo-aware-profile-handler
- type: max-score-picker
{{- end }}
schedulingProfiles:
{{- if .Values.inferenceExtension.latencyPredictor.enabled }}
- name: prefix
plugins:
- pluginRef: prefix-cache-scorer
- name: default
plugins:
- pluginRef: slo-aware-routing
weight: 0
- pluginRef: queue-scorer
- pluginRef: kv-cache-utilization-scorer
- pluginRef: max-score-picker
- name: slo
plugins:
- pluginRef: slo-aware-routing
- pluginRef: max-score-picker
{{- else }}
- name: default
plugins:
- pluginRef: queue-scorer
Expand All @@ -20,10 +41,10 @@ data:
weight: 2
- pluginRef: prefix-cache-scorer
weight: 3
{{- end }}
{{- if (hasKey .Values.inferenceExtension "pluginsCustomConfig") }}
{{- .Values.inferenceExtension.pluginsCustomConfig | toYaml | nindent 2 }}
{{- end }}

---
{{- if .Values.inferenceExtension.sidecar.enabled }}
apiVersion: v1
Expand All @@ -34,3 +55,25 @@ metadata:
data:
{{- .Values.inferenceExtension.sidecar.configMap.data | toYaml | nindent 2 }}
{{- end }}
---
{{- if .Values.inferenceExtension.latencyPredictor.enabled }}
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ include "gateway-api-inference-extension.name" . }}-latency-predictor-training
namespace: {{ .Release.Namespace }}
data:
{{- range $key, $value := .Values.inferenceExtension.latencyPredictor.trainingServer.config }}
{{ $key }}: {{ $value | quote }}
{{- end }}
---
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ include "gateway-api-inference-extension.name" . }}-latency-predictor-prediction
namespace: {{ .Release.Namespace }}
data:
{{- range $key, $value := .Values.inferenceExtension.latencyPredictor.predictionServers.config }}
{{ $key }}: {{ $value | quote }}
{{- end }}
{{- end }}
Loading