Skip to content

Commit

Permalink
fix: set default property values for minimal CRD installation (#264)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <whynowy@gmail.com>
  • Loading branch information
whynowy committed Oct 27, 2022
1 parent 17a9956 commit 5ffcadc
Show file tree
Hide file tree
Showing 15 changed files with 447 additions and 348 deletions.
2 changes: 1 addition & 1 deletion config/base/crds/full/numaflow.numaproj.io_pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ spec:
by each vertex's settings
properties:
bufferMaxLength:
default: 30000
default: 50000
description: BufferMaxLength is used to define the max length
of a buffer Only applies to UDF and Source vertice as only they
do buffer write. It can be overridden by the settings in vertex
Expand Down
2 changes: 1 addition & 1 deletion config/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4847,7 +4847,7 @@ spec:
by each vertex's settings
properties:
bufferMaxLength:
default: 30000
default: 50000
description: BufferMaxLength is used to define the max length
of a buffer Only applies to UDF and Source vertice as only they
do buffer write. It can be overridden by the settings in vertex
Expand Down
2 changes: 1 addition & 1 deletion config/namespace-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4847,7 +4847,7 @@ spec:
by each vertex's settings
properties:
bufferMaxLength:
default: 30000
default: 50000
description: BufferMaxLength is used to define the max length
of a buffer Only applies to UDF and Source vertice as only they
do buffer write. It can be overridden by the settings in vertex
Expand Down
5 changes: 2 additions & 3 deletions pkg/apis/numaflow/v1alpha1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,10 @@ const (

DefaultRequeueAfter = 10 * time.Second

// ISB
DefaultBufferLength = 50000
DefaultBufferUsageLimit = 0.8
DefaultReadBatchSize = 500

// Auto scaling
DefaultLookbackSeconds = 180 // Default lookback seconds for calculating avg rate and pending
Expand All @@ -124,9 +126,6 @@ const (
// Default window options
DefaultWindowType = FixedType
DefaultWindowDuration = 0

// Default reduce forward options
DefaultReadBatchSize = 100
)

// PBQ store's backend type.
Expand Down
605 changes: 305 additions & 300 deletions pkg/apis/numaflow/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/apis/numaflow/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 55 additions & 3 deletions pkg/apis/numaflow/v1alpha1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"os"
"strings"
"time"

appv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -259,18 +260,61 @@ func (p Pipeline) GetDaemonServiceObj() *corev1.Service {
}
}

// GetPipelineLimits returns the pipeline limits with default values
func (p Pipeline) GetPipelineLimits() PipelineLimits {
defaultReadBatchSize := uint64(DefaultReadBatchSize)
defaultBufferMaxLength := uint64(DefaultBufferLength)
defaultBufferUsageLimit := uint32(100 * DefaultBufferUsageLimit)
defaultReadTimeout := time.Second
limits := PipelineLimits{
ReadBatchSize: &defaultReadBatchSize,
BufferMaxLength: &defaultBufferMaxLength,
BufferUsageLimit: &defaultBufferUsageLimit,
ReadTimeout: &metav1.Duration{Duration: defaultReadTimeout},
}
if x := p.Spec.Limits; x != nil {
if x.ReadBatchSize != nil {
limits.ReadBatchSize = x.ReadBatchSize
}
if x.BufferMaxLength != nil {
limits.BufferMaxLength = x.BufferMaxLength
}
if x.BufferUsageLimit != nil {
limits.BufferUsageLimit = x.BufferUsageLimit
}
if x.ReadTimeout != nil {
limits.ReadTimeout = x.ReadTimeout
}
}
return limits
}

type Lifecycle struct {
// DeleteGracePeriodSeconds used to delete pipeline gracefully
// +kubebuilder:default=30
// +optional
DeleteGracePeriodSeconds int32 `json:"deleteGracePeriodSeconds,omitempty" protobuf:"varint,1,opt,name=deleteGracePeriodSeconds"`

DeleteGracePeriodSeconds *int32 `json:"deleteGracePeriodSeconds,omitempty" protobuf:"varint,1,opt,name=deleteGracePeriodSeconds"`
// DesiredPhase used to bring the pipeline from current phase to desired phase
// +kubebuilder:default=Running
// +optional
DesiredPhase PipelinePhase `json:"desiredPhase,omitempty" protobuf:"bytes,2,opt,name=desiredPhase"`
}

// GetDeleteGracePeriodSeconds returns the value DeleteGracePeriodSeconds.
func (lc Lifecycle) GetDeleteGracePeriodSeconds() int32 {
if lc.DeleteGracePeriodSeconds != nil {
return *lc.DeleteGracePeriodSeconds
}
return 30
}

func (lc Lifecycle) GetDesiredPhase() PipelinePhase {
if string(lc.DesiredPhase) != "" {
return lc.DesiredPhase
}
return PipelinePhaseRunning
}

type PipelineSpec struct {
// +optional
InterStepBufferServiceName string `json:"interStepBufferServiceName,omitempty" protobuf:"bytes,1,opt,name=interStepBufferServiceName"`
Expand Down Expand Up @@ -306,6 +350,14 @@ type Watermark struct {
MaxDelay *metav1.Duration `json:"maxDelay,omitempty" protobuf:"bytes,2,opt,name=maxDelay"`
}

// GetMaxDelay returns the configured max delay with a default value
func (wm Watermark) GetMaxDelay() time.Duration {
if wm.MaxDelay != nil {
return wm.MaxDelay.Duration
}
return time.Duration(0)
}

type PipelineLimits struct {
// Read batch size for all the vertices in the pipeline, can be overridden by the vertex's limit settings
// +kubebuilder:default=500
Expand All @@ -314,7 +366,7 @@ type PipelineLimits struct {
// BufferMaxLength is used to define the max length of a buffer
// Only applies to UDF and Source vertice as only they do buffer write.
// It can be overridden by the settings in vertex limits.
// +kubebuilder:default=30000
// +kubebuilder:default=50000
// +optional
BufferMaxLength *uint64 `json:"bufferMaxLength,omitempty" protobuf:"varint,2,opt,name=bufferMaxLength"`
// BufferUsageLimit is used to define the pencentage of the buffer usage limit, a valid value should be less than 100, for example, 85.
Expand Down
49 changes: 49 additions & 0 deletions pkg/apis/numaflow/v1alpha1/pipeline_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package v1alpha1

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
)

var (
Expand Down Expand Up @@ -218,3 +220,50 @@ func Test_GetDownstreamEdges(t *testing.T) {
edges = pl.GetDownstreamEdges("notexisting")
assert.Equal(t, 0, len(edges))
}

func Test_GetWatermarkMaxDelay(t *testing.T) {
wm := Watermark{}
assert.Equal(t, "0s", wm.GetMaxDelay().String())
wm.MaxDelay = &metav1.Duration{Duration: time.Duration(2 * time.Second)}
assert.Equal(t, "2s", wm.GetMaxDelay().String())
}

func Test_GetDeleteGracePeriodSeconds(t *testing.T) {
lc := Lifecycle{}
assert.Equal(t, int32(30), lc.GetDeleteGracePeriodSeconds())
lc.DeleteGracePeriodSeconds = pointer.Int32(50)
assert.Equal(t, int32(50), lc.GetDeleteGracePeriodSeconds())
}

func Test_GetDesiredPhase(t *testing.T) {
lc := Lifecycle{}
assert.Equal(t, PipelinePhaseRunning, lc.GetDesiredPhase())
lc.DesiredPhase = PipelinePhasePaused
assert.Equal(t, PipelinePhasePaused, lc.GetDesiredPhase())
}

func Test_GetPipelineLimits(t *testing.T) {
pl := Pipeline{
Spec: PipelineSpec{},
}
l := pl.GetPipelineLimits()
assert.Equal(t, int64(DefaultBufferLength), int64(*l.BufferMaxLength))
assert.Equal(t, float64(DefaultBufferUsageLimit), float64(*l.BufferUsageLimit)/100)
assert.Equal(t, int64(DefaultReadBatchSize), int64(*l.ReadBatchSize))
assert.Equal(t, "1s", l.ReadTimeout.Duration.String())

length := uint64(2000)
usuageLimit := uint32(40)
readBatch := uint64(321)
pl.Spec.Limits = &PipelineLimits{
BufferMaxLength: &length,
BufferUsageLimit: &usuageLimit,
ReadBatchSize: &readBatch,
ReadTimeout: &metav1.Duration{Duration: time.Duration(5 * time.Second)},
}
l = pl.GetPipelineLimits()
assert.Equal(t, length, *l.BufferMaxLength)
assert.Equal(t, float64(40)/100, float64(*l.BufferUsageLimit)/100)
assert.Equal(t, readBatch, *l.ReadBatchSize)
assert.Equal(t, "5s", l.ReadTimeout.Duration.String())
}
7 changes: 6 additions & 1 deletion pkg/apis/numaflow/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 3 additions & 10 deletions pkg/daemon/server/service/pipeline_metrics_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,16 +196,9 @@ func (ps *pipelineMetadataQuery) GetVertexMetrics(ctx context.Context, req *daem
}

func getBufferLimits(pl *v1alpha1.Pipeline, edge v1alpha1.Edge) (bufferLength int64, bufferUsageLimit float64) {
bufferLength = int64(v1alpha1.DefaultBufferLength)
bufferUsageLimit = v1alpha1.DefaultBufferUsageLimit
if x := pl.Spec.Limits; x != nil {
if x.BufferMaxLength != nil {
bufferLength = int64(*x.BufferMaxLength)
}
if x.BufferUsageLimit != nil {
bufferUsageLimit = float64(*x.BufferUsageLimit) / 100
}
}
plLimits := pl.GetPipelineLimits()
bufferLength = int64(*plLimits.BufferMaxLength)
bufferUsageLimit = float64(*plLimits.BufferUsageLimit) / 100
if x := edge.Limits; x != nil {
if x.BufferMaxLength != nil {
bufferLength = int64(*x.BufferMaxLength)
Expand Down
4 changes: 2 additions & 2 deletions pkg/isb/forward/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ func NewInterStepDataForward(vertex *dfv1.Vertex,

options := &options{
retryInterval: time.Millisecond,
readBatchSize: 1,
udfConcurrency: 1,
readBatchSize: dfv1.DefaultReadBatchSize,
udfConcurrency: dfv1.DefaultReadBatchSize,
logger: logging.NewLogger(),
}
for _, o := range opts {
Expand Down
24 changes: 10 additions & 14 deletions pkg/reconciler/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (r *pipelineReconciler) reconcile(ctx context.Context, pl *dfv1.Pipeline) (
if !pl.DeletionTimestamp.IsZero() {
log.Info("Deleting pipeline")
if controllerutil.ContainsFinalizer(pl, finalizerName) {
if time.Now().Before(pl.DeletionTimestamp.Add(time.Duration(pl.Spec.Lifecycle.DeleteGracePeriodSeconds) * time.Second)) {
if time.Now().Before(pl.DeletionTimestamp.Add(time.Duration(pl.Spec.Lifecycle.GetDeleteGracePeriodSeconds()) * time.Second)) {
safeToDelete, err := r.safeToDelete(ctx, pl)
if err != nil {
log.Errorw("Failed to check if it's safe to delete the pipeline", zap.Error(err))
Expand Down Expand Up @@ -112,7 +112,7 @@ func (r *pipelineReconciler) reconcile(ctx context.Context, pl *dfv1.Pipeline) (
return r.reconcileNonLifecycleChanges(ctx, pl)
}

if oldPhase := pl.Status.Phase; oldPhase != pl.Spec.Lifecycle.DesiredPhase {
if oldPhase := pl.Status.Phase; oldPhase != pl.Spec.Lifecycle.GetDesiredPhase() {
requeue, err := r.updateDesiredState(ctx, pl)
if err != nil {
log.Errorw("Updated desired pipeline phase failed", zap.Error(err))
Expand Down Expand Up @@ -261,7 +261,7 @@ func (r *pipelineReconciler) reconcileNonLifecycleChanges(ctx context.Context, p
}

pl.Status.MarkDeployed()
pl.Status.SetPhase(pl.Spec.Lifecycle.DesiredPhase, "")
pl.Status.SetPhase(pl.Spec.Lifecycle.GetDesiredPhase(), "")
return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -456,34 +456,30 @@ func buildVertices(pl *dfv1.Pipeline) map[string]dfv1.Vertex {
}

func copyVertexLimits(pl *dfv1.Pipeline, v *dfv1.AbstractVertex) {
if pl.Spec.Limits == nil {
return
}
plLimits := pl.GetPipelineLimits()
if v.Limits == nil {
v.Limits = &dfv1.VertexLimits{}
}
if v.Limits.ReadBatchSize == nil {
v.Limits.ReadBatchSize = pl.Spec.Limits.ReadBatchSize
v.Limits.ReadBatchSize = plLimits.ReadBatchSize
}
if v.Limits.ReadTimeout == nil {
v.Limits.ReadTimeout = pl.Spec.Limits.ReadTimeout
v.Limits.ReadTimeout = plLimits.ReadTimeout
}
}

func copyEdgeLimits(pl *dfv1.Pipeline, edges []dfv1.Edge) []dfv1.Edge {
if pl.Spec.Limits == nil {
return edges
}
plLimits := pl.GetPipelineLimits()
result := []dfv1.Edge{}
for _, e := range edges {
if e.Limits == nil {
e.Limits = &dfv1.EdgeLimits{}
}
if e.Limits.BufferMaxLength == nil {
e.Limits.BufferMaxLength = pl.Spec.Limits.BufferMaxLength
e.Limits.BufferMaxLength = plLimits.BufferMaxLength
}
if e.Limits.BufferUsageLimit == nil {
e.Limits.BufferUsageLimit = pl.Spec.Limits.BufferUsageLimit
e.Limits.BufferUsageLimit = plLimits.BufferUsageLimit
}
result = append(result, e)
}
Expand Down Expand Up @@ -535,7 +531,7 @@ var allVertexFilter vertexFilterFunc = func(v dfv1.Vertex) bool { return true }
var sourceVertexFilter vertexFilterFunc = func(v dfv1.Vertex) bool { return v.IsASource() }

func (r *pipelineReconciler) updateDesiredState(ctx context.Context, pl *dfv1.Pipeline) (bool, error) {
switch pl.Spec.Lifecycle.DesiredPhase {
switch pl.Spec.Lifecycle.GetDesiredPhase() {
case dfv1.PipelinePhasePaused:
return r.pausePipeline(ctx, pl)
case dfv1.PipelinePhaseRunning, dfv1.PipelinePhaseUnknown:
Expand Down
15 changes: 9 additions & 6 deletions pkg/reconciler/pipeline/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,18 +137,20 @@ func Test_copyVertexLimits(t *testing.T) {
pl := testPipeline.DeepCopy()
v := pl.Spec.Vertices[0].DeepCopy()
copyVertexLimits(pl, v)
assert.Nil(t, v.Limits)
assert.NotNil(t, v.Limits)
assert.Equal(t, int64(dfv1.DefaultReadBatchSize), int64(*v.Limits.ReadBatchSize))
one := uint64(1)
limitJson := `{"readTimeout": "2s"}`
var pipelineLimit dfv1.PipelineLimits
err := json.Unmarshal([]byte(limitJson), &pipelineLimit)
assert.NoError(t, err)
pipelineLimit.ReadBatchSize = &one
pl.Spec.Limits = &pipelineLimit
copyVertexLimits(pl, v)
assert.NotNil(t, v.Limits)
assert.Equal(t, one, *v.Limits.ReadBatchSize)
assert.Equal(t, "2s", v.Limits.ReadTimeout.Duration.String())
v1 := new(dfv1.AbstractVertex)
copyVertexLimits(pl, v1)
assert.NotNil(t, v1.Limits)
assert.Equal(t, int64(one), int64(*v1.Limits.ReadBatchSize))
assert.Equal(t, "2s", v1.Limits.ReadTimeout.Duration.String())
two := uint64(2)
vertexLimitJson := `{"readTimeout": "3s"}`
var vertexLimit dfv1.VertexLimits
Expand All @@ -167,7 +169,8 @@ func Test_copyEdgeLimits(t *testing.T) {
edges := []dfv1.Edge{{From: "in", To: "out"}}
result := copyEdgeLimits(pl, edges)
for _, e := range result {
assert.Nil(t, e.Limits)
assert.NotNil(t, e.Limits)
assert.Equal(t, int64(dfv1.DefaultBufferLength), int64(*e.Limits.BufferMaxLength))
}
onethouand := uint64(1000)
eighty := uint32(80)
Expand Down
5 changes: 1 addition & 4 deletions pkg/reconciler/vertex/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,10 +285,7 @@ func (r *vertexReconciler) buildPodSpec(vertex *dfv1.Vertex, pl *dfv1.Pipeline,
Value: fmt.Sprintf("%t", pl.Spec.Watermark.Disabled),
})

maxDelay := "0s"
if x := pl.Spec.Watermark.MaxDelay; x != nil {
maxDelay = x.Duration.String()
}
maxDelay := pl.Spec.Watermark.GetMaxDelay().String()
podSpec.Containers[0].Env = append(podSpec.Containers[0].Env, corev1.EnvVar{
Name: dfv1.EnvWatermarkMaxDelay,
Value: maxDelay,
Expand Down
Loading

0 comments on commit 5ffcadc

Please sign in to comment.