Skip to content

Commit

Permalink
Merge pull request #34 from apstndb/processing-units
Browse files Browse the repository at this point in the history
Support Processing Units
  • Loading branch information
tkuchiki committed Sep 10, 2021
2 parents ab62d5b + 4d615a9 commit 766c015
Show file tree
Hide file tree
Showing 8 changed files with 342 additions and 70 deletions.
30 changes: 17 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,30 @@ Spanner Autoscaler is a [Kubernetes Operator](https://coreos.com/operators/) to

1. Spanner Autoscaler is not tested on our production yet.
2. Spanner Autoscaler watches `High Priority` CPU utilization only. It doesn't watch `Low Priority` CPU utilization and Rolling average 24 hour utilization.
It doesn't check the storage size as well. You must take care of these metrics by yourself.
It doesn't check [the storage size and the number of databases](https://cloud.google.com/spanner/quotas?hl=en#database_limits) as well. You must take care of these metrics by yourself.

## Overview

[Cloud Spanner](https://cloud.google.com/spanner) is scalable.
When CPU utilization gets high, we can [reduce CPU utilization by adding new nodes](https://cloud.google.com/spanner/docs/cpu-utilization#reduce).
When CPU utilization gets high, we can [reduce CPU utilization by increasing compute capacity](https://cloud.google.com/spanner/docs/cpu-utilization?hl=en#add-compute-capacity).

Spanner Autoscaler is created to reconcile Cloud Spanner nodes like [Horizontal Pod Autoscaler](https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/) by configuring `minNodes`, `maxNodes`, and `targetCPUUtilization`.
Spanner Autoscaler is created to reconcile Cloud Spanner compute capacity like [Horizontal Pod Autoscaler](https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/) by configuring `minNodes`, `maxNodes`, and `targetCPUUtilization`.

![spanner autoscaler overview diagram](./docs/assets/overview.jpg)

When CPU Utilization(High Priority) is above `taregetCPUUtilization`, Spanner Autoscaler calcurates desired nodes count and increase nodes.
When CPU Utilization(High Priority) is above `targetCPUUtilization`, Spanner Autoscaler calculates desired compute capacity and increase compute capacity.

![spanner cpu utilization](./docs/assets/cpu_utilization.png)

![spanner node scale up](./docs/assets/node_scaleup.png)
![spanner scale up](./docs/assets/node_scaleup.png)

After CPU Utilization gets low, Spanner Autoscaler *doesn't* decrease nodes count immediately.
After CPU Utilization gets low, Spanner Autoscaler *doesn't* decrease compute capacity immediately.

![spanner node scale down](./docs/assets/node_scaledown.png)
![spanner scale down](./docs/assets/node_scaledown.png)

Spanner Autoscaler has `Scale Down Interval`(default: 55min) and `Max Scale Down Nodes`(default: 2) to scale down nodes.
The [pricing of Cloud Spanner](https://cloud.google.com/spanner/pricing) says any nodes that you provision will be billed for a minimum of one hour, so it keep nodes up around 1 hour.
And if Spanner Autoscaler reduces a lot of nodes at once like 10 -> 1, it will cause a latency increase. It reduces nodes with `maxScaleDownNodes`.
Spanner Autoscaler has `Scale Down Interval`(default: 55min) and `Max Scale Down Nodes`(default: 2) to scale down compute capacity.
The [pricing of Cloud Spanner](https://cloud.google.com/spanner/pricing) says any compute capacity that you provision will be billed for a minimum of one hour, so it keep compute capacity up around 1 hour.
And if Spanner Autoscaler reduces a lot of compute capacity at once like 10000 PU -> 1000 PU, it will cause a latency increase. It reduces compute capacity with `maxScaleDownNodes`.

## Prerequisite

Expand Down Expand Up @@ -232,9 +232,13 @@ You need to configure following items
* `instanceId`: Cloud Spanner Instance ID
* `serviceAccountSecretRef`: Secret which you created on step 1b.
* `instanceConfig.targetServiceAccount`: Target service account email which you created on step 1c.
* `minNodes`: Minimum number of Cloud Spanner nodes.
* `maxNodes`: Maximum number of Cloud Spanner nodes. It should be higher than `minNodes` and not over [quota](https://cloud.google.com/spanner/quotas).
* `maxScaleDownNodes`(optional): Maximum number of nodes scale down at once. Default is 2.
* `minProcessingUnits`: Minimum number of Cloud Spanner processing units of the instance.
* `minNodes`: It is interpreted as `minProcessingUnits = maxNodes / 1000`
* `maxProcessingUnits`: Maximum number of Cloud Spanner processing units of the instance.
* It should be higher than `minProcessingUnits` and not over [quota](https://cloud.google.com/spanner/quotas).
* `maxNodes`: It is interpreted as `maxProcessingUnits = maxNodes / 1000`
* `maxScaleDownNodes`(optional): Maximum number of compute capacity in nodes(1 node equals 1000 processing units) scale down at once. Default is 2.
* Note: `maxScaleDownNodes * 1000` is maximum number of processing units scaled down at once.
* `targetCPUUtilization`: Spanner Autoscaler watches `High Priority` CPU utilization for now. Please read [CPU utilization metrics  \|  Cloud Spanner](https://cloud.google.com/spanner/docs/cpu-utilization) and configure target CPU utilization.

#### Example yaml of 1a. Prepare single Service Account for the controller:
Expand Down
28 changes: 26 additions & 2 deletions config/crd/bases/spanner.mercari.com_spannerautoscalers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ spec:
- JSONPath: .spec.maxNodes
name: Max Nodes
type: integer
- JSONPath: .spec.minProcessingUnits
name: Min PUs
type: integer
- JSONPath: .spec.maxProcessingUnits
name: Max PUs
type: integer
- JSONPath: .spec.targetCPUUtilization.highPriority
name: Target CPU
type: integer
Expand Down Expand Up @@ -72,6 +78,12 @@ spec:
format: int32
minimum: 1
type: integer
maxProcessingUnits:
description: upper limit for the number of nodes that can be set by
the autoscaler. It cannot be smaller than minProcessingUnits.
format: int32
minimum: 100
type: integer
maxScaleDownNodes:
description: upper limit for the number of nodes when autoscaler scaledown.
format: int32
Expand All @@ -83,6 +95,12 @@ spec:
format: int32
minimum: 1
type: integer
minProcessingUnits:
description: lower limit for the number of nodes that can be set by
the autoscaler.
format: int32
minimum: 100
type: integer
scaleTargetRef:
description: target reference for scaling.
properties:
Expand Down Expand Up @@ -126,8 +144,6 @@ spec:
- highPriority
type: object
required:
- maxNodes
- minNodes
- scaleTargetRef
- targetCPUUtilization
type: object
Expand All @@ -143,10 +159,18 @@ spec:
description: current number of nodes of Spanner managed by this autoscaler.
format: int32
type: integer
currentProcessingUnits:
description: current number of nodes of Spanner managed by this autoscaler.
format: int32
type: integer
desiredNodes:
description: desired number of nodes of Spanner managed by this autoscaler.
format: int32
type: integer
desiredProcessingUnits:
description: desired number of nodes of Spanner managed by this autoscaler.
format: int32
type: integer
instanceState:
type: string
lastScaleTime:
Expand Down
27 changes: 25 additions & 2 deletions pkg/api/v1alpha1/spannerautoscaler_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,28 @@ type SpannerAutoscalerSpec struct {
ImpersonateConfig *ImpersonateConfig `json:"impersonateConfig,omitempty"`

// +kubebuilder:validation:Minimum=1
// +kubebuilder:validation:Optional
// lower limit for the number of nodes that can be set by the autoscaler.
MinNodes *int32 `json:"minNodes"`
MinNodes *int32 `json:"minNodes,omitempty"`

// +kubebuilder:validation:Minimum=1
// +kubebuilder:validation:Optional
// upper limit for the number of nodes that can be set by the autoscaler.
// It cannot be smaller than MinNodes.
MaxNodes *int32 `json:"maxNodes"`
MaxNodes *int32 `json:"maxNodes,omitempty"`

// +kubebuilder:validation:Minimum=100
// +kubebuilder:validation:MultipleOf=100
// +kubebuilder:validation:Optional
// lower limit for the number of nodes that can be set by the autoscaler.
MinProcessingUnits *int32 `json:"minProcessingUnits,omitempty"`

// +kubebuilder:validation:Minimum=100
// +kubebuilder:validation:MultipleOf=100
// +kubebuilder:validation:Optional
// upper limit for the number of nodes that can be set by the autoscaler.
// It cannot be smaller than minProcessingUnits.
MaxProcessingUnits *int32 `json:"maxProcessingUnits,omitempty"`

// +kubebuilder:validation:Minimum=1
// +kubebuilder:validation:Optional
Expand Down Expand Up @@ -110,9 +125,15 @@ type SpannerAutoscalerStatus struct {
// current number of nodes of Spanner managed by this autoscaler.
CurrentNodes *int32 `json:"currentNodes,omitempty"`

// current number of nodes of Spanner managed by this autoscaler.
CurrentProcessingUnits *int32 `json:"currentProcessingUnits,omitempty"`

// desired number of nodes of Spanner managed by this autoscaler.
DesiredNodes *int32 `json:"desiredNodes,omitempty"`

// desired number of nodes of Spanner managed by this autoscaler.
DesiredProcessingUnits *int32 `json:"desiredProcessingUnits,omitempty"`

// +kubebuilder:validation:Type=string
InstanceState InstanceState `json:"instanceState"`

Expand All @@ -127,6 +148,8 @@ type SpannerAutoscalerStatus struct {
// +kubebuilder:printcolumn:name="Instance Id",type="string",JSONPath=".spec.scaleTargetRef.instanceId"
// +kubebuilder:printcolumn:name="Min Nodes",type="integer",JSONPath=".spec.minNodes"
// +kubebuilder:printcolumn:name="Max Nodes",type="integer",JSONPath=".spec.maxNodes"
// +kubebuilder:printcolumn:name="Min PUs",type="integer",JSONPath=".spec.minProcessingUnits"
// +kubebuilder:printcolumn:name="Max PUs",type="integer",JSONPath=".spec.maxProcessingUnits"
// +kubebuilder:printcolumn:name="Target CPU",type="integer",JSONPath=".spec.targetCPUUtilization.highPriority"
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"

Expand Down
95 changes: 67 additions & 28 deletions pkg/controllers/spannerautoscaler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,37 +205,39 @@ func (r *SpannerAutoscalerReconciler) Reconcile(req ctrlreconcile.Request) (ctrl
sa.Status.LastScaleTime = &metav1.Time{}
}

if !r.needCalcNodes(&sa) {
if !r.needCalcProcessingUnits(&sa) {
return ctrlreconcile.Result{}, nil
}

desiredNodes := calcDesiredNodes(
desiredProcessingUnits := calcDesiredProcessingUnits(
*sa.Status.CurrentHighPriorityCPUUtilization,
*sa.Status.CurrentNodes,
normalizeProcessingUnitsOrNodes(sa.Status.CurrentProcessingUnits, sa.Status.CurrentNodes),
*sa.Spec.TargetCPUUtilization.HighPriority,
*sa.Spec.MinNodes,
*sa.Spec.MaxNodes,
normalizeProcessingUnitsOrNodes(sa.Spec.MinProcessingUnits, sa.Spec.MinNodes),
normalizeProcessingUnitsOrNodes(sa.Spec.MaxProcessingUnits, sa.Spec.MaxNodes),
*sa.Spec.MaxScaleDownNodes,
)

now := r.clock.Now()

if !r.needUpdateNodes(&sa, desiredNodes, now) {
if !r.needUpdateProcessingUnits(&sa, desiredProcessingUnits, now) {
return ctrlreconcile.Result{}, nil
}

if err := s.UpdateInstance(ctx, desiredNodes); err != nil {
if err := s.UpdateInstance(ctx, desiredProcessingUnits); err != nil {
r.recorder.Event(&sa, corev1.EventTypeWarning, "FailedUpdateInstance", err.Error())
log.Error(err, "failed to update spanner instance status")
return ctrlreconcile.Result{}, err
}

r.recorder.Eventf(&sa, corev1.EventTypeNormal, "Updated", "Updated number of %s/%s nodes from %d to %d", *sa.Spec.ScaleTargetRef.ProjectID, *sa.Spec.ScaleTargetRef.InstanceID, *sa.Status.CurrentNodes, desiredNodes)
r.recorder.Eventf(&sa, corev1.EventTypeNormal, "Updated", "Updated processing units of %s/%s from %d to %d", *sa.Spec.ScaleTargetRef.ProjectID, *sa.Spec.ScaleTargetRef.InstanceID,
normalizeProcessingUnitsOrNodes(sa.Status.CurrentProcessingUnits, sa.Status.CurrentNodes), desiredProcessingUnits)

log.Info("updated nodes via google cloud api", "before", *sa.Status.CurrentNodes, "after", desiredNodes)
log.Info("updated nodes via google cloud api", "before", normalizeProcessingUnitsOrNodes(sa.Status.CurrentProcessingUnits, sa.Status.CurrentNodes), "after", desiredProcessingUnits)

saCopy := sa.DeepCopy()
saCopy.Status.DesiredNodes = &desiredNodes
saCopy.Status.DesiredProcessingUnits = &desiredProcessingUnits
saCopy.Status.DesiredNodes = pointer.Int32(desiredProcessingUnits / 1000)
saCopy.Status.LastScaleTime = &metav1.Time{Time: now}

if err = r.ctrlClient.Status().Update(ctx, saCopy); err != nil {
Expand All @@ -247,6 +249,16 @@ func (r *SpannerAutoscalerReconciler) Reconcile(req ctrlreconcile.Request) (ctrl
return ctrlreconcile.Result{}, nil
}

func normalizeProcessingUnitsOrNodes(processingUnits, nodes *int32) int32 {
if processingUnits != nil {
return *processingUnits
}
if nodes != nil {
return *nodes * 1000
}
return 0
}

// SetupWithManager sets up the controller with ctrlmanager.Manager.
func (r *SpannerAutoscalerReconciler) SetupWithManager(mgr ctrlmanager.Manager) error {
opts := ctrlcontroller.Options{
Expand Down Expand Up @@ -277,12 +289,12 @@ func (r *SpannerAutoscalerReconciler) startSyncer(ctx context.Context, nn types.
return nil
}

func (r *SpannerAutoscalerReconciler) needCalcNodes(sa *spannerv1alpha1.SpannerAutoscaler) bool {
func (r *SpannerAutoscalerReconciler) needCalcProcessingUnits(sa *spannerv1alpha1.SpannerAutoscaler) bool {
log := r.log

switch {
case sa.Status.CurrentNodes == nil:
log.Info("current nodes have not fetched yet")
case sa.Status.CurrentProcessingUnits == nil && sa.Status.CurrentNodes == nil:
log.Info("current processing units have not fetched yet")
return false

case sa.Status.InstanceState != spanner.StateReady:
Expand All @@ -293,23 +305,25 @@ func (r *SpannerAutoscalerReconciler) needCalcNodes(sa *spannerv1alpha1.SpannerA
}
}

func (r *SpannerAutoscalerReconciler) needUpdateNodes(sa *spannerv1alpha1.SpannerAutoscaler, desiredNodes int32, now time.Time) bool {
func (r *SpannerAutoscalerReconciler) needUpdateProcessingUnits(sa *spannerv1alpha1.SpannerAutoscaler, desiredProcessingUnits int32, now time.Time) bool {
log := r.log

currentProcessingUnits := normalizeProcessingUnitsOrNodes(sa.Status.CurrentProcessingUnits, sa.Status.CurrentNodes)

switch {
case desiredNodes == *sa.Status.CurrentNodes:
log.V(0).Info("the desired number of nodes is equal to that of the current; no need to scale nodes")
case desiredProcessingUnits == currentProcessingUnits:
log.V(0).Info("the desired number of processing units is equal to that of the current; no need to scale")
return false

case desiredNodes > *sa.Status.CurrentNodes && r.clock.Now().Before(sa.Status.LastScaleTime.Time.Add(10*time.Second)):
log.Info("too short to scale up since instance scaled nodes last",
case desiredProcessingUnits > currentProcessingUnits && r.clock.Now().Before(sa.Status.LastScaleTime.Time.Add(10*time.Second)):
log.Info("too short to scale up since instance scaled last",
"now", now.String(),
"last scale time", sa.Status.LastScaleTime,
)

return false

case desiredNodes < *sa.Status.CurrentNodes && r.clock.Now().Before(sa.Status.LastScaleTime.Time.Add(r.scaleDownInterval)):
case desiredProcessingUnits < currentProcessingUnits && r.clock.Now().Before(sa.Status.LastScaleTime.Time.Add(r.scaleDownInterval)):
log.Info("too short to scale down since instance scaled nodes last",
"now", now.String(),
"last scale time", sa.Status.LastScaleTime,
Expand All @@ -322,23 +336,48 @@ func (r *SpannerAutoscalerReconciler) needUpdateNodes(sa *spannerv1alpha1.Spanne
}
}

// For testing purpose only
func calcDesiredNodes(currentCPU, currentNodes, targetCPU, minNodes, maxNodes, maxScaleDownNodes int32) int32 {
totalCPU := currentCPU * currentNodes
desiredNodes := totalCPU/targetCPU + 1 // roundup
return calcDesiredProcessingUnits(currentCPU, currentNodes*1000, targetCPU, minNodes*1000, maxNodes*1000, maxScaleDownNodes) / 1000
}

if (currentNodes - desiredNodes) > maxScaleDownNodes {
desiredNodes = currentNodes - maxScaleDownNodes
// nextValidProcessingUnits finds next valid value in processing units.
// https://cloud.google.com/spanner/docs/compute-capacity?hl=en
// Valid values are
// If processingUnits < 1000, processing units must be multiples of 100.
// If processingUnits >= 1000, processing units must be multiples of 1000.
func nextValidProcessingUnits(processingUnits int32) int32 {
if processingUnits < 1000 {
return ((processingUnits / 100) + 1) * 100
}
return ((processingUnits / 1000) + 1) * 1000
}

func maxInt32(first int32, rest ...int32) int32 {
result := first
for _, v := range rest {
if result < v {
result = v
}
}
return result
}

// calcDesiredProcessingUnits calculates the values needed to keep CPU utilization below TargetCPU.
func calcDesiredProcessingUnits(currentCPU, currentProcessingUnits, targetCPU, minProcessingUnits, maxProcessingUnits, maxScaleDownNodes int32) int32 {
totalCPUProduct1000 := currentCPU * currentProcessingUnits

desiredProcessingUnits := maxInt32(nextValidProcessingUnits(totalCPUProduct1000 / targetCPU), currentProcessingUnits - maxScaleDownNodes*1000)

switch {
case desiredNodes < minNodes:
return minNodes
case desiredProcessingUnits < minProcessingUnits:
return minProcessingUnits

case desiredNodes > maxNodes:
return maxNodes
case desiredProcessingUnits > maxProcessingUnits:
return maxProcessingUnits

default:
return desiredNodes
return desiredProcessingUnits
}
}

Expand Down
Loading

0 comments on commit 766c015

Please sign in to comment.