Skip to content

Commit

Permalink
fix: bug in late message handling for sliding window (#1471)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <yashashhl25@gmail.com>
Signed-off-by: Vigith Maurice <vigith@gmail.com>
Co-authored-by: Vigith Maurice <vigith@gmail.com>
  • Loading branch information
yhl25 and vigith committed Apr 3, 2024
1 parent 1735370 commit 3e6cd33
Showing 1 changed file with 44 additions and 32 deletions.
76 changes: 44 additions & 32 deletions pkg/reduce/data_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,15 +527,14 @@ func (df *DataForward) writeMessagesToWindows(ctx context.Context, messages []*i
var failedMessages = make([]*isb.ReadMessage, 0)

for _, message := range messages {
if df.shouldDropMessage(message) {
writtenMessages = append(writtenMessages, message)
continue
var windowOperations []*window.TimedWindowRequest
if message.IsLate {
windowOperations = df.handleLateMessage(message)
} else {
windowOperations = df.handleOnTimeMessage(message)
}

var failed bool
// identify and add window for the message
windowOperations := df.windower.AssignWindows(message)

// for each window we will have a PBQ. A message could belong to multiple windows (e.g., sliding).
// We need to write the messages to these PBQs
for _, winOp := range windowOperations {
Expand All @@ -557,40 +556,53 @@ func (df *DataForward) writeMessagesToWindows(ctx context.Context, messages []*i
return writtenMessages, failedMessages, err
}

func (df *DataForward) shouldDropMessage(message *isb.ReadMessage) bool {
if message.IsLate {
// we should be able to get the late message in as long as there is an open window
nextWinAsSeenByWriter := df.windower.NextWindowToBeClosed()
// if there is no window open, drop the message
if nextWinAsSeenByWriter == nil || df.windower.Type() == window.Unaligned {
df.log.Infow("Dropping the late message", zap.Time("eventTime", message.EventTime), zap.Time("watermark", message.Watermark))
return true
} else if message.EventTime.Before(nextWinAsSeenByWriter.StartTime()) { // if the message doesn't fall in the next window that is about to be closed drop it.
df.log.Infow("Dropping the late message", zap.Time("eventTime", message.EventTime), zap.Time("watermark", message.Watermark), zap.Time("nextWindowToBeClosed", nextWinAsSeenByWriter.StartTime()))
metrics.ReduceDroppedMessagesCount.With(map[string]string{
metrics.LabelVertex: df.vertexName,
metrics.LabelPipeline: df.pipelineName,
metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)),
metrics.LabelReason: "late"}).Inc()
return true
// handleLateMessage handles the late message and returns the timed window requests to be written to PBQ.
// if the message is dropped, it returns an empty slice.
func (df *DataForward) handleLateMessage(message *isb.ReadMessage) []*window.TimedWindowRequest {
lateMessageWindowRequests := make([]*window.TimedWindowRequest, 0)
// we should be able to get the late message in as long as there is an open fixed window
// for unaligned windows, we never consider late messages since it will expand the existing window, which is not the ideal behavior
// (for aligned, the windows start and end are fixed).
// for sliding windows, we cannot accept late messages because a message can be part of multiple windows
// and some windows might have already been closed.
if df.windower.Strategy() != window.Fixed {
return lateMessageWindowRequests
}

// mark it as a successfully written message as the message will be acked to avoid subsequent retries
} else { // if the message falls in the next window that is about to be closed, keep it
df.log.Debugw("Keeping the late message for next condition check because COB has not happened yet", zap.Int64("eventTime", message.EventTime.UnixMilli()), zap.Int64("watermark", message.Watermark.UnixMilli()), zap.Int64("nextWindowToBeClosed.startTime", nextWinAsSeenByWriter.StartTime().UnixMilli()))
}
nextWinAsSeenByWriter := df.windower.NextWindowToBeClosed()
// if there is no window open, drop the message.
if nextWinAsSeenByWriter == nil {
df.log.Warnw("Dropping the late message", zap.Time("eventTime", message.EventTime), zap.Time("watermark", message.Watermark))
return lateMessageWindowRequests
}
// if the message doesn't fall in the next window that is about to be closed drop it.
if message.EventTime.Before(nextWinAsSeenByWriter.StartTime()) {
df.log.Warnw("Dropping the late message", zap.Time("eventTime", message.EventTime), zap.Time("watermark", message.Watermark), zap.Time("nextWindowToBeClosed", nextWinAsSeenByWriter.StartTime()))
return lateMessageWindowRequests
}

// We will accept data as long as window is open. If a straggler (late data) makes in before the window is closed,
// it is accepted.
// since the window is not closed yet, we can send an append request.
lateMessageWindowRequests = append(lateMessageWindowRequests, &window.TimedWindowRequest{
Operation: window.Append,
ReadMessage: message,
ID: nextWinAsSeenByWriter.Partition(),
Windows: []window.TimedWindow{nextWinAsSeenByWriter},
})

return lateMessageWindowRequests
}

// handleOnTimeMessage handles the on-time message and returns the timed window requests to be written to PBQ.
func (df *DataForward) handleOnTimeMessage(message *isb.ReadMessage) []*window.TimedWindowRequest {
// NOTE(potential bug): if we get a message where the event-time is < (watermark-allowedLateness), skip processing the message.
// This could be due to a couple of problem, eg. ack was not registered, etc.
// Please do not confuse this with late data! This is a platform related problem causing the watermark inequality
// to be violated.
// df.currentWatermark cannot be -1 except for the first time till it gets a valid watermark (wm > -1)
if !message.IsLate && message.EventTime.Before(df.currentWatermark.Add(-1*df.opts.allowedLateness)) {
// TODO: track as a counter metric
df.log.Errorw("An old message just popped up", zap.Any("msgOffSet", message.ReadOffset.String()), zap.Int64("eventTime", message.EventTime.UnixMilli()), zap.Int64("watermark", message.Watermark.UnixMilli()), zap.Any("message", message.Message))
if message.EventTime.Before(df.currentWatermark.Add(-1 * df.opts.allowedLateness)) {
df.log.Errorw("An old message just popped up", zap.Any("msgOffSet", message.ReadOffset.String()), zap.Int64("eventTime", message.EventTime.UnixMilli()),
zap.Int64("msg_watermark", message.Watermark.UnixMilli()), zap.Any("message", message.Message), zap.Int64("currentWatermark", df.currentWatermark.UnixMilli()))
// mark it as a successfully written message as the message will be acked to avoid subsequent retries
// let's not continue processing this message, most likely the window has already been closed and the message
// won't be processed anyways.
Expand All @@ -599,10 +611,10 @@ func (df *DataForward) shouldDropMessage(message *isb.ReadMessage) bool {
metrics.LabelPipeline: df.pipelineName,
metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)),
metrics.LabelReason: "watermark_issue"}).Inc()
return true
return []*window.TimedWindowRequest{}
}

return false
return df.windower.AssignWindows(message)
}

// writeToPBQ writes to the PBQ. It will return error only if it is not failing to write to PBQ and is in a continuous
Expand Down

0 comments on commit 3e6cd33

Please sign in to comment.