Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 44 additions & 24 deletions pkg/mcp/prompts.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ const (
sncloudClusterTypeKafka = "KafkaCluster"
)

type sncloudClusterListEntry struct {
// SNCloudClusterListEntry describes one StreamNative Cloud cluster in list responses.
type SNCloudClusterListEntry struct {
ClusterType string
InstanceName string
ClusterName string
Expand Down Expand Up @@ -118,14 +119,14 @@ func getSNCloudAPIClientAndOrganization(ctx context.Context) (*sncloud.APIClient
return apiClient, session.Ctx.Organization, nil
}

func listSNCloudPulsarClusterEntries(ctx context.Context, apiClient *sncloud.APIClient, organization string) ([]sncloudClusterListEntry, error) {
func listSNCloudPulsarClusterEntries(ctx context.Context, apiClient *sncloud.APIClient, organization string) ([]SNCloudClusterListEntry, error) {
clusters, clustersBody, err := apiClient.CloudStreamnativeIoV1alpha1Api.ListCloudStreamnativeIoV1alpha1NamespacedPulsarCluster(ctx, organization).Execute()
if err != nil {
return nil, fmt.Errorf("failed to list pulsar clusters: %v", err)
}
defer func() { _ = clustersBody.Body.Close() }()

entries := make([]sncloudClusterListEntry, 0, len(clusters.Items))
entries := make([]SNCloudClusterListEntry, 0, len(clusters.Items))
for _, cluster := range clusters.Items {
displayName := cluster.Spec.DisplayName
if displayName == nil || *displayName == "" {
Expand All @@ -142,7 +143,7 @@ func listSNCloudPulsarClusterEntries(ctx context.Context, apiClient *sncloud.API
clusterName = *cluster.Metadata.Name
}

entries = append(entries, sncloudClusterListEntry{
entries = append(entries, SNCloudClusterListEntry{
ClusterType: sncloudClusterTypePulsar,
InstanceName: cluster.Spec.InstanceName,
ClusterName: clusterName,
Expand All @@ -155,14 +156,14 @@ func listSNCloudPulsarClusterEntries(ctx context.Context, apiClient *sncloud.API
return entries, nil
}

func listSNCloudKafkaClusterEntries(ctx context.Context, apiClient *sncloud.APIClient, organization string) ([]sncloudClusterListEntry, error) {
func listSNCloudKafkaClusterEntries(ctx context.Context, apiClient *sncloud.APIClient, organization string) ([]SNCloudClusterListEntry, error) {
clusters, clustersBody, err := apiClient.CloudStreamnativeIoV1alpha1Api.ListCloudStreamnativeIoV1alpha1NamespacedKafkaCluster(ctx, organization).Execute()
if err != nil {
return nil, fmt.Errorf("failed to list kafka clusters: %v", err)
}
defer func() { _ = clustersBody.Body.Close() }()

entries := make([]sncloudClusterListEntry, 0, len(clusters.Items))
entries := make([]SNCloudClusterListEntry, 0, len(clusters.Items))
for _, cluster := range clusters.Items {
displayName := cluster.Spec.DisplayName
clusterName := ""
Expand All @@ -173,7 +174,7 @@ func listSNCloudKafkaClusterEntries(ctx context.Context, apiClient *sncloud.APIC
displayName = cluster.Metadata.Name
}

entries = append(entries, sncloudClusterListEntry{
entries = append(entries, SNCloudClusterListEntry{
ClusterType: sncloudClusterTypeKafka,
InstanceName: cluster.Spec.InstanceName,
ClusterName: clusterName,
Expand All @@ -197,7 +198,7 @@ func sncloudClusterReadinessStatus(status *sncloud.ComGithubStreamnativeCloudApi
return "Not Ready"
}

func buildSNCloudClusterPromptMessages(summary string, entries []sncloudClusterListEntry) []mcp.PromptMessage {
func buildSNCloudClusterPromptMessages(summary string, entries []SNCloudClusterListEntry) []mcp.PromptMessage {
messages := make([]mcp.PromptMessage, 0, len(entries)+1)
messages = append(messages, mcp.PromptMessage{
Content: mcp.TextContent{
Expand Down Expand Up @@ -231,6 +232,29 @@ func buildSNCloudClusterPromptMessages(summary string, entries []sncloudClusterL
return messages
}

// ListSNCloudClusterEntries lists all StreamNative Cloud clusters visible in the current session.
func ListSNCloudClusterEntries(ctx context.Context) ([]SNCloudClusterListEntry, string, error) {
apiClient, organization, err := getSNCloudAPIClientAndOrganization(ctx)
if err != nil {
return nil, "", err
}

pulsarEntries, err := listSNCloudPulsarClusterEntries(ctx, apiClient, organization)
if err != nil {
return nil, "", err
}

kafkaEntries, err := listSNCloudKafkaClusterEntries(ctx, apiClient, organization)
if err != nil {
return nil, "", err
}

entries := make([]SNCloudClusterListEntry, 0, len(pulsarEntries)+len(kafkaEntries))
entries = append(entries, pulsarEntries...)
entries = append(entries, kafkaEntries...)
return entries, organization, nil
}

// NewBuildSNCloudServerlessClusterPrompt creates the reusable serverless cluster build prompt definition.
func NewBuildSNCloudServerlessClusterPrompt() mcp.Prompt {
return mcp.NewPrompt("build-sncloud-serverless-cluster",
Expand All @@ -243,31 +267,27 @@ func NewBuildSNCloudServerlessClusterPrompt() mcp.Prompt {

// HandleListSNCloudClusters handles listing StreamNative Cloud clusters.
func HandleListSNCloudClusters(ctx context.Context, _ mcp.GetPromptRequest) (*mcp.GetPromptResult, error) {
apiClient, organization, err := getSNCloudAPIClientAndOrganization(ctx)
if err != nil {
return nil, err
}

pulsarEntries, err := listSNCloudPulsarClusterEntries(ctx, apiClient, organization)
entries, organization, err := ListSNCloudClusterEntries(ctx)
if err != nil {
return nil, err
}

kafkaEntries, err := listSNCloudKafkaClusterEntries(ctx, apiClient, organization)
if err != nil {
return nil, err
pulsarCount := 0
kafkaCount := 0
for _, entry := range entries {
switch entry.ClusterType {
case sncloudClusterTypePulsar:
pulsarCount++
case sncloudClusterTypeKafka:
kafkaCount++
}
}

entries := make([]sncloudClusterListEntry, 0, len(pulsarEntries)+len(kafkaEntries))
entries = append(entries, pulsarEntries...)
entries = append(entries, kafkaEntries...)
messages := buildSNCloudClusterPromptMessages(
fmt.Sprintf(
"There are %d StreamNative Cloud clusters in organization %s (%d PulsarCluster, %d KafkaCluster):",
len(entries),
organization,
len(pulsarEntries),
len(kafkaEntries),
pulsarCount,
kafkaCount,
),
entries,
)
Expand Down
45 changes: 45 additions & 0 deletions pkg/mcp/streamnative_cloud_primitives_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,51 @@ func TestHandleListSNCloudClustersIncludesClusterTypes(t *testing.T) {
}
}

func TestListSNCloudClusterEntriesIncludesPulsarAndKafkaClusters(t *testing.T) {
t.Parallel()

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/apis/cloud.streamnative.io/v1alpha1/namespaces/session-org/pulsarclusters":
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"items":[{"metadata":{"name":"pc-test","annotations":{"cloud.streamnative.io/engine":"ursa"}},"spec":{"instanceName":"inst-p","displayName":"Pulsar Display"},"status":{"broker":{"readyReplicas":1},"conditions":[{"type":"Ready","status":"True"}]}}]}`))
case "/apis/cloud.streamnative.io/v1alpha1/namespaces/session-org/kafkaclusters":
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"items":[{"metadata":{"name":"kc-test"},"spec":{"instanceName":"inst-k","displayName":"Kafka Display","location":"use1"},"status":{"conditions":[{"type":"Ready","status":"True"}]}}]}`))
default:
t.Fatalf("unexpected path %s", r.URL.Path)
}
}))
defer server.Close()

session, err := config.NewSNCloudSession(config.SNCloudContext{
JWTToken: "token",
APIURL: server.URL,
LogAPIURL: server.URL,
Organization: "session-org",
})
if err != nil {
t.Fatalf("failed to create session: %v", err)
}

ctx := context.Background()
ctx = WithSNCloudSession(ctx, session)

entries, organization, err := ListSNCloudClusterEntries(ctx)
if err != nil {
t.Fatalf("expected no list error, got %v", err)
}
if organization != "session-org" {
t.Fatalf("expected organization session-org, got %q", organization)
}
if len(entries) != 2 {
t.Fatalf("expected two cluster entries, got %#v", entries)
}
if entries[0].ClusterType != "PulsarCluster" || entries[1].ClusterType != "KafkaCluster" {
t.Fatalf("expected typed pulsar and kafka entries, got %#v", entries)
}
}

func TestBuildSNCloudContextClusterPromptResultUsesPulsarClustersOnly(t *testing.T) {
t.Parallel()

Expand Down
Loading