Skip to content

Commit

Permalink
minimal end to end line-graph watermark integration (#43)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <whynowy@gmail.com>
  • Loading branch information
vigith authored and whynowy committed Jun 13, 2022
1 parent bb9be80 commit f189ba3
Show file tree
Hide file tree
Showing 31 changed files with 446 additions and 85 deletions.
46 changes: 37 additions & 9 deletions pkg/isb/forward/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"sync"
"time"

"github.com/numaproj/numaflow/pkg/watermark/processor"
"github.com/numaproj/numaflow/pkg/watermark/progress"
"go.uber.org/zap"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
Expand All @@ -29,6 +31,7 @@ type InterStepDataForward struct {
toBuffers map[string]isb.BufferWriter
FSD ToWhichStepDecider
UDF udfapplier.Applier
wmProgressor progress.Progressor
opts options
vertexName string
pipelineName string
Expand All @@ -39,8 +42,8 @@ type InterStepDataForward struct {
func NewInterStepDataForward(vertex *dfv1.Vertex,
fromStep isb.BufferReader,
toSteps map[string]isb.BufferWriter,
fsd ToWhichStepDecider,
applyUDF udfapplier.Applier,
fsd ToWhichStepDecider, applyUDF udfapplier.Applier,
wmProgressor progress.Progressor,
opts ...Option) (*InterStepDataForward, error) {

options := &options{
Expand All @@ -58,12 +61,13 @@ func NewInterStepDataForward(vertex *dfv1.Vertex,
ctx, cancel := context.WithCancel(context.Background())

isdf := InterStepDataForward{
ctx: ctx,
cancelFn: cancel,
fromBuffer: fromStep,
toBuffers: toSteps,
FSD: fsd,
UDF: applyUDF,
ctx: ctx,
cancelFn: cancel,
fromBuffer: fromStep,
toBuffers: toSteps,
FSD: fsd,
UDF: applyUDF,
wmProgressor: wmProgressor,
// should we do a check here for the values not being null?
vertexName: vertex.Spec.Name,
pipelineName: vertex.Spec.PipelineName,
Expand Down Expand Up @@ -125,6 +129,11 @@ func (isdf *InterStepDataForward) Start() <-chan struct{} {
log.Infow("Closed buffer writer", zap.String("bufferTo", v.GetName()))
}
}

// stop watermark publisher if watermarking is enabled
if isdf.wmProgressor != nil {
isdf.wmProgressor.StopPublisher()
}
close(stopped)
}()

Expand Down Expand Up @@ -159,6 +168,15 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) {
if len(readMessages) == 0 {
return
}

// fetch watermark if available
// TODO: make it async (concurrent and wait later)
var processorWM processor.Watermark
if isdf.wmProgressor != nil {
// let's track only the last element's watermark
processorWM = isdf.wmProgressor.GetWatermark(readMessages[len(readMessages)-1].ReadOffset)
}

// create space for writeMessages specific to each step as we could forward to all the steps too.
var messageToStep = make(map[string][]isb.Message)
var toBuffers string
Expand Down Expand Up @@ -216,12 +234,22 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) {
}
}
// forward the message to the edge buffer (could be multiple edges)
_, err = isdf.writeToBuffers(ctx, messageToStep)
writeOffsets, err := isdf.writeToBuffers(ctx, messageToStep)
if err != nil {
isdf.opts.logger.Errorw("failed to write to toBuffers", zap.Error(err))
return
}

// forward the highest watermark to all the edges to avoid idle edge problem
if isdf.wmProgressor != nil {
// TODO: sort and get the highest value
for _, offsets := range writeOffsets {
if len(offsets) > 0 {
isdf.wmProgressor.PublishWatermark(processorWM, offsets[len(offsets)-1])
}
}
}

// let us ack the only if we have successfully forwarded all the messages.
// we need the readOffsets to acknowledge later
var readOffsets = make([]isb.Offset, len(readMessages))
Expand Down
16 changes: 8 additions & 8 deletions pkg/isb/forward/forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestNewInterStepDataForward(t *testing.T) {

writeMessages := testutils.BuildTestWriteMessages(int64(20), testStartTime)

f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardTest{}, myForwardTest{}, WithReadBatchSize(5))
f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardTest{}, myForwardTest{}, nil, WithReadBatchSize(5))
assert.NoError(t, err)
assert.False(t, to1.IsFull())
assert.True(t, to1.IsEmpty())
Expand Down Expand Up @@ -112,7 +112,7 @@ func TestNewInterStepDataForward_drop(t *testing.T) {
},
}}

