Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: internal batch endpoint #4394

1 change: 1 addition & 0 deletions gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ var (
var (
errRequestDropped = errors.New("request dropped")
errRequestSuppressed = errors.New("request suppressed")
errEventSuppressed = errors.New("event suppressed")
)

//go:embed openapi/index.html
Expand Down
6 changes: 4 additions & 2 deletions gateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,9 +477,10 @@ var _ = Describe("Gateway", func() {
}
Expect(err).To(BeNil())
req.Header.Set("Content-Type", "application/json")
if ep == "/internal/v1/replay" || ep == "/internal/v1/retl" {
switch ep {
case "/internal/v1/replay", "/internal/v1/retl", "/internal/v1/batch":
req.Header.Set("X-Rudder-Source-Id", ReplaySourceID)
} else {
default:
req.Header.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(WriteKeyEnabled+":")))
}
resp, err := client.Do(req)
Expand Down Expand Up @@ -1708,6 +1709,7 @@ func endpointsToVerify() ([]string, []string, []string) {
"/v1/warehouse/pending-events",
"/v1/warehouse/trigger-upload",
"/v1/warehouse/jobs",
"/internal/v1/batch",
}

deleteEndpoints := []string{
Expand Down
234 changes: 234 additions & 0 deletions gateway/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"
"github.com/rudderlabs/rudder-server/app"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/gateway/internal/bot"
Expand Down Expand Up @@ -612,3 +613,236 @@
}
userWebRequestWorker.webRequestQ <- &webReq
}

func (gw *Handle) internalBatchHandlerFunc() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var (
ctx = r.Context()
reqType = ctx.Value(gwtypes.CtxParamCallType).(string)
arctx = ctx.Value(gwtypes.CtxParamAuthRequestContext).(*gwtypes.AuthRequestContext)
jobs []*jobsdb.JobT
body []byte
err error
status int
errorMessage string
responseBody string
)

// TODO: add tracing
gw.logger.LogRequest(r)
body, err = gw.getPayload(arctx, r, reqType)
if err != nil {
goto requestError

Check warning on line 635 in gateway/handle.go

View check run for this annotation

Codecov / codecov/patch

gateway/handle.go#L635

Added line #L635 was not covered by tests
}
jobs, err = gw.extractJobsFromInternalBatchPayload(arctx, reqType, body)
if err != nil {
goto requestError

Check warning on line 639 in gateway/handle.go

View check run for this annotation

Codecov / codecov/patch

gateway/handle.go#L639

Added line #L639 was not covered by tests
}

if len(jobs) > 0 {
if err := gw.storeJobs(ctx, jobs); err != nil {
gw.stats.NewTaggedStat(
"gateway.write_key_failed_events",
stats.CountType,
gw.newSourceStatTagsWithReason(arctx, reqType, "storeFailed"),
).Count(len(jobs))
goto requestError

Check warning on line 649 in gateway/handle.go

View check run for this annotation

Codecov / codecov/patch

gateway/handle.go#L643-L649

Added lines #L643 - L649 were not covered by tests
}
gw.stats.NewTaggedStat(
"gateway.write_key_successful_events",
stats.CountType,
gw.newSourceStatTagsWithReason(arctx, reqType, ""),
).Count(len(jobs))

Check warning on line 655 in gateway/handle.go

View check run for this annotation

Codecov / codecov/patch

gateway/handle.go#L651-L655

Added lines #L651 - L655 were not covered by tests
}

status = http.StatusOK
responseBody = response.GetStatus(response.Ok)
gw.stats.NewTaggedStat(
"gateway.write_key_successful_requests",
stats.CountType,
gw.newSourceStatTagsWithReason(arctx, reqType, ""),
).Increment()
gw.logger.Debugn("response",
logger.NewStringField("ip", misc.GetIPFromReq(r)),
logger.NewStringField("path", r.URL.Path),
logger.NewIntField("status", int64(status)),
logger.NewStringField("body", responseBody),
)
_, _ = w.Write([]byte(responseBody))
return

