-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: implement Nats user-defined source #1
Conversation
Signed-off-by: Keran Yang <yangkr920208@gmail.com>
Signed-off-by: Keran Yang <yangkr920208@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - once the signature for Pending()
is updated I'll approve
// NewLogger returns a new zap.SugaredLogger | ||
func NewLogger() *zap.SugaredLogger { | ||
var config zap.Config | ||
debugMode, ok := os.LookupEnv("NUMAFLOW_DEBUG") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was this environment variable something you imagined the user would specify in their container spec? If so, I wonder if we need a way for them to be able to accomplish the same thing if they use the user-friendly spec method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the env var "NUMAFLOW_DEBUG" is already supported on the numaflow side. https://numaflow.numaproj.io/development/debugging/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, here it says that it applies to the "numa" container. That implies it's not being passed to the udf container - is it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assuming it's not passed to the UDF container, then we need to figure out a way to pass it.
I am thinking of the following approach. How a UDSource determines whether to log at DEBUG mode is completely up to the UDSource implementer. Hence the following approach makes sense in a way that we can add to README.me to tell user, if you want to enable debug logging for nats-source-go, add this env var.
udsource:
container:
image: my-udsource:latest
env:
- name: NUMAFLOW_DEBUG
value: "true"
For the user-friendly spec method, we need a way to pass it. I will dive deeper into how as I make changes to the controller.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, sounds good
pkg/nats/nats.go
Outdated
// Pending returns the number of pending records. | ||
func (n *natsSource) Pending(_ context.Context) int64 { | ||
// The nats source always returns zero to indicate no pending records. | ||
return 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this is probably how it was in the original code, right? It's interesting that it's 0 and not isb.PendingNotAvailable
. Just want to verify that's right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! The original code doesn't support Pending
, meaning the NatsSource cannot be converted to a lagReader. Hence, when exposing pending metrics, nats source reader is skipped, making auto-scaling disabled. In this case, I think returning a negative number better represents the contract. Will update.
Add the implementation of Nats user-defined source for numaflow. Majority of the changes are referenced to the built-in Nats implementation under the numaflow repository.