New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: internal batch endpoint #4394
feat: internal batch endpoint #4394
Conversation
Important Auto Review SkippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the To trigger a single review, invoke the Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #4394 +/- ##
==========================================
- Coverage 75.60% 75.40% -0.21%
==========================================
Files 381 381
Lines 46123 46322 +199
==========================================
+ Hits 34871 34927 +56
- Misses 8972 9107 +135
- Partials 2280 2288 +8 ☔ View full report in Codecov by Sentry. |
gateway/handle.go
Outdated
}) | ||
} | ||
// some checks here | ||
if gw.conf.enableRateLimit.Load() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we apply rate limit on events coming from HA-ingestion? We can consider throttling on source router instead.
gateway/handle.go
Outdated
anonIDFromReq := strings.TrimSpace(misc.SanitizeUnicode(misc.GetStringifiedData(toSet["anonymousId"]))) | ||
userIDFromReq := strings.TrimSpace(misc.SanitizeUnicode(misc.GetStringifiedData(toSet["userId"]))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can probably skip the sanitization here, this should take place in ingestion service
gateway/handle.go
Outdated
receivedAt, ok := userEvent.events[0]["receivedAt"].(string) | ||
if !ok || !arctx.ReplaySource { | ||
receivedAt = time.Now().Format(misc.RFC3339Milli) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this logic be placed on ingestion svc?
gateway/handle.go
Outdated
userIDHeader = r.Header.Get("AnonymousId") | ||
ipAddr = misc.GetIPFromReq(r) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Events coming from source router should not contain "AnonymousId"
. Also getting the ip address from request should happen on ingestion svc.
userIDHeader = r.Header.Get("AnonymousId") | |
ipAddr = misc.GetIPFromReq(r) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True
We have to make sure that the processor properly reads this for every event.
For now I'm reading receivedAt
from inside each payload. I think we should do the same for ipAddr
as well.
wdyt?
gateway/handle.go
Outdated
workspaceID = arctx.WorkspaceID | ||
userIDHeader = r.Header.Get("AnonymousId") | ||
ipAddr = misc.GetIPFromReq(r) | ||
eventsBatch = gjson.GetBytes(body, "batch").Array() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should compare performance of gjson.GetBytes(body, "batch").Array()
against unmarshalling to a struct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gjson looks fine. Here's the benchmark I used
var eventBatch = []byte(`{"batch": [
{
"userId": "identified_user_id",
"anonymousId":"anonymousId_1",
"messageId":"messageId_1",
"type": "identify",
"eventOrderNo":"1",
"context": {
"traits": {
"trait1": "new-val"
},
"ip": "14.5.67.21",
"library": {
"name": "http"
}
},
"timestamp": "2020-02-02T00:23:09.544Z"
}]}`)
/*
Running tool: /usr/local/go/bin/go test -benchmem -run=^$ -bench ^BenchmarkGJson$ github.com/rudderlabs/rudder-server/gateway
2024/02/16 09:56:29 maxprocs: Leaving GOMAXPROCS=10: CPU quota undefined
goos: darwin
goarch: arm64
pkg: github.com/rudderlabs/rudder-server/gateway
BenchmarkGJson-10 668709 1572 ns/op 1888 B/op 19 allocs/op
PASS
ok github.com/rudderlabs/rudder-server/gateway 2.700s
*/
func BenchmarkGJson(b *testing.B) {
for i := 0; i < b.N; i++ {
batch := gjson.GetBytes(eventBatch, "batch").Array()
_, _ = batch[0].Value().(map[string]interface{})
}
}
type BatchEvents struct {
Batch []map[string]interface{} `json:"batch"`
}
/*
Running tool: /usr/local/go/bin/go test -benchmem -run=^$ -bench ^BenchmarkStruct$ github.com/rudderlabs/rudder-server/gateway
2024/02/16 09:54:07 maxprocs: Leaving GOMAXPROCS=10: CPU quota undefined
goos: darwin
goarch: arm64
pkg: github.com/rudderlabs/rudder-server/gateway
BenchmarkStruct-10 297831 3832 ns/op 2376 B/op 61 allocs/op
PASS
ok github.com/rudderlabs/rudder-server/gateway 2.783s
*/
func BenchmarkStruct(b *testing.B) {
for i := 0; i < b.N; i++ {
var batch BatchEvents
_ = json.Unmarshal(eventBatch, &batch)
}
}
gateway/handle.go
Outdated
return | ||
} | ||
|
||
params := map[string]any{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could use a struct
for params instead of map[string]any
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the benchmarks got me thinking...
came up with something(didn't include it though)
var bufferPool = &sync.Pool{
New: func() interface{} {
return &bytes.Buffer{}
},
}
func paramBytes(sourceID, jobRunID, taskRunID string) []byte {
b := bufferPool.Get().(*bytes.Buffer)
b.WriteString(`{"source_id":"`)
b.WriteString(sourceID)
b.WriteString(`", "source_job_run_id":"`)
b.WriteString(jobRunID)
b.WriteString(`", "source_task_run_id":"`)
b.WriteString(taskRunID)
b.WriteString(`"}`)
paramsBytes := b.Bytes()
b.Truncate(0)
bufferPool.Put(b)
return paramsBytes
}
/*
Running tool: /usr/local/go/bin/go test -benchmem -run=^$ -bench ^BenchmarkBytesSliceParallel$ github.com/rudderlabs/rudder-server/gateway
2024/02/16 15:07:37 maxprocs: Leaving GOMAXPROCS=10: CPU quota undefined
goos: darwin
goarch: arm64
pkg: github.com/rudderlabs/rudder-server/gateway
BenchmarkBytesSliceParallel-10 321385850 3.682 ns/op 0 B/op 0 allocs/op
PASS
ok github.com/rudderlabs/rudder-server/gateway 3.173s
*/
func BenchmarkBytesSliceParallel(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_ = paramBytes("sourceID", "jobRunID", "taskRunID")
}
})
}
/*
Running tool: /usr/local/go/bin/go test -benchmem -run=^$ -bench ^BenchmarkBytesSlice$ github.com/rudderlabs/rudder-server/gateway
2024/02/16 15:08:08 maxprocs: Leaving GOMAXPROCS=10: CPU quota undefined
goos: darwin
goarch: arm64
pkg: github.com/rudderlabs/rudder-server/gateway
BenchmarkBytesSlice-10 38668513 29.73 ns/op 0 B/op 0 allocs/op
PASS
ok github.com/rudderlabs/rudder-server/gateway 2.857s
*/
func BenchmarkBytesSlice(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = paramBytes("sourceID", "jobRunID", "taskRunID")
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func paramBytesUsingStruct(sourceID, jobRunID, taskRunID string) []byte {
params := struct {
SourceID string `json:"source_id"`
SourceJobRunID string `json:"source_job_run_id"`
SourceTaskRunID string `json:"source_task_run_id"`
}{
SourceID: sourceID,
SourceJobRunID: jobRunID,
SourceTaskRunID: taskRunID,
}
b, _ := json.Marshal(params)
return b
}
Running tool: /usr/local/go/bin/go test -benchmem -run=^$ -bench ^BenchmarkBytesSliceUsingStruct$ github.com/rudderlabs/rudder-server/gateway
2024/02/20 22:32:52 maxprocs: Leaving GOMAXPROCS=10: CPU quota undefined
goos: darwin
goarch: arm64
pkg: github.com/rudderlabs/rudder-server/gateway
BenchmarkBytesSliceUsingStruct-10 7423453 159.7 ns/op 144 B/op 2 allocs/op
PASS
ok github.com/rudderlabs/rudder-server/gateway 2.965s
func BenchmarkBytesSliceUsingStruct(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = paramBytesUsingStruct("sourceID", "jobRunID", "taskRunID")
}
}
// Running tool: /usr/local/go/bin/go test -benchmem -run=^$ -bench ^BenchmarkBytesSliceUsingStructParallel$ github.com/rudderlabs/rudder-server/gateway
// 2024/02/16 15:23:51 maxprocs: Leaving GOMAXPROCS=10: CPU quota undefined
// goos: darwin
// goarch: arm64
// pkg: github.com/rudderlabs/rudder-server/gateway
// BenchmarkBytesSliceUsingStructParallel-10 21970879 52.59 ns/op 144 B/op 2 allocs/op
// PASS
// ok github.com/rudderlabs/rudder-server/gateway 2.832s
func BenchmarkBytesSliceUsingStructParallel(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_ = paramBytesUsingStruct("sourceID", "jobRunID", "taskRunID")
}
})
}
gateway/handle.go
Outdated
@@ -614,3 +614,228 @@ func (gw *Handle) addToWebRequestQ(_ *http.ResponseWriter, req *http.Request, do | |||
} | |||
userWebRequestWorker.webRequestQ <- &webReq | |||
} | |||
|
|||
func (gw *Handle) writeToJobsDB() http.HandlerFunc { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this function is doing too many things. I would break the payload preparation logic from http handler.
HTTP handler:
- expose
func(w http.ResponseWriter, r *http.Request)
- validates payload
- call method to
writeToJobsDB
- HTTP response, and status code
writeToJobsDB:
- extract params
- handle suppression
- rsourcesStats
- write to jobsDB
We can decouple writeToJobsDB
from gateway.Handle
struct, it will depend on suppression service and jobsDB. It should not be aware of HTTP.
gateway/handle.go
Outdated
var errorMessage string | ||
defer func() { | ||
if errorMessage != "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This approach will become hard to maintain given the size of the function:
- It is much easier to test for errors when the method returns error, instead of writing in an http response. (similar intention with my comment above)
- we need to remember to set
errorMessage
when returning with an error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've split the function quite a bit, so this looks a little better. Let me know after you take a look if we still need a little more work on it..
We should try and increase code coverage as much as possible. Breaking the big handler method up, would help creating test specific for the business logic without needing to involve http test. |
4423044
to
c1d9bd3
Compare
c1d9bd3
to
36f77d5
Compare
07569d0
to
c9dded0
Compare
c9dded0
to
fe84750
Compare
We would also need to add the new endpoint in the open api yaml file and generate the html for the same |
Description
internal batch endpoint: no buffering, not a lot of validations -> just write to jobsdb.
Linear Ticket
resolves PIPE-765
Security