requestError:
errorMessage = err.Error()
status = response.GetErrorStatusCode(errorMessage)
responseBody = response.GetStatus(errorMessage)
gw.stats.NewTaggedStat(
"gateway.write_key_failed_requests",
stats.CountType,
gw.newSourceStatTagsWithReason(arctx, reqType, errorMessage),
).Increment()
gw.logger.Infon("response",
logger.NewStringField("ip", misc.GetIPFromReq(r)),
logger.NewStringField("path", r.URL.Path),
logger.NewIntField("status", int64(status)),
logger.NewStringField("body", responseBody),
)
http.Error(w, responseBody, status)

Check warning on line 689 in gateway/handle.go

View check run for this annotation

Codecov / codecov/patch

gateway/handle.go#L675-L689

Added lines #L675 - L689 were not covered by tests
}
}

func (gw *Handle) extractJobsFromInternalBatchPayload(
arctx *gwtypes.AuthRequestContext,
reqType string,
body []byte,
) ([]*jobsdb.JobT, error) {
if !gjson.ValidBytes(body) {
return nil, fmt.Errorf("%s", response.InvalidJSON)

Check warning on line 699 in gateway/handle.go

View check run for this annotation

Codecov / codecov/patch

gateway/handle.go#L699

Added line #L699 was not covered by tests
}
gw.requestSizeStat.Observe(float64(len(body)))

type jobObject struct {
userID string
events []map[string]interface{}
receivedAt string
}
var (
sourcesJobRunID = arctx.SourceJobRunID
sourcesTaskRunID = arctx.SourceTaskRunID
sourceID = arctx.SourceID
workspaceID = arctx.WorkspaceID
eventsBatch = gjson.GetBytes(body, "batch").Array()
isUserSuppressed = gw.memoizedIsUserSuppressed()
out = make([]jobObject, 0, len(eventsBatch))
)

for idx, v := range eventsBatch {
toSet, ok := v.Value().(map[string]interface{})
if !ok {
gw.stats.NewTaggedStat(
"gateway.write_key_failed_events",
stats.CountType,
gw.newSourceStatTagsWithReason(arctx, reqType, response.NotRudderEvent),
).Increment()
return nil, fmt.Errorf("%s", response.NotRudderEvent)

Check warning on line 726 in gateway/handle.go

View check run for this annotation

Codecov / codecov/patch

gateway/handle.go#L719-L726

Added lines #L719 - L726 were not covered by tests
}
anonIDFromReq, _ := toSet["anonymousId"].(string)
userIDFromReq, _ := toSet["userId"].(string)
eventContext, ok := misc.MapLookup(toSet, "context").(map[string]interface{})
if ok {
if idx == 0 {
if v, _ := misc.MapLookup(eventContext, "sources", "job_run_id").(string); v != "" {
sourcesJobRunID = v

Check warning on line 734 in gateway/handle.go

View check run for this annotation

Codecov / codecov/patch

gateway/handle.go#L728-L734

Added lines #L728 - L734 were not covered by tests
}
if v, _ := misc.MapLookup(eventContext, "sources", "task_run_id").(string); v != "" {
sourcesTaskRunID = v

Check warning on line 737 in gateway/handle.go

View check run for this annotation

Codecov / codecov/patch

gateway/handle.go#L736-L737

Added lines #L736 - L737 were not covered by tests
}
}
}

if isUserSuppressed(workspaceID, userIDFromReq, sourceID) {
gw.logger.Infon("suppressed event",
logger.NewStringField("sourceID", sourceID),
logger.NewStringField("workspaceID", workspaceID),
logger.NewStringField("userIDFromReq", userIDFromReq),
)
gw.stats.NewTaggedStat(
"gateway.write_key_suppressed_events",
stats.CountType,
gw.newSourceStatTagsWithReason(arctx, reqType, errEventSuppressed.Error()),
).Increment()
continue

Check warning on line 753 in gateway/handle.go

View check run for this annotation

Codecov / codecov/patch

gateway/handle.go#L742-L753

Added lines #L742 - L753 were not covered by tests
}

rudderID, _ := misc.GetMD5UUID(userIDFromReq + ":" + anonIDFromReq)
toSet["rudderId"] = rudderID
userID := buildUserID("", anonIDFromReq, userIDFromReq)
receivedAt, _ := toSet["receivedAt"].(string)
if receivedAt == "" {
receivedAt = time.Now().Format(misc.RFC3339Milli)

Check warning on line 761 in gateway/handle.go

View check run for this annotation

Codecov / codecov/patch

gateway/handle.go#L756-L761

Added lines #L756 - L761 were not covered by tests
}
out = append(out, jobObject{
userID: userID, events: []map[string]interface{}{toSet}, receivedAt: receivedAt,
})

Check warning on line 765 in gateway/handle.go

View check run for this annotation

Codecov / codecov/patch

gateway/handle.go#L763-L765

Added lines #L763 - L765 were not covered by tests
}
if len(out) == 0 { // events suppressed - but return success
return nil, nil
}
var params struct {
SourceID string `json:"source_id"`
SourceJobRunID string `json:"source_job_run_id"`
SourceTaskRunID string `json:"source_task_run_id"`

Check warning on line 773 in gateway/handle.go

View check run for this annotation

Codecov / codecov/patch

gateway/handle.go#L770-L773

Added lines #L770 - L773 were not covered by tests
}
params.SourceID = sourceID
params.SourceJobRunID = sourcesJobRunID
params.SourceTaskRunID = sourcesTaskRunID
marshalledParams, err := json.Marshal(params)
if err != nil {
gw.logger.Errorn(
"[Gateway] Failed to marshal parameters map. Parameters: %+v",
logger.NewField("params", params),
obskit.Error(err),
)
marshalledParams = []byte(
`{"error": "rudder-server gateway failed to marshal params"}`,
)

Check warning on line 787 in gateway/handle.go

View check run for this annotation

Codecov / codecov/patch

gateway/handle.go#L775-L787

Added lines #L775 - L787 were not covered by tests
}

jobs := make([]*jobsdb.JobT, 0, len(out))
type singularEventBatch struct {
Batch []map[string]interface{} `json:"batch"`
RequestIP string `json:"requestIP"` // update processor accordingly
WriteKey string `json:"writeKey"`
ReceivedAt string `json:"receivedAt"`

Check warning on line 795 in gateway/handle.go

View check run for this annotation

Codecov / codecov/patch

gateway/handle.go#L790-L795

Added lines #L790 - L795 were not covered by tests
}
for _, userEvent := range out {
var (
payload json.RawMessage
eventCount int
)
eventBatch := singularEventBatch{
Batch: userEvent.events,
WriteKey: arctx.WriteKey,
ReceivedAt: userEvent.receivedAt,

Check warning on line 805 in gateway/handle.go

View check run for this annotation

Codecov / codecov/patch

gateway/handle.go#L797-L805

Added lines #L797 - L805 were not covered by tests
}
payload, err = json.Marshal(eventBatch)
if err != nil {
panic(err)

Check warning on line 809 in gateway/handle.go

View check run for this annotation

Codecov / codecov/patch

gateway/handle.go#L807-L809

Added lines #L807 - L809 were not covered by tests
}
eventCount = len(userEvent.events)

Check warning on line 811 in gateway/handle.go

View check run for this annotation

Codecov / codecov/patch

gateway/handle.go#L811

Added line #L811 was not covered by tests

jobs = append(jobs, &jobsdb.JobT{
UUID: uuid.New(),
UserID: userEvent.userID,
Parameters: marshalledParams,
CustomVal: customVal,
EventPayload: payload,
EventCount: eventCount,
WorkspaceId: workspaceID,
})

Check warning on line 821 in gateway/handle.go

View check run for this annotation

Codecov / codecov/patch

gateway/handle.go#L813-L821

Added lines #L813 - L821 were not covered by tests
}
return jobs, nil

Check warning on line 823 in gateway/handle.go

View check run for this annotation

Codecov / codecov/patch

gateway/handle.go#L823

Added line #L823 was not covered by tests
}

