-
Notifications
You must be signed in to change notification settings - Fork 98
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Fallback Sink #1669
feat: Fallback Sink #1669
Conversation
Signed-off-by: Yashash H L <yashashhl25@gmail.com>
Signed-off-by: Yashash H L <yashashhl25@gmail.com>
Signed-off-by: Yashash H L <yashashhl25@gmail.com>
Signed-off-by: Yashash H L <yashashhl25@gmail.com>
// forward the message to the edge buffer (could be multiple edges) | ||
writeOffsets, err := df.writeToBuffer(ctx, df.toBuffer, messageToStep) | ||
// write the messages to the sink | ||
writeOffsets, fallbackMessages, err := df.writeToSink(ctx, df.sinkWriter, writeMessages) | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment that this error will only happen if there is ctx.Done()
pkg/sinks/udsink/udsink_grpc.go
Outdated
@@ -28,6 +28,8 @@ import ( | |||
"github.com/numaproj/numaflow/pkg/shared/logging" | |||
) | |||
|
|||
var WriteToFallbackErr = "write to fallback sink" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inline it?
pkg/sinks/sink.go
Outdated
} | ||
|
||
// create the fallback sink writer if fallback sink is present | ||
if u.VertexInstance.Vertex.Spec.Sink.Fallback != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This requires a sink must be defined in the field fallback
. We could argue this is a requirement, but usually this should not be enforced - if nothing is defined under fallback
, we should consider there's no fallback specified.
Signed-off-by: Yashash H L <yashashhl25@gmail.com>
// create the fallback sink writer if fallback sink is present | ||
fbSink := u.VertexInstance.Vertex.Spec.Sink.Fallback | ||
if fbSink != nil && fbSink.IsAnySinkSpecified() { | ||
fbSinkWriter, err := u.createSinkWriter(ctx, u.VertexInstance.Vertex.Spec.Sink.Fallback, fbSinkHandler) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Who handles the close of gbSinkWriter
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of using an option to pass in the fallback writer, is it better to explicitly add it as a property to the forwarder?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good find... The forwarder should close it, I missed that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fb sink writer is not a mandatory field for the forwarder to start, so kept it optional. Since it is optional for users as well
Signed-off-by: Yashash H L <yashashhl25@gmail.com>
Signed-off-by: Yashash H L <yashashhl25@gmail.com> Co-authored-by: Vigith Maurice <vigith@gmail.com>
Signed-off-by: Yashash H L <yashashhl25@gmail.com>
Signed-off-by: Yashash H L <yashashhl25@gmail.com>
Signed-off-by: Yashash H L <yashashhl25@gmail.com> Co-authored-by: Derek Wang <whynowy@gmail.com>
Signed-off-by: Yashash H L <yashashhl25@gmail.com> Co-authored-by: Vigith Maurice <vigith@gmail.com> Co-authored-by: Derek Wang <whynowy@gmail.com>
fixes #1668