Skip to content

Commit

Permalink
feat: enable streaming message to next vertex when batch size is 1 (#709
Browse files Browse the repository at this point in the history
)

Signed-off-by: Hao Hao <xdevxhao@gmail.com>
  • Loading branch information
xdevxy authored and whynowy committed May 30, 2023
1 parent a505884 commit 11cd8e9
Show file tree
Hide file tree
Showing 20 changed files with 1,532 additions and 763 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe
github.com/nats-io/nats-server/v2 v2.7.5-0.20220415000625-a6b62f61a703
github.com/nats-io/nats.go v1.24.0
github.com/numaproj/numaflow-go v0.4.5
github.com/numaproj/numaflow-go v0.4.6-0.20230509160243-b9e82285daf3
github.com/prometheus/client_golang v1.12.1
github.com/prometheus/common v0.32.1
github.com/redis/go-redis/v9 v9.0.3
Expand All @@ -39,6 +39,7 @@ require (
go.uber.org/goleak v1.2.1
go.uber.org/multierr v1.7.0
go.uber.org/zap v1.19.1
golang.org/x/sync v0.1.0
google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f
google.golang.org/grpc v1.54.0
google.golang.org/protobuf v1.28.1
Expand Down Expand Up @@ -174,7 +175,6 @@ require (
golang.org/x/mod v0.8.0 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/oauth2 v0.4.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/term v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -707,8 +707,8 @@ github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/numaproj/numaflow-go v0.4.5 h1:t8oDmP32eABCGBXAOOXSGeTEPzX+gPW0plPoeMNvrcA=
github.com/numaproj/numaflow-go v0.4.5/go.mod h1:6hl5mLd3BFDNJ4gJhrft9vuHHPaog68iog2KkibHnbo=
github.com/numaproj/numaflow-go v0.4.6-0.20230509160243-b9e82285daf3 h1:U1irDJfatdjhOoMEZ0mskVB3bOq11Eej7T0BuRtkQ8c=
github.com/numaproj/numaflow-go v0.4.6-0.20230509160243-b9e82285daf3/go.mod h1:6hl5mLd3BFDNJ4gJhrft9vuHHPaog68iog2KkibHnbo=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/numaflow/v1alpha1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ const (

// Default gRPC max message size
DefaultGRPCMaxMessageSize = 20 * 1024 * 1024

// UDF map streaming
MapUdfStreamKey = "numaflow.numaproj.io/map-stream"
)

var (
Expand Down
10 changes: 10 additions & 0 deletions pkg/apis/numaflow/v1alpha1/vertex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"os"
"strconv"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -355,6 +356,15 @@ func (v Vertex) GetReplicas() int {
return int(*v.Spec.Replicas)
}

func (v Vertex) MapUdfStreamEnabled() (bool, error) {
if v.Spec.Metadata != nil && v.Spec.Metadata.Annotations != nil {
if mapUdfStream, existing := v.Spec.Metadata.Annotations[MapUdfStreamKey]; existing {
return strconv.ParseBool(mapUdfStream)
}
}
return false, nil
}

type VertexSpec struct {
AbstractVertex `json:",inline" protobuf:"bytes,1,opt,name=abstractVertex"`
PipelineName string `json:"pipelineName" protobuf:"bytes,2,opt,name=pipelineName"`
Expand Down
33 changes: 26 additions & 7 deletions pkg/forward/applier/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,39 @@ import (
// InternalErr can be returned and could be retried by the callee.
type MapApplier interface {
ApplyMap(ctx context.Context, message *isb.ReadMessage) ([]*isb.WriteMessage, error)
ApplyMapStream(ctx context.Context, message *isb.ReadMessage, writeMessageCh chan<- isb.WriteMessage) error
}

// ApplyMapFunc utility function used to create an Applier implementation
type ApplyMapFunc func(context.Context, *isb.ReadMessage) ([]*isb.WriteMessage, error)
type ApplyMapFunc struct {
applyMap func(context.Context, *isb.ReadMessage) ([]*isb.WriteMessage, error)
applyMapStream func(context.Context, *isb.ReadMessage, chan<- isb.WriteMessage) error
}

func (a ApplyMapFunc) ApplyMap(ctx context.Context, message *isb.ReadMessage) ([]*isb.WriteMessage, error) {
return a(ctx, message)
return a.applyMap(ctx, message)
}

func (a ApplyMapFunc) ApplyMapStream(ctx context.Context, message *isb.ReadMessage, writeMessageCh chan<- isb.WriteMessage) error {
return a.applyMapStream(ctx, message, writeMessageCh)
}

var (
// Terminal Applier do not make any change to the message
Terminal = ApplyMapFunc(func(ctx context.Context, msg *isb.ReadMessage) ([]*isb.WriteMessage, error) {
return []*isb.WriteMessage{{
Message: msg.Message,
}}, nil
})
Terminal = ApplyMapFunc{
applyMap: func(ctx context.Context, msg *isb.ReadMessage) ([]*isb.WriteMessage, error) {
return []*isb.WriteMessage{{
Message: msg.Message,
}}, nil
},
applyMapStream: func(ctx context.Context, msg *isb.ReadMessage, writeMessageCh chan<- isb.WriteMessage) error {
defer close(writeMessageCh)
writeMessage := &isb.WriteMessage{
Message: msg.Message,
}

writeMessageCh <- *writeMessage
return nil
},
}
)

0 comments on commit 11cd8e9

Please sign in to comment.