-
Notifications
You must be signed in to change notification settings - Fork 317
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: common flusher for reporting with mtu handler #4823
Conversation
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #4823 +/- ##
==========================================
- Coverage 74.37% 74.35% -0.03%
==========================================
Files 420 427 +7
Lines 49232 49610 +378
==========================================
+ Hits 36617 36886 +269
- Misses 10214 10284 +70
- Partials 2401 2440 +39 ☔ View full report in Codecov by Sentry. |
}, | ||
} | ||
|
||
for _, item := range items { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can also use https://pkg.go.dev/github.com/stretchr/testify/assert#JSONEq
r := TrackedUsersReport{} | ||
err := rows.Scan(&r.ReportedAt, &r.WorkspaceID, &r.SourceID, &r.InstanceID, &r.UserIDHLLHex, &r.AnonymousIDHLLHex, &r.IdentifiedAnonymousIDHLLHex) | ||
if err != nil { | ||
return nil, err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return nil, err | |
return nil, fmt.Errorf("reading from db %w",err) |
same for other places as well, wrapping the errors would help in debugging
t.IdentifiedAnonymousIDHLLHex = hex.EncodeToString(t.IdentifiedAnonymousIDHLL.ToBytes()) | ||
|
||
type Alias TrackedUsersReport | ||
return json.Marshal(&struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is Alias needed? can't we directly use TrackedUsersReport?
|
||
// TODO: Should we panic here ? | ||
if !errors.Is(err, context.Canceled) { | ||
panic(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not panic here, instead we should log the error. panic will shut the application down, i.e event processing will also stop. we can setup alerts on the backlog of tracked users.
|
||
func (c *CronRunner) Stop() { | ||
c.cancel() | ||
_ = c.g.Wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should handle errors here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
g.SetLimit(concurrency) | ||
f.concurrentRequests.Gauge(float64(concurrency)) | ||
|
||
for i := 0; i < len(aggReports); i += batchSize { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Description
Motivation for common flusher
Flusher
Flush steps
start
,end
) to flush.start
will be from db.end
will be min(now() - 30s, end-of-current-hour). We don't want to flush reports where inserts are happening. We always try to flush full window durations which is configured viaflushWindow
unless window is crossing multiple hour boundariesFeatures
Linear Ticket
https://linear.app/rudderstack/issue/OBS-504/reporting-client-for-tracked-users
completes OBS-504
Security