Skip to content

Commit

Permalink
chore: bypass transformer call if UDTransformer is not defined (#1709)
Browse files Browse the repository at this point in the history
  • Loading branch information
ayildirim21 committed May 10, 2024
1 parent bfd73e8 commit 3225b73
Show file tree
Hide file tree
Showing 7 changed files with 247 additions and 98 deletions.
9 changes: 0 additions & 9 deletions pkg/sources/forward/applier/sourcetransformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,3 @@ type ApplySourceTransformFunc func(ctx context.Context, message *isb.ReadMessage
func (f ApplySourceTransformFunc) ApplyTransform(ctx context.Context, message *isb.ReadMessage) ([]*isb.WriteMessage, error) {
return f(ctx, message)
}

var (
// Terminal Applier do not make any change to the message
Terminal = ApplySourceTransformFunc(func(ctx context.Context, msg *isb.ReadMessage) ([]*isb.WriteMessage, error) {
return []*isb.WriteMessage{{
Message: msg.Message,
}}, nil
})
)
277 changes: 210 additions & 67 deletions pkg/sources/forward/data_forward.go

Large diffs are not rendered by default.

20 changes: 10 additions & 10 deletions pkg/sources/forward/data_forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func TestNewDataForward(t *testing.T) {
fetchWatermark, _ := generic.BuildNoOpSourceWatermarkProgressorsFromBufferMap(toSteps)
noOpStores := buildNoOpToVertexStores(toSteps)
idleManager, _ := wmb.NewIdleManager(1, len(toSteps))
f, err := NewDataForward(vertexInstance, fromStep, toSteps, &mySourceForwardTestRoundRobin{}, myForwardTest{}, fetchWatermark, TestSourceWatermarkPublisher{}, noOpStores, idleManager, WithReadBatchSize(batchSize))
f, err := NewDataForward(vertexInstance, fromStep, toSteps, &mySourceForwardTestRoundRobin{}, fetchWatermark, TestSourceWatermarkPublisher{}, noOpStores, idleManager, WithReadBatchSize(batchSize), WithTransformer(myForwardTest{}))

assert.NoError(t, err)
assert.False(t, to11.IsFull())
Expand Down Expand Up @@ -265,7 +265,7 @@ func TestNewDataForward(t *testing.T) {
toVertexStores := buildToVertexWatermarkStores(toSteps)

idleManager, _ := wmb.NewIdleManager(1, len(toSteps))
f, err := NewDataForward(vertexInstance, fromStep, toSteps, &myForwardToAllTest{}, &myForwardToAllTest{}, fetchWatermark, TestSourceWatermarkPublisher{}, toVertexStores, idleManager, WithReadBatchSize(batchSize))
f, err := NewDataForward(vertexInstance, fromStep, toSteps, &myForwardToAllTest{}, fetchWatermark, TestSourceWatermarkPublisher{}, toVertexStores, idleManager, WithReadBatchSize(batchSize), WithTransformer(&myForwardToAllTest{}))

assert.NoError(t, err)
assert.False(t, to11.IsFull())
Expand Down Expand Up @@ -418,7 +418,7 @@ func TestNewDataForward(t *testing.T) {
toVertexStores := buildToVertexWatermarkStores(toSteps)

idleManager, _ := wmb.NewIdleManager(1, len(toSteps))
f, err := NewDataForward(vertexInstance, fromStep, toSteps, myForwardDropTest{}, myForwardDropTest{}, fetchWatermark, TestSourceWatermarkPublisher{}, toVertexStores, idleManager, WithReadBatchSize(batchSize))
f, err := NewDataForward(vertexInstance, fromStep, toSteps, myForwardDropTest{}, fetchWatermark, TestSourceWatermarkPublisher{}, toVertexStores, idleManager, WithReadBatchSize(batchSize), WithTransformer(myForwardDropTest{}))

assert.NoError(t, err)
assert.False(t, to11.IsFull())
Expand Down Expand Up @@ -583,7 +583,7 @@ func TestNewDataForward(t *testing.T) {
toVertexStores := buildToVertexWatermarkStores(toSteps)

idleManager, _ := wmb.NewIdleManager(1, len(toSteps))
f, err := NewDataForward(vertexInstance, fromStep, toSteps, &mySourceForwardTestRoundRobin{}, myForwardTest{}, fetchWatermark, TestSourceWatermarkPublisher{}, toVertexStores, idleManager, WithReadBatchSize(batchSize))
f, err := NewDataForward(vertexInstance, fromStep, toSteps, &mySourceForwardTestRoundRobin{}, fetchWatermark, TestSourceWatermarkPublisher{}, toVertexStores, idleManager, WithReadBatchSize(batchSize), WithTransformer(myForwardTest{}))

assert.NoError(t, err)
assert.False(t, to11.IsFull())
Expand Down Expand Up @@ -720,7 +720,7 @@ func TestNewDataForward(t *testing.T) {
toVertexStores := buildNoOpToVertexStores(toSteps)

idleManager, _ := wmb.NewIdleManager(1, len(toSteps))
f, err := NewDataForward(vertexInstance, fromStep, toSteps, myForwardApplyTransformerErrTest{}, myForwardApplyTransformerErrTest{}, fetchWatermark, TestSourceWatermarkPublisher{}, toVertexStores, idleManager, WithReadBatchSize(batchSize))
f, err := NewDataForward(vertexInstance, fromStep, toSteps, myForwardApplyTransformerErrTest{}, fetchWatermark, TestSourceWatermarkPublisher{}, toVertexStores, idleManager, WithReadBatchSize(batchSize), WithTransformer(myForwardApplyTransformerErrTest{}))

assert.NoError(t, err)
assert.False(t, to1.IsFull())
Expand Down Expand Up @@ -766,7 +766,7 @@ func TestNewDataForward(t *testing.T) {
toVertexStores := buildNoOpToVertexStores(toSteps)

idleManager, _ := wmb.NewIdleManager(1, len(toSteps))
f, err := NewDataForward(vertexInstance, fromStep, toSteps, myForwardApplyWhereToErrTest{}, myForwardApplyWhereToErrTest{}, fetchWatermark, TestSourceWatermarkPublisher{}, toVertexStores, idleManager, WithReadBatchSize(batchSize))
f, err := NewDataForward(vertexInstance, fromStep, toSteps, myForwardApplyWhereToErrTest{}, fetchWatermark, TestSourceWatermarkPublisher{}, toVertexStores, idleManager, WithReadBatchSize(batchSize), WithTransformer(myForwardApplyWhereToErrTest{}))

assert.NoError(t, err)
assert.True(t, to1.IsEmpty())
Expand Down Expand Up @@ -810,7 +810,7 @@ func TestNewDataForward(t *testing.T) {
toVertexStores := buildNoOpToVertexStores(toSteps)

idleManager, _ := wmb.NewIdleManager(1, len(toSteps))
f, err := NewDataForward(vertexInstance, fromStep, toSteps, myForwardInternalErrTest{}, myForwardInternalErrTest{}, fetchWatermark, TestSourceWatermarkPublisher{}, toVertexStores, idleManager, WithReadBatchSize(batchSize))
f, err := NewDataForward(vertexInstance, fromStep, toSteps, myForwardInternalErrTest{}, fetchWatermark, TestSourceWatermarkPublisher{}, toVertexStores, idleManager, WithReadBatchSize(batchSize), WithTransformer(myForwardInternalErrTest{}))

assert.NoError(t, err)
assert.False(t, to1.IsFull())
Expand Down Expand Up @@ -921,7 +921,7 @@ func TestDataForwardSinglePartition(t *testing.T) {
toVertexStores := buildNoOpToVertexStores(toSteps)

idleManager, _ := wmb.NewIdleManager(1, len(toSteps))
f, err := NewDataForward(vertexInstance, fromStep, toSteps, mySourceForwardTest{}, mySourceForwardTest{}, fetchWatermark, TestSourceWatermarkPublisher{}, toVertexStores, idleManager, WithReadBatchSize(5))
f, err := NewDataForward(vertexInstance, fromStep, toSteps, mySourceForwardTest{}, fetchWatermark, TestSourceWatermarkPublisher{}, toVertexStores, idleManager, WithReadBatchSize(5), WithTransformer(mySourceForwardTest{}))
assert.NoError(t, err)
assert.False(t, to1.IsFull())
assert.True(t, to1.IsEmpty())
Expand Down Expand Up @@ -976,7 +976,7 @@ func TestDataForwardMultiplePartition(t *testing.T) {
toVertexStores := buildNoOpToVertexStores(toSteps)

idleManager, _ := wmb.NewIdleManager(1, len(toSteps))
f, err := NewDataForward(vertexInstance, fromStep, toSteps, &mySourceForwardTestRoundRobin{}, mySourceForwardTest{}, fetchWatermark, TestSourceWatermarkPublisher{}, toVertexStores, idleManager, WithReadBatchSize(5))
f, err := NewDataForward(vertexInstance, fromStep, toSteps, &mySourceForwardTestRoundRobin{}, fetchWatermark, TestSourceWatermarkPublisher{}, toVertexStores, idleManager, WithReadBatchSize(5), WithTransformer(mySourceForwardTest{}))
assert.NoError(t, err)
assert.False(t, to11.IsFull())
assert.False(t, to12.IsFull())
Expand Down Expand Up @@ -1085,7 +1085,7 @@ func TestWriteToBuffer(t *testing.T) {
toVertexStores := buildNoOpToVertexStores(toSteps)

idleManager, _ := wmb.NewIdleManager(1, len(toSteps))
f, err := NewDataForward(vertexInstance, fromStep, toSteps, myForwardTest{}, myForwardTest{}, fetchWatermark, TestSourceWatermarkPublisher{}, toVertexStores, idleManager, WithReadBatchSize(value.batchSize))
f, err := NewDataForward(vertexInstance, fromStep, toSteps, myForwardTest{}, fetchWatermark, TestSourceWatermarkPublisher{}, toVertexStores, idleManager, WithReadBatchSize(value.batchSize), WithTransformer(myForwardTest{}))
assert.NoError(t, err)
assert.False(t, buffer.IsFull())
assert.True(t, buffer.IsEmpty())
Expand Down
12 changes: 12 additions & 0 deletions pkg/sources/forward/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@ import (

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/shared/logging"
"github.com/numaproj/numaflow/pkg/sources/forward/applier"
)

// options for forwarding the message
type options struct {
// readBatchSize is the default batch size
readBatchSize int64
// transformer defines the way to transform data at source
transformer applier.SourceTransformApplier
// transformerConcurrency sets the concurrency for concurrent transformer processing
transformerConcurrency int
// retryInterval is the time.Duration to sleep before retrying
Expand All @@ -42,6 +45,7 @@ type Option func(*options) error
func defaultOptions() *options {
return &options{
readBatchSize: dfv1.DefaultReadBatchSize,
transformer: nil,
transformerConcurrency: dfv1.DefaultReadBatchSize,
retryInterval: time.Millisecond,
logger: logging.NewLogger(),
Expand Down Expand Up @@ -79,3 +83,11 @@ func WithLogger(l *zap.SugaredLogger) Option {
return nil
}
}

// WithTransformer sets the transformer to be applied
func WithTransformer(f applier.SourceTransformApplier) Option {
return func(o *options) error {
o.transformer = f
return nil
}
}
4 changes: 2 additions & 2 deletions pkg/sources/forward/shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestInterStepDataForward(t *testing.T) {
fetchWatermark, _ := generic.BuildNoOpSourceWatermarkProgressorsFromBufferMap(toSteps)
toVertexWmStores := buildNoOpToVertexStores(toSteps)
idleManager, _ := wmb.NewIdleManager(1, len(toSteps))
f, err := NewDataForward(vertexInstance, fromStep, toSteps, myShutdownTest{}, myShutdownTest{}, fetchWatermark, TestSourceWatermarkPublisher{}, toVertexWmStores, idleManager, WithReadBatchSize(batchSize))
f, err := NewDataForward(vertexInstance, fromStep, toSteps, myShutdownTest{}, fetchWatermark, TestSourceWatermarkPublisher{}, toVertexWmStores, idleManager, WithReadBatchSize(batchSize), WithTransformer(myShutdownTest{}))
assert.NoError(t, err)
stopped := f.Start()
// write some data but buffer is not full even though we are not reading
Expand Down Expand Up @@ -129,7 +129,7 @@ func TestInterStepDataForward(t *testing.T) {
toVertexWmStores := buildNoOpToVertexStores(toSteps)

idleManager, _ := wmb.NewIdleManager(1, len(toSteps))
f, err := NewDataForward(vertexInstance, fromStep, toSteps, myShutdownTest{}, myShutdownTest{}, fetchWatermark, TestSourceWatermarkPublisher{}, toVertexWmStores, idleManager, WithReadBatchSize(batchSize))
f, err := NewDataForward(vertexInstance, fromStep, toSteps, myShutdownTest{}, fetchWatermark, TestSourceWatermarkPublisher{}, toVertexWmStores, idleManager, WithReadBatchSize(batchSize), WithTransformer(myShutdownTest{}))
assert.NoError(t, err)
stopped := f.Start()
// write some data such that the reader can be empty, that is toBuffer gets full
Expand Down
1 change: 1 addition & 0 deletions pkg/sources/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func Test_NewHTTP(t *testing.T) {
Hostname: "test-host",
Replica: 0,
}

h, err := NewHttpSource(ctx, vi)
assert.NoError(t, err)
assert.NotNil(t, h.(*httpSource).shutdown)
Expand Down
22 changes: 12 additions & 10 deletions pkg/sources/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
sharedutil "github.com/numaproj/numaflow/pkg/shared/util"
"github.com/numaproj/numaflow/pkg/shuffle"
sourceforward "github.com/numaproj/numaflow/pkg/sources/forward"
"github.com/numaproj/numaflow/pkg/sources/forward/applier"
"github.com/numaproj/numaflow/pkg/sources/generator"
"github.com/numaproj/numaflow/pkg/sources/http"
"github.com/numaproj/numaflow/pkg/sources/kafka"
Expand Down Expand Up @@ -147,7 +146,7 @@ func (sp *SourceProcessor) Start(ctx context.Context) error {
}

// created watermark related components only if watermark is enabled
// otherwise no op will used
// otherwise no op will be used
if !sp.VertexInstance.Vertex.Spec.Watermark.Disabled {
// build watermark stores for from vertex
sourceWmStores, err = jetstream.BuildFromVertexWatermarkStores(ctx, sp.VertexInstance, natsClientPool.NextAvailableClient())
Expand Down Expand Up @@ -222,6 +221,14 @@ func (sp *SourceProcessor) Start(ctx context.Context) error {
healthCheckers = append(healthCheckers, udsGRPCClient)
}

var forwardOpts []sourceforward.Option

if x := sp.VertexInstance.Vertex.Spec.Limits; x != nil {
if x.ReadBatchSize != nil {
forwardOpts = append(forwardOpts, sourceforward.WithReadBatchSize(int64(*x.ReadBatchSize)))
}
}

if sp.VertexInstance.Vertex.HasUDTransformer() {
// Wait for server info to be ready
serverInfo, err := sdkserverinfo.SDKServerInfo(sdkserverinfo.WithServerInfoFilePath(sdkclient.SourceTransformerServerInfoFile))
Expand Down Expand Up @@ -250,6 +257,7 @@ func (sp *SourceProcessor) Start(ctx context.Context) error {
}

healthCheckers = append(healthCheckers, srcTransformerGRPCClient)
forwardOpts = append(forwardOpts, sourceforward.WithTransformer(srcTransformerGRPCClient))
}

sourceReader, err := sp.createSourceReader(ctx, udsGRPCClient)
Expand All @@ -259,19 +267,13 @@ func (sp *SourceProcessor) Start(ctx context.Context) error {

// create a source watermark publisher
sourceWmPublisher := publish.NewSourcePublish(ctx, pipelineName, vertexName, sourcePublisherStores, publish.WithDelay(sp.VertexInstance.Vertex.Spec.Watermark.GetMaxDelay()))
var forwardOpts []sourceforward.Option
if x := sp.VertexInstance.Vertex.Spec.Limits; x != nil {
if x.ReadBatchSize != nil {
forwardOpts = append(forwardOpts, sourceforward.WithReadBatchSize(int64(*x.ReadBatchSize)))
}
}

// create source data forwarder
var sourceForwarder *sourceforward.DataForward
if sp.VertexInstance.Vertex.HasUDTransformer() {
sourceForwarder, err = sourceforward.NewDataForward(sp.VertexInstance, sourceReader, writersMap, sp.getTransformerGoWhereDecider(shuffleFuncMap), srcTransformerGRPCClient, fetchWatermark, sourceWmPublisher, toVertexWatermarkStores, idleManager, forwardOpts...)
sourceForwarder, err = sourceforward.NewDataForward(sp.VertexInstance, sourceReader, writersMap, sp.getTransformerGoWhereDecider(shuffleFuncMap), fetchWatermark, sourceWmPublisher, toVertexWatermarkStores, idleManager, forwardOpts...)
} else {
sourceForwarder, err = sourceforward.NewDataForward(sp.VertexInstance, sourceReader, writersMap, sp.getSourceGoWhereDecider(shuffleFuncMap), applier.Terminal, fetchWatermark, sourceWmPublisher, toVertexWatermarkStores, idleManager, forwardOpts...)
sourceForwarder, err = sourceforward.NewDataForward(sp.VertexInstance, sourceReader, writersMap, sp.getSourceGoWhereDecider(shuffleFuncMap), fetchWatermark, sourceWmPublisher, toVertexWatermarkStores, idleManager, forwardOpts...)
}
if err != nil {
return fmt.Errorf("failed to create source forwarder, error: %w", err)
Expand Down

0 comments on commit 3225b73

Please sign in to comment.