From de780b95da57437ceb4fc5d7bc77619c7e9deb2d Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Sat, 20 Jan 2024 22:56:09 +0530 Subject: [PATCH] fix: bug in late message handling for sliding window (#1472) Signed-off-by: Yashash H L Signed-off-by: Vigith Maurice Co-authored-by: Vigith Maurice --- pkg/reduce/data_forward.go | 64 +++++++++++++++++++++----------------- 1 file changed, 35 insertions(+), 29 deletions(-) diff --git a/pkg/reduce/data_forward.go b/pkg/reduce/data_forward.go index e87c812c0..79e6731c8 100644 --- a/pkg/reduce/data_forward.go +++ b/pkg/reduce/data_forward.go @@ -402,14 +402,14 @@ func (df *DataForward) writeMessagesToWindows(ctx context.Context, messages []*i readLoop: for i, message := range messages { - if df.shouldDropMessage(message) { - writtenMessages = append(writtenMessages, message) - continue + var windows []window.AlignedKeyedWindower + // identify the window for the messages + if message.IsLate { + windows = df.handleLateMessage(message) + } else { + windows = df.handleOnTimeMessage(message) } - // identify and add window for the message - windows := df.upsertWindowsAndKeys(message) - // for each window we will have a PBQ. A message could belong to multiple windows (e.g., sliding). // We need to write the messages to these PBQs for _, kw := range windows { @@ -425,43 +425,48 @@ readLoop: } } + // even if the message is late and dropped, we still consider it written + // so that the messages will be acked. Because we don't have to persist + // the late messages. writtenMessages = append(writtenMessages, message) } return writtenMessages, failedMessages, err } -func (df *DataForward) shouldDropMessage(message *isb.ReadMessage) bool { - if message.IsLate { - // we should be able to get the late message in as long as there is an open window - nextWinAsSeenByWriter := df.windower.NextWindowToBeClosed() - // if there is no window open, drop the message - if nextWinAsSeenByWriter == nil { - df.log.Warnw("Dropping the late message", zap.Time("eventTime", message.EventTime), zap.Time("watermark", message.Watermark)) - return true - } else if message.EventTime.Before(nextWinAsSeenByWriter.StartTime()) { // if the message doesn't fall in the next window that is about to be closed drop it. - df.log.Warnw("Dropping the late message", zap.Time("eventTime", message.EventTime), zap.Time("watermark", message.Watermark), zap.Time("nextWindowToBeClosed", nextWinAsSeenByWriter.StartTime())) - metrics.ReduceDroppedMessagesCount.With(map[string]string{ - metrics.LabelVertex: df.vertexName, - metrics.LabelPipeline: df.pipelineName, - metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), - metrics.LabelReason: "late"}).Inc() - return true +// handleLateMessage handles the late message and returns an array of window.AlignedKeyedWindower to which the message belongs. +func (df *DataForward) handleLateMessage(message *isb.ReadMessage) []window.AlignedKeyedWindower { + lateMessageWindows := make([]window.AlignedKeyedWindower, 0) + // we should be able to get the late message in as long as there is an open window - // mark it as a successfully written message as the message will be acked to avoid subsequent retries - } else { // if the message falls in the next window that is about to be closed, keep it - df.log.Debugw("Keeping the late message for next condition check because COB has not happened yet", zap.Int64("eventTime", message.EventTime.UnixMilli()), zap.Int64("watermark", message.Watermark.UnixMilli()), zap.Int64("nextWindowToBeClosed.startTime", nextWinAsSeenByWriter.StartTime().UnixMilli())) - } + nextWinAsSeenByWriter := df.windower.NextWindowToBeClosed() + // if there is no window open, drop the message + if nextWinAsSeenByWriter == nil { + df.log.Warnw("Dropping the late message", zap.Time("eventTime", message.EventTime), zap.Time("watermark", message.Watermark)) + return lateMessageWindows + } + // if the message doesn't fall in the next window that is about to be closed drop it. + if message.EventTime.Before(nextWinAsSeenByWriter.StartTime()) { + df.log.Warnw("Dropping the late message", zap.Time("eventTime", message.EventTime), zap.Time("watermark", message.Watermark), zap.Time("nextWindowToBeClosed", nextWinAsSeenByWriter.StartTime())) + return lateMessageWindows } // We will accept data as long as window is open. If a straggler (late data) makes in before the window is closed, // it is accepted. + // since the window is not closed yet, we will add it to the lateMessageWindows + lateMessageWindows = append(lateMessageWindows, nextWinAsSeenByWriter) + + return lateMessageWindows +} +// handleOnTimeMessage handles the on time message and returns an array of window.AlignedKeyedWindower to which the message belongs. +// FIXME: this code works only fir FIXED window and not for Sliding window. +func (df *DataForward) handleOnTimeMessage(message *isb.ReadMessage) []window.AlignedKeyedWindower { // NOTE(potential bug): if we get a message where the event-time is < (watermark-allowedLateness), skip processing the message. // This could be due to a couple of problem, eg. ack was not registered, etc. // Please do not confuse this with late data! This is a platform related problem causing the watermark inequality // to be violated. - if !message.IsLate && message.EventTime.Before(message.Watermark.Add(-1*df.opts.allowedLateness)) { + if message.EventTime.Before(message.Watermark.Add(-1 * df.opts.allowedLateness)) { // TODO: track as a counter metric df.log.Errorw("An old message just popped up", zap.Any("msgOffSet", message.ReadOffset.String()), zap.Int64("eventTime", message.EventTime.UnixMilli()), zap.Int64("watermark", message.Watermark.UnixMilli()), zap.Any("message", message.Message)) // mark it as a successfully written message as the message will be acked to avoid subsequent retries @@ -472,10 +477,11 @@ func (df *DataForward) shouldDropMessage(message *isb.ReadMessage) bool { metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelReason: "watermark_issue"}).Inc() - return true + return []window.AlignedKeyedWindower{} } - return false + // for on time messages, we can invoke the upsertWindowsAndKeys + return df.upsertWindowsAndKeys(message) } // writeToPBQ writes to the PBQ. It will return error only if it is not failing to write to PBQ and is in a continuous