Skip to content

Commit

Permalink
fix: bug in late message handling for sliding window (#1472)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <yashashhl25@gmail.com>
Signed-off-by: Vigith Maurice <vigith@gmail.com>
Co-authored-by: Vigith Maurice <vigith@gmail.com>
  • Loading branch information
yhl25 and vigith committed Jan 20, 2024
1 parent 0b96acf commit de780b9
Showing 1 changed file with 35 additions and 29 deletions.
64 changes: 35 additions & 29 deletions pkg/reduce/data_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,14 +402,14 @@ func (df *DataForward) writeMessagesToWindows(ctx context.Context, messages []*i

readLoop:
for i, message := range messages {
if df.shouldDropMessage(message) {
writtenMessages = append(writtenMessages, message)
continue
var windows []window.AlignedKeyedWindower
// identify the window for the messages
if message.IsLate {
windows = df.handleLateMessage(message)
} else {
windows = df.handleOnTimeMessage(message)
}

// identify and add window for the message
windows := df.upsertWindowsAndKeys(message)

// for each window we will have a PBQ. A message could belong to multiple windows (e.g., sliding).
// We need to write the messages to these PBQs
for _, kw := range windows {
Expand All @@ -425,43 +425,48 @@ readLoop:
}
}

// even if the message is late and dropped, we still consider it written
// so that the messages will be acked. Because we don't have to persist
// the late messages.
writtenMessages = append(writtenMessages, message)
}

return writtenMessages, failedMessages, err
}

func (df *DataForward) shouldDropMessage(message *isb.ReadMessage) bool {
if message.IsLate {
// we should be able to get the late message in as long as there is an open window
nextWinAsSeenByWriter := df.windower.NextWindowToBeClosed()
// if there is no window open, drop the message
if nextWinAsSeenByWriter == nil {
df.log.Warnw("Dropping the late message", zap.Time("eventTime", message.EventTime), zap.Time("watermark", message.Watermark))
return true
} else if message.EventTime.Before(nextWinAsSeenByWriter.StartTime()) { // if the message doesn't fall in the next window that is about to be closed drop it.
df.log.Warnw("Dropping the late message", zap.Time("eventTime", message.EventTime), zap.Time("watermark", message.Watermark), zap.Time("nextWindowToBeClosed", nextWinAsSeenByWriter.StartTime()))
metrics.ReduceDroppedMessagesCount.With(map[string]string{
metrics.LabelVertex: df.vertexName,
metrics.LabelPipeline: df.pipelineName,
metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)),
metrics.LabelReason: "late"}).Inc()
return true
// handleLateMessage handles the late message and returns an array of window.AlignedKeyedWindower to which the message belongs.
func (df *DataForward) handleLateMessage(message *isb.ReadMessage) []window.AlignedKeyedWindower {
lateMessageWindows := make([]window.AlignedKeyedWindower, 0)
// we should be able to get the late message in as long as there is an open window

// mark it as a successfully written message as the message will be acked to avoid subsequent retries
} else { // if the message falls in the next window that is about to be closed, keep it
df.log.Debugw("Keeping the late message for next condition check because COB has not happened yet", zap.Int64("eventTime", message.EventTime.UnixMilli()), zap.Int64("watermark", message.Watermark.UnixMilli()), zap.Int64("nextWindowToBeClosed.startTime", nextWinAsSeenByWriter.StartTime().UnixMilli()))
}
nextWinAsSeenByWriter := df.windower.NextWindowToBeClosed()
// if there is no window open, drop the message
if nextWinAsSeenByWriter == nil {
df.log.Warnw("Dropping the late message", zap.Time("eventTime", message.EventTime), zap.Time("watermark", message.Watermark))
return lateMessageWindows
}
// if the message doesn't fall in the next window that is about to be closed drop it.
if message.EventTime.Before(nextWinAsSeenByWriter.StartTime()) {
df.log.Warnw("Dropping the late message", zap.Time("eventTime", message.EventTime), zap.Time("watermark", message.Watermark), zap.Time("nextWindowToBeClosed", nextWinAsSeenByWriter.StartTime()))
return lateMessageWindows
}

// We will accept data as long as window is open. If a straggler (late data) makes in before the window is closed,
// it is accepted.
// since the window is not closed yet, we will add it to the lateMessageWindows
lateMessageWindows = append(lateMessageWindows, nextWinAsSeenByWriter)

return lateMessageWindows
}

// handleOnTimeMessage handles the on time message and returns an array of window.AlignedKeyedWindower to which the message belongs.
// FIXME: this code works only fir FIXED window and not for Sliding window.
func (df *DataForward) handleOnTimeMessage(message *isb.ReadMessage) []window.AlignedKeyedWindower {
// NOTE(potential bug): if we get a message where the event-time is < (watermark-allowedLateness), skip processing the message.
// This could be due to a couple of problem, eg. ack was not registered, etc.
// Please do not confuse this with late data! This is a platform related problem causing the watermark inequality
// to be violated.
if !message.IsLate && message.EventTime.Before(message.Watermark.Add(-1*df.opts.allowedLateness)) {
if message.EventTime.Before(message.Watermark.Add(-1 * df.opts.allowedLateness)) {
// TODO: track as a counter metric
df.log.Errorw("An old message just popped up", zap.Any("msgOffSet", message.ReadOffset.String()), zap.Int64("eventTime", message.EventTime.UnixMilli()), zap.Int64("watermark", message.Watermark.UnixMilli()), zap.Any("message", message.Message))
// mark it as a successfully written message as the message will be acked to avoid subsequent retries
Expand All @@ -472,10 +477,11 @@ func (df *DataForward) shouldDropMessage(message *isb.ReadMessage) bool {
metrics.LabelPipeline: df.pipelineName,
metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)),
metrics.LabelReason: "watermark_issue"}).Inc()
return true
return []window.AlignedKeyedWindower{}
}

return false
// for on time messages, we can invoke the upsertWindowsAndKeys
return df.upsertWindowsAndKeys(message)
}

// writeToPBQ writes to the PBQ. It will return error only if it is not failing to write to PBQ and is in a continuous
Expand Down

0 comments on commit de780b9

Please sign in to comment.