Skip to content

Commit

Permalink
feat: internal batch endpoint (#4394)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth committed Feb 27, 2024
1 parent 3501578 commit 3f87930
Show file tree
Hide file tree
Showing 9 changed files with 804 additions and 13 deletions.
1 change: 1 addition & 0 deletions gateway/gateway.go
Expand Up @@ -34,6 +34,7 @@ var (
var (
errRequestDropped = errors.New("request dropped")
errRequestSuppressed = errors.New("request suppressed")
errEventSuppressed = errors.New("event suppressed")
)

//go:embed openapi/index.html
Expand Down
6 changes: 4 additions & 2 deletions gateway/gateway_test.go
Expand Up @@ -477,9 +477,10 @@ var _ = Describe("Gateway", func() {
}
Expect(err).To(BeNil())
req.Header.Set("Content-Type", "application/json")
if ep == "/internal/v1/replay" || ep == "/internal/v1/retl" {
switch ep {
case "/internal/v1/replay", "/internal/v1/retl", "/internal/v1/batch":
req.Header.Set("X-Rudder-Source-Id", ReplaySourceID)
} else {
default:
req.Header.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(WriteKeyEnabled+":")))
}
resp, err := client.Do(req)
Expand Down Expand Up @@ -1708,6 +1709,7 @@ func endpointsToVerify() ([]string, []string, []string) {
"/v1/warehouse/pending-events",
"/v1/warehouse/trigger-upload",
"/v1/warehouse/jobs",
"/internal/v1/batch",
}

deleteEndpoints := []string{
Expand Down
234 changes: 234 additions & 0 deletions gateway/handle.go
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"
"github.com/rudderlabs/rudder-server/app"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/gateway/internal/bot"
Expand Down Expand Up @@ -612,3 +613,236 @@ func (gw *Handle) addToWebRequestQ(_ *http.ResponseWriter, req *http.Request, do
}
userWebRequestWorker.webRequestQ <- &webReq
}

func (gw *Handle) internalBatchHandlerFunc() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var (
ctx = r.Context()
reqType = ctx.Value(gwtypes.CtxParamCallType).(string)
arctx = ctx.Value(gwtypes.CtxParamAuthRequestContext).(*gwtypes.AuthRequestContext)
jobs []*jobsdb.JobT
body []byte
err error
status int
errorMessage string
responseBody string
)

// TODO: add tracing
gw.logger.LogRequest(r)
body, err = gw.getPayload(arctx, r, reqType)
if err != nil {
goto requestError
}
jobs, err = gw.extractJobsFromInternalBatchPayload(arctx, reqType, body)
if err != nil {
goto requestError
}

if len(jobs) > 0 {
if err := gw.storeJobs(ctx, jobs); err != nil {
gw.stats.NewTaggedStat(
"gateway.write_key_failed_events",
stats.CountType,
gw.newSourceStatTagsWithReason(arctx, reqType, "storeFailed"),
).Count(len(jobs))
goto requestError
}
gw.stats.NewTaggedStat(
"gateway.write_key_successful_events",
stats.CountType,
gw.newSourceStatTagsWithReason(arctx, reqType, ""),
).Count(len(jobs))
}

status = http.StatusOK
responseBody = response.GetStatus(response.Ok)
gw.stats.NewTaggedStat(
"gateway.write_key_successful_requests",
stats.CountType,
gw.newSourceStatTagsWithReason(arctx, reqType, ""),
).Increment()
gw.logger.Debugn("response",
logger.NewStringField("ip", misc.GetIPFromReq(r)),
logger.NewStringField("path", r.URL.Path),
logger.NewIntField("status", int64(status)),
logger.NewStringField("body", responseBody),
)
_, _ = w.Write([]byte(responseBody))
return

requestError:
errorMessage = err.Error()
status = response.GetErrorStatusCode(errorMessage)
responseBody = response.GetStatus(errorMessage)
gw.stats.NewTaggedStat(
"gateway.write_key_failed_requests",
stats.CountType,
gw.newSourceStatTagsWithReason(arctx, reqType, errorMessage),
).Increment()
gw.logger.Infon("response",
logger.NewStringField("ip", misc.GetIPFromReq(r)),
logger.NewStringField("path", r.URL.Path),
logger.NewIntField("status", int64(status)),
logger.NewStringField("body", responseBody),
)
http.Error(w, responseBody, status)
}
}

