Skip to content

Commit

Permalink
feat: Expose ReadTimeoutSeconds on Vertex (#110)
Browse files Browse the repository at this point in the history
* feat: Expose ReadTimeoutSeconds on Vertex

Signed-off-by: Saravanan Balasubramanian <sarabala1979@gmail.com>

* fix: comments

Signed-off-by: Saravanan Balasubramanian <sarabala1979@gmail.com>

* fix: refactor the seconds to duration

Signed-off-by: Saravanan Balasubramanian <sarabala1979@gmail.com>

* fix: metav1.duration

Signed-off-by: Saravanan Balasubramanian <sarabala1979@gmail.com>

* fix: comments

Signed-off-by: Saravanan Balasubramanian <sarabala1979@gmail.com>
  • Loading branch information
sarabala1979 authored and whynowy committed Aug 6, 2022
1 parent d95d41b commit 8e612b1
Show file tree
Hide file tree
Showing 16 changed files with 522 additions and 294 deletions.
8 changes: 8 additions & 0 deletions config/base/crds/numaflow.numaproj.io_pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ spec:
can be overridden by the vertex's limit settings
format: int64
type: integer
readTimeout:
default: 1s
description: Read timeout seconds for all the vertices in the
pipeline, can be overridden by the vertex's limit settings
type: string
type: object
vertices:
items:
Expand Down Expand Up @@ -1384,6 +1389,9 @@ spec:
description: Read batch size
format: int64
type: integer
readTimeout:
description: Read timeout duration
type: string
type: object
metadata:
description: Metadata sets the pods's metadata, i.e. annotations
Expand Down
3 changes: 3 additions & 0 deletions config/base/crds/numaflow.numaproj.io_vertices.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1262,6 +1262,9 @@ spec:
description: Read batch size
format: int64
type: integer
readTimeout:
description: Read timeout duration
type: string
type: object
metadata:
description: Metadata sets the pods's metadata, i.e. annotations and
Expand Down
11 changes: 11 additions & 0 deletions config/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4872,6 +4872,11 @@ spec:
can be overridden by the vertex's limit settings
format: int64
type: integer
readTimeout:
default: 1s
description: Read timeout seconds for all the vertices in the
pipeline, can be overridden by the vertex's limit settings
type: string
type: object
vertices:
items:
Expand Down Expand Up @@ -6110,6 +6115,9 @@ spec:
description: Read batch size
format: int64
type: integer
readTimeout:
description: Read timeout duration
type: string
type: object
metadata:
description: Metadata sets the pods's metadata, i.e. annotations
Expand Down Expand Up @@ -9993,6 +10001,9 @@ spec:
description: Read batch size
format: int64
type: integer
readTimeout:
description: Read timeout duration
type: string
type: object
metadata:
description: Metadata sets the pods's metadata, i.e. annotations and
Expand Down
3 changes: 3 additions & 0 deletions controllers/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,9 @@ func copyVertexLimits(pl *dfv1.Pipeline, v *dfv1.AbstractVertex) {
if v.Limits.ReadBatchSize == nil {
v.Limits.ReadBatchSize = pl.Spec.Limits.ReadBatchSize
}
if v.Limits.ReadTimeout == nil {
v.Limits.ReadTimeout = pl.Spec.Limits.ReadTimeout
}
}

func copyEdgeLimits(pl *dfv1.Pipeline, edges []dfv1.Edge) []dfv1.Edge {
Expand Down
16 changes: 15 additions & 1 deletion controllers/pipeline/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"testing"

"github.com/goccy/go-json"
"github.com/numaproj/numaflow/controllers"
dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -138,14 +139,27 @@ func Test_copyVertexLimits(t *testing.T) {
copyVertexLimits(pl, v)
assert.Nil(t, v.Limits)
one := uint64(1)
pl.Spec.Limits = &dfv1.PipelineLimits{ReadBatchSize: &one}
limitJson := `{"readTimeout": "2s"}`
var pipelineLimit dfv1.PipelineLimits
err := json.Unmarshal([]byte(limitJson), &pipelineLimit)
assert.NoError(t, err)
pipelineLimit.ReadBatchSize = &one
pl.Spec.Limits = &pipelineLimit
copyVertexLimits(pl, v)
assert.NotNil(t, v.Limits)
assert.Equal(t, one, *v.Limits.ReadBatchSize)
assert.Equal(t, "2s", v.Limits.ReadTimeout.Duration.String())
two := uint64(2)
vertexLimitJson := `{"readTimeout": "3s"}`
var vertexLimit dfv1.VertexLimits
err = json.Unmarshal([]byte(vertexLimitJson), &vertexLimit)
assert.NoError(t, err)
v.Limits = &vertexLimit
v.Limits.ReadBatchSize = &two
copyVertexLimits(pl, v)
assert.Equal(t, two, *v.Limits.ReadBatchSize)
assert.Equal(t, "3s", v.Limits.ReadTimeout.Duration.String())

}

func Test_copyEdgeLimits(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion controllers/pipeline/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package pipeline

import (
"fmt"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
)

Expand All @@ -25,6 +24,7 @@ func ValidatePipeline(pl *dfv1.Pipeline) error {
return fmt.Errorf("duplicate vertex name %q", v.Name)
}
names[v.Name] = true

if v.Source == nil && v.Sink == nil && v.UDF == nil {
return fmt.Errorf("invalid vertex %q, it could only be either a source, or a sink, or a UDF", v.Name)
}
Expand Down
27 changes: 27 additions & 0 deletions docs/APIs.md
Original file line number Diff line number Diff line change
Expand Up @@ -2550,6 +2550,20 @@ be overridden by the settings in vertex limits.
</p>
</td>
</tr>
<tr>
<td>
<code>readTimeout</code></br> <em>
<a href="https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1#Duration">
Kubernetes meta/v1.Duration </a> </em>
</td>
<td>
<em>(Optional)</em>
<p>
Read timeout seconds for all the vertices in the pipeline, can be
overridden by the vertex’s limit settings
</p>
</td>
</tr>
</tbody>
</table>
<h3 id="numaflow.numaproj.io/v1alpha1.PipelinePhase">
Expand Down Expand Up @@ -3455,6 +3469,19 @@ Read batch size
</p>
</td>
</tr>
<tr>
<td>
<code>readTimeout</code></br> <em>
<a href="https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1#Duration">
Kubernetes meta/v1.Duration </a> </em>
</td>
<td>
<em>(Optional)</em>
<p>
Read timeout duration
</p>
</td>
</tr>
</tbody>
</table>
<h3 id="numaflow.numaproj.io/v1alpha1.VertexPhase">
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/gin-gonic/gin v1.8.1
github.com/go-redis/redis/v8 v8.11.4
github.com/go-swagger/go-swagger v0.28.0
github.com/goccy/go-json v0.9.7
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.2
github.com/google/uuid v1.2.0
Expand Down Expand Up @@ -87,7 +88,6 @@ require (
github.com/go-playground/validator/v10 v10.10.0 // indirect
github.com/go-stack/stack v1.8.1 // indirect
github.com/gobuffalo/flect v0.2.3 // indirect
github.com/goccy/go-json v0.9.7 // indirect
github.com/golang/glog v1.0.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/snappy v0.0.4 // indirect
Expand Down

0 comments on commit 8e612b1

Please sign in to comment.