f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardDropTest{}, myForwardDropTest{}, WithReadBatchSize(2))
f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardDropTest{}, myForwardDropTest{}, nil, WithReadBatchSize(2))
assert.NoError(t, err)
assert.False(t, to1.IsFull())
assert.True(t, to1.IsEmpty())
Expand Down Expand Up @@ -172,7 +172,7 @@ func TestNewInterStepDataForward_WithInternalError(t *testing.T) {
},
}}

f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardApplyErrTest{}, myForwardApplyErrTest{}, WithReadBatchSize(2))
f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardApplyErrTest{}, myForwardApplyErrTest{}, nil, WithReadBatchSize(2))
assert.NoError(t, err)
assert.False(t, to1.IsFull())
assert.True(t, to1.IsEmpty())
Expand Down Expand Up @@ -217,7 +217,7 @@ func TestNewInterStepDataForward_WhereToError(t *testing.T) {
},
}}

f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardApplyWhereToErrTest{}, myForwardApplyWhereToErrTest{}, WithReadBatchSize(2))
f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardApplyWhereToErrTest{}, myForwardApplyWhereToErrTest{}, nil, WithReadBatchSize(2))
assert.NoError(t, err)
assert.True(t, to1.IsEmpty())

Expand Down Expand Up @@ -264,7 +264,7 @@ func TestNewInterStepDataForward_UDFError(t *testing.T) {
},
}}

f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardApplyUDFErrTest{}, myForwardApplyUDFErrTest{}, WithReadBatchSize(2))
f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardApplyUDFErrTest{}, myForwardApplyUDFErrTest{}, nil, WithReadBatchSize(2))
assert.NoError(t, err)
assert.True(t, to1.IsEmpty())

Expand Down Expand Up @@ -312,7 +312,7 @@ func TestNewInterStepData_forwardToAll(t *testing.T) {
},
}}

f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardToAllTest{}, myForwardToAllTest{}, WithReadBatchSize(2))
f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardToAllTest{}, myForwardToAllTest{}, nil, WithReadBatchSize(2))
assert.NoError(t, err)
assert.False(t, to1.IsFull())
assert.True(t, to1.IsEmpty())
Expand Down Expand Up @@ -359,7 +359,7 @@ func TestNewInterStepDataForwardToOneStep(t *testing.T) {
},
}}

f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardTest{}, myForwardTest{}, WithReadBatchSize(2))
f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardTest{}, myForwardTest{}, nil, WithReadBatchSize(2))
assert.NoError(t, err)
assert.False(t, to1.IsFull())
assert.True(t, to1.IsEmpty())
Expand Down Expand Up @@ -405,7 +405,7 @@ func TestWriteToBufferError(t *testing.T) {
},
}}

f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardTest{}, myForwardTest{}, WithReadBatchSize(10))
f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardTest{}, myForwardTest{}, nil, WithReadBatchSize(10))
assert.NoError(t, err)
assert.False(t, to1.IsFull())
assert.True(t, to1.IsEmpty())
Expand Down
4 changes: 2 additions & 2 deletions pkg/isb/forward/shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestInterStepDataForward_Stop(t *testing.T) {
},
}}

f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myShutdownTest{}, myShutdownTest{}, WithReadBatchSize(2))
f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myShutdownTest{}, myShutdownTest{}, nil, WithReadBatchSize(2))
assert.NoError(t, err)
stopped := f.Start()
// write some data but buffer is not full even though we are not reading
Expand Down Expand Up @@ -75,7 +75,7 @@ func TestInterStepDataForward_ForceStop(t *testing.T) {
},
}}

