From 97d4ee444047938b44480ca8d62cf2d0064f2aba Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Wed, 22 Apr 2026 11:04:08 +0800 Subject: [PATCH] mcp: export SNCloud cluster list primitive --- pkg/mcp/prompts.go | 68 ++++++++++++------- pkg/mcp/streamnative_cloud_primitives_test.go | 45 ++++++++++++ 2 files changed, 89 insertions(+), 24 deletions(-) diff --git a/pkg/mcp/prompts.go b/pkg/mcp/prompts.go index 09e2049..4fbfa24 100644 --- a/pkg/mcp/prompts.go +++ b/pkg/mcp/prompts.go @@ -69,7 +69,8 @@ const ( sncloudClusterTypeKafka = "KafkaCluster" ) -type sncloudClusterListEntry struct { +// SNCloudClusterListEntry describes one StreamNative Cloud cluster in list responses. +type SNCloudClusterListEntry struct { ClusterType string InstanceName string ClusterName string @@ -118,14 +119,14 @@ func getSNCloudAPIClientAndOrganization(ctx context.Context) (*sncloud.APIClient return apiClient, session.Ctx.Organization, nil } -func listSNCloudPulsarClusterEntries(ctx context.Context, apiClient *sncloud.APIClient, organization string) ([]sncloudClusterListEntry, error) { +func listSNCloudPulsarClusterEntries(ctx context.Context, apiClient *sncloud.APIClient, organization string) ([]SNCloudClusterListEntry, error) { clusters, clustersBody, err := apiClient.CloudStreamnativeIoV1alpha1Api.ListCloudStreamnativeIoV1alpha1NamespacedPulsarCluster(ctx, organization).Execute() if err != nil { return nil, fmt.Errorf("failed to list pulsar clusters: %v", err) } defer func() { _ = clustersBody.Body.Close() }() - entries := make([]sncloudClusterListEntry, 0, len(clusters.Items)) + entries := make([]SNCloudClusterListEntry, 0, len(clusters.Items)) for _, cluster := range clusters.Items { displayName := cluster.Spec.DisplayName if displayName == nil || *displayName == "" { @@ -142,7 +143,7 @@ func listSNCloudPulsarClusterEntries(ctx context.Context, apiClient *sncloud.API clusterName = *cluster.Metadata.Name } - entries = append(entries, sncloudClusterListEntry{ + entries = append(entries, SNCloudClusterListEntry{ ClusterType: sncloudClusterTypePulsar, InstanceName: cluster.Spec.InstanceName, ClusterName: clusterName, @@ -155,14 +156,14 @@ func listSNCloudPulsarClusterEntries(ctx context.Context, apiClient *sncloud.API return entries, nil } -func listSNCloudKafkaClusterEntries(ctx context.Context, apiClient *sncloud.APIClient, organization string) ([]sncloudClusterListEntry, error) { +func listSNCloudKafkaClusterEntries(ctx context.Context, apiClient *sncloud.APIClient, organization string) ([]SNCloudClusterListEntry, error) { clusters, clustersBody, err := apiClient.CloudStreamnativeIoV1alpha1Api.ListCloudStreamnativeIoV1alpha1NamespacedKafkaCluster(ctx, organization).Execute() if err != nil { return nil, fmt.Errorf("failed to list kafka clusters: %v", err) } defer func() { _ = clustersBody.Body.Close() }() - entries := make([]sncloudClusterListEntry, 0, len(clusters.Items)) + entries := make([]SNCloudClusterListEntry, 0, len(clusters.Items)) for _, cluster := range clusters.Items { displayName := cluster.Spec.DisplayName clusterName := "" @@ -173,7 +174,7 @@ func listSNCloudKafkaClusterEntries(ctx context.Context, apiClient *sncloud.APIC displayName = cluster.Metadata.Name } - entries = append(entries, sncloudClusterListEntry{ + entries = append(entries, SNCloudClusterListEntry{ ClusterType: sncloudClusterTypeKafka, InstanceName: cluster.Spec.InstanceName, ClusterName: clusterName, @@ -197,7 +198,7 @@ func sncloudClusterReadinessStatus(status *sncloud.ComGithubStreamnativeCloudApi return "Not Ready" } -func buildSNCloudClusterPromptMessages(summary string, entries []sncloudClusterListEntry) []mcp.PromptMessage { +func buildSNCloudClusterPromptMessages(summary string, entries []SNCloudClusterListEntry) []mcp.PromptMessage { messages := make([]mcp.PromptMessage, 0, len(entries)+1) messages = append(messages, mcp.PromptMessage{ Content: mcp.TextContent{ @@ -231,6 +232,29 @@ func buildSNCloudClusterPromptMessages(summary string, entries []sncloudClusterL return messages } +// ListSNCloudClusterEntries lists all StreamNative Cloud clusters visible in the current session. +func ListSNCloudClusterEntries(ctx context.Context) ([]SNCloudClusterListEntry, string, error) { + apiClient, organization, err := getSNCloudAPIClientAndOrganization(ctx) + if err != nil { + return nil, "", err + } + + pulsarEntries, err := listSNCloudPulsarClusterEntries(ctx, apiClient, organization) + if err != nil { + return nil, "", err + } + + kafkaEntries, err := listSNCloudKafkaClusterEntries(ctx, apiClient, organization) + if err != nil { + return nil, "", err + } + + entries := make([]SNCloudClusterListEntry, 0, len(pulsarEntries)+len(kafkaEntries)) + entries = append(entries, pulsarEntries...) + entries = append(entries, kafkaEntries...) + return entries, organization, nil +} + // NewBuildSNCloudServerlessClusterPrompt creates the reusable serverless cluster build prompt definition. func NewBuildSNCloudServerlessClusterPrompt() mcp.Prompt { return mcp.NewPrompt("build-sncloud-serverless-cluster", @@ -243,31 +267,27 @@ func NewBuildSNCloudServerlessClusterPrompt() mcp.Prompt { // HandleListSNCloudClusters handles listing StreamNative Cloud clusters. func HandleListSNCloudClusters(ctx context.Context, _ mcp.GetPromptRequest) (*mcp.GetPromptResult, error) { - apiClient, organization, err := getSNCloudAPIClientAndOrganization(ctx) - if err != nil { - return nil, err - } - - pulsarEntries, err := listSNCloudPulsarClusterEntries(ctx, apiClient, organization) + entries, organization, err := ListSNCloudClusterEntries(ctx) if err != nil { return nil, err } - - kafkaEntries, err := listSNCloudKafkaClusterEntries(ctx, apiClient, organization) - if err != nil { - return nil, err + pulsarCount := 0 + kafkaCount := 0 + for _, entry := range entries { + switch entry.ClusterType { + case sncloudClusterTypePulsar: + pulsarCount++ + case sncloudClusterTypeKafka: + kafkaCount++ + } } - - entries := make([]sncloudClusterListEntry, 0, len(pulsarEntries)+len(kafkaEntries)) - entries = append(entries, pulsarEntries...) - entries = append(entries, kafkaEntries...) messages := buildSNCloudClusterPromptMessages( fmt.Sprintf( "There are %d StreamNative Cloud clusters in organization %s (%d PulsarCluster, %d KafkaCluster):", len(entries), organization, - len(pulsarEntries), - len(kafkaEntries), + pulsarCount, + kafkaCount, ), entries, ) diff --git a/pkg/mcp/streamnative_cloud_primitives_test.go b/pkg/mcp/streamnative_cloud_primitives_test.go index 5c03e4a..904c1ac 100644 --- a/pkg/mcp/streamnative_cloud_primitives_test.go +++ b/pkg/mcp/streamnative_cloud_primitives_test.go @@ -826,6 +826,51 @@ func TestHandleListSNCloudClustersIncludesClusterTypes(t *testing.T) { } } +func TestListSNCloudClusterEntriesIncludesPulsarAndKafkaClusters(t *testing.T) { + t.Parallel() + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/apis/cloud.streamnative.io/v1alpha1/namespaces/session-org/pulsarclusters": + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"items":[{"metadata":{"name":"pc-test","annotations":{"cloud.streamnative.io/engine":"ursa"}},"spec":{"instanceName":"inst-p","displayName":"Pulsar Display"},"status":{"broker":{"readyReplicas":1},"conditions":[{"type":"Ready","status":"True"}]}}]}`)) + case "/apis/cloud.streamnative.io/v1alpha1/namespaces/session-org/kafkaclusters": + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"items":[{"metadata":{"name":"kc-test"},"spec":{"instanceName":"inst-k","displayName":"Kafka Display","location":"use1"},"status":{"conditions":[{"type":"Ready","status":"True"}]}}]}`)) + default: + t.Fatalf("unexpected path %s", r.URL.Path) + } + })) + defer server.Close() + + session, err := config.NewSNCloudSession(config.SNCloudContext{ + JWTToken: "token", + APIURL: server.URL, + LogAPIURL: server.URL, + Organization: "session-org", + }) + if err != nil { + t.Fatalf("failed to create session: %v", err) + } + + ctx := context.Background() + ctx = WithSNCloudSession(ctx, session) + + entries, organization, err := ListSNCloudClusterEntries(ctx) + if err != nil { + t.Fatalf("expected no list error, got %v", err) + } + if organization != "session-org" { + t.Fatalf("expected organization session-org, got %q", organization) + } + if len(entries) != 2 { + t.Fatalf("expected two cluster entries, got %#v", entries) + } + if entries[0].ClusterType != "PulsarCluster" || entries[1].ClusterType != "KafkaCluster" { + t.Fatalf("expected typed pulsar and kafka entries, got %#v", entries) + } +} + func TestBuildSNCloudContextClusterPromptResultUsesPulsarClustersOnly(t *testing.T) { t.Parallel()