func (gw *Handle) storeJobs(ctx context.Context, jobs []*jobsdb.JobT) error {
ctx, cancel := context.WithTimeout(ctx, gw.conf.WriteTimeout)
defer cancel()
defer gw.dbWritesStat.Count(1)
return gw.jobsDB.WithStoreSafeTx(ctx, func(tx jobsdb.StoreSafeTx) error {
if err := gw.jobsDB.StoreInTx(ctx, tx, jobs); err != nil {
gw.logger.Errorn(
"Store into gateway db failed with error",
obskit.Error(err),
logger.NewField("jobs", jobs),
)
return err

Check warning on line 837 in gateway/handle.go

View check run for this annotation

Codecov / codecov/patch

gateway/handle.go#L826-L837

Added lines #L826 - L837 were not covered by tests
}

// rsources stats
rsourcesStats := rsources.NewStatsCollector(
gw.rsourcesService,
rsources.IgnoreDestinationID(),
)
rsourcesStats.JobsStoredWithErrors(jobs, nil)
return rsourcesStats.Publish(ctx, tx.SqlTx())

Check warning on line 846 in gateway/handle.go

View check run for this annotation

Codecov / codecov/patch

gateway/handle.go#L841-L846

Added lines #L841 - L846 were not covered by tests
})
}
4 changes: 4 additions & 0 deletions gateway/handle_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ func (gw *Handle) webBatchHandler() http.HandlerFunc {
return gw.callType("batch", gw.writeKeyAuth(gw.webHandler()))
}

func (gw *Handle) internalBatchHandler() http.HandlerFunc {
return gw.callType("internalBatch", gw.sourceIDAuth(gw.internalBatchHandlerFunc()))
}

// webIdentifyHandler - handler for identify requests
func (gw *Handle) webIdentifyHandler() http.HandlerFunc {
return gw.callType("identify", gw.writeKeyAuth(gw.webHandler()))
Expand Down
1 change: 1 addition & 0 deletions gateway/handle_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ func (gw *Handle) StartWebHandler(ctx context.Context) error {
r.Get("/v1/warehouse/fetch-tables", gw.whProxy.ServeHTTP)
r.Post("/v1/audiencelist", gw.webAudienceListHandler())
r.Post("/v1/replay", gw.webReplayHandler())
r.Post("/v1/batch", gw.internalBatchHandler())

// TODO: delete this handler once we are ready to remove support for the v1 api
r.Mount("/v1/job-status", withContentType("application/json; charset=utf-8", rsourcesHandlerV1.ServeHTTP))
Expand Down
16 changes: 16 additions & 0 deletions gateway/handle_observability.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package gateway

import (
"github.com/rudderlabs/rudder-go-kit/stats"
gwstats "github.com/rudderlabs/rudder-server/gateway/internal/stats"
gwtypes "github.com/rudderlabs/rudder-server/gateway/internal/types"
)
Expand All @@ -16,3 +17,18 @@
SourceType: arctx.SourceCategory,
}
}

func (gw *Handle) newSourceStatTagsWithReason(arctx *gwtypes.AuthRequestContext, reqType, reason string) stats.Tags {
tags := stats.Tags{
"source": arctx.SourceTag(),
"source_id": arctx.SourceID,
"write_key": arctx.WriteKey,
"req_type": reqType,
"workspace_id": arctx.WorkspaceID,
"source_type": arctx.SourceCategory,
}
if reason != "" {
tags["reason"] = reason

Check warning on line 31 in gateway/handle_observability.go

View check run for this annotation

Codecov / codecov/patch

gateway/handle_observability.go#L31

Added line #L31 was not covered by tests
}
return tags
}