diff --git a/README.md b/README.md index c4ef06bc..d2a07ef9 100644 --- a/README.md +++ b/README.md @@ -12,30 +12,30 @@ Spanner Autoscaler is a [Kubernetes Operator](https://coreos.com/operators/) to 1. Spanner Autoscaler is not tested on our production yet. 2. Spanner Autoscaler watches `High Priority` CPU utilization only. It doesn't watch `Low Priority` CPU utilization and Rolling average 24 hour utilization. -It doesn't check the storage size as well. You must take care of these metrics by yourself. +It doesn't check [the storage size and the number of databases](https://cloud.google.com/spanner/quotas?hl=en#database_limits) as well. You must take care of these metrics by yourself. ## Overview [Cloud Spanner](https://cloud.google.com/spanner) is scalable. -When CPU utilization gets high, we can [reduce CPU utilization by adding new nodes](https://cloud.google.com/spanner/docs/cpu-utilization#reduce). +When CPU utilization gets high, we can [reduce CPU utilization by increasing compute capacity](https://cloud.google.com/spanner/docs/cpu-utilization?hl=en#add-compute-capacity). -Spanner Autoscaler is created to reconcile Cloud Spanner nodes like [Horizontal Pod Autoscaler](https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/) by configuring `minNodes`, `maxNodes`, and `targetCPUUtilization`. +Spanner Autoscaler is created to reconcile Cloud Spanner compute capacity like [Horizontal Pod Autoscaler](https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/) by configuring `minNodes`, `maxNodes`, and `targetCPUUtilization`. ![spanner autoscaler overview diagram](./docs/assets/overview.jpg) -When CPU Utilization(High Priority) is above `taregetCPUUtilization`, Spanner Autoscaler calcurates desired nodes count and increase nodes. +When CPU Utilization(High Priority) is above `targetCPUUtilization`, Spanner Autoscaler calculates desired compute capacity and increase compute capacity. ![spanner cpu utilization](./docs/assets/cpu_utilization.png) -![spanner node scale up](./docs/assets/node_scaleup.png) +![spanner scale up](./docs/assets/node_scaleup.png) -After CPU Utilization gets low, Spanner Autoscaler *doesn't* decrease nodes count immediately. +After CPU Utilization gets low, Spanner Autoscaler *doesn't* decrease compute capacity immediately. -![spanner node scale down](./docs/assets/node_scaledown.png) +![spanner scale down](./docs/assets/node_scaledown.png) -Spanner Autoscaler has `Scale Down Interval`(default: 55min) and `Max Scale Down Nodes`(default: 2) to scale down nodes. -The [pricing of Cloud Spanner](https://cloud.google.com/spanner/pricing) says any nodes that you provision will be billed for a minimum of one hour, so it keep nodes up around 1 hour. -And if Spanner Autoscaler reduces a lot of nodes at once like 10 -> 1, it will cause a latency increase. It reduces nodes with `maxScaleDownNodes`. +Spanner Autoscaler has `Scale Down Interval`(default: 55min) and `Max Scale Down Nodes`(default: 2) to scale down compute capacity. +The [pricing of Cloud Spanner](https://cloud.google.com/spanner/pricing) says any compute capacity that you provision will be billed for a minimum of one hour, so it keep compute capacity up around 1 hour. +And if Spanner Autoscaler reduces a lot of compute capacity at once like 10000 PU -> 1000 PU, it will cause a latency increase. It reduces compute capacity with `maxScaleDownNodes`. ## Prerequisite @@ -232,9 +232,13 @@ You need to configure following items * `instanceId`: Cloud Spanner Instance ID * `serviceAccountSecretRef`: Secret which you created on step 1b. * `instanceConfig.targetServiceAccount`: Target service account email which you created on step 1c. -* `minNodes`: Minimum number of Cloud Spanner nodes. -* `maxNodes`: Maximum number of Cloud Spanner nodes. It should be higher than `minNodes` and not over [quota](https://cloud.google.com/spanner/quotas). -* `maxScaleDownNodes`(optional): Maximum number of nodes scale down at once. Default is 2. +* `minProcessingUnits`: Minimum number of Cloud Spanner processing units of the instance. +* `minNodes`: It is interpreted as `minProcessingUnits = maxNodes / 1000` +* `maxProcessingUnits`: Maximum number of Cloud Spanner processing units of the instance. + * It should be higher than `minProcessingUnits` and not over [quota](https://cloud.google.com/spanner/quotas). +* `maxNodes`: It is interpreted as `maxProcessingUnits = maxNodes / 1000` +* `maxScaleDownNodes`(optional): Maximum number of compute capacity in nodes(1 node equals 1000 processing units) scale down at once. Default is 2. + * Note: `maxScaleDownNodes * 1000` is maximum number of processing units scaled down at once. * `targetCPUUtilization`: Spanner Autoscaler watches `High Priority` CPU utilization for now. Please read [CPU utilization metrics  \|  Cloud Spanner](https://cloud.google.com/spanner/docs/cpu-utilization) and configure target CPU utilization. #### Example yaml of 1a. Prepare single Service Account for the controller: diff --git a/config/crd/bases/spanner.mercari.com_spannerautoscalers.yaml b/config/crd/bases/spanner.mercari.com_spannerautoscalers.yaml index c59e9d52..b50ce44e 100644 --- a/config/crd/bases/spanner.mercari.com_spannerautoscalers.yaml +++ b/config/crd/bases/spanner.mercari.com_spannerautoscalers.yaml @@ -21,6 +21,12 @@ spec: - JSONPath: .spec.maxNodes name: Max Nodes type: integer + - JSONPath: .spec.minProcessingUnits + name: Min PUs + type: integer + - JSONPath: .spec.maxProcessingUnits + name: Max PUs + type: integer - JSONPath: .spec.targetCPUUtilization.highPriority name: Target CPU type: integer @@ -72,6 +78,12 @@ spec: format: int32 minimum: 1 type: integer + maxProcessingUnits: + description: upper limit for the number of nodes that can be set by + the autoscaler. It cannot be smaller than minProcessingUnits. + format: int32 + minimum: 100 + type: integer maxScaleDownNodes: description: upper limit for the number of nodes when autoscaler scaledown. format: int32 @@ -83,6 +95,12 @@ spec: format: int32 minimum: 1 type: integer + minProcessingUnits: + description: lower limit for the number of nodes that can be set by + the autoscaler. + format: int32 + minimum: 100 + type: integer scaleTargetRef: description: target reference for scaling. properties: @@ -126,8 +144,6 @@ spec: - highPriority type: object required: - - maxNodes - - minNodes - scaleTargetRef - targetCPUUtilization type: object @@ -143,10 +159,18 @@ spec: description: current number of nodes of Spanner managed by this autoscaler. format: int32 type: integer + currentProcessingUnits: + description: current number of nodes of Spanner managed by this autoscaler. + format: int32 + type: integer desiredNodes: description: desired number of nodes of Spanner managed by this autoscaler. format: int32 type: integer + desiredProcessingUnits: + description: desired number of nodes of Spanner managed by this autoscaler. + format: int32 + type: integer instanceState: type: string lastScaleTime: diff --git a/pkg/api/v1alpha1/spannerautoscaler_types.go b/pkg/api/v1alpha1/spannerautoscaler_types.go index 958b38c8..67797c75 100644 --- a/pkg/api/v1alpha1/spannerautoscaler_types.go +++ b/pkg/api/v1alpha1/spannerautoscaler_types.go @@ -68,13 +68,28 @@ type SpannerAutoscalerSpec struct { ImpersonateConfig *ImpersonateConfig `json:"impersonateConfig,omitempty"` // +kubebuilder:validation:Minimum=1 + // +kubebuilder:validation:Optional // lower limit for the number of nodes that can be set by the autoscaler. - MinNodes *int32 `json:"minNodes"` + MinNodes *int32 `json:"minNodes,omitempty"` // +kubebuilder:validation:Minimum=1 + // +kubebuilder:validation:Optional // upper limit for the number of nodes that can be set by the autoscaler. // It cannot be smaller than MinNodes. - MaxNodes *int32 `json:"maxNodes"` + MaxNodes *int32 `json:"maxNodes,omitempty"` + + // +kubebuilder:validation:Minimum=100 + // +kubebuilder:validation:MultipleOf=100 + // +kubebuilder:validation:Optional + // lower limit for the number of nodes that can be set by the autoscaler. + MinProcessingUnits *int32 `json:"minProcessingUnits,omitempty"` + + // +kubebuilder:validation:Minimum=100 + // +kubebuilder:validation:MultipleOf=100 + // +kubebuilder:validation:Optional + // upper limit for the number of nodes that can be set by the autoscaler. + // It cannot be smaller than minProcessingUnits. + MaxProcessingUnits *int32 `json:"maxProcessingUnits,omitempty"` // +kubebuilder:validation:Minimum=1 // +kubebuilder:validation:Optional @@ -110,9 +125,15 @@ type SpannerAutoscalerStatus struct { // current number of nodes of Spanner managed by this autoscaler. CurrentNodes *int32 `json:"currentNodes,omitempty"` + // current number of nodes of Spanner managed by this autoscaler. + CurrentProcessingUnits *int32 `json:"currentProcessingUnits,omitempty"` + // desired number of nodes of Spanner managed by this autoscaler. DesiredNodes *int32 `json:"desiredNodes,omitempty"` + // desired number of nodes of Spanner managed by this autoscaler. + DesiredProcessingUnits *int32 `json:"desiredProcessingUnits,omitempty"` + // +kubebuilder:validation:Type=string InstanceState InstanceState `json:"instanceState"` @@ -127,6 +148,8 @@ type SpannerAutoscalerStatus struct { // +kubebuilder:printcolumn:name="Instance Id",type="string",JSONPath=".spec.scaleTargetRef.instanceId" // +kubebuilder:printcolumn:name="Min Nodes",type="integer",JSONPath=".spec.minNodes" // +kubebuilder:printcolumn:name="Max Nodes",type="integer",JSONPath=".spec.maxNodes" +// +kubebuilder:printcolumn:name="Min PUs",type="integer",JSONPath=".spec.minProcessingUnits" +// +kubebuilder:printcolumn:name="Max PUs",type="integer",JSONPath=".spec.maxProcessingUnits" // +kubebuilder:printcolumn:name="Target CPU",type="integer",JSONPath=".spec.targetCPUUtilization.highPriority" // +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp" diff --git a/pkg/controllers/spannerautoscaler_controller.go b/pkg/controllers/spannerautoscaler_controller.go index 70c5f1ea..8f0ed77a 100644 --- a/pkg/controllers/spannerautoscaler_controller.go +++ b/pkg/controllers/spannerautoscaler_controller.go @@ -205,37 +205,39 @@ func (r *SpannerAutoscalerReconciler) Reconcile(req ctrlreconcile.Request) (ctrl sa.Status.LastScaleTime = &metav1.Time{} } - if !r.needCalcNodes(&sa) { + if !r.needCalcProcessingUnits(&sa) { return ctrlreconcile.Result{}, nil } - desiredNodes := calcDesiredNodes( + desiredProcessingUnits := calcDesiredProcessingUnits( *sa.Status.CurrentHighPriorityCPUUtilization, - *sa.Status.CurrentNodes, + normalizeProcessingUnitsOrNodes(sa.Status.CurrentProcessingUnits, sa.Status.CurrentNodes), *sa.Spec.TargetCPUUtilization.HighPriority, - *sa.Spec.MinNodes, - *sa.Spec.MaxNodes, + normalizeProcessingUnitsOrNodes(sa.Spec.MinProcessingUnits, sa.Spec.MinNodes), + normalizeProcessingUnitsOrNodes(sa.Spec.MaxProcessingUnits, sa.Spec.MaxNodes), *sa.Spec.MaxScaleDownNodes, ) now := r.clock.Now() - if !r.needUpdateNodes(&sa, desiredNodes, now) { + if !r.needUpdateProcessingUnits(&sa, desiredProcessingUnits, now) { return ctrlreconcile.Result{}, nil } - if err := s.UpdateInstance(ctx, desiredNodes); err != nil { + if err := s.UpdateInstance(ctx, desiredProcessingUnits); err != nil { r.recorder.Event(&sa, corev1.EventTypeWarning, "FailedUpdateInstance", err.Error()) log.Error(err, "failed to update spanner instance status") return ctrlreconcile.Result{}, err } - r.recorder.Eventf(&sa, corev1.EventTypeNormal, "Updated", "Updated number of %s/%s nodes from %d to %d", *sa.Spec.ScaleTargetRef.ProjectID, *sa.Spec.ScaleTargetRef.InstanceID, *sa.Status.CurrentNodes, desiredNodes) + r.recorder.Eventf(&sa, corev1.EventTypeNormal, "Updated", "Updated processing units of %s/%s from %d to %d", *sa.Spec.ScaleTargetRef.ProjectID, *sa.Spec.ScaleTargetRef.InstanceID, + normalizeProcessingUnitsOrNodes(sa.Status.CurrentProcessingUnits, sa.Status.CurrentNodes), desiredProcessingUnits) - log.Info("updated nodes via google cloud api", "before", *sa.Status.CurrentNodes, "after", desiredNodes) + log.Info("updated nodes via google cloud api", "before", normalizeProcessingUnitsOrNodes(sa.Status.CurrentProcessingUnits, sa.Status.CurrentNodes), "after", desiredProcessingUnits) saCopy := sa.DeepCopy() - saCopy.Status.DesiredNodes = &desiredNodes + saCopy.Status.DesiredProcessingUnits = &desiredProcessingUnits + saCopy.Status.DesiredNodes = pointer.Int32(desiredProcessingUnits / 1000) saCopy.Status.LastScaleTime = &metav1.Time{Time: now} if err = r.ctrlClient.Status().Update(ctx, saCopy); err != nil { @@ -247,6 +249,16 @@ func (r *SpannerAutoscalerReconciler) Reconcile(req ctrlreconcile.Request) (ctrl return ctrlreconcile.Result{}, nil } +func normalizeProcessingUnitsOrNodes(processingUnits, nodes *int32) int32 { + if processingUnits != nil { + return *processingUnits + } + if nodes != nil { + return *nodes * 1000 + } + return 0 +} + // SetupWithManager sets up the controller with ctrlmanager.Manager. func (r *SpannerAutoscalerReconciler) SetupWithManager(mgr ctrlmanager.Manager) error { opts := ctrlcontroller.Options{ @@ -277,12 +289,12 @@ func (r *SpannerAutoscalerReconciler) startSyncer(ctx context.Context, nn types. return nil } -func (r *SpannerAutoscalerReconciler) needCalcNodes(sa *spannerv1alpha1.SpannerAutoscaler) bool { +func (r *SpannerAutoscalerReconciler) needCalcProcessingUnits(sa *spannerv1alpha1.SpannerAutoscaler) bool { log := r.log switch { - case sa.Status.CurrentNodes == nil: - log.Info("current nodes have not fetched yet") + case sa.Status.CurrentProcessingUnits == nil && sa.Status.CurrentNodes == nil: + log.Info("current processing units have not fetched yet") return false case sa.Status.InstanceState != spanner.StateReady: @@ -293,23 +305,25 @@ func (r *SpannerAutoscalerReconciler) needCalcNodes(sa *spannerv1alpha1.SpannerA } } -func (r *SpannerAutoscalerReconciler) needUpdateNodes(sa *spannerv1alpha1.SpannerAutoscaler, desiredNodes int32, now time.Time) bool { +func (r *SpannerAutoscalerReconciler) needUpdateProcessingUnits(sa *spannerv1alpha1.SpannerAutoscaler, desiredProcessingUnits int32, now time.Time) bool { log := r.log + currentProcessingUnits := normalizeProcessingUnitsOrNodes(sa.Status.CurrentProcessingUnits, sa.Status.CurrentNodes) + switch { - case desiredNodes == *sa.Status.CurrentNodes: - log.V(0).Info("the desired number of nodes is equal to that of the current; no need to scale nodes") + case desiredProcessingUnits == currentProcessingUnits: + log.V(0).Info("the desired number of processing units is equal to that of the current; no need to scale") return false - case desiredNodes > *sa.Status.CurrentNodes && r.clock.Now().Before(sa.Status.LastScaleTime.Time.Add(10*time.Second)): - log.Info("too short to scale up since instance scaled nodes last", + case desiredProcessingUnits > currentProcessingUnits && r.clock.Now().Before(sa.Status.LastScaleTime.Time.Add(10*time.Second)): + log.Info("too short to scale up since instance scaled last", "now", now.String(), "last scale time", sa.Status.LastScaleTime, ) return false - case desiredNodes < *sa.Status.CurrentNodes && r.clock.Now().Before(sa.Status.LastScaleTime.Time.Add(r.scaleDownInterval)): + case desiredProcessingUnits < currentProcessingUnits && r.clock.Now().Before(sa.Status.LastScaleTime.Time.Add(r.scaleDownInterval)): log.Info("too short to scale down since instance scaled nodes last", "now", now.String(), "last scale time", sa.Status.LastScaleTime, @@ -322,23 +336,48 @@ func (r *SpannerAutoscalerReconciler) needUpdateNodes(sa *spannerv1alpha1.Spanne } } +// For testing purpose only func calcDesiredNodes(currentCPU, currentNodes, targetCPU, minNodes, maxNodes, maxScaleDownNodes int32) int32 { - totalCPU := currentCPU * currentNodes - desiredNodes := totalCPU/targetCPU + 1 // roundup + return calcDesiredProcessingUnits(currentCPU, currentNodes*1000, targetCPU, minNodes*1000, maxNodes*1000, maxScaleDownNodes) / 1000 +} - if (currentNodes - desiredNodes) > maxScaleDownNodes { - desiredNodes = currentNodes - maxScaleDownNodes +// nextValidProcessingUnits finds next valid value in processing units. +// https://cloud.google.com/spanner/docs/compute-capacity?hl=en +// Valid values are +// If processingUnits < 1000, processing units must be multiples of 100. +// If processingUnits >= 1000, processing units must be multiples of 1000. +func nextValidProcessingUnits(processingUnits int32) int32 { + if processingUnits < 1000 { + return ((processingUnits / 100) + 1) * 100 } + return ((processingUnits / 1000) + 1) * 1000 +} + +func maxInt32(first int32, rest ...int32) int32 { + result := first + for _, v := range rest { + if result < v { + result = v + } + } + return result +} + +// calcDesiredProcessingUnits calculates the values needed to keep CPU utilization below TargetCPU. +func calcDesiredProcessingUnits(currentCPU, currentProcessingUnits, targetCPU, minProcessingUnits, maxProcessingUnits, maxScaleDownNodes int32) int32 { + totalCPUProduct1000 := currentCPU * currentProcessingUnits + + desiredProcessingUnits := maxInt32(nextValidProcessingUnits(totalCPUProduct1000 / targetCPU), currentProcessingUnits - maxScaleDownNodes*1000) switch { - case desiredNodes < minNodes: - return minNodes + case desiredProcessingUnits < minProcessingUnits: + return minProcessingUnits - case desiredNodes > maxNodes: - return maxNodes + case desiredProcessingUnits > maxProcessingUnits: + return maxProcessingUnits default: - return desiredNodes + return desiredProcessingUnits } } diff --git a/pkg/controllers/spannerautoscaler_controller_test.go b/pkg/controllers/spannerautoscaler_controller_test.go index 03a26e83..254071e4 100644 --- a/pkg/controllers/spannerautoscaler_controller_test.go +++ b/pkg/controllers/spannerautoscaler_controller_test.go @@ -134,6 +134,7 @@ func TestSpannerAutoscalerReconciler_Reconcile(t *testing.T) { want: func() *spannerv1alpha1.SpannerAutoscaler { o := baseObj.DeepCopy() o.Status.DesiredNodes = pointer.Int32(6) + o.Status.DesiredProcessingUnits = pointer.Int32(6000) o.Status.InstanceState = spannerv1alpha1.InstanceStateReady o.Status.LastScaleTime = &metav1.Time{Time: fakeTime} return o @@ -202,7 +203,7 @@ func TestSpannerAutoscalerReconciler_Reconcile(t *testing.T) { } } -func TestSpannerAutoscalerReconciler_needCalcNodes(t *testing.T) { +func TestSpannerAutoscalerReconciler_needCalcProcessingUnits(t *testing.T) { fakeTime := time.Date(2020, 4, 1, 0, 0, 0, 0, time.Local) type args struct { @@ -254,9 +255,9 @@ func TestSpannerAutoscalerReconciler_needCalcNodes(t *testing.T) { clock: clock.NewFakeClock(fakeTime), log: zapr.NewLogger(zap.NewNop()), } - got := r.needCalcNodes(tt.args.sa) + got := r.needCalcProcessingUnits(tt.args.sa) if got != tt.want { - t.Errorf("needCalcNodes() got = %v, want %v", got, tt.want) + t.Errorf("needCalcProcessingUnits() got = %v, want %v", got, tt.want) } }) } @@ -310,9 +311,9 @@ func TestSpannerAutoscalerReconciler_needUpdateNodes(t *testing.T) { clock: clock.NewFakeClock(fakeTime), log: zapr.NewLogger(zap.NewNop()), } - got := r.needUpdateNodes(tt.args.sa, *tt.args.sa.Status.DesiredNodes, fakeTime) + got := r.needUpdateProcessingUnits(tt.args.sa, normalizeProcessingUnitsOrNodes(tt.args.sa.Status.DesiredProcessingUnits, tt.args.sa.Status.DesiredNodes), fakeTime) if got != tt.want { - t.Errorf("needUpdateNodes() got = %v, want %v", got, tt.want) + t.Errorf("needUpdateProcessingUnits() got = %v, want %v", got, tt.want) } }) } @@ -409,12 +410,190 @@ func Test_calcDesiredNodes(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { if got := calcDesiredNodes(tt.args.currentCPU, tt.args.currentNodes, tt.args.targetCPU, tt.args.minNodes, tt.args.maxNodes, tt.args.maxScaleDownNodes); got != tt.want { - t.Errorf("calcDesiredNodes() = %v, want %v", got, tt.want) + t.Errorf("calcDesiredProcessingUnits() = %v, want %v", got, tt.want) } }) } } +func Test_calcDesiredProcessingUnits(t *testing.T) { + type args struct { + currentCPU int32 + currentProcessingUnits int32 + targetCPU int32 + minProcessingUnits int32 + maxProcessingUnits int32 + maxScaleDownNodes int32 + } + tests := []struct { + name string + args args + want int32 + }{ + { + name: "scale up", + args: args{ + currentCPU: 50, + currentProcessingUnits: 300, + targetCPU: 30, + minProcessingUnits: 100, + maxProcessingUnits: 1000, + maxScaleDownNodes: 2, + }, + want: 600, + }, + { + name: "scale up 2", + args: args{ + currentCPU: 50, + currentProcessingUnits: 3000, + targetCPU: 30, + minProcessingUnits: 1000, + maxProcessingUnits: 10000, + maxScaleDownNodes: 2, + }, + want: 6000, + }, + { + name: "scale up 3", + args: args{ + currentCPU: 50, + currentProcessingUnits: 900, + targetCPU: 40, + minProcessingUnits: 100, + maxProcessingUnits: 5000, + maxScaleDownNodes: 2, + }, + want: 2000, + }, + { + name: "scale down", + args: args{ + currentCPU: 30, + currentProcessingUnits: 500, + targetCPU: 50, + minProcessingUnits: 100, + maxProcessingUnits: 1000, + maxScaleDownNodes: 2, + }, + want: 400, + }, + { + name: "scale down", + args: args{ + currentCPU: 30, + currentProcessingUnits: 5000, + targetCPU: 50, + minProcessingUnits: 1000, + maxProcessingUnits: 10000, + maxScaleDownNodes: 2, + }, + want: 4000, + }, + { + name: "scale up to max PUs", + args: args{ + currentCPU: 50, + currentProcessingUnits: 300, + targetCPU: 30, + minProcessingUnits: 100, + maxProcessingUnits: 400, + maxScaleDownNodes: 2, + }, + want: 400, + }, + { + name: "scale up to max PUs 2", + args: args{ + currentCPU: 50, + currentProcessingUnits: 3000, + targetCPU: 30, + minProcessingUnits: 1000, + maxProcessingUnits: 4000, + maxScaleDownNodes: 2, + }, + want: 4000, + }, + { + name: "scale down to min PUs", + args: args{ + currentCPU: 0, + currentProcessingUnits: 500, + targetCPU: 50, + minProcessingUnits: 100, + maxProcessingUnits: 1000, + maxScaleDownNodes: 2, + }, + want: 100, + }, + { + name: "scale down to min PUs 2", + args: args{ + currentCPU: 0, + currentProcessingUnits: 5000, + targetCPU: 50, + minProcessingUnits: 1000, + maxProcessingUnits: 10000, + maxScaleDownNodes: 5, + }, + want: 1000, + }, + { + name: "scale down to min PUs", + args: args{ + currentCPU: 0, + currentProcessingUnits: 5000, + targetCPU: 50, + minProcessingUnits: 100, + maxProcessingUnits: 10000, + maxScaleDownNodes: 5, + }, + want: 100, + }, + { + name: "scale down with max scale down nodes", + args: args{ + currentCPU: 30, + currentProcessingUnits: 10000, + targetCPU: 50, + minProcessingUnits: 5000, + maxProcessingUnits: 10000, + maxScaleDownNodes: 2, + }, + want: 8000, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := calcDesiredProcessingUnits(tt.args.currentCPU, tt.args.currentProcessingUnits, tt.args.targetCPU, tt.args.minProcessingUnits, tt.args.maxProcessingUnits, tt.args.maxScaleDownNodes); got != tt.want { + t.Errorf("calcDesiredProcessingUnits() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestNextValidProcessingUnits(t *testing.T) { + tests := []struct { + input int32 + want int32 + }{ + {input: 0, want: 100}, + {input: 99, want: 100}, + {input: 100, want: 200}, + {input: 900, want: 1000}, + {input: 1000, want: 2000}, + {input: 1999, want: 2000}, + {input: 2000, want: 3000}, + } + for _, tt := range tests { + got := nextValidProcessingUnits(tt.input) + if got != tt.want { + t.Errorf("TestNextValidProcessingUnits(%v) = %v, want %v", tt.input, got, tt.want) + } + } +} + func TestSpannerAutoscalerReconciler_fetchCredentials(t *testing.T) { t.Parallel() ctx := context.Background() diff --git a/pkg/spanner/spanner.go b/pkg/spanner/spanner.go index 86671a85..af81285b 100644 --- a/pkg/spanner/spanner.go +++ b/pkg/spanner/spanner.go @@ -29,8 +29,8 @@ const ( // Instance represents Spanner Instance. type Instance struct { - NodeCount *int32 - InstanceState State + ProcessingUnits *int32 + InstanceState State } // Client is a client for manipulation of Instance. @@ -106,8 +106,8 @@ func (c *client) GetInstance(ctx context.Context, instanceID string) (*Instance, } return &Instance{ - NodeCount: pointer.Int32(i.NodeCount), - InstanceState: instanceState(i.State), + ProcessingUnits: pointer.Int32(i.ProcessingUnits), + InstanceState: instanceState(i.State), }, nil } @@ -123,14 +123,14 @@ func (c *client) UpdateInstance(ctx context.Context, instanceID string, instance return err } - if instance.NodeCount != nil { - i.NodeCount = *instance.NodeCount + if instance.ProcessingUnits != nil { + i.ProcessingUnits = *instance.ProcessingUnits } _, err = c.spannerInstanceAdminClient.UpdateInstance(ctx, &instancepb.UpdateInstanceRequest{ Instance: i, FieldMask: &field_mask.FieldMask{ - Paths: []string{"node_count"}, + Paths: []string{"processing_units"}, }, }) if err != nil { diff --git a/pkg/syncer/syncer.go b/pkg/syncer/syncer.go index 17497317..0e6bbe57 100644 --- a/pkg/syncer/syncer.go +++ b/pkg/syncer/syncer.go @@ -9,6 +9,7 @@ import ( "github.com/go-logr/logr" "github.com/go-logr/zapr" + "github.com/mercari/spanner-autoscaler/pkg/pointer" "go.uber.org/zap" "golang.org/x/oauth2" "golang.org/x/oauth2/google" @@ -36,7 +37,7 @@ type Syncer interface { UpdateTarget(projectID, instanceID string, credentials *Credentials) bool - UpdateInstance(ctx context.Context, desiredNodes int32) error + UpdateInstance(ctx context.Context, desiredProcessingUnits int32) error } type CredentialsType int @@ -271,9 +272,9 @@ func (s *syncer) UpdateTarget(projectID, instanceID string, credentials *Credent return updated } -func (s *syncer) UpdateInstance(ctx context.Context, desiredNodes int32) error { +func (s *syncer) UpdateInstance(ctx context.Context, desiredProcessingUnits int32) error { err := s.spannerClient.UpdateInstance(ctx, s.instanceID, &spanner.Instance{ - NodeCount: &desiredNodes, + ProcessingUnits: &desiredProcessingUnits, }) if err != nil { return err @@ -312,13 +313,14 @@ func (s *syncer) syncResource(ctx context.Context) error { } log.V(1).Info("spanner instance status", - "current nodes", instance.NodeCount, + "current processing untis", instance.ProcessingUnits, "instance state", instance.InstanceState, "high priority cpu utilization", instanceMetrics.CurrentHighPriorityCPUUtilization, ) saCopy := sa.DeepCopy() - saCopy.Status.CurrentNodes = instance.NodeCount + saCopy.Status.CurrentProcessingUnits = instance.ProcessingUnits + saCopy.Status.CurrentNodes = pointer.Int32(*instance.ProcessingUnits / 1000) saCopy.Status.InstanceState = instance.InstanceState saCopy.Status.CurrentHighPriorityCPUUtilization = instanceMetrics.CurrentHighPriorityCPUUtilization saCopy.Status.LastSyncTime = &metav1.Time{Time: s.clock.Now()} @@ -351,7 +353,7 @@ func (s *syncer) getInstanceInfo(ctx context.Context, instanceID string) (*spann return err } log.V(1).Info("successfully got spanner instance with spanner client", - "current nodes", instance.NodeCount, + "current processing units", instance.ProcessingUnits, "instance state", instance.InstanceState, ) return nil diff --git a/pkg/syncer/syncer_test.go b/pkg/syncer/syncer_test.go index 067e16d7..98084ca9 100644 --- a/pkg/syncer/syncer_test.go +++ b/pkg/syncer/syncer_test.go @@ -87,8 +87,8 @@ func Test_syncer_syncResource(t *testing.T) { name: "sync and update instance", fakeInstances: map[string]*spanner.Instance{ fakeInstanceID: { - NodeCount: pointer.Int32(3), - InstanceState: spanner.StateReady, + ProcessingUnits: pointer.Int32(3000), + InstanceState: spanner.StateReady, }, }, fakeMetrics: map[string]*metrics.InstanceMetrics{ @@ -103,6 +103,7 @@ func Test_syncer_syncResource(t *testing.T) { want: func() *spannerv1alpha1.SpannerAutoscaler { o := fakeSpannerAutoscaler.DeepCopy() o.Status.CurrentNodes = pointer.Int32(3) + o.Status.CurrentProcessingUnits = pointer.Int32(3000) o.Status.InstanceState = spannerv1alpha1.InstanceStateReady o.Status.CurrentHighPriorityCPUUtilization = pointer.Int32(30) o.Status.LastSyncTime = &metav1.Time{Time: fakeTime} @@ -194,8 +195,8 @@ func Test_syncer_getInstanceInfo(t *testing.T) { name: "get instance info", fakeInstances: map[string]*spanner.Instance{ fakeInstanceID: { - NodeCount: pointer.Int32(1), - InstanceState: spanner.StateReady, + ProcessingUnits: pointer.Int32(1000), + InstanceState: spanner.StateReady, }, }, fakeMetrics: map[string]*metrics.InstanceMetrics{ @@ -204,8 +205,8 @@ func Test_syncer_getInstanceInfo(t *testing.T) { }, }, wantInstance: &spanner.Instance{ - NodeCount: pointer.Int32(1), - InstanceState: spanner.StateReady, + ProcessingUnits: pointer.Int32(1000), + InstanceState: spanner.StateReady, }, wantInstanceMetrics: &metrics.InstanceMetrics{ CurrentHighPriorityCPUUtilization: pointer.Int32(50),