diff --git a/.github/workflows/integration-test-k8s.yml b/.github/workflows/integration-test-k8s.yml index 2696c6df1..3e3b932cf 100644 --- a/.github/workflows/integration-test-k8s.yml +++ b/.github/workflows/integration-test-k8s.yml @@ -26,7 +26,7 @@ jobs: strategy: fail-fast: false # Continue testing other profiles even if one fails matrix: - profile: [ai-gateway, aibrix, routing-strategies, llm-d, istio] + profile: [ai-gateway, aibrix, routing-strategies, llm-d, istio, production-stack] steps: - name: Check out the repo diff --git a/e2e/README.md b/e2e/README.md index 504c8684a..34cf1ef89 100644 --- a/e2e/README.md +++ b/e2e/README.md @@ -16,7 +16,7 @@ The framework follows a **separation of concerns** design: - **aibrix**: Tests Semantic Router with vLLM AIBrix integration - **dynamic-config**: Tests Semantic Router with Kubernetes CRD-based configuration (IntelligentRoute/IntelligentPool) - **istio**: Tests Semantic Router with Istio service mesh integration -- **production-stack**: Tests vLLM Production Stack configurations (future) +- **production-stack**: Tests vLLM Production Stack configurations - **llm-d**: Tests Semantic Router with LLM-D distributed inference - **dynamo**: Tests with Nvidia Dynamo (future) @@ -131,6 +131,7 @@ make e2e-test ```bash make e2e-test E2E_PROFILE=ai-gateway +make e2e-test E2E_PROFILE=production-stack ``` ### Run specific test cases diff --git a/e2e/cmd/e2e/main.go b/e2e/cmd/e2e/main.go index a46ea394c..c7c484b1c 100644 --- a/e2e/cmd/e2e/main.go +++ b/e2e/cmd/e2e/main.go @@ -14,6 +14,7 @@ import ( dynamicconfig "github.com/vllm-project/semantic-router/e2e/profiles/dynamic-config" istio "github.com/vllm-project/semantic-router/e2e/profiles/istio" llmd "github.com/vllm-project/semantic-router/e2e/profiles/llm-d" + productionstack "github.com/vllm-project/semantic-router/e2e/profiles/production-stack" routingstrategies "github.com/vllm-project/semantic-router/e2e/profiles/routing-strategies" // Import profiles to register test cases @@ -21,6 +22,7 @@ import ( _ "github.com/vllm-project/semantic-router/e2e/profiles/aibrix" _ "github.com/vllm-project/semantic-router/e2e/profiles/istio" _ "github.com/vllm-project/semantic-router/e2e/profiles/llm-d" + _ "github.com/vllm-project/semantic-router/e2e/profiles/production-stack" _ "github.com/vllm-project/semantic-router/e2e/profiles/routing-strategies" ) @@ -113,6 +115,8 @@ func getProfile(name string) (framework.Profile, error) { return istio.NewProfile(), nil case "llm-d": return llmd.NewProfile(), nil + case "production-stack": + return productionstack.NewProfile(), nil case "routing-strategies": return routingstrategies.NewProfile(), nil default: diff --git a/e2e/profiles/production-stack/profile.go b/e2e/profiles/production-stack/profile.go new file mode 100644 index 000000000..77f2e2edd --- /dev/null +++ b/e2e/profiles/production-stack/profile.go @@ -0,0 +1,480 @@ +package productionstack + +import ( + "context" + "fmt" + "os" + "os/exec" + "time" + + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" + + "github.com/vllm-project/semantic-router/e2e/pkg/framework" + "github.com/vllm-project/semantic-router/e2e/pkg/helm" + "github.com/vllm-project/semantic-router/e2e/pkg/helpers" + + // Import testcases package to register all test cases via their init() functions + _ "github.com/vllm-project/semantic-router/e2e/testcases" +) + +const ( + // Profile constants + profileName = "production-stack" + + // Namespace constants + namespaceSemanticRouter = "vllm-semantic-router-system" + namespaceEnvoyGateway = "envoy-gateway-system" + namespaceAIGateway = "envoy-ai-gateway-system" + namespaceDefault = "default" + + // Release name constants + releaseSemanticRouter = "semantic-router" + releaseEnvoyGateway = "eg" + releaseAIGatewayCRD = "aieg-crd" + releaseAIGateway = "aieg" + + // Deployment name constants + deploymentSemanticRouter = "semantic-router" + deploymentEnvoyGateway = "envoy-gateway" + deploymentAIGateway = "ai-gateway-controller" + + // Chart and URL constants + chartPathSemanticRouter = "deploy/helm/semantic-router" + chartEnvoyGateway = "oci://docker.io/envoyproxy/gateway-helm" + chartAIGatewayCRD = "oci://docker.io/envoyproxy/ai-gateway-crds-helm" + chartAIGateway = "oci://docker.io/envoyproxy/ai-gateway-helm" + chartVersion = "v0.0.0-latest" + envoyGatewayValuesURL = "https://raw.githubusercontent.com/envoyproxy/ai-gateway/main/manifests/envoy-gateway-values.yaml" + + // File path constants + valuesFile = "e2e/profiles/production-stack/values.yaml" + baseModelManifest = "deploy/kubernetes/ai-gateway/aigw-resources/base-model.yaml" + gatewayAPIManifest = "deploy/kubernetes/ai-gateway/aigw-resources/gwapi-resources.yaml" + prometheusConfigFile = "e2e/profiles/production-stack/prometheus-config.yaml" + + // Timeout constants + timeoutSemanticRouterInstall = "30m" + timeoutHelmInstall = "10m" + timeoutDeploymentWait = 10 * time.Minute + timeoutServiceRetry = 10 * time.Minute + intervalServiceRetry = 5 * time.Second + + // Image constants + imageRepository = "ghcr.io/vllm-project/semantic-router/extproc" + imagePullPolicy = "Never" + + // Label selector constants + labelSelectorGateway = "gateway.envoyproxy.io/owning-gateway-namespace=default,gateway.envoyproxy.io/owning-gateway-name=semantic-router" + + // Port mapping constants + portMapping = "8080:80" +) + +// Profile implements the production-stack test profile +type Profile struct { + verbose bool +} + +// NewProfile creates a new production-stack profile +func NewProfile() *Profile { + return &Profile{} +} + +// Name returns the profile name +func (p *Profile) Name() string { + return profileName +} + +// Description returns the profile description +func (p *Profile) Description() string { + return "Tests Semantic Router with Envoy AI Gateway integration (production-stack)" +} + +// Setup deploys all required components for production-stack testing +func (p *Profile) Setup(ctx context.Context, opts *framework.SetupOptions) error { + p.verbose = opts.Verbose + p.log("Setting up Production Stack test environment") + + deployer := helm.NewDeployer(opts.KubeConfig, opts.Verbose) + + // Step 1: Deploy Semantic Router + p.log("Step 1/7: Deploying Semantic Router") + if err := p.deploySemanticRouter(ctx, deployer, opts); err != nil { + return fmt.Errorf("failed to deploy semantic router: %w", err) + } + + // Step 2: Deploy Envoy Gateway + p.log("Step 2/7: Deploying Envoy Gateway") + if err := p.deployEnvoyGateway(ctx, deployer, opts); err != nil { + return fmt.Errorf("failed to deploy envoy gateway: %w", err) + } + + // Step 3: Deploy Envoy AI Gateway + p.log("Step 3/7: Deploying Envoy AI Gateway") + if err := p.deployEnvoyAIGateway(ctx, deployer, opts); err != nil { + return fmt.Errorf("failed to deploy envoy ai gateway: %w", err) + } + + // Step 4: Deploy Demo LLM and Gateway API Resources + p.log("Step 4/7: Deploying Demo LLM and Gateway API Resources") + if err := p.deployGatewayResources(ctx, opts); err != nil { + return fmt.Errorf("failed to deploy gateway resources: %w", err) + } + + // Step 5: Scale deployments for HA/LB + p.log("Step 5/7: Scaling deployments for high availability") + if err := p.scaleDeployments(ctx, opts); err != nil { + return fmt.Errorf("failed to scale deployments: %w", err) + } + + // Step 6: Deploy Prometheus for monitoring + p.log("Step 6/7: Deploying Prometheus for monitoring") + if err := p.deployPrometheus(ctx, opts); err != nil { + return fmt.Errorf("failed to deploy prometheus: %w", err) + } + + // Step 7: Verify all components are ready + p.log("Step 7/7: Verifying all components are ready") + if err := p.verifyEnvironment(ctx, opts); err != nil { + return fmt.Errorf("failed to verify environment: %w", err) + } + + p.log("Production Stack test environment setup complete") + return nil +} + +// Teardown cleans up all deployed resources +func (p *Profile) Teardown(ctx context.Context, opts *framework.TeardownOptions) error { + p.verbose = opts.Verbose + p.log("Tearing down Production Stack test environment") + + deployer := helm.NewDeployer(opts.KubeConfig, opts.Verbose) + + p.log("Cleaning up Gateway API resources") + p.cleanupGatewayResources(ctx, opts) + + p.log("Cleaning up Prometheus") + p.cleanupPrometheus(ctx, opts) + + p.log("Uninstalling Envoy AI Gateway") + deployer.Uninstall(ctx, releaseAIGatewayCRD, namespaceAIGateway) + deployer.Uninstall(ctx, releaseAIGateway, namespaceAIGateway) + + p.log("Uninstalling Envoy Gateway") + deployer.Uninstall(ctx, releaseEnvoyGateway, namespaceEnvoyGateway) + + p.log("Uninstalling Semantic Router") + deployer.Uninstall(ctx, releaseSemanticRouter, namespaceSemanticRouter) + + p.log("Production Stack test environment teardown complete") + return nil +} + +// GetTestCases returns the list of test cases for this profile +func (p *Profile) GetTestCases() []string { + return []string{ + // Standard functional tests + "chat-completions-request", + "chat-completions-stress-request", + "domain-classify", + "semantic-cache", + "pii-detection", + "jailbreak-detection", + "chat-completions-progressive-stress", + // Production stack specific tests (HA/LB/Observability) + "multi-replica-health", + "load-balancing-verification", + "failover-during-traffic", + "performance-throughput", + "resource-utilization-monitoring", + } +} + +// GetServiceConfig returns the service configuration for accessing the deployed service +func (p *Profile) GetServiceConfig() framework.ServiceConfig { + return framework.ServiceConfig{ + LabelSelector: labelSelectorGateway, + Namespace: namespaceEnvoyGateway, + PortMapping: portMapping, + } +} + +func (p *Profile) deploySemanticRouter(ctx context.Context, deployer *helm.Deployer, opts *framework.SetupOptions) error { + installOpts := helm.InstallOptions{ + ReleaseName: releaseSemanticRouter, + Chart: chartPathSemanticRouter, + Namespace: namespaceSemanticRouter, + ValuesFiles: []string{valuesFile}, + Set: map[string]string{ + "image.repository": imageRepository, + "image.tag": opts.ImageTag, + "image.pullPolicy": imagePullPolicy, + "replicaCount": "1", // Start with 1 replica, scale to 2 later + }, + Wait: true, + Timeout: timeoutSemanticRouterInstall, + } + + if err := deployer.Install(ctx, installOpts); err != nil { + return err + } + + if err := deployer.WaitForDeployment(ctx, namespaceSemanticRouter, deploymentSemanticRouter, timeoutDeploymentWait); err != nil { + return err + } + + return nil +} + +func (p *Profile) deployEnvoyGateway(ctx context.Context, deployer *helm.Deployer, _ *framework.SetupOptions) error { + installOpts := helm.InstallOptions{ + ReleaseName: releaseEnvoyGateway, + Chart: chartEnvoyGateway, + Namespace: namespaceEnvoyGateway, + Version: chartVersion, + ValuesFiles: []string{envoyGatewayValuesURL}, + Wait: true, + Timeout: timeoutHelmInstall, + } + + if err := deployer.Install(ctx, installOpts); err != nil { + return err + } + + return deployer.WaitForDeployment(ctx, namespaceEnvoyGateway, deploymentEnvoyGateway, timeoutDeploymentWait) +} + +func (p *Profile) deployEnvoyAIGateway(ctx context.Context, deployer *helm.Deployer, _ *framework.SetupOptions) error { + crdOpts := helm.InstallOptions{ + ReleaseName: releaseAIGatewayCRD, + Chart: chartAIGatewayCRD, + Namespace: namespaceAIGateway, + Version: chartVersion, + Wait: true, + Timeout: timeoutHelmInstall, + } + + if err := deployer.Install(ctx, crdOpts); err != nil { + return err + } + + installOpts := helm.InstallOptions{ + ReleaseName: releaseAIGateway, + Chart: chartAIGateway, + Namespace: namespaceAIGateway, + Version: chartVersion, + Wait: true, + Timeout: timeoutHelmInstall, + } + + if err := deployer.Install(ctx, installOpts); err != nil { + return err + } + + return deployer.WaitForDeployment(ctx, namespaceAIGateway, deploymentAIGateway, timeoutDeploymentWait) +} + +func (p *Profile) deployGatewayResources(ctx context.Context, opts *framework.SetupOptions) error { + if err := p.kubectlApply(ctx, opts.KubeConfig, baseModelManifest); err != nil { + return fmt.Errorf("failed to apply base model: %w", err) + } + + if err := p.kubectlApply(ctx, opts.KubeConfig, gatewayAPIManifest); err != nil { + return fmt.Errorf("failed to apply gateway API resources: %w", err) + } + + return nil +} + +func (p *Profile) verifyEnvironment(ctx context.Context, opts *framework.SetupOptions) error { + // Create Kubernetes client + config, err := clientcmd.BuildConfigFromFlags("", opts.KubeConfig) + if err != nil { + return fmt.Errorf("failed to build kubeconfig: %w", err) + } + + client, err := kubernetes.NewForConfig(config) + if err != nil { + return fmt.Errorf("failed to create kube client: %w", err) + } + + startTime := time.Now() + p.log("Waiting for Envoy Gateway service to be ready...") + + var envoyService string + for { + envoyService, err = helpers.GetEnvoyServiceName(ctx, client, labelSelectorGateway, p.verbose) + if err == nil { + podErr := helpers.VerifyServicePodsRunning(ctx, client, namespaceEnvoyGateway, envoyService, p.verbose) + if podErr == nil { + p.log("Envoy Gateway service is ready: %s", envoyService) + break + } + if p.verbose { + p.log("Envoy service found but pods not ready: %v", podErr) + } + err = fmt.Errorf("service pods not ready: %w", podErr) + } + + if time.Since(startTime) >= timeoutServiceRetry { + return fmt.Errorf("failed to get Envoy service with running pods after %v: %w", timeoutServiceRetry, err) + } + + if p.verbose { + p.log("Envoy service not ready, retrying in %v... (elapsed: %v)", + intervalServiceRetry, time.Since(startTime).Round(time.Second)) + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(intervalServiceRetry): + } + } + + p.log("Verifying all deployments are healthy...") + + if err := helpers.CheckDeployment(ctx, client, namespaceSemanticRouter, deploymentSemanticRouter, p.verbose); err != nil { + return fmt.Errorf("semantic-router deployment not healthy: %w", err) + } + + if err := helpers.CheckDeployment(ctx, client, namespaceEnvoyGateway, deploymentEnvoyGateway, p.verbose); err != nil { + return fmt.Errorf("envoy-gateway deployment not healthy: %w", err) + } + + if err := helpers.CheckDeployment(ctx, client, namespaceAIGateway, deploymentAIGateway, p.verbose); err != nil { + return fmt.Errorf("ai-gateway-controller deployment not healthy: %w", err) + } + + p.log("All deployments are healthy") + + return nil +} + +func (p *Profile) scaleDeployments(ctx context.Context, opts *framework.SetupOptions) error { + deployer := helm.NewDeployer(opts.KubeConfig, opts.Verbose) + + p.log("Scaling semantic-router deployment to 2 replicas") + if err := p.kubectl(ctx, opts.KubeConfig, "scale", "deployment", deploymentSemanticRouter, "-n", namespaceSemanticRouter, "--replicas=2"); err != nil { + return fmt.Errorf("failed to scale semantic-router deployment: %w", err) + } + + if err := deployer.WaitForDeployment(ctx, namespaceSemanticRouter, deploymentSemanticRouter, timeoutDeploymentWait); err != nil { + return fmt.Errorf("semantic-router deployment not ready after scaling: %w", err) + } + + p.log("Scaling vllm-llama3-8b-instruct deployment to 2 replicas") + if err := p.kubectl(ctx, opts.KubeConfig, "scale", "deployment", "vllm-llama3-8b-instruct", "-n", namespaceDefault, "--replicas=2"); err != nil { + return fmt.Errorf("failed to scale vllm demo deployment: %w", err) + } + + if err := deployer.WaitForDeployment(ctx, namespaceDefault, "vllm-llama3-8b-instruct", timeoutDeploymentWait); err != nil { + return fmt.Errorf("vllm demo deployment not ready after scaling: %w", err) + } + + return nil +} + +func (p *Profile) deployPrometheus(ctx context.Context, opts *framework.SetupOptions) error { + prometheusDir := "deploy/kubernetes/observability/prometheus" + + if err := p.kubectl(ctx, opts.KubeConfig, "create", "serviceaccount", "prometheus", "-n", namespaceDefault); err != nil { + p.log("ServiceAccount prometheus may already exist, continuing...") + } + + if err := p.kubectl(ctx, opts.KubeConfig, "apply", "-f", prometheusDir+"/rbac.yaml", "--server-side"); err != nil { + return fmt.Errorf("failed to apply prometheus RBAC: %w", err) + } + + if err := p.kubectl(ctx, opts.KubeConfig, "patch", "clusterrolebinding", "prometheus", "--type", "json", "-p", `[{"op": "replace", "path": "/subjects/0/namespace", "value": "default"}]`); err != nil { + p.log("Patching ClusterRoleBinding, if it fails we'll continue...") + } + + if err := p.kubectlApplyWithNamespace(ctx, opts.KubeConfig, namespaceDefault, prometheusDir+"/configmap.yaml"); err != nil { + return fmt.Errorf("failed to apply prometheus configmap: %w", err) + } + + updatedConfig, err := os.ReadFile(prometheusConfigFile) + if err != nil { + return fmt.Errorf("failed to read prometheus config file: %w", err) + } + + if err := p.kubectl(ctx, opts.KubeConfig, "patch", "configmap", "prometheus-config", "-n", namespaceDefault, "--type", "merge", "-p", fmt.Sprintf(`{"data":{"prometheus.yml":%q}}`, string(updatedConfig))); err != nil { + p.log("Warning: Could not update prometheus configmap, using default: %v", err) + } else { + p.log("Reloading Prometheus configuration...") + time.Sleep(2 * time.Second) + } + + if err := p.kubectlApplyWithNamespace(ctx, opts.KubeConfig, namespaceDefault, prometheusDir+"/pvc.yaml"); err != nil { + return fmt.Errorf("failed to apply prometheus PVC: %w", err) + } + + if err := p.kubectlApplyWithNamespace(ctx, opts.KubeConfig, namespaceDefault, prometheusDir+"/deployment.yaml"); err != nil { + return fmt.Errorf("failed to apply prometheus deployment: %w", err) + } + + if err := p.kubectlApplyWithNamespace(ctx, opts.KubeConfig, namespaceDefault, prometheusDir+"/service.yaml"); err != nil { + return fmt.Errorf("failed to apply prometheus service: %w", err) + } + + deployer := helm.NewDeployer(opts.KubeConfig, opts.Verbose) + if err := deployer.WaitForDeployment(ctx, namespaceDefault, "prometheus", timeoutDeploymentWait); err != nil { + return fmt.Errorf("prometheus deployment not ready: %w", err) + } + + p.log("Waiting for Prometheus to start scraping metrics...") + time.Sleep(30 * time.Second) + + return nil +} + +func (p *Profile) cleanupPrometheus(ctx context.Context, opts *framework.TeardownOptions) error { + prometheusDir := "deploy/kubernetes/observability/prometheus" + p.kubectl(ctx, opts.KubeConfig, "delete", "-f", prometheusDir+"/service.yaml", "-n", namespaceDefault, "--ignore-not-found=true") + p.kubectl(ctx, opts.KubeConfig, "delete", "-f", prometheusDir+"/deployment.yaml", "-n", namespaceDefault, "--ignore-not-found=true") + p.kubectl(ctx, opts.KubeConfig, "delete", "-f", prometheusDir+"/pvc.yaml", "-n", namespaceDefault, "--ignore-not-found=true") + p.kubectl(ctx, opts.KubeConfig, "delete", "-f", prometheusDir+"/configmap.yaml", "-n", namespaceDefault, "--ignore-not-found=true") + p.kubectl(ctx, opts.KubeConfig, "delete", "-f", prometheusDir+"/rbac.yaml", "--ignore-not-found=true") + p.kubectl(ctx, opts.KubeConfig, "delete", "serviceaccount", "prometheus", "-n", namespaceDefault, "--ignore-not-found=true") + return nil +} + +func (p *Profile) cleanupGatewayResources(ctx context.Context, opts *framework.TeardownOptions) error { + p.kubectlDelete(ctx, opts.KubeConfig, gatewayAPIManifest) + p.kubectlDelete(ctx, opts.KubeConfig, baseModelManifest) + return nil +} + +func (p *Profile) kubectl(ctx context.Context, kubeConfig string, args ...string) error { + return p.runKubectl(ctx, kubeConfig, args...) +} + +func (p *Profile) kubectlApply(ctx context.Context, kubeConfig, manifest string) error { + return p.runKubectl(ctx, kubeConfig, "apply", "-f", manifest) +} + +func (p *Profile) kubectlApplyWithNamespace(ctx context.Context, kubeConfig, namespace, manifest string) error { + return p.runKubectl(ctx, kubeConfig, "apply", "-f", manifest, "-n", namespace) +} + +func (p *Profile) kubectlDelete(ctx context.Context, kubeConfig, manifest string) error { + return p.runKubectl(ctx, kubeConfig, "delete", "-f", manifest) +} + +func (p *Profile) runKubectl(ctx context.Context, kubeConfig string, args ...string) error { + args = append(args, "--kubeconfig", kubeConfig) + cmd := exec.CommandContext(ctx, "kubectl", args...) + if p.verbose { + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + } + return cmd.Run() +} + +func (p *Profile) log(format string, args ...interface{}) { + if p.verbose { + fmt.Printf("[Production-Stack] "+format+"\n", args...) + } +} diff --git a/e2e/profiles/production-stack/prometheus-config.yaml b/e2e/profiles/production-stack/prometheus-config.yaml new file mode 100644 index 000000000..0945384b7 --- /dev/null +++ b/e2e/profiles/production-stack/prometheus-config.yaml @@ -0,0 +1,55 @@ +global: + scrape_interval: 15s + evaluation_interval: 15s + +scrape_configs: + - job_name: prometheus + static_configs: + - targets: + - localhost:9090 + + - job_name: semantic-router + kubernetes_sd_configs: + - role: endpoints + namespaces: + names: + - vllm-semantic-router-system + - default + relabel_configs: + - source_labels: [__meta_kubernetes_service_name] + regex: semantic-router-metrics + action: keep + - source_labels: [__meta_kubernetes_endpoint_port_name] + regex: metrics + action: keep + - source_labels: [__meta_kubernetes_namespace] + target_label: namespace + - source_labels: [__address__] + target_label: instance + + - job_name: kubernetes-pods + kubernetes_sd_configs: + - role: pod + namespaces: + names: + - default + - vllm-semantic-router-system + + - job_name: kubernetes-nodes + kubernetes_sd_configs: + - role: node + scheme: https + tls_config: + ca_file: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt + insecure_skip_verify: true + bearer_token_file: /var/run/secrets/kubernetes.io/serviceaccount/token + relabel_configs: + - action: labelmap + regex: __meta_kubernetes_node_label_(.+) + - target_label: __address__ + replacement: kubernetes.default.svc:443 + - source_labels: [__meta_kubernetes_node_name] + regex: (.+) + target_label: __metrics_path__ + replacement: /api/v1/nodes/${1}/proxy/metrics/cadvisor + diff --git a/e2e/profiles/production-stack/values.yaml b/e2e/profiles/production-stack/values.yaml new file mode 100644 index 000000000..e796d83a2 --- /dev/null +++ b/e2e/profiles/production-stack/values.yaml @@ -0,0 +1,168 @@ +# Semantic Router Configuration for Production Stack +# Minimal configuration for HA/LB/Monitoring tests +config: + # Allow Envoy to re-run route matching after Semantic Router sets x-selected-model + clear_route_cache: true + default_model: general-expert + + # Model configuration - base model with LoRA adapters + model_config: + "base-model": + reasoning_family: "qwen3" + loras: + - name: "science-expert" + description: "Specialized for science domains" + - name: "social-expert" + description: "Optimized for social sciences" + - name: "math-expert" + description: "Fine-tuned for mathematics" + - name: "law-expert" + description: "Specialized for legal questions" + - name: "humanities-expert" + description: "Optimized for humanities" + - name: "general-expert" + description: "General-purpose adapter" + + # Classifier configuration - required for domain-classify test + classifier: + category_model: + model_id: models/category_classifier_modernbert-base_model + use_modernbert: true + threshold: 0.6 + use_cpu: true + category_mapping_path: models/category_classifier_modernbert-base_model/category_mapping.json + pii_model: + # Required for pii-detection test + model_id: models/lora_pii_detector_bert-base-uncased_model + use_modernbert: false + threshold: 0.7 + use_cpu: true + pii_mapping_path: models/pii_classifier_modernbert-base_presidio_token_model/pii_type_mapping.json + + # Categories required for domain-classify test cases + categories: + - name: business + description: "Business, corporate strategy, management, finance" + - name: philosophy + description: "Philosophical traditions, ethics, logic" + - name: biology + description: "Molecular biology, genetics, cell biology" + - name: health + description: "Anatomy, physiology, diseases, treatments" + - name: computer science + description: "Algorithms, data structures, programming" + - name: engineering + description: "Engineering disciplines, design, problem-solving" + - name: psychology + description: "Cognitive processes, behavioral patterns" + - name: math + description: "Mathematics, algebra, calculus, geometry" + - name: chemistry + description: "Chemical reactions, molecular structures" + - name: physics + description: "Physical laws, mechanics, thermodynamics" + - name: history + description: "Historical events, time periods, cultures" + - name: law + description: "Legal principles, case law, statutory interpretation" + - name: economics + description: "Microeconomics, macroeconomics, financial markets" + - name: other + description: "General knowledge and miscellaneous topics" + + # Simple default decision - routes all requests to base model + decisions: + - name: default_decision + description: "Default route for all requests" + priority: 1 + rules: + operator: "OR" + conditions: + - type: "domain" + name: "other" + modelRefs: + - model: base-model + lora_name: general-expert + use_reasoning: false + plugins: + - type: "pii" + configuration: + enabled: true + pii_types_allowed: [] + - type: "semantic-cache" + configuration: + enabled: true + similarity_threshold: 0.8 + + strategy: "priority" + + # BERT model for embeddings + bert_model: + model_id: models/all-MiniLM-L12-v2 + threshold: 0.6 + use_cpu: true + + # Semantic cache - required for semantic-cache test + semantic_cache: + enabled: true + backend_type: "memory" + similarity_threshold: 0.8 + max_entries: 1000 + ttl_seconds: 3600 + eviction_policy: "fifo" + use_hnsw: true + hnsw_m: 16 + hnsw_ef_construction: 200 + embedding_model: "bert" + + # Tools - disabled (not needed for production-stack tests) + tools: + enabled: false + + # Prompt guard - required for jailbreak-detection test + prompt_guard: + enabled: true + use_modernbert: true + model_id: "models/jailbreak_classifier_modernbert-base_model" + threshold: 0.7 + use_cpu: true + jailbreak_mapping_path: "models/jailbreak_classifier_modernbert-base_model/jailbreak_type_mapping.json" + + # Router configuration - minimal settings + router: + high_confidence_threshold: 0.99 + low_latency_threshold_ms: 2000 + default_confidence_threshold: 0.95 + default_max_latency_ms: 5000 + + # Reasoning family configurations + reasoning_families: + qwen3: + type: "chat_template_kwargs" + parameter: "enable_thinking" + + default_reasoning_effort: high + + # API Configuration + api: + batch_classification: + max_batch_size: 100 + concurrency_threshold: 5 + max_concurrency: 8 + metrics: + enabled: true + detailed_goroutine_tracking: true + high_resolution_timing: false + sample_rate: 1.0 + duration_buckets: + [0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30] + size_buckets: [1, 2, 5, 10, 20, 50, 100, 200] + + # Observability Configuration + observability: + tracing: + enabled: false + +# Keep consistent with the default chart: initContainer, model downloads, and PVC use chart defaults +image: + pullPolicy: IfNotPresent diff --git a/e2e/testcases/failover_during_traffic.go b/e2e/testcases/failover_during_traffic.go new file mode 100644 index 000000000..585ddabe1 --- /dev/null +++ b/e2e/testcases/failover_during_traffic.go @@ -0,0 +1,106 @@ +package testcases + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "sync/atomic" + "time" + + pkgtestcases "github.com/vllm-project/semantic-router/e2e/pkg/testcases" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" +) + +func init() { + pkgtestcases.Register("failover-during-traffic", pkgtestcases.TestCase{ + Description: "While sending requests, delete one vLLM pod and verify high availability and recovery", + Tags: []string{"ha", "failover"}, + Fn: testFailoverDuringTraffic, + }) +} + +func testFailoverDuringTraffic(ctx context.Context, client *kubernetes.Clientset, opts pkgtestcases.TestCaseOptions) error { + localPort, stopPF, err := setupServiceConnection(ctx, client, opts) + if err != nil { + return err + } + defer stopPF() + + url := fmt.Sprintf("http://localhost:%s/v1/chat/completions", localPort) + httpClient := &http.Client{Timeout: 15 * time.Second} + + // Start sending requests in background + total := 100 + var errs int32 // Use atomic for thread-safe access + done := make(chan struct{}) + go func() { + for i := 0; i < total; i++ { + if err := sendChat(ctx, httpClient, url, i); err != nil { + atomic.AddInt32(&errs, 1) + } + time.Sleep(100 * time.Millisecond) + } + close(done) + }() + + // Wait a short moment then delete one vLLM pod + time.Sleep(2 * time.Second) + pods, err := client.CoreV1().Pods("default").List(ctx, metav1.ListOptions{LabelSelector: "app=vllm-llama3-8b-instruct"}) + if err == nil && len(pods.Items) > 0 { + if err := client.CoreV1().Pods("default").Delete(ctx, pods.Items[0].Name, metav1.DeleteOptions{}); err != nil { + // Log but don't fail - pod might already be deleted + if opts.Verbose { + fmt.Printf("[Test] Warning: failed to delete pod: %v\n", err) + } + } + } + + <-done + + errCount := int(atomic.LoadInt32(&errs)) + successRate := float64(total-errCount) / float64(total) + if opts.SetDetails != nil { + opts.SetDetails(map[string]interface{}{ + "total": total, + "errors": errCount, + "success_rate": successRate, + }) + } + // Expect >= 0.95 success rate despite disruption + if successRate < 0.95 { + return fmt.Errorf("success rate too low during failover: %.2f", successRate) + } + + // Ensure deployment recovers to 2 ready replicas + if err := waitDeploymentReadyReplicas(ctx, client, "default", "vllm-llama3-8b-instruct", 2, 5*time.Minute, opts.Verbose); err != nil { + return fmt.Errorf("vllm demo did not recover: %w", err) + } + return nil +} + +func sendChat(ctx context.Context, httpClient *http.Client, url string, id int) error { + reqBody := map[string]interface{}{ + "model": "MoM", + "messages": []map[string]string{ + {"role": "user", "content": fmt.Sprintf("check %d", id)}, + }, + } + b, _ := json.Marshal(reqBody) + req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(b)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + resp, err := httpClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != 200 { + return fmt.Errorf("status %d", resp.StatusCode) + } + return nil +} diff --git a/e2e/testcases/load_balancing_verification.go b/e2e/testcases/load_balancing_verification.go new file mode 100644 index 000000000..af1b2d9c5 --- /dev/null +++ b/e2e/testcases/load_balancing_verification.go @@ -0,0 +1,144 @@ +package testcases + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "strings" + "sync" + "time" + + pkgtestcases "github.com/vllm-project/semantic-router/e2e/pkg/testcases" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" +) + +func init() { + pkgtestcases.Register("load-balancing-verification", pkgtestcases.TestCase{ + Description: "Send concurrent requests and verify vLLM service has >=2 endpoints and all replicas respond OK", + Tags: []string{"lb", "ha"}, + Fn: testLoadBalancingVerification, + }) +} + +func testLoadBalancingVerification(ctx context.Context, client *kubernetes.Clientset, opts pkgtestcases.TestCaseOptions) error { + // Precondition: vLLM service should have >= 2 endpoints (implies LB target set) + if err := ensureServiceHasAtLeastNEndpoints(ctx, client, "default", "vllm-llama3-8b-instruct", 2, opts.Verbose); err != nil { + return fmt.Errorf("service endpoints check failed: %w", err) + } + + // Drive concurrent traffic through Envoy -> router -> vLLM + localPort, stopPF, err := setupServiceConnection(ctx, client, opts) + if err != nil { + return err + } + defer stopPF() + + total := 100 + concurrency := 10 + errCount := 0 + mu := sync.Mutex{} + wg := sync.WaitGroup{} + wg.Add(concurrency) + + work := make(chan int, total) + for i := 0; i < total; i++ { + work <- i + 1 + } + close(work) + + clientHTTP := &http.Client{Timeout: 20 * time.Second} + url := fmt.Sprintf("http://localhost:%s/v1/chat/completions", localPort) + + for w := 0; w < concurrency; w++ { + go func() { + defer wg.Done() + for id := range work { + if err := sendBasicChatRequest(ctx, clientHTTP, url, id); err != nil { + mu.Lock() + errCount++ + mu.Unlock() + } + } + }() + } + wg.Wait() + + if opts.SetDetails != nil { + opts.SetDetails(map[string]interface{}{ + "total": total, + "errors": errCount, + "success_count": total - errCount, + }) + } + + // Basic assertion: expect high success rate (>=95%) under LB across replicas + if float64(errCount) > float64(total)*0.05 { + return fmt.Errorf("too many errors under load: %d/%d", errCount, total) + } + return nil +} + +func ensureServiceHasAtLeastNEndpoints(ctx context.Context, client *kubernetes.Clientset, namespace, name string, n int, verbose bool) error { + ep, err := client.CoreV1().Endpoints(namespace).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + return err + } + count := 0 + for _, subset := range ep.Subsets { + count += len(subset.Addresses) + } + if verbose { + fmt.Printf("[Test] service %s/%s has %d endpoints\n", namespace, name, count) + } + if count < n { + return fmt.Errorf("service %s/%s has %d endpoints, expected at least %d", namespace, name, count, n) + } + // Also verify pods behind the service are running + svc, err := client.CoreV1().Services(namespace).Get(ctx, name, metav1.GetOptions{}) + if err == nil { + var selector []string + for k, v := range svc.Spec.Selector { + selector = append(selector, fmt.Sprintf("%s=%s", k, v)) + } + pods, err := client.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{LabelSelector: strings.Join(selector, ",")}) + if err != nil { + if verbose { + fmt.Printf("[Test] Warning: failed to list pods: %v\n", err) + } + return nil // Continue even if pod listing fails + } + running := 0 + for _, p := range pods.Items { + if p.Status.Phase == corev1.PodRunning { + running++ + } + } + if verbose { + fmt.Printf("[Test] %d/%d pods Running behind service %s/%s\n", running, len(pods.Items), namespace, name) + } + } + return nil +} + +func sendBasicChatRequest(ctx context.Context, httpClient *http.Client, url string, id int) error { + body := []byte(`{"model":"MoM","messages":[{"role":"user","content":"ping ` + fmt.Sprint(id) + `"}]}`) + req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(body)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + resp, err := httpClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != 200 { + b, _ := io.ReadAll(resp.Body) + return fmt.Errorf("status %d: %s", resp.StatusCode, string(b)) + } + return nil +} diff --git a/e2e/testcases/multi_replica_health.go b/e2e/testcases/multi_replica_health.go new file mode 100644 index 000000000..72a884993 --- /dev/null +++ b/e2e/testcases/multi_replica_health.go @@ -0,0 +1,57 @@ +package testcases + +import ( + "context" + "fmt" + "time" + + pkgtestcases "github.com/vllm-project/semantic-router/e2e/pkg/testcases" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" +) + +func init() { + pkgtestcases.Register("multi-replica-health", pkgtestcases.TestCase{ + Description: "Verify multi-replica Deployments (router and vLLM) have all replicas Ready", + Tags: []string{"ha", "lb", "readiness"}, + Fn: testMultiReplicaHealth, + }) +} + +func testMultiReplicaHealth(ctx context.Context, client *kubernetes.Clientset, opts pkgtestcases.TestCaseOptions) error { + // Verify semantic-router Deployment has >= 2 ready replicas + if err := waitDeploymentReadyReplicas(ctx, client, "vllm-semantic-router-system", "semantic-router", 2, 5*time.Minute, opts.Verbose); err != nil { + return fmt.Errorf("semantic-router not multi-ready: %w", err) + } + // Verify vLLM demo Deployment has >= 2 ready replicas + if err := waitDeploymentReadyReplicas(ctx, client, "default", "vllm-llama3-8b-instruct", 2, 5*time.Minute, opts.Verbose); err != nil { + return fmt.Errorf("vllm demo not multi-ready: %w", err) + } + return nil +} + +func waitDeploymentReadyReplicas(ctx context.Context, client *kubernetes.Clientset, namespace, name string, minReady int32, timeout time.Duration, verbose bool) error { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + dep, err := client.AppsV1().Deployments(namespace).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + return err + } + if verbose { + var desiredReplicas int32 + if dep.Spec.Replicas != nil { + desiredReplicas = *dep.Spec.Replicas + } + fmt.Printf("[Test] %s/%s ready=%d desired=%d\n", namespace, name, dep.Status.ReadyReplicas, desiredReplicas) + } + if dep.Status.ReadyReplicas >= minReady { + return nil + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(5 * time.Second): + } + } + return fmt.Errorf("timeout waiting for %s/%s to have at least %d ready replicas", namespace, name, minReady) +} diff --git a/e2e/testcases/performance_throughput.go b/e2e/testcases/performance_throughput.go new file mode 100644 index 000000000..c31866b89 --- /dev/null +++ b/e2e/testcases/performance_throughput.go @@ -0,0 +1,101 @@ +package testcases + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "sort" + "time" + + pkgtestcases "github.com/vllm-project/semantic-router/e2e/pkg/testcases" + "k8s.io/client-go/kubernetes" +) + +func init() { + pkgtestcases.Register("performance-throughput", pkgtestcases.TestCase{ + Description: "Measure throughput and latency under moderate load", + Tags: []string{"performance"}, + Fn: testPerformanceThroughput, + }) +} + +func testPerformanceThroughput(ctx context.Context, client *kubernetes.Clientset, opts pkgtestcases.TestCaseOptions) error { + localPort, stopPF, err := setupServiceConnection(ctx, client, opts) + if err != nil { + return err + } + defer stopPF() + + url := fmt.Sprintf("http://localhost:%s/v1/chat/completions", localPort) + httpClient := &http.Client{Timeout: 20 * time.Second} + + // Fixed-run benchmark: send N requests sequentially and compute metrics + N := 100 + start := time.Now() + var latencies []time.Duration + errors := 0 + for i := 0; i < N; i++ { + t0 := time.Now() + if err := sendSmallChat(ctx, httpClient, url, i); err != nil { + errors++ + } + latencies = append(latencies, time.Since(t0)) + } + elapsed := time.Since(start) + rps := float64(N) / elapsed.Seconds() + p50, p95, p99 := percentile(latencies, 50), percentile(latencies, 95), percentile(latencies, 99) + + if opts.SetDetails != nil { + opts.SetDetails(map[string]interface{}{ + "total": N, + "errors": errors, + "rps": rps, + "p50_ms": p50.Milliseconds(), + "p95_ms": p95.Milliseconds(), + "p99_ms": p99.Milliseconds(), + }) + } + // Simple gates suitable for CI; can be tuned + if errors > 0 { + return fmt.Errorf("observed %d errors during performance run", errors) + } + return nil +} + +func sendSmallChat(ctx context.Context, httpClient *http.Client, url string, id int) error { + body := map[string]interface{}{ + "model": "MoM", + "messages": []map[string]string{ + {"role": "user", "content": "hi"}, + }, + } + b, _ := json.Marshal(body) + req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(b)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + resp, err := httpClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != 200 { + return fmt.Errorf("status %d", resp.StatusCode) + } + return nil +} + +func percentile(latencies []time.Duration, p int) time.Duration { + if len(latencies) == 0 { + return 0 + } + // Copy and sort using efficient O(n log n) algorithm + tmp := make([]time.Duration, len(latencies)) + copy(tmp, latencies) + sort.Slice(tmp, func(i, j int) bool { return tmp[i] < tmp[j] }) + idx := (p * (len(tmp) - 1)) / 100 + return tmp[idx] +} diff --git a/e2e/testcases/resource_utilization_monitoring.go b/e2e/testcases/resource_utilization_monitoring.go new file mode 100644 index 000000000..3a7d216fe --- /dev/null +++ b/e2e/testcases/resource_utilization_monitoring.go @@ -0,0 +1,77 @@ +package testcases + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "time" + + "github.com/vllm-project/semantic-router/e2e/pkg/helpers" + pkgtestcases "github.com/vllm-project/semantic-router/e2e/pkg/testcases" + "k8s.io/client-go/kubernetes" +) + +func init() { + pkgtestcases.Register("resource-utilization-monitoring", pkgtestcases.TestCase{ + Description: "Verify Prometheus is scraping metrics for vLLM/router (non-empty series)", + Tags: []string{"observability"}, + Fn: testResourceUtilizationMonitoring, + }) +} + +func testResourceUtilizationMonitoring(ctx context.Context, client *kubernetes.Clientset, opts pkgtestcases.TestCaseOptions) error { + // Port-forward to Prometheus service in default namespace + stop, err := startPrometheusPF(ctx, client, opts) + if err != nil { + return err + } + defer stop() + + // Query a basic metric that should exist for pods (container CPU usage) + // Note: Using a short range vector to confirm series existence + query := `sum(rate(container_cpu_usage_seconds_total{namespace=~"default|vllm-semantic-router-system"}[1m]))` + ok, err := promHasNonEmptyResult(fmt.Sprintf("http://localhost:%d/api/v1/query?query=%s", 9090, url.QueryEscape(query))) + if err != nil { + return err + } + if !ok { + return fmt.Errorf("prometheus returned empty result for CPU usage query") + } + return nil +} + +func startPrometheusPF(ctx context.Context, client *kubernetes.Clientset, opts pkgtestcases.TestCaseOptions) (func(), error) { + stop, err := helpers.StartPortForward(ctx, client, opts.RestConfig, "default", "prometheus", "9090:9090", opts.Verbose) + if err != nil { + return nil, fmt.Errorf("failed to port-forward to prometheus: %w", err) + } + // settle + time.Sleep(2 * time.Second) + return stop, nil +} + +func promHasNonEmptyResult(urlStr string) (bool, error) { + httpClient := &http.Client{Timeout: 10 * time.Second} + resp, err := httpClient.Get(urlStr) + if err != nil { + return false, err + } + defer resp.Body.Close() + if resp.StatusCode != 200 { + b, _ := io.ReadAll(resp.Body) + return false, fmt.Errorf("prometheus query status %d: %s", resp.StatusCode, string(b)) + } + var out struct { + Status string `json:"status"` + Data struct { + Result []interface{} `json:"result"` + } `json:"data"` + } + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + return false, err + } + return len(out.Data.Result) > 0, nil +} diff --git a/tools/make/e2e.mk b/tools/make/e2e.mk index d295db3d4..9627a9c5d 100644 --- a/tools/make/e2e.mk +++ b/tools/make/e2e.mk @@ -98,6 +98,7 @@ e2e-help: ## Show help for E2E testing @echo " aibrix - Test Semantic Router with vLLM AIBrix" @echo " istio - Test Semantic Router with Istio service mesh" @echo " llm-d - Test Semantic Router with LLM-D" + @echo " production-stack - Test Semantic Router in production-like stack (HA/LB/Obs)" @echo "" @echo "Environment Variables:" @echo " E2E_PROFILE - Test profile to run (default: ai-gateway)"