Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions cmd/thv-operator/api/v1alpha1/mcpserver_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,28 @@ const (
ConditionReasonExternalAuthConfigMultiUpstream = "MultiUpstreamNotSupported"
)

// ConditionStdioReplicaCapped indicates spec.replicas was capped at 1 for stdio transport.
const ConditionStdioReplicaCapped = "StdioReplicaCapped"

const (
// ConditionReasonStdioReplicaCapped is set when spec.replicas > 1 for a stdio transport.
ConditionReasonStdioReplicaCapped = "StdioTransportCapAt1"
// ConditionReasonStdioReplicaCapNotActive is set when the stdio replica cap does not apply.
ConditionReasonStdioReplicaCapNotActive = "StdioReplicaCapNotActive"
)

// ConditionSessionStorageWarning indicates replicas > 1 but no Redis session storage is configured.
const ConditionSessionStorageWarning = "SessionStorageWarning"

const (
// ConditionReasonSessionStorageMissing is set when replicas > 1 and no Redis session storage is configured.
ConditionReasonSessionStorageMissing = "SessionStorageMissingForReplicas"
// ConditionReasonSessionStorageConfigured is set when replicas > 1 and Redis session storage is configured.
ConditionReasonSessionStorageConfigured = "SessionStorageConfigured"
// ConditionReasonSessionStorageNotApplicable is set when replicas is nil or <= 1 and the warning is not active.
ConditionReasonSessionStorageNotApplicable = "SessionStorageWarningNotApplicable"
)