f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myShutdownTest{}, myShutdownTest{}, WithReadBatchSize(2))
f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myShutdownTest{}, myShutdownTest{}, nil, WithReadBatchSize(2))
assert.NoError(t, err)
stopped := f.Start()
// write some data such that the fromBuffer can be empty, that is toBuffer gets full
Expand Down
2 changes: 1 addition & 1 deletion pkg/isb/jetstream/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestForwarderJetStreamBuffer(t *testing.T) {
"to1": to1,
}

f, err := forward.NewInterStepDataForward(vertex, fromStep, toSteps, myForwardJetStreamTest{}, myForwardJetStreamTest{})
f, err := forward.NewInterStepDataForward(vertex, fromStep, toSteps, myForwardJetStreamTest{}, myForwardJetStreamTest{}, nil)
assert.NoError(t, err)

stopped := f.Start()
Expand Down
6 changes: 3 additions & 3 deletions pkg/isb/redis/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func TestRedisCheckBacklog(t *testing.T) {

f, err := forward.NewInterStepDataForward(vertex, rqr, map[string]isb.BufferWriter{
"to1": rqw,
}, forwardReadWritePerformance{}, forwardReadWritePerformance{}, forward.WithReadBatchSize(10))
}, forwardReadWritePerformance{}, forwardReadWritePerformance{}, nil, forward.WithReadBatchSize(10))

stopped := f.Start()
// validate the length of the toStep stream.
Expand Down Expand Up @@ -305,7 +305,7 @@ func (suite *ReadWritePerformance) SetupSuite() {
},
}}

isdf, _ := forward.NewInterStepDataForward(vertex, rqr, toSteps, forwardReadWritePerformance{}, forwardReadWritePerformance{})
isdf, _ := forward.NewInterStepDataForward(vertex, rqr, toSteps, forwardReadWritePerformance{}, forwardReadWritePerformance{}, nil)

suite.ctx = ctx
suite.rclient = client
Expand Down Expand Up @@ -392,7 +392,7 @@ func (suite *ReadWritePerformance) TestReadWriteLatencyPipelining() {

suite.isdf, _ = forward.NewInterStepDataForward(vertex, suite.rqr, map[string]isb.BufferWriter{
"to1": suite.rqw,
}, forwardReadWritePerformance{}, forwardReadWritePerformance{})
}, forwardReadWritePerformance{}, forwardReadWritePerformance{}, nil)

suite.False(suite.rqw.IsFull())
var writeMessages = make([]isb.Message, 0, suite.count)
Expand Down
4 changes: 2 additions & 2 deletions pkg/isb/redis/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func TestNewInterStepDataForwardRedis(t *testing.T) {
},
}}

f, err := forward.NewInterStepDataForward(vertex, fromStep, toSteps, myForwardRedisTest{}, myForwardRedisTest{})
f, err := forward.NewInterStepDataForward(vertex, fromStep, toSteps, myForwardRedisTest{}, myForwardRedisTest{}, nil)
assert.NoError(t, err)
assert.False(t, to1.IsFull())

Expand Down Expand Up @@ -380,7 +380,7 @@ func TestReadTimeout(t *testing.T) {
},
}}

f, err := forward.NewInterStepDataForward(vertex, fromStep, toSteps, myForwardRedisTest{}, myForwardRedisTest{})
f, err := forward.NewInterStepDataForward(vertex, fromStep, toSteps, myForwardRedisTest{}, myForwardRedisTest{}, nil)
assert.NoError(t, err)
stopped := f.Start()
// Call stop to end the test as we have a blocking read. The forwarder is up and running with no messages written
Expand Down
2 changes: 1 addition & 1 deletion pkg/sinks/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func NewToKafka(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, opts ...Option
forwardOpts = append(forwardOpts, forward.WithReadBatchSize(int64(*x.ReadBatchSize)))
}
}
f, err := forward.NewInterStepDataForward(vertex, fromBuffer, map[string]isb.BufferWriter{vertex.Name: toKafka}, forward.All, applier.Terminal, forwardOpts...)
f, err := forward.NewInterStepDataForward(vertex, fromBuffer, map[string]isb.BufferWriter{vertex.Name: toKafka}, forward.All, applier.Terminal, nil, forwardOpts...)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/sinks/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestWriteSuccessToKafka(t *testing.T) {
},
}}