func (gw *Handle) extractJobsFromInternalBatchPayload(
arctx *gwtypes.AuthRequestContext,
reqType string,
body []byte,
) ([]*jobsdb.JobT, error) {
if !gjson.ValidBytes(body) {
return nil, fmt.Errorf("%s", response.InvalidJSON)
}
gw.requestSizeStat.Observe(float64(len(body)))

type jobObject struct {
userID string
events []map[string]interface{}
receivedAt string
}
var (
sourcesJobRunID = arctx.SourceJobRunID
sourcesTaskRunID = arctx.SourceTaskRunID
sourceID = arctx.SourceID
workspaceID = arctx.WorkspaceID
eventsBatch = gjson.GetBytes(body, "batch").Array()
isUserSuppressed = gw.memoizedIsUserSuppressed()
out = make([]jobObject, 0, len(eventsBatch))
)

for idx, v := range eventsBatch {
toSet, ok := v.Value().(map[string]interface{})
if !ok {
gw.stats.NewTaggedStat(
"gateway.write_key_failed_events",
stats.CountType,
gw.newSourceStatTagsWithReason(arctx, reqType, response.NotRudderEvent),
).Increment()
return nil, fmt.Errorf("%s", response.NotRudderEvent)
}
anonIDFromReq, _ := toSet["anonymousId"].(string)
userIDFromReq, _ := toSet["userId"].(string)
eventContext, ok := misc.MapLookup(toSet, "context").(map[string]interface{})
if ok {
if idx == 0 {
if v, _ := misc.MapLookup(eventContext, "sources", "job_run_id").(string); v != "" {
sourcesJobRunID = v
}
if v, _ := misc.MapLookup(eventContext, "sources", "task_run_id").(string); v != "" {
sourcesTaskRunID = v
}
}
}

if isUserSuppressed(workspaceID, userIDFromReq, sourceID) {
gw.logger.Infon("suppressed event",
logger.NewStringField("sourceID", sourceID),
logger.NewStringField("workspaceID", workspaceID),
logger.NewStringField("userIDFromReq", userIDFromReq),
)
gw.stats.NewTaggedStat(
"gateway.write_key_suppressed_events",
stats.CountType,
gw.newSourceStatTagsWithReason(arctx, reqType, errEventSuppressed.Error()),
).Increment()
continue
}

rudderID, _ := misc.GetMD5UUID(userIDFromReq + ":" + anonIDFromReq)
toSet["rudderId"] = rudderID
userID := buildUserID("", anonIDFromReq, userIDFromReq)
receivedAt, _ := toSet["receivedAt"].(string)
if receivedAt == "" {
receivedAt = time.Now().Format(misc.RFC3339Milli)
}
out = append(out, jobObject{
userID: userID, events: []map[string]interface{}{toSet}, receivedAt: receivedAt,
})
}
if len(out) == 0 { // events suppressed - but return success
return nil, nil
}
var params struct {
SourceID string `json:"source_id"`
SourceJobRunID string `json:"source_job_run_id"`
SourceTaskRunID string `json:"source_task_run_id"`
}
params.SourceID = sourceID
params.SourceJobRunID = sourcesJobRunID
params.SourceTaskRunID = sourcesTaskRunID
marshalledParams, err := json.Marshal(params)
if err != nil {
gw.logger.Errorn(
"[Gateway] Failed to marshal parameters map. Parameters: %+v",
logger.NewField("params", params),
obskit.Error(err),
)
marshalledParams = []byte(
`{"error": "rudder-server gateway failed to marshal params"}`,
)
}

jobs := make([]*jobsdb.JobT, 0, len(out))
type singularEventBatch struct {
Batch []map[string]interface{} `json:"batch"`
RequestIP string `json:"requestIP"` // update processor accordingly
WriteKey string `json:"writeKey"`
ReceivedAt string `json:"receivedAt"`
}
for _, userEvent := range out {
var (
payload json.RawMessage
eventCount int
)
eventBatch := singularEventBatch{
Batch: userEvent.events,
WriteKey: arctx.WriteKey,
ReceivedAt: userEvent.receivedAt,
}
payload, err = json.Marshal(eventBatch)
if err != nil {
panic(err)
}
eventCount = len(userEvent.events)

jobs = append(jobs, &jobsdb.JobT{
UUID: uuid.New(),
UserID: userEvent.userID,
Parameters: marshalledParams,
CustomVal: customVal,
EventPayload: payload,
EventCount: eventCount,
WorkspaceId: workspaceID,
})
}
return jobs, nil
}