// MCPServerSpec defines the desired state of MCPServer
type MCPServerSpec struct {
// Image is the container image for the MCP server
Expand Down
118 changes: 111 additions & 7 deletions cmd/thv-operator/controllers/mcpserver_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ const (
authzLabelValueInline = "inline"
)

const defaultTerminationGracePeriodSeconds = int64(30)

const stdioTransport = "stdio"

// detectPlatform detects the Kubernetes platform type (Kubernetes vs OpenShift)
// It uses the shared platform detector to ensure detection is only performed once and cached
func (r *MCPServerReconciler) detectPlatform(ctx context.Context) (kubernetes.Platform, error) {
Expand Down Expand Up @@ -189,6 +193,10 @@ func (r *MCPServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
// Validate CABundleRef if specified
r.validateCABundleRef(ctx, mcpServer)

// Validate stdio replica cap and session storage requirements
r.validateStdioReplicaCap(ctx, mcpServer)
r.validateSessionStorageForReplicas(ctx, mcpServer)

// Validate PodTemplateSpec early - before other validations
// This ensures we fail fast if the spec is invalid
if !r.validateAndUpdatePodTemplateStatus(ctx, mcpServer) {
Expand Down Expand Up @@ -392,7 +400,7 @@ func (r *MCPServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
// Enforce stdio transport replica cap: stdio requires 1:1 proxy-to-backend
// connections and cannot scale beyond 1. Other transports are hands-off
// to allow HPAs, KEDA, or manual kubectl scale to manage replicas freely.
if mcpServer.Spec.Transport == "stdio" &&
if mcpServer.Spec.Transport == stdioTransport &&
deployment.Spec.Replicas != nil && *deployment.Spec.Replicas > 1 {
deployment.Spec.Replicas = int32Ptr(1)
err = r.Update(ctx, deployment)
Expand Down Expand Up @@ -450,13 +458,18 @@ func (r *MCPServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (

// Check if the deployment spec changed
if r.deploymentNeedsUpdate(ctx, deployment, mcpServer, runConfigChecksum) {
// Update template and metadata only — preserve Spec.Replicas so that
// HPAs, KEDA, and manual scaling are not overwritten by the controller.
// Update template and metadata. Also sync Spec.Replicas when spec.replicas is
// explicitly set — this makes the operator authoritative for spec-driven scaling.
// When spec.replicas is nil, preserve the live count so HPAs, KEDA, and manual
// kubectl scale remain in control.
newDeployment := r.deploymentForMCPServer(ctx, mcpServer, runConfigChecksum)
deployment.Spec.Template = newDeployment.Spec.Template
deployment.Spec.Selector = newDeployment.Spec.Selector
deployment.Labels = newDeployment.Labels
deployment.Annotations = ctrlutil.MergeAnnotations(newDeployment.Annotations, deployment.Annotations)
if newDeployment.Spec.Replicas != nil {
deployment.Spec.Replicas = newDeployment.Spec.Replicas
}
err = r.Update(ctx, deployment)
if err != nil {
ctxLogger.Error(err, "Failed to update Deployment",
Expand Down Expand Up @@ -931,7 +944,6 @@ func (r *MCPServerReconciler) deploymentForMCPServer(
ctx context.Context, m *mcpv1alpha1.MCPServer, runConfigChecksum string,
) *appsv1.Deployment {
ls := labelsForMCPServer(m.Name)
replicas := int32(1)

// Prepare container args
args := []string{"run"}
Expand Down Expand Up @@ -1196,7 +1208,7 @@ func (r *MCPServerReconciler) deploymentForMCPServer(
Annotations: deploymentAnnotations,
},
Spec: appsv1.DeploymentSpec{
Replicas: &replicas,
Replicas: resolveDeploymentReplicas(m.Spec.Transport, m.Spec.Replicas),
Selector: &metav1.LabelSelector{
MatchLabels: ls, // Keep original labels for selector
},
Expand All @@ -1206,8 +1218,9 @@ func (r *MCPServerReconciler) deploymentForMCPServer(
Annotations: deploymentTemplateAnnotations,
},
Spec: corev1.PodSpec{
ServiceAccountName: ctrlutil.ProxyRunnerServiceAccountName(m.Name),
ImagePullSecrets: imagePullSecrets,
ServiceAccountName: ctrlutil.ProxyRunnerServiceAccountName(m.Name),
ImagePullSecrets: imagePullSecrets,
TerminationGracePeriodSeconds: int64Ptr(defaultTerminationGracePeriodSeconds),
Containers: []corev1.Container{{
Image: getToolhiveRunnerImage(),
Name: "toolhive",
Expand Down Expand Up @@ -1694,6 +1707,15 @@ func (r *MCPServerReconciler) deploymentNeedsUpdate(
return true
}

// Check if spec.replicas has changed. Only compare when spec.replicas is non-nil;
// nil means hands-off mode (HPA/KEDA manages replicas) and the live count is authoritative.
expectedReplicas := resolveDeploymentReplicas(mcpServer.Spec.Transport, mcpServer.Spec.Replicas)
if expectedReplicas != nil {
if deployment.Spec.Replicas == nil || *deployment.Spec.Replicas != *expectedReplicas {
return true
}
}

return false
}

Expand Down Expand Up @@ -1879,6 +1901,88 @@ func int32Ptr(i int32) *int32 {
return &i
}

// int64Ptr returns a pointer to an int64
func int64Ptr(i int64) *int64 {
return &i
}

// resolveDeploymentReplicas returns the replica count to set on Deployment.Spec.Replicas.
// Returns nil when spec.replicas is nil (hands-off mode for HPA/KEDA).
// Enforces stdio cap at 1 as defense-in-depth (reconciler also enforces this via status condition).
func resolveDeploymentReplicas(mcpTransport string, specReplicas *int32) *int32 {
if specReplicas == nil {
return nil
}
if mcpTransport == stdioTransport && *specReplicas > 1 {
return int32Ptr(1)
}
return specReplicas
}

// setStdioReplicaCappedCondition sets the StdioReplicaCapped status condition
func setStdioReplicaCappedCondition(mcpServer *mcpv1alpha1.MCPServer, status metav1.ConditionStatus, reason, message string) {
meta.SetStatusCondition(&mcpServer.Status.Conditions, metav1.Condition{
Type: mcpv1alpha1.ConditionStdioReplicaCapped,
Status: status,
Reason: reason,
Message: message,
ObservedGeneration: mcpServer.Generation,
})
}

// validateStdioReplicaCap checks if spec.replicas > 1 for stdio transport and sets a warning condition.
// The deployment builder enforces the cap at 1 as defense-in-depth.
// Clears the condition when transport or replica count no longer violates the cap.
func (r *MCPServerReconciler) validateStdioReplicaCap(ctx context.Context, mcpServer *mcpv1alpha1.MCPServer) {
if mcpServer.Spec.Transport == stdioTransport && mcpServer.Spec.Replicas != nil && *mcpServer.Spec.Replicas > 1 {
setStdioReplicaCappedCondition(mcpServer, metav1.ConditionTrue,
mcpv1alpha1.ConditionReasonStdioReplicaCapped,
"stdio transport requires exactly 1 replica; deployment will use 1 regardless of spec.replicas")
} else {
setStdioReplicaCappedCondition(mcpServer, metav1.ConditionFalse,
mcpv1alpha1.ConditionReasonStdioReplicaCapNotActive,
"stdio replica cap is not active")
}
if err := r.Status().Update(ctx, mcpServer); err != nil {
log.FromContext(ctx).Error(err, "Failed to update MCPServer status after stdio replica cap validation")
}
}

// setSessionStorageCondition sets the SessionStorageWarning status condition
func setSessionStorageCondition(mcpServer *mcpv1alpha1.MCPServer, status metav1.ConditionStatus, reason, message string) {
meta.SetStatusCondition(&mcpServer.Status.Conditions, metav1.Condition{
Type: mcpv1alpha1.ConditionSessionStorageWarning,
Status: status,
Reason: reason,
Message: message,
ObservedGeneration: mcpServer.Generation,
})
}

// validateSessionStorageForReplicas emits a Warning condition when replicas > 1 but session storage
// is not configured with a Redis backend. The deployment still proceeds; this is advisory only.
// Clears the condition when replicas drop back to nil or <= 1.
func (r *MCPServerReconciler) validateSessionStorageForReplicas(ctx context.Context, mcpServer *mcpv1alpha1.MCPServer) {
if mcpServer.Spec.Replicas != nil && *mcpServer.Spec.Replicas > 1 {
if mcpServer.Spec.SessionStorage == nil || mcpServer.Spec.SessionStorage.Provider != "redis" {
setSessionStorageCondition(mcpServer, metav1.ConditionTrue,
mcpv1alpha1.ConditionReasonSessionStorageMissing,
"replicas > 1 but sessionStorage.provider is not redis; sessions are not shared across replicas")
} else {
setSessionStorageCondition(mcpServer, metav1.ConditionFalse,
mcpv1alpha1.ConditionReasonSessionStorageConfigured,
"Redis session storage is configured")
}
} else {
setSessionStorageCondition(mcpServer, metav1.ConditionFalse,
mcpv1alpha1.ConditionReasonSessionStorageNotApplicable,
"session storage warning is not active")
}
if err := r.Status().Update(ctx, mcpServer); err != nil {
log.FromContext(ctx).Error(err, "Failed to update MCPServer status after session storage validation")
}
}

// SetupWithManager sets up the controller with the Manager.
func (r *MCPServerReconciler) SetupWithManager(mgr ctrl.Manager) error {
// Create a handler that maps MCPExternalAuthConfig changes to MCPServer reconciliation requests
Expand Down
4 changes: 0 additions & 4 deletions cmd/thv-operator/controllers/mcpserver_pod_template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,3 @@ func TestProxyRunnerStructuredLogsEnvVar(t *testing.T) {
func boolPtr(b bool) *bool {
return &b
}

func int64Ptr(i int64) *int64 {
return &i
}
Loading
Loading