From 5504058e431a9f12d7506320f13e00a47730c517 Mon Sep 17 00:00:00 2001 From: Juan Antonio Osorio Date: Mon, 1 Dec 2025 10:41:56 -0600 Subject: [PATCH 1/2] Add vMCP e2e tests for tool overrides and composite workflows MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add three new e2e test files for VirtualMCPServer: - virtualmcp_aggregation_overrides_test.go: Tests tool renaming via overrides configuration, verifying renamed tools appear with custom names and descriptions while original names are hidden. - virtualmcp_composite_sequential_test.go: Tests sequential composite tool workflows with template expansion, where step B depends on step A's output using Go template syntax. - virtualmcp_composite_parallel_test.go: Tests parallel composite workflows where independent steps execute concurrently before aggregating results in a final dependent step. All tests use yardstick as a deterministic MCP server backend. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .../virtualmcp_aggregation_overrides_test.go | 289 ++++++++++++++ .../virtualmcp_composite_parallel_test.go | 374 ++++++++++++++++++ .../virtualmcp_composite_sequential_test.go | 317 +++++++++++++++ 3 files changed, 980 insertions(+) create mode 100644 test/e2e/thv-operator/virtualmcp/virtualmcp_aggregation_overrides_test.go create mode 100644 test/e2e/thv-operator/virtualmcp/virtualmcp_composite_parallel_test.go create mode 100644 test/e2e/thv-operator/virtualmcp/virtualmcp_composite_sequential_test.go diff --git a/test/e2e/thv-operator/virtualmcp/virtualmcp_aggregation_overrides_test.go b/test/e2e/thv-operator/virtualmcp/virtualmcp_aggregation_overrides_test.go new file mode 100644 index 000000000..f443954a3 --- /dev/null +++ b/test/e2e/thv-operator/virtualmcp/virtualmcp_aggregation_overrides_test.go @@ -0,0 +1,289 @@ +package virtualmcp + +import ( + "fmt" + "time" + + "github.com/mark3labs/mcp-go/mcp" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + + mcpv1alpha1 "github.com/stacklok/toolhive/cmd/thv-operator/api/v1alpha1" + "github.com/stacklok/toolhive/test/e2e/images" +) + +// Compile-time check to ensure corev1 is used (for Service type) +var _ = corev1.ServiceSpec{} + +var _ = Describe("VirtualMCPServer Tool Overrides", Ordered, func() { + var ( + testNamespace = "default" + mcpGroupName = "test-overrides-group" + vmcpServerName = "test-vmcp-overrides" + backendName = "yardstick-override" + timeout = 5 * time.Minute + pollingInterval = 5 * time.Second + vmcpNodePort int32 + + // The original and renamed tool names + originalToolName = "echo" + renamedToolName = "custom_echo_tool" + newDescription = "A renamed echo tool with custom description" + ) + + vmcpServiceName := func() string { + return fmt.Sprintf("vmcp-%s", vmcpServerName) + } + + BeforeAll(func() { + By("Creating MCPGroup for overrides test") + mcpGroup := &mcpv1alpha1.MCPGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: mcpGroupName, + Namespace: testNamespace, + }, + Spec: mcpv1alpha1.MCPGroupSpec{ + Description: "Test MCP Group for tool overrides E2E tests", + }, + } + Expect(k8sClient.Create(ctx, mcpGroup)).To(Succeed()) + + By("Waiting for MCPGroup to be ready") + Eventually(func() bool { + err := k8sClient.Get(ctx, types.NamespacedName{ + Name: mcpGroupName, + Namespace: testNamespace, + }, mcpGroup) + if err != nil { + return false + } + return mcpGroup.Status.Phase == mcpv1alpha1.MCPGroupPhaseReady + }, timeout, pollingInterval).Should(BeTrue()) + + By("Creating yardstick backend MCPServer") + backend := &mcpv1alpha1.MCPServer{ + ObjectMeta: metav1.ObjectMeta{ + Name: backendName, + Namespace: testNamespace, + }, + Spec: mcpv1alpha1.MCPServerSpec{ + GroupRef: mcpGroupName, + Image: images.YardstickServerImage, + Transport: "streamable-http", + ProxyPort: 8080, + McpPort: 8080, + Env: []mcpv1alpha1.EnvVar{ + {Name: "TRANSPORT", Value: "streamable-http"}, + }, + }, + } + Expect(k8sClient.Create(ctx, backend)).To(Succeed()) + + By("Waiting for backend MCPServer to be ready") + Eventually(func() error { + server := &mcpv1alpha1.MCPServer{} + err := k8sClient.Get(ctx, types.NamespacedName{ + Name: backendName, + Namespace: testNamespace, + }, server) + if err != nil { + return fmt.Errorf("failed to get server: %w", err) + } + if server.Status.Phase == mcpv1alpha1.MCPServerPhaseRunning { + return nil + } + return fmt.Errorf("%s not ready yet, phase: %s", backendName, server.Status.Phase) + }, timeout, pollingInterval).Should(Succeed(), "Backend should be ready") + + By("Creating VirtualMCPServer with tool overrides") + vmcpServer := &mcpv1alpha1.VirtualMCPServer{ + ObjectMeta: metav1.ObjectMeta{ + Name: vmcpServerName, + Namespace: testNamespace, + }, + Spec: mcpv1alpha1.VirtualMCPServerSpec{ + GroupRef: mcpv1alpha1.GroupRef{ + Name: mcpGroupName, + }, + IncomingAuth: &mcpv1alpha1.IncomingAuthConfig{ + Type: "anonymous", + }, + Aggregation: &mcpv1alpha1.AggregationConfig{ + ConflictResolution: "prefix", + // Tool overrides: rename echo to custom_echo_tool with new description + Tools: []mcpv1alpha1.WorkloadToolConfig{ + { + Workload: backendName, + Filter: []string{originalToolName}, // Only expose echo + Overrides: map[string]mcpv1alpha1.ToolOverride{ + originalToolName: { + Name: renamedToolName, + Description: newDescription, + }, + }, + }, + }, + }, + ServiceType: "NodePort", + }, + } + Expect(k8sClient.Create(ctx, vmcpServer)).To(Succeed()) + + By("Waiting for VirtualMCPServer to be ready") + WaitForVirtualMCPServerReady(ctx, k8sClient, vmcpServerName, testNamespace, timeout) + + By("Getting NodePort for VirtualMCPServer") + Eventually(func() error { + service := &corev1.Service{} + serviceName := vmcpServiceName() + err := k8sClient.Get(ctx, types.NamespacedName{ + Name: serviceName, + Namespace: testNamespace, + }, service) + if err != nil { + return err + } + if len(service.Spec.Ports) == 0 || service.Spec.Ports[0].NodePort == 0 { + return fmt.Errorf("nodePort not assigned for vmcp") + } + vmcpNodePort = service.Spec.Ports[0].NodePort + return nil + }, timeout, pollingInterval).Should(Succeed()) + + By(fmt.Sprintf("VirtualMCPServer accessible at http://localhost:%d", vmcpNodePort)) + }) + + AfterAll(func() { + By("Cleaning up VirtualMCPServer") + vmcpServer := &mcpv1alpha1.VirtualMCPServer{ + ObjectMeta: metav1.ObjectMeta{ + Name: vmcpServerName, + Namespace: testNamespace, + }, + } + _ = k8sClient.Delete(ctx, vmcpServer) + + By("Cleaning up backend MCPServer") + backend := &mcpv1alpha1.MCPServer{ + ObjectMeta: metav1.ObjectMeta{ + Name: backendName, + Namespace: testNamespace, + }, + } + _ = k8sClient.Delete(ctx, backend) + + By("Cleaning up MCPGroup") + mcpGroup := &mcpv1alpha1.MCPGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: mcpGroupName, + Namespace: testNamespace, + }, + } + _ = k8sClient.Delete(ctx, mcpGroup) + }) + + Context("when tool overrides are configured", func() { + It("should expose tools with renamed names", func() { + By("Creating and initializing MCP client for VirtualMCPServer") + mcpClient, err := CreateInitializedMCPClient(vmcpNodePort, "toolhive-overrides-test", 30*time.Second) + Expect(err).ToNot(HaveOccurred()) + defer mcpClient.Close() + + By("Listing tools from VirtualMCPServer") + listRequest := mcp.ListToolsRequest{} + tools, err := mcpClient.Client.ListTools(mcpClient.Ctx, listRequest) + Expect(err).ToNot(HaveOccurred()) + + By(fmt.Sprintf("VirtualMCPServer exposes %d tools", len(tools.Tools))) + for _, tool := range tools.Tools { + GinkgoWriter.Printf(" Tool: %s - %s\n", tool.Name, tool.Description) + } + + // Should have the renamed tool + var foundTool *mcp.Tool + for i := range tools.Tools { + tool := &tools.Tools[i] + // Tool name will be prefixed with workload name due to prefix conflict resolution + // Format: {workload}_{original_or_renamed_tool} + if tool.Name == fmt.Sprintf("%s_%s", backendName, renamedToolName) { + foundTool = tool + break + } + } + + Expect(foundTool).ToNot(BeNil(), "Should find renamed tool: %s_%s", backendName, renamedToolName) + Expect(foundTool.Description).To(Equal(newDescription), "Tool should have the custom description") + }) + + It("should NOT expose the original tool name", func() { + By("Creating and initializing MCP client for VirtualMCPServer") + mcpClient, err := CreateInitializedMCPClient(vmcpNodePort, "toolhive-overrides-test", 30*time.Second) + Expect(err).ToNot(HaveOccurred()) + defer mcpClient.Close() + + By("Listing tools from VirtualMCPServer") + listRequest := mcp.ListToolsRequest{} + tools, err := mcpClient.Client.ListTools(mcpClient.Ctx, listRequest) + Expect(err).ToNot(HaveOccurred()) + + // Should NOT have the original tool name + for _, tool := range tools.Tools { + originalWithPrefix := fmt.Sprintf("%s_%s", backendName, originalToolName) + Expect(tool.Name).ToNot(Equal(originalWithPrefix), + "Original tool name should not be exposed when renamed") + } + }) + + It("should allow calling the renamed tool", func() { + By("Creating and initializing MCP client for VirtualMCPServer") + mcpClient, err := CreateInitializedMCPClient(vmcpNodePort, "toolhive-overrides-test", 30*time.Second) + Expect(err).ToNot(HaveOccurred()) + defer mcpClient.Close() + + renamedToolFullName := fmt.Sprintf("%s_%s", backendName, renamedToolName) + By(fmt.Sprintf("Calling renamed tool: %s", renamedToolFullName)) + + testInput := "override_test_123" + callRequest := mcp.CallToolRequest{} + callRequest.Params.Name = renamedToolFullName + callRequest.Params.Arguments = map[string]any{ + "input": testInput, + } + + result, err := mcpClient.Client.CallTool(mcpClient.Ctx, callRequest) + Expect(err).ToNot(HaveOccurred(), "Should be able to call renamed tool") + Expect(result).ToNot(BeNil()) + Expect(result.Content).ToNot(BeEmpty(), "Should have content in response") + + // Yardstick echo tool echoes back the input + GinkgoWriter.Printf("Renamed tool call result: %+v\n", result.Content) + }) + }) + + Context("when verifying override configuration", func() { + It("should have correct aggregation configuration with tool overrides", func() { + vmcpServer := &mcpv1alpha1.VirtualMCPServer{} + err := k8sClient.Get(ctx, types.NamespacedName{ + Name: vmcpServerName, + Namespace: testNamespace, + }, vmcpServer) + Expect(err).ToNot(HaveOccurred()) + + Expect(vmcpServer.Spec.Aggregation).ToNot(BeNil()) + Expect(vmcpServer.Spec.Aggregation.Tools).To(HaveLen(1)) + + // Verify backend config has overrides + backendConfig := vmcpServer.Spec.Aggregation.Tools[0] + Expect(backendConfig.Workload).To(Equal(backendName)) + Expect(backendConfig.Overrides).To(HaveLen(1)) + + override, exists := backendConfig.Overrides[originalToolName] + Expect(exists).To(BeTrue(), "Should have override for original tool name") + Expect(override.Name).To(Equal(renamedToolName)) + Expect(override.Description).To(Equal(newDescription)) + }) + }) +}) diff --git a/test/e2e/thv-operator/virtualmcp/virtualmcp_composite_parallel_test.go b/test/e2e/thv-operator/virtualmcp/virtualmcp_composite_parallel_test.go new file mode 100644 index 000000000..339a433d8 --- /dev/null +++ b/test/e2e/thv-operator/virtualmcp/virtualmcp_composite_parallel_test.go @@ -0,0 +1,374 @@ +package virtualmcp + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/mark3labs/mcp-go/mcp" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + + mcpv1alpha1 "github.com/stacklok/toolhive/cmd/thv-operator/api/v1alpha1" + "github.com/stacklok/toolhive/test/e2e/images" +) + +// Compile-time check to ensure corev1 is used (for Service type) +var _ = corev1.ServiceSpec{} + +var _ = Describe("VirtualMCPServer Composite Parallel Workflow", Ordered, func() { + var ( + testNamespace = "default" + mcpGroupName = "test-composite-par-group" + vmcpServerName = "test-vmcp-composite-par" + backend1Name = "yardstick-par-a" + backend2Name = "yardstick-par-b" + timeout = 5 * time.Minute + pollingInterval = 5 * time.Second + vmcpNodePort int32 + + // Composite tool name + compositeToolName = "parallel_echo" + ) + + vmcpServiceName := func() string { + return fmt.Sprintf("vmcp-%s", vmcpServerName) + } + + BeforeAll(func() { + By("Creating MCPGroup for composite parallel test") + mcpGroup := &mcpv1alpha1.MCPGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: mcpGroupName, + Namespace: testNamespace, + }, + Spec: mcpv1alpha1.MCPGroupSpec{ + Description: "Test MCP Group for composite parallel E2E tests", + }, + } + Expect(k8sClient.Create(ctx, mcpGroup)).To(Succeed()) + + By("Waiting for MCPGroup to be ready") + Eventually(func() bool { + err := k8sClient.Get(ctx, types.NamespacedName{ + Name: mcpGroupName, + Namespace: testNamespace, + }, mcpGroup) + if err != nil { + return false + } + return mcpGroup.Status.Phase == mcpv1alpha1.MCPGroupPhaseReady + }, timeout, pollingInterval).Should(BeTrue()) + + By("Creating first yardstick backend MCPServer") + backend1 := &mcpv1alpha1.MCPServer{ + ObjectMeta: metav1.ObjectMeta{ + Name: backend1Name, + Namespace: testNamespace, + }, + Spec: mcpv1alpha1.MCPServerSpec{ + GroupRef: mcpGroupName, + Image: images.YardstickServerImage, + Transport: "streamable-http", + ProxyPort: 8080, + McpPort: 8080, + Env: []mcpv1alpha1.EnvVar{ + {Name: "TRANSPORT", Value: "streamable-http"}, + }, + }, + } + Expect(k8sClient.Create(ctx, backend1)).To(Succeed()) + + By("Creating second yardstick backend MCPServer") + backend2 := &mcpv1alpha1.MCPServer{ + ObjectMeta: metav1.ObjectMeta{ + Name: backend2Name, + Namespace: testNamespace, + }, + Spec: mcpv1alpha1.MCPServerSpec{ + GroupRef: mcpGroupName, + Image: images.YardstickServerImage, + Transport: "streamable-http", + ProxyPort: 8080, + McpPort: 8080, + Env: []mcpv1alpha1.EnvVar{ + {Name: "TRANSPORT", Value: "streamable-http"}, + }, + }, + } + Expect(k8sClient.Create(ctx, backend2)).To(Succeed()) + + By("Waiting for backend MCPServers to be ready") + for _, backendName := range []string{backend1Name, backend2Name} { + Eventually(func() error { + server := &mcpv1alpha1.MCPServer{} + err := k8sClient.Get(ctx, types.NamespacedName{ + Name: backendName, + Namespace: testNamespace, + }, server) + if err != nil { + return fmt.Errorf("failed to get server: %w", err) + } + if server.Status.Phase == mcpv1alpha1.MCPServerPhaseRunning { + return nil + } + return fmt.Errorf("%s not ready yet, phase: %s", backendName, server.Status.Phase) + }, timeout, pollingInterval).Should(Succeed(), fmt.Sprintf("%s should be ready", backendName)) + } + + // JSON Schema for composite tool parameters + parameterSchema := map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "message": map[string]interface{}{ + "type": "string", + "description": "The message to echo in parallel to both backends", + }, + }, + "required": []string{"message"}, + } + paramSchemaBytes, err := json.Marshal(parameterSchema) + Expect(err).ToNot(HaveOccurred()) + + By("Creating VirtualMCPServer with composite parallel workflow") + vmcpServer := &mcpv1alpha1.VirtualMCPServer{ + ObjectMeta: metav1.ObjectMeta{ + Name: vmcpServerName, + Namespace: testNamespace, + }, + Spec: mcpv1alpha1.VirtualMCPServerSpec{ + GroupRef: mcpv1alpha1.GroupRef{ + Name: mcpGroupName, + }, + IncomingAuth: &mcpv1alpha1.IncomingAuthConfig{ + Type: "anonymous", + }, + Aggregation: &mcpv1alpha1.AggregationConfig{ + ConflictResolution: "prefix", + }, + // Define a composite tool that echoes to both backends in parallel + // Steps without DependsOn can execute concurrently + CompositeTools: []mcpv1alpha1.CompositeToolSpec{ + { + Name: compositeToolName, + Description: "Echoes message to both backends in parallel, then combines results", + Parameters: &runtime.RawExtension{ + Raw: paramSchemaBytes, + }, + Steps: []mcpv1alpha1.WorkflowStep{ + { + // Step 1: Echo to backend1 (no dependencies - runs in parallel) + ID: "echo_backend1", + Type: "tool", + Tool: fmt.Sprintf("%s.echo", backend1Name), + Arguments: map[string]string{ + "input": "backend1: {{ .params.message }}", + }, + }, + { + // Step 2: Echo to backend2 (no dependencies - runs in parallel with step 1) + ID: "echo_backend2", + Type: "tool", + Tool: fmt.Sprintf("%s.echo", backend2Name), + Arguments: map[string]string{ + "input": "backend2: {{ .params.message }}", + }, + }, + { + // Step 3: Final aggregation - depends on both parallel steps + ID: "combine_results", + Type: "tool", + Tool: fmt.Sprintf("%s.echo", backend1Name), + DependsOn: []string{"echo_backend1", "echo_backend2"}, + Arguments: map[string]string{ + // Combine outputs from both parallel steps + "input": "Combined: [{{ .steps.echo_backend1.result }}] + [{{ .steps.echo_backend2.result }}]", + }, + }, + }, + Timeout: "60s", + }, + }, + ServiceType: "NodePort", + }, + } + Expect(k8sClient.Create(ctx, vmcpServer)).To(Succeed()) + + By("Waiting for VirtualMCPServer to be ready") + WaitForVirtualMCPServerReady(ctx, k8sClient, vmcpServerName, testNamespace, timeout) + + By("Getting NodePort for VirtualMCPServer") + Eventually(func() error { + service := &corev1.Service{} + serviceName := vmcpServiceName() + err := k8sClient.Get(ctx, types.NamespacedName{ + Name: serviceName, + Namespace: testNamespace, + }, service) + if err != nil { + return err + } + if len(service.Spec.Ports) == 0 || service.Spec.Ports[0].NodePort == 0 { + return fmt.Errorf("nodePort not assigned for vmcp") + } + vmcpNodePort = service.Spec.Ports[0].NodePort + return nil + }, timeout, pollingInterval).Should(Succeed()) + + By(fmt.Sprintf("VirtualMCPServer accessible at http://localhost:%d", vmcpNodePort)) + }) + + AfterAll(func() { + By("Cleaning up VirtualMCPServer") + vmcpServer := &mcpv1alpha1.VirtualMCPServer{ + ObjectMeta: metav1.ObjectMeta{ + Name: vmcpServerName, + Namespace: testNamespace, + }, + } + _ = k8sClient.Delete(ctx, vmcpServer) + + By("Cleaning up backend MCPServers") + for _, backendName := range []string{backend1Name, backend2Name} { + backend := &mcpv1alpha1.MCPServer{ + ObjectMeta: metav1.ObjectMeta{ + Name: backendName, + Namespace: testNamespace, + }, + } + _ = k8sClient.Delete(ctx, backend) + } + + By("Cleaning up MCPGroup") + mcpGroup := &mcpv1alpha1.MCPGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: mcpGroupName, + Namespace: testNamespace, + }, + } + _ = k8sClient.Delete(ctx, mcpGroup) + }) + + Context("when composite tools with parallel steps are configured", func() { + It("should expose the composite tool in tool listing", func() { + By("Creating and initializing MCP client for VirtualMCPServer") + mcpClient, err := CreateInitializedMCPClient(vmcpNodePort, "toolhive-parallel-test", 30*time.Second) + Expect(err).ToNot(HaveOccurred()) + defer mcpClient.Close() + + By("Listing tools from VirtualMCPServer") + listRequest := mcp.ListToolsRequest{} + tools, err := mcpClient.Client.ListTools(mcpClient.Ctx, listRequest) + Expect(err).ToNot(HaveOccurred()) + + By(fmt.Sprintf("VirtualMCPServer exposes %d tools", len(tools.Tools))) + for _, tool := range tools.Tools { + GinkgoWriter.Printf(" Tool: %s - %s\n", tool.Name, tool.Description) + } + + // Should find the composite tool + var foundComposite bool + for _, tool := range tools.Tools { + if tool.Name == compositeToolName { + foundComposite = true + Expect(tool.Description).To(ContainSubstring("parallel")) + break + } + } + Expect(foundComposite).To(BeTrue(), "Should find composite tool: %s", compositeToolName) + + // Should also have both backends' native echo tools (with prefix) + foundBackends := make(map[string]bool) + for _, tool := range tools.Tools { + if tool.Name == fmt.Sprintf("%s_echo", backend1Name) { + foundBackends[backend1Name] = true + } + if tool.Name == fmt.Sprintf("%s_echo", backend2Name) { + foundBackends[backend2Name] = true + } + } + Expect(foundBackends).To(HaveLen(2), "Should find both backend echo tools") + }) + + It("should execute parallel workflow and aggregate results", func() { + By("Creating and initializing MCP client for VirtualMCPServer") + mcpClient, err := CreateInitializedMCPClient(vmcpNodePort, "toolhive-parallel-test", 30*time.Second) + Expect(err).ToNot(HaveOccurred()) + defer mcpClient.Close() + + By("Calling composite tool with test message") + testMessage := "parallel_test_123" + callRequest := mcp.CallToolRequest{} + callRequest.Params.Name = compositeToolName + callRequest.Params.Arguments = map[string]any{ + "message": testMessage, + } + + result, err := mcpClient.Client.CallTool(mcpClient.Ctx, callRequest) + Expect(err).ToNot(HaveOccurred(), "Composite tool call should succeed") + Expect(result).ToNot(BeNil()) + Expect(result.Content).ToNot(BeEmpty(), "Should have content in response") + + // The result should contain combined outputs from both parallel steps + // Final step combines: [backend1 result] + [backend2 result] + GinkgoWriter.Printf("Parallel composite tool result: %+v\n", result.Content) + }) + }) + + Context("when verifying parallel workflow configuration", func() { + It("should have correct composite tool spec with parallel steps", func() { + vmcpServer := &mcpv1alpha1.VirtualMCPServer{} + err := k8sClient.Get(ctx, types.NamespacedName{ + Name: vmcpServerName, + Namespace: testNamespace, + }, vmcpServer) + Expect(err).ToNot(HaveOccurred()) + + Expect(vmcpServer.Spec.CompositeTools).To(HaveLen(1)) + + compositeTool := vmcpServer.Spec.CompositeTools[0] + Expect(compositeTool.Name).To(Equal(compositeToolName)) + Expect(compositeTool.Steps).To(HaveLen(3)) + + // Verify parallel steps (no dependencies) + step1 := compositeTool.Steps[0] + Expect(step1.ID).To(Equal("echo_backend1")) + Expect(step1.DependsOn).To(BeEmpty(), "First step should have no dependencies (parallel)") + + step2 := compositeTool.Steps[1] + Expect(step2.ID).To(Equal("echo_backend2")) + Expect(step2.DependsOn).To(BeEmpty(), "Second step should have no dependencies (parallel)") + + // Verify final aggregation step depends on both parallel steps + step3 := compositeTool.Steps[2] + Expect(step3.ID).To(Equal("combine_results")) + Expect(step3.DependsOn).To(ContainElements("echo_backend1", "echo_backend2")) + + // Verify template usage combines outputs from parallel steps + Expect(step3.Arguments["input"]).To(ContainSubstring(".steps.echo_backend1")) + Expect(step3.Arguments["input"]).To(ContainSubstring(".steps.echo_backend2")) + }) + + It("should target different backends in parallel steps", func() { + vmcpServer := &mcpv1alpha1.VirtualMCPServer{} + err := k8sClient.Get(ctx, types.NamespacedName{ + Name: vmcpServerName, + Namespace: testNamespace, + }, vmcpServer) + Expect(err).ToNot(HaveOccurred()) + + compositeTool := vmcpServer.Spec.CompositeTools[0] + + // Verify steps target different backends + step1 := compositeTool.Steps[0] + step2 := compositeTool.Steps[1] + + Expect(step1.Tool).To(ContainSubstring(backend1Name)) + Expect(step2.Tool).To(ContainSubstring(backend2Name)) + }) + }) +}) diff --git a/test/e2e/thv-operator/virtualmcp/virtualmcp_composite_sequential_test.go b/test/e2e/thv-operator/virtualmcp/virtualmcp_composite_sequential_test.go new file mode 100644 index 000000000..91c00447f --- /dev/null +++ b/test/e2e/thv-operator/virtualmcp/virtualmcp_composite_sequential_test.go @@ -0,0 +1,317 @@ +package virtualmcp + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/mark3labs/mcp-go/mcp" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + + mcpv1alpha1 "github.com/stacklok/toolhive/cmd/thv-operator/api/v1alpha1" + "github.com/stacklok/toolhive/test/e2e/images" +) + +// Compile-time check to ensure corev1 is used (for Service type) +var _ = corev1.ServiceSpec{} + +var _ = Describe("VirtualMCPServer Composite Sequential Workflow", Ordered, func() { + var ( + testNamespace = "default" + mcpGroupName = "test-composite-seq-group" + vmcpServerName = "test-vmcp-composite-seq" + backendName = "yardstick-composite-seq" + timeout = 5 * time.Minute + pollingInterval = 5 * time.Second + vmcpNodePort int32 + + // Composite tool names + compositeToolName = "echo_twice" + ) + + vmcpServiceName := func() string { + return fmt.Sprintf("vmcp-%s", vmcpServerName) + } + + BeforeAll(func() { + By("Creating MCPGroup for composite sequential test") + mcpGroup := &mcpv1alpha1.MCPGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: mcpGroupName, + Namespace: testNamespace, + }, + Spec: mcpv1alpha1.MCPGroupSpec{ + Description: "Test MCP Group for composite sequential E2E tests", + }, + } + Expect(k8sClient.Create(ctx, mcpGroup)).To(Succeed()) + + By("Waiting for MCPGroup to be ready") + Eventually(func() bool { + err := k8sClient.Get(ctx, types.NamespacedName{ + Name: mcpGroupName, + Namespace: testNamespace, + }, mcpGroup) + if err != nil { + return false + } + return mcpGroup.Status.Phase == mcpv1alpha1.MCPGroupPhaseReady + }, timeout, pollingInterval).Should(BeTrue()) + + By("Creating yardstick backend MCPServer") + backend := &mcpv1alpha1.MCPServer{ + ObjectMeta: metav1.ObjectMeta{ + Name: backendName, + Namespace: testNamespace, + }, + Spec: mcpv1alpha1.MCPServerSpec{ + GroupRef: mcpGroupName, + Image: images.YardstickServerImage, + Transport: "streamable-http", + ProxyPort: 8080, + McpPort: 8080, + Env: []mcpv1alpha1.EnvVar{ + {Name: "TRANSPORT", Value: "streamable-http"}, + }, + }, + } + Expect(k8sClient.Create(ctx, backend)).To(Succeed()) + + By("Waiting for backend MCPServer to be ready") + Eventually(func() error { + server := &mcpv1alpha1.MCPServer{} + err := k8sClient.Get(ctx, types.NamespacedName{ + Name: backendName, + Namespace: testNamespace, + }, server) + if err != nil { + return fmt.Errorf("failed to get server: %w", err) + } + if server.Status.Phase == mcpv1alpha1.MCPServerPhaseRunning { + return nil + } + return fmt.Errorf("%s not ready yet, phase: %s", backendName, server.Status.Phase) + }, timeout, pollingInterval).Should(Succeed(), "Backend should be ready") + + // JSON Schema for composite tool parameters + // Per MCP spec, inputSchema should be a JSON Schema object + parameterSchema := map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "message": map[string]interface{}{ + "type": "string", + "description": "The message to echo twice", + }, + }, + "required": []string{"message"}, + } + paramSchemaBytes, err := json.Marshal(parameterSchema) + Expect(err).ToNot(HaveOccurred()) + + By("Creating VirtualMCPServer with composite sequential workflow") + vmcpServer := &mcpv1alpha1.VirtualMCPServer{ + ObjectMeta: metav1.ObjectMeta{ + Name: vmcpServerName, + Namespace: testNamespace, + }, + Spec: mcpv1alpha1.VirtualMCPServerSpec{ + GroupRef: mcpv1alpha1.GroupRef{ + Name: mcpGroupName, + }, + IncomingAuth: &mcpv1alpha1.IncomingAuthConfig{ + Type: "anonymous", + }, + Aggregation: &mcpv1alpha1.AggregationConfig{ + ConflictResolution: "prefix", + }, + // Define a composite tool that echoes input, then echoes the result again + CompositeTools: []mcpv1alpha1.CompositeToolSpec{ + { + Name: compositeToolName, + Description: "Echoes the input message twice in sequence", + Parameters: &runtime.RawExtension{ + Raw: paramSchemaBytes, + }, + Steps: []mcpv1alpha1.WorkflowStep{ + { + ID: "first_echo", + Type: "tool", + Tool: fmt.Sprintf("%s.echo", backendName), + Arguments: map[string]string{ + // Template expansion: use input parameter + "input": "{{ .params.message }}", + }, + }, + { + ID: "second_echo", + Type: "tool", + Tool: fmt.Sprintf("%s.echo", backendName), + DependsOn: []string{"first_echo"}, + Arguments: map[string]string{ + // Template expansion: use output from previous step + "input": "{{ .steps.first_echo.result }}", + }, + }, + }, + Timeout: "30s", + }, + }, + ServiceType: "NodePort", + }, + } + Expect(k8sClient.Create(ctx, vmcpServer)).To(Succeed()) + + By("Waiting for VirtualMCPServer to be ready") + WaitForVirtualMCPServerReady(ctx, k8sClient, vmcpServerName, testNamespace, timeout) + + By("Getting NodePort for VirtualMCPServer") + Eventually(func() error { + service := &corev1.Service{} + serviceName := vmcpServiceName() + err := k8sClient.Get(ctx, types.NamespacedName{ + Name: serviceName, + Namespace: testNamespace, + }, service) + if err != nil { + return err + } + if len(service.Spec.Ports) == 0 || service.Spec.Ports[0].NodePort == 0 { + return fmt.Errorf("nodePort not assigned for vmcp") + } + vmcpNodePort = service.Spec.Ports[0].NodePort + return nil + }, timeout, pollingInterval).Should(Succeed()) + + By(fmt.Sprintf("VirtualMCPServer accessible at http://localhost:%d", vmcpNodePort)) + }) + + AfterAll(func() { + By("Cleaning up VirtualMCPServer") + vmcpServer := &mcpv1alpha1.VirtualMCPServer{ + ObjectMeta: metav1.ObjectMeta{ + Name: vmcpServerName, + Namespace: testNamespace, + }, + } + _ = k8sClient.Delete(ctx, vmcpServer) + + By("Cleaning up backend MCPServer") + backend := &mcpv1alpha1.MCPServer{ + ObjectMeta: metav1.ObjectMeta{ + Name: backendName, + Namespace: testNamespace, + }, + } + _ = k8sClient.Delete(ctx, backend) + + By("Cleaning up MCPGroup") + mcpGroup := &mcpv1alpha1.MCPGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: mcpGroupName, + Namespace: testNamespace, + }, + } + _ = k8sClient.Delete(ctx, mcpGroup) + }) + + Context("when composite tools are configured", func() { + It("should expose the composite tool in tool listing", func() { + By("Creating and initializing MCP client for VirtualMCPServer") + mcpClient, err := CreateInitializedMCPClient(vmcpNodePort, "toolhive-composite-test", 30*time.Second) + Expect(err).ToNot(HaveOccurred()) + defer mcpClient.Close() + + By("Listing tools from VirtualMCPServer") + listRequest := mcp.ListToolsRequest{} + tools, err := mcpClient.Client.ListTools(mcpClient.Ctx, listRequest) + Expect(err).ToNot(HaveOccurred()) + + By(fmt.Sprintf("VirtualMCPServer exposes %d tools", len(tools.Tools))) + for _, tool := range tools.Tools { + GinkgoWriter.Printf(" Tool: %s - %s\n", tool.Name, tool.Description) + } + + // Should find the composite tool + var foundComposite bool + for _, tool := range tools.Tools { + if tool.Name == compositeToolName { + foundComposite = true + Expect(tool.Description).To(Equal("Echoes the input message twice in sequence")) + break + } + } + Expect(foundComposite).To(BeTrue(), "Should find composite tool: %s", compositeToolName) + + // Should also have the backend's native echo tool (with prefix) + var foundBackendTool bool + expectedBackendTool := fmt.Sprintf("%s_echo", backendName) + for _, tool := range tools.Tools { + if tool.Name == expectedBackendTool { + foundBackendTool = true + break + } + } + Expect(foundBackendTool).To(BeTrue(), "Should find backend native tool: %s", expectedBackendTool) + }) + + It("should execute sequential workflow with template expansion", func() { + By("Creating and initializing MCP client for VirtualMCPServer") + mcpClient, err := CreateInitializedMCPClient(vmcpNodePort, "toolhive-composite-test", 30*time.Second) + Expect(err).ToNot(HaveOccurred()) + defer mcpClient.Close() + + By("Calling composite tool with test message") + testMessage := "hello_sequential_test" + callRequest := mcp.CallToolRequest{} + callRequest.Params.Name = compositeToolName + callRequest.Params.Arguments = map[string]any{ + "message": testMessage, + } + + result, err := mcpClient.Client.CallTool(mcpClient.Ctx, callRequest) + Expect(err).ToNot(HaveOccurred(), "Composite tool call should succeed") + Expect(result).ToNot(BeNil()) + Expect(result.Content).ToNot(BeEmpty(), "Should have content in response") + + // The result should reflect the sequential execution + // First echo: echoes testMessage + // Second echo: echoes the result of first echo + GinkgoWriter.Printf("Composite tool result: %+v\n", result.Content) + }) + }) + + Context("when verifying composite tool configuration", func() { + It("should have correct composite tool spec stored", func() { + vmcpServer := &mcpv1alpha1.VirtualMCPServer{} + err := k8sClient.Get(ctx, types.NamespacedName{ + Name: vmcpServerName, + Namespace: testNamespace, + }, vmcpServer) + Expect(err).ToNot(HaveOccurred()) + + Expect(vmcpServer.Spec.CompositeTools).To(HaveLen(1)) + + compositeTool := vmcpServer.Spec.CompositeTools[0] + Expect(compositeTool.Name).To(Equal(compositeToolName)) + Expect(compositeTool.Steps).To(HaveLen(2)) + + // Verify step dependencies + step1 := compositeTool.Steps[0] + Expect(step1.ID).To(Equal("first_echo")) + Expect(step1.DependsOn).To(BeEmpty()) + + step2 := compositeTool.Steps[1] + Expect(step2.ID).To(Equal("second_echo")) + Expect(step2.DependsOn).To(ContainElement("first_echo")) + + // Verify template usage in arguments + Expect(step1.Arguments["input"]).To(ContainSubstring(".params.message")) + Expect(step2.Arguments["input"]).To(ContainSubstring(".steps.first_echo")) + }) + }) +}) From e8e8f98504df9d60b347634348f8048438e9767a Mon Sep 17 00:00:00 2001 From: Juan Antonio Osorio Date: Mon, 1 Dec 2025 11:18:26 -0600 Subject: [PATCH 2/2] Fix tool override test to use user-facing name in filter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The filtering logic applies after tool overrides, so the filter must specify the renamed tool name (user-facing name) rather than the original name. Updated test to filter by renamedToolName. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .../virtualmcp/virtualmcp_aggregation_overrides_test.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/test/e2e/thv-operator/virtualmcp/virtualmcp_aggregation_overrides_test.go b/test/e2e/thv-operator/virtualmcp/virtualmcp_aggregation_overrides_test.go index f443954a3..ae092f369 100644 --- a/test/e2e/thv-operator/virtualmcp/virtualmcp_aggregation_overrides_test.go +++ b/test/e2e/thv-operator/virtualmcp/virtualmcp_aggregation_overrides_test.go @@ -114,10 +114,12 @@ var _ = Describe("VirtualMCPServer Tool Overrides", Ordered, func() { Aggregation: &mcpv1alpha1.AggregationConfig{ ConflictResolution: "prefix", // Tool overrides: rename echo to custom_echo_tool with new description + // Note: Filter uses the user-facing name (after override), so we filter by + // the renamed tool name, not the original name. Tools: []mcpv1alpha1.WorkloadToolConfig{ { Workload: backendName, - Filter: []string{originalToolName}, // Only expose echo + Filter: []string{renamedToolName}, // Filter by user-facing name (after override) Overrides: map[string]mcpv1alpha1.ToolOverride{ originalToolName: { Name: renamedToolName, @@ -280,6 +282,10 @@ var _ = Describe("VirtualMCPServer Tool Overrides", Ordered, func() { Expect(backendConfig.Workload).To(Equal(backendName)) Expect(backendConfig.Overrides).To(HaveLen(1)) + // Filter should contain the user-facing name (after override) + Expect(backendConfig.Filter).To(ContainElement(renamedToolName), + "Filter should contain the renamed tool name (user-facing name)") + override, exists := backendConfig.Overrides[originalToolName] Expect(exists).To(BeTrue(), "Should have override for original tool name") Expect(override.Name).To(Equal(renamedToolName))