-
Notifications
You must be signed in to change notification settings - Fork 185
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added Google Cloud Pubsub support for data records #209
Conversation
Codecov Report
@@ Coverage Diff @@
## master #209 +/- ##
==========================================
- Coverage 86.73% 85.62% -1.12%
==========================================
Files 23 24 +1
Lines 1342 1384 +42
==========================================
+ Hits 1164 1185 +21
- Misses 127 144 +17
- Partials 51 55 +4
Continue to review full report at Codecov.
|
Awesome. We are also looking to use Pub/Sub as our data pipeline. |
pkg/handler/data_recorder_pubsub.go
Outdated
client, err := pubsub.NewClient( | ||
context.Background(), | ||
config.Config.RecorderPubsubProjectID, | ||
option.WithServiceAccountFile(config.Config.RecorderPubsubKeyFile), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is deprecated, it is recommended to use WithCredentialsFile
instead. Just starting to look into this to check their equivalency, but they seem to operate on the same idea.
As this is an option, does setting it to an empty string affect the use of Application Default Credentials? Would want to make sure that is not the case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sorry, you are right! this option is deprecated I just used a chunk of code I am already using for the past 2 years in many go services here... I am pushing a fix now (thanks I'm also fixing my own services)
Regarding the empty string, yes it will work with default creds, matter effect this is exactly how we develop on our machines using our own accounts with proper IAM
} | ||
} | ||
|
||
type pubsubEvalResult struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I totally agree, I looked at the code and tried to follow your style just to make sure everything will pass and be ok on your end... But sure I think there's a lot of place to improve some of the things.
I will take a look at those relevant issues and will update ASAP (this is night over here)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks! I think you can use
type pubsubMessageFrame struct {
Payload string `json:"payload"`
Encrypted bool `json:"encrypted"`
}
Consolidating 2 structs is the same amout of work for 3. We can fix #203 later.
Also, make sure the final payload is a JSON struct looks like
{
"payload": "<json marshal of EvalResult>",
"encrypted": false
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and test coverage of this file :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhouzhuojie regarding the pubsubMessageFrame, I added it on my end but it looks weird to marshal into json string because pubsub accepts []byte anyways so the conversion happens for no reason... I'm pushing the code to share this with you, LMK what you think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I knew the json string payload at the first look is weird. The reason I want to standardize the log format is that
- Extensibility. If people want to end-to-end encryption, compression, or other meta things related to the message frame, they can do it regardless of the choice of the recorder type. They can add more fields into the message frame struct. Payload as a string fits into this decision.
- Portability. The data analytics pipeline requires no changes if one wants to switch between providers.
Need to check further to see if this is a larger-scale thing, but it seems that if the pubsub connection fails then an EvaluationResult fails to be returned. @zhouzhuojie would this be expected behavior? It seems to me that we'd want to return a result anyways given that the data recording should be an async process. |
Why do you think "if the pubsub connection fails then an EvaluationResult fails to be returned"? The connection won't affect online evaluation. AsyncRecord function should just log errors if there's any failure, and it shouldn't panic or exit as well. |
Here is what I'm seeing:
Server logs:
I'd expect the result to be returned immediately, but it's never returned at all. In the OpenAPI client for Go, this ends up setting the error value of the '''PostEvaluationResult''' method. It looks like it's hanging on the |
pkg/handler/data_recorder_pubsub.go
Outdated
option.WithCredentialsFile(config.Config.RecorderPubsubKeyFile), | ||
) | ||
if err != nil { | ||
logrus.WithField("pubsub_error", err).Error("error getting pubsub client") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's Fatal
here instead of Error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the issue is caused by get
method from pubsub which is a blocking operation, see pubsub doc here
we should pass context either with a deadline or timeout to get
method instead of just background context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can confirm this is the issue. When I disable verbose logging, everything works as expected.
pkg/handler/data_recorder_pubsub.go
Outdated
res := p.topic.Publish(ctx, &pubsub.Message{Data: payload}) | ||
if config.Config.RecorderPubsubVerbose { | ||
ctx, cancel := context.WithTimeout(ctx, 5*time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about combining this with running the Get within a new goroutine?
Tried this and it seems to work well:
if config.Config.RecorderPubsubVerbose {
go func() {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
id, err := res.Get(ctx)
if err != nil {
logrus.WithFields(logrus.Fields{"pubsub_error": err, "id": id}).Error("error pushing to pubsub")
}
}()
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for another goroutine. Also, 5s can be moved into env config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, I added both
I need some help regarding the tests, I added test coverage to pubsub but now the I can see that we use in all data recorders with their production functions without mocking, meaning connection should fail on all of them, unless I am missing something that runs as a mock server on CI |
Still figuring out the best way to handle this, but I know what is wrong currently: What is failing is the data_recorder_pubsub_test. The data_recorder_test.go file passes still and that is because we are stubbing out the When the following code is called: client, err := pubsub.NewClient(
context.Background(),
config.Config.RecorderPubsubProjectID,
option.WithCredentialsFile(config.Config.RecorderPubsubKeyFile),
) the library uses the following rules to create a new client:
On our machines, step 2 is happening, likely because we have run a NewClient signature is as follows: func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (c *Client, err error) { and the return value in case of an error is this: return nil, fmt.Errorf("pubsub: %v", err) so when the rest of the code executes: if err != nil {
// TODO: use Fatal again after fixing the test expecting to not panic.
// logrus.WithField("pubsub_error", err).Fatal("error getting pubsub client")
logrus.WithField("pubsub_error", err).Error("error getting pubsub client")
}
return &pubsubRecorder{
producer: client,
topic: client.Topic(config.Config.RecorderPubsubTopicName),
enabled: config.Config.RecorderEnabled,
} client is a null pointer, and you get a null pointer dereference. So that's the issue. Having the Fatal call will prevent us from dereferencing a nil pointer which is good. It is also the same way we handle error cases in the kafka client so it would follow the standards there. So to fix the issue in the test we need to either a) is not great because you would need to provide some sort of real credentials. |
pkg/config/env.go
Outdated
RecorderPubsubTopicName string `env:"FLAGR_RECORDER_PUBSUB_TOPIC_NAME" envDefault:"flagr-records"` | ||
RecorderPubsubKeyFile string `env:"FLAGR_RECORDER_PUBSUB_KEYFILE" envDefault:""` | ||
RecorderPubsubVerbose bool `env:"FLAGR_RECORDER_PUBSUB_VERBOSE" envDefault:"false"` | ||
RecorderPubsubVerboseCancel time.Duration `env:"FLAGR_RECORDER_PUBSUB_VERBOSE_CANCEL" envDefault:"5s"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about RecorderPubsubVerboseCancelTimeout
? Otherwise, it's not clear to me that cancel
can be a time.Duration.
@vic3lord I've created a PR on your fork which fixes the above issues |
fix issues raised in checkr/flagr PR openflagr#209
This looks good to me pending my question about handling potential marshal errors. |
Cool. @zhouzhuojie how are you feeling about everything at this point? |
pkg/handler/data_recorder_pubsub.go
Outdated
"github.com/checkr/flagr/pkg/config" | ||
"github.com/checkr/flagr/swagger_gen/models" | ||
"google.golang.org/api/option" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, no need a new line here
pkg/handler/data_recorder_pubsub.go
Outdated
) | ||
|
||
type pubsubRecorder struct { | ||
enabled bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this enabled
necessary here? eval.go checks it here https://github.com/checkr/flagr/blob/master/pkg/handler/eval.go#L178
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh yeah... this could probably be removed from the kafka and kinesis versions as they do the check as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
didn't realize that it was also used in kafka and kinesis, we can probably clean them up in other PRs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, I think I left 1 or 2 nit comments, nothing major
Also, I think you can document how to authenticate with pubsub, for example, setting GOOGLE_APPLICATION_CREDENTIALS
Thank you so much for helping, I fixed the little nits. Where would it be best to document the pubsub addition and its auth docs? |
I think you can put it after the Kinesis section in https://github.com/checkr/flagr/blob/master/docs/flagr_env.md |
Description
Adding Google Cloud Pubsub support for data records, currently just a minimal implementation
Motivation and Context
We are using Google Cloud Pubsub and needed something native for data records
How Has This Been Tested?
running
make
with all tasks, passed all tests and started flagr serverTypes of changes
Checklist: