Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: bug in late message handling for sliding window #1471

Merged
merged 8 commits into from
Apr 2, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 44 additions & 32 deletions pkg/reduce/data_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,15 +527,14 @@ func (df *DataForward) writeMessagesToWindows(ctx context.Context, messages []*i
var failedMessages = make([]*isb.ReadMessage, 0)

for _, message := range messages {
if df.shouldDropMessage(message) {
writtenMessages = append(writtenMessages, message)
continue
var windowOperations []*window.TimedWindowRequest
if message.IsLate {
windowOperations = df.handleLateMessage(message)
} else {
windowOperations = df.handleOnTimeMessage(message)
}

var failed bool
// identify and add window for the message
windowOperations := df.windower.AssignWindows(message)

// for each window we will have a PBQ. A message could belong to multiple windows (e.g., sliding).
// We need to write the messages to these PBQs
for _, winOp := range windowOperations {
Expand All @@ -557,40 +556,53 @@ func (df *DataForward) writeMessagesToWindows(ctx context.Context, messages []*i
return writtenMessages, failedMessages, err
}

func (df *DataForward) shouldDropMessage(message *isb.ReadMessage) bool {
if message.IsLate {
// we should be able to get the late message in as long as there is an open window
nextWinAsSeenByWriter := df.windower.NextWindowToBeClosed()
// if there is no window open, drop the message
if nextWinAsSeenByWriter == nil || df.windower.Type() == window.Unaligned {
df.log.Infow("Dropping the late message", zap.Time("eventTime", message.EventTime), zap.Time("watermark", message.Watermark))
return true
} else if message.EventTime.Before(nextWinAsSeenByWriter.StartTime()) { // if the message doesn't fall in the next window that is about to be closed drop it.
df.log.Infow("Dropping the late message", zap.Time("eventTime", message.EventTime), zap.Time("watermark", message.Watermark), zap.Time("nextWindowToBeClosed", nextWinAsSeenByWriter.StartTime()))
metrics.ReduceDroppedMessagesCount.With(map[string]string{
metrics.LabelVertex: df.vertexName,
metrics.LabelPipeline: df.pipelineName,
metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)),
metrics.LabelReason: "late"}).Inc()
return true
// handleLateMessage handles the late message and returns the timed window requests to be written to PBQ.
// if the message is dropped, it returns an empty slice.
func (df *DataForward) handleLateMessage(message *isb.ReadMessage) []*window.TimedWindowRequest {
lateMessageWindowRequests := make([]*window.TimedWindowRequest, 0)
// we should be able to get the late message in as long as there is an open fixed window
// for unaligned windows, we never consider late messages since it will expand the existing window, which is not the ideal behavior
// (for aligned, the windows start and end are fixed).
// for sliding windows, we cannot accept late messages because a message can be part of multiple windows
// and some windows might have already been closed.
if df.windower.Strategy() != window.Fixed {
return lateMessageWindowRequests
}

// mark it as a successfully written message as the message will be acked to avoid subsequent retries
} else { // if the message falls in the next window that is about to be closed, keep it
df.log.Debugw("Keeping the late message for next condition check because COB has not happened yet", zap.Int64("eventTime", message.EventTime.UnixMilli()), zap.Int64("watermark", message.Watermark.UnixMilli()), zap.Int64("nextWindowToBeClosed.startTime", nextWinAsSeenByWriter.StartTime().UnixMilli()))
}
nextWinAsSeenByWriter := df.windower.NextWindowToBeClosed()
// if there is no window open, drop the message.
if nextWinAsSeenByWriter == nil {
df.log.Warnw("Dropping the late message", zap.Time("eventTime", message.EventTime), zap.Time("watermark", message.Watermark))
return lateMessageWindowRequests
}
// if the message doesn't fall in the next window that is about to be closed drop it.
if message.EventTime.Before(nextWinAsSeenByWriter.StartTime()) {
df.log.Warnw("Dropping the late message", zap.Time("eventTime", message.EventTime), zap.Time("watermark", message.Watermark), zap.Time("nextWindowToBeClosed", nextWinAsSeenByWriter.StartTime()))
return lateMessageWindowRequests
}

// We will accept data as long as window is open. If a straggler (late data) makes in before the window is closed,
// it is accepted.
// since the window is not closed yet, we can send an append request.
lateMessageWindowRequests = append(lateMessageWindowRequests, &window.TimedWindowRequest{
Operation: window.Append,
ReadMessage: message,
ID: nextWinAsSeenByWriter.Partition(),
Windows: []window.TimedWindow{nextWinAsSeenByWriter},
})

return lateMessageWindowRequests
}

// handleOnTimeMessage handles the on-time message and returns the timed window requests to be written to PBQ.
func (df *DataForward) handleOnTimeMessage(message *isb.ReadMessage) []*window.TimedWindowRequest {
// NOTE(potential bug): if we get a message where the event-time is < (watermark-allowedLateness), skip processing the message.
// This could be due to a couple of problem, eg. ack was not registered, etc.
// Please do not confuse this with late data! This is a platform related problem causing the watermark inequality
// to be violated.
// df.currentWatermark cannot be -1 except for the first time till it gets a valid watermark (wm > -1)
if !message.IsLate && message.EventTime.Before(df.currentWatermark.Add(-1*df.opts.allowedLateness)) {
// TODO: track as a counter metric
df.log.Errorw("An old message just popped up", zap.Any("msgOffSet", message.ReadOffset.String()), zap.Int64("eventTime", message.EventTime.UnixMilli()), zap.Int64("watermark", message.Watermark.UnixMilli()), zap.Any("message", message.Message))
if message.EventTime.Before(df.currentWatermark.Add(-1 * df.opts.allowedLateness)) {
df.log.Errorw("An old message just popped up", zap.Any("msgOffSet", message.ReadOffset.String()), zap.Int64("eventTime", message.EventTime.UnixMilli()),
zap.Int64("msg_watermark", message.Watermark.UnixMilli()), zap.Any("message", message.Message), zap.Int64("currentWatermark", df.currentWatermark.UnixMilli()))
// mark it as a successfully written message as the message will be acked to avoid subsequent retries
// let's not continue processing this message, most likely the window has already been closed and the message
// won't be processed anyways.
Expand All @@ -599,10 +611,10 @@ func (df *DataForward) shouldDropMessage(message *isb.ReadMessage) bool {
metrics.LabelPipeline: df.pipelineName,
metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)),
metrics.LabelReason: "watermark_issue"}).Inc()
return true
return []*window.TimedWindowRequest{}
}

return false
return df.windower.AssignWindows(message)
}

// writeToPBQ writes to the PBQ. It will return error only if it is not failing to write to PBQ and is in a continuous
Expand Down
Loading