toKafka.isdf, err = forward.NewInterStepDataForward(vertex, fromStep, map[string]isb.BufferWriter{"name": toKafka}, forward.All, applier.Terminal)
toKafka.isdf, err = forward.NewInterStepDataForward(vertex, fromStep, map[string]isb.BufferWriter{"name": toKafka}, forward.All, applier.Terminal, nil)
assert.NoError(t, err)
toKafka.name = "Test"
toKafka.topic = "topic-1"
Expand Down Expand Up @@ -73,7 +73,7 @@ func TestWriteFailureToKafka(t *testing.T) {
},
}}

toKafka.isdf, err = forward.NewInterStepDataForward(vertex, fromStep, map[string]isb.BufferWriter{"name": toKafka}, forward.All, applier.Terminal)
toKafka.isdf, err = forward.NewInterStepDataForward(vertex, fromStep, map[string]isb.BufferWriter{"name": toKafka}, forward.All, applier.Terminal, nil)
assert.NoError(t, err)
toKafka.name = "Test"
toKafka.topic = "topic-1"
Expand Down
2 changes: 1 addition & 1 deletion pkg/sinks/logger/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func NewToLog(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, opts ...Option)
forwardOpts = append(forwardOpts, forward.WithReadBatchSize(int64(*x.ReadBatchSize)))
}
}
isdf, err := forward.NewInterStepDataForward(vertex, fromBuffer, map[string]isb.BufferWriter{name: toLog}, forward.All, applier.Terminal, forwardOpts...)
isdf, err := forward.NewInterStepDataForward(vertex, fromBuffer, map[string]isb.BufferWriter{name: toLog}, forward.All, applier.Terminal, nil, forwardOpts...)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sinks/logger/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestToLog_ForwardToTwoVertex(t *testing.T) {
},
}}

f, err := forward.NewInterStepDataForward(vertex, fromStep, toSteps, forward.All, applier.Terminal)
f, err := forward.NewInterStepDataForward(vertex, fromStep, toSteps, forward.All, applier.Terminal, nil)
assert.NoError(t, err)

stopped := f.Start()
Expand Down
1 change: 1 addition & 0 deletions pkg/sinks/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func (u *SinkProcessor) Start(ctx context.Context) error {
// getSinker takes in the logger from the parent context
func (u *SinkProcessor) getSinker(reader isb.BufferReader, logger *zap.SugaredLogger) (Sinker, error) {
sink := u.Vertex.Spec.Sink
// TODO: add watermark
if x := sink.Log; x != nil {
return logsink.NewToLog(u.Vertex, reader, logsink.WithLogger(logger))
} else if x := sink.Kafka; x != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sinks/udsink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func NewUserDefinedSink(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, opts .
}
contentType := sharedutil.LookupEnvStringOr(dfv1.EnvUDSinkContentType, string(dfv1.MsgPackType))
s.udsink = NewUDSHTTPBasedUDSink(dfv1.PathVarRun+"/udsink.sock", withTimeout(20*time.Second), withContentType(dfv1.ContentType(contentType)))
isdf, err := forward.NewInterStepDataForward(vertex, fromBuffer, map[string]isb.BufferWriter{name: s}, forward.All, applier.Terminal, forwardOpts...)
isdf, err := forward.NewInterStepDataForward(vertex, fromBuffer, map[string]isb.BufferWriter{name: s}, forward.All, applier.Terminal, nil, forwardOpts...)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit f189ba3

Please sign in to comment.