func (gw *Handle) storeJobs(ctx context.Context, jobs []*jobsdb.JobT) error {
ctx, cancel := context.WithTimeout(ctx, gw.conf.WriteTimeout)
defer cancel()
defer gw.dbWritesStat.Count(1)
return gw.jobsDB.WithStoreSafeTx(ctx, func(tx jobsdb.StoreSafeTx) error {
if err := gw.jobsDB.StoreInTx(ctx, tx, jobs); err != nil {
gw.logger.Errorn(
"Store into gateway db failed with error",
obskit.Error(err),
logger.NewField("jobs", jobs),
)
return err
}

// rsources stats
rsourcesStats := rsources.NewStatsCollector(
gw.rsourcesService,
rsources.IgnoreDestinationID(),
)
rsourcesStats.JobsStoredWithErrors(jobs, nil)
return rsourcesStats.Publish(ctx, tx.SqlTx())
})
}
4 changes: 4 additions & 0 deletions gateway/handle_http.go
Expand Up @@ -26,6 +26,10 @@ func (gw *Handle) webBatchHandler() http.HandlerFunc {
return gw.callType("batch", gw.writeKeyAuth(gw.webHandler()))
}

func (gw *Handle) internalBatchHandler() http.HandlerFunc {
return gw.callType("internalBatch", gw.sourceIDAuth(gw.internalBatchHandlerFunc()))
}

// webIdentifyHandler - handler for identify requests
func (gw *Handle) webIdentifyHandler() http.HandlerFunc {
return gw.callType("identify", gw.writeKeyAuth(gw.webHandler()))
Expand Down
1 change: 1 addition & 0 deletions gateway/handle_lifecycle.go
Expand Up @@ -385,6 +385,7 @@ func (gw *Handle) StartWebHandler(ctx context.Context) error {
r.Get("/v1/warehouse/fetch-tables", gw.whProxy.ServeHTTP)
r.Post("/v1/audiencelist", gw.webAudienceListHandler())
r.Post("/v1/replay", gw.webReplayHandler())
r.Post("/v1/batch", gw.internalBatchHandler())

// TODO: delete this handler once we are ready to remove support for the v1 api
r.Mount("/v1/job-status", withContentType("application/json; charset=utf-8", rsourcesHandlerV1.ServeHTTP))
Expand Down
16 changes: 16 additions & 0 deletions gateway/handle_observability.go
@@ -1,6 +1,7 @@
package gateway

import (
"github.com/rudderlabs/rudder-go-kit/stats"
gwstats "github.com/rudderlabs/rudder-server/gateway/internal/stats"
gwtypes "github.com/rudderlabs/rudder-server/gateway/internal/types"
)
Expand All @@ -16,3 +17,18 @@ func (gw *Handle) NewSourceStat(arctx *gwtypes.AuthRequestContext, reqType strin
SourceType: arctx.SourceCategory,
}
}

func (gw *Handle) newSourceStatTagsWithReason(arctx *gwtypes.AuthRequestContext, reqType, reason string) stats.Tags {
tags := stats.Tags{
"source": arctx.SourceTag(),
"source_id": arctx.SourceID,
"write_key": arctx.WriteKey,
"req_type": reqType,
"workspace_id": arctx.WorkspaceID,
"source_type": arctx.SourceCategory,
}
if reason != "" {
tags["reason"] = reason
}
return tags
}

0 comments on commit 3f87930

Please sign in to comment.