Skip to content

Commit

Permalink
chore: gateway stores singular event batches (#3256)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth committed May 29, 2023
1 parent 3f88f50 commit 1ccec6e
Show file tree
Hide file tree
Showing 13 changed files with 275 additions and 232 deletions.
1 change: 1 addition & 0 deletions gateway/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func loadConfig() {
// Enables accepting requests without user id and anonymous id. This is added to prevent client 4xx retries.
config.RegisterBoolConfigVariable(false, &allowReqsWithoutUserIDAndAnonymousID, true, "Gateway.allowReqsWithoutUserIDAndAnonymousID")
config.RegisterBoolConfigVariable(true, &gwAllowPartialWriteWithErrors, true, "Gateway.allowPartialWriteWithErrors")
config.RegisterBoolConfigVariable(true, &allowBatchSplitting, true, "Gateway.allowBatchSplitting")
config.RegisterDurationConfigVariable(0, &ReadTimeout, false, time.Second, []string{"ReadTimeout", "ReadTimeOutInSec"}...)
config.RegisterDurationConfigVariable(0, &ReadHeaderTimeout, false, time.Second, []string{"ReadHeaderTimeout", "ReadHeaderTimeoutInSec"}...)
config.RegisterDurationConfigVariable(10, &WriteTimeout, false, time.Second, []string{"WriteTimeout", "WriteTimeOutInSec"}...)
Expand Down
147 changes: 101 additions & 46 deletions gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type webRequestT struct {
writeKey string
ipAddr string
userIDHeader string
errors []string
}

type batchWebRequestT struct {
Expand All @@ -98,6 +99,7 @@ var (
IdleTimeout time.Duration
allowReqsWithoutUserIDAndAnonymousID bool
gwAllowPartialWriteWithErrors bool
allowBatchSplitting bool
pkgLogger logger.Logger
Diagnostics diagnostics.DiagnosticsI
semverRegexp = regexp.MustCompile(`^v?([0-9]+)(\.[0-9]+)?(\.[0-9]+)?(-([0-9A-Za-z\-]+(\.[0-9A-Za-z\-]+)*))?(\+([0-9A-Za-z\-]+(\.[0-9A-Za-z\-]+)*))?$`)
Expand Down Expand Up @@ -126,7 +128,7 @@ func Init() {
}

type userWorkerBatchRequestT struct {
jobList []*jobsdb.JobT
jobBatches [][]*jobsdb.JobT
respChannel chan map[uuid.UUID]string
}

Expand Down Expand Up @@ -297,34 +299,34 @@ func (gateway *HandleT) userWorkerRequestBatcher() {
func (gateway *HandleT) dbWriterWorkerProcess() {
for breq := range gateway.batchUserWorkerBatchRequestQ {
var (
jobList = make([]*jobsdb.JobT, 0)
jobBatches = make([][]*jobsdb.JobT, 0)
errorMessagesMap map[uuid.UUID]string
)

for _, userWorkerBatchRequest := range breq.batchUserWorkerBatchRequest {
jobList = append(jobList, userWorkerBatchRequest.jobList...)
jobBatches = append(jobBatches, userWorkerBatchRequest.jobBatches...)
}

ctx, cancel := context.WithTimeout(context.Background(), WriteTimeout)
err := gateway.jobsDB.WithStoreSafeTx(ctx, func(tx jobsdb.StoreSafeTx) error {
if gwAllowPartialWriteWithErrors {
var err error
errorMessagesMap, err = gateway.jobsDB.StoreWithRetryEachInTx(ctx, tx, jobList)
errorMessagesMap, err = gateway.jobsDB.StoreEachBatchRetryInTx(ctx, tx, jobBatches)
if err != nil {
return err
}
} else {
err := gateway.jobsDB.StoreInTx(ctx, tx, jobList)
err := gateway.jobsDB.StoreInTx(ctx, tx, lo.Flatten(jobBatches))
if err != nil {
gateway.logger.Errorf("Store into gateway db failed with error: %v", err)
gateway.logger.Errorf("JobList: %+v", jobList)
gateway.logger.Errorf("JobList: %+v", jobBatches)
return err
}
}

// rsources stats
rsourcesStats := rsources.NewStatsCollector(gateway.rsourcesService)
rsourcesStats.JobsStoredWithErrors(jobList, errorMessagesMap)
rsourcesStats.JobsStoredWithErrors(lo.Flatten(jobBatches), errorMessagesMap)
return rsourcesStats.Publish(ctx, tx.SqlTx())
})
if err != nil {
Expand All @@ -333,10 +335,10 @@ func (gateway *HandleT) dbWriterWorkerProcess() {
errorMessage = ctx.Err().Error()
}
if errorMessagesMap == nil {
errorMessagesMap = make(map[uuid.UUID]string, len(jobList))
errorMessagesMap = make(map[uuid.UUID]string, len(jobBatches))
}
for _, job := range jobList {
errorMessagesMap[job.UUID] = errorMessage
for _, batch := range jobBatches {
errorMessagesMap[batch[0].UUID] = errorMessage
}
}
cancel()
Expand Down Expand Up @@ -418,7 +420,7 @@ func (gateway *HandleT) NewSourceStat(writeKey, reqType string) *gwstats.SourceS
// Finally sends responses(error) if any back to the webRequests over their `done` channels
func (gateway *HandleT) userWebRequestWorkerProcess(userWebRequestWorker *userWebRequestWorkerT) {
for breq := range userWebRequestWorker.batchRequestQ {
var jobList []*jobsdb.JobT
var jobBatches [][]*jobsdb.JobT
jobIDReqMap := make(map[uuid.UUID]*webRequestT)
jobSourceTagMap := make(map[uuid.UUID]string)
sourceStats := make(map[string]*gwstats.SourceStat)
Expand Down Expand Up @@ -448,30 +450,44 @@ func (gateway *HandleT) userWebRequestWorkerProcess(userWebRequestWorker *userWe
}
continue
}
jobList = append(jobList, jobData.job)
jobIDReqMap[jobData.job.UUID] = req
jobSourceTagMap[jobData.job.UUID] = sourceTag
eventBatchesToRecord = append(eventBatchesToRecord, sourceDebugger{data: jobData.job.EventPayload, writeKey: writeKey})
if len(jobData.jobs) > 0 {
jobBatches = append(jobBatches, jobData.jobs)
jobIDReqMap[jobData.jobs[0].UUID] = req
jobSourceTagMap[jobData.jobs[0].UUID] = sourceTag
for _, job := range jobData.jobs {
eventBatchesToRecord = append(
eventBatchesToRecord,
sourceDebugger{
data: job.EventPayload,
writeKey: writeKey,
},
)
}
} else {
req.done <- response.InvalidJSON
sourceStats[sourceTag].RequestFailed(response.InvalidJSON)
}
}

errorMessagesMap := make(map[uuid.UUID]string)
if len(jobList) > 0 {
if len(jobBatches) > 0 {
gateway.userWorkerBatchRequestQ <- &userWorkerBatchRequestT{
jobList: jobList,
jobBatches: jobBatches,
respChannel: userWebRequestWorker.reponseQ,
}
errorMessagesMap = <-userWebRequestWorker.reponseQ
}

for _, job := range jobList {
err, found := errorMessagesMap[job.UUID]
sourceTag := jobSourceTagMap[job.UUID]
for _, batch := range jobBatches {
err, found := errorMessagesMap[batch[0].UUID]
sourceTag := jobSourceTagMap[batch[0].UUID]
if found {
sourceStats[sourceTag].RequestEventsFailed(job.EventCount, "storeFailed")
sourceStats[sourceTag].RequestEventsFailed(len(batch), "storeFailed")
jobIDReqMap[batch[0].UUID].errors = append(jobIDReqMap[batch[0].UUID].errors, err)
} else {
sourceStats[sourceTag].RequestEventsSucceeded(job.EventCount)
sourceStats[sourceTag].RequestEventsSucceeded(len(batch))
}
jobIDReqMap[job.UUID].done <- err
jobIDReqMap[batch[0].UUID].done <- err
}
// Sending events to config backend
for _, eventBatch := range eventBatchesToRecord {
Expand All @@ -493,7 +509,7 @@ var (
)

type jobFromReq struct {
job *jobsdb.JobT
jobs []*jobsdb.JobT
numEvents int
version string
}
Expand Down Expand Up @@ -550,12 +566,17 @@ func (gateway *HandleT) getJobDataFromRequest(req *webRequestT) (jobData *jobFro
}
}

type jobObject struct {
userID string
events []map[string]interface{}
}

var (
// map to hold modified/filtered events of the batch
out []map[string]interface{}
out []jobObject

marshalledParams []byte
// values retrieved from first event in batch
firstUserID string
firstSourcesJobRunID, firstSourcesTaskRunID string

// facts about the batch populated as we iterate over events
Expand Down Expand Up @@ -583,7 +604,6 @@ func (gateway *HandleT) getJobDataFromRequest(req *webRequestT) (jobData *jobFro
}

if idx == 0 {
firstUserID = buildUserID(userIDHeader, anonIDFromReq, userIDFromReq)
firstSourcesJobRunID, _ = misc.MapLookup(
toSet,
"context",
Expand Down Expand Up @@ -636,7 +656,11 @@ func (gateway *HandleT) getJobDataFromRequest(req *webRequestT) (jobData *jobFro
containsAudienceList = true
}

out = append(out, toSet)
userID := buildUserID(userIDHeader, anonIDFromReq, userIDFromReq)
out = append(out, jobObject{
userID: userID,
events: []map[string]interface{}{toSet},
})
}

if len(out) == 0 && suppressed {
Expand All @@ -649,19 +673,12 @@ func (gateway *HandleT) getJobDataFromRequest(req *webRequestT) (jobData *jobFro
return
}

body, _ = sjson.SetBytes(body, "batch", out)
body, _ = sjson.SetBytes(body, "requestIP", ipAddr)
body, _ = sjson.SetBytes(body, "writeKey", writeKey)
body, _ = sjson.SetBytes(body, "receivedAt", time.Now().Format(misc.RFC3339Milli))

id := uuid.New()

params := map[string]interface{}{
"source_id": sourceID,
"source_job_run_id": firstSourcesJobRunID,
"source_task_run_id": firstSourcesTaskRunID,
}
marshalledParams, err := json.Marshal(params)
marshalledParams, err = json.Marshal(params)
if err != nil {
gateway.logger.Errorf(
"[Gateway] Failed to marshal parameters map. Parameters: %+v",
Expand All @@ -671,17 +688,55 @@ func (gateway *HandleT) getJobDataFromRequest(req *webRequestT) (jobData *jobFro
`{"error": "rudder-server gateway failed to marshal params"}`,
)
}
if !allowBatchSplitting {
// instead of multiple jobs with one event, create one job with all events
out = []jobObject{
{
userID: out[0].userID,
events: lo.Map(out, func(userEvent jobObject, _ int) map[string]interface{} {
return userEvent.events[0]
}),
},
}
}
jobs := make([]*jobsdb.JobT, 0)
for _, userEvent := range out {
var (
payload json.RawMessage
eventCount int
)
{
type SingularEventBatch struct {
Batch []map[string]interface{} `json:"batch"`
RequestIP string `json:"requestIP"`
WriteKey string `json:"writeKey"`
ReceivedAt string `json:"receivedAt"`
}
singularEventBatch := SingularEventBatch{
Batch: userEvent.events,
RequestIP: ipAddr,
WriteKey: writeKey,
ReceivedAt: time.Now().Format(misc.RFC3339Milli),
}
payload, err = json.Marshal(singularEventBatch)
if err != nil {
panic(err)
}
eventCount = len(userEvent.events)
}

jobs = append(jobs, &jobsdb.JobT{
UUID: uuid.New(),
UserID: userEvent.userID,
Parameters: marshalledParams,
CustomVal: CustomVal,
EventPayload: payload,
EventCount: eventCount,
WorkspaceId: workspaceId,
})
}
err = nil
job := &jobsdb.JobT{
UUID: id,
UserID: firstUserID,
Parameters: marshalledParams,
CustomVal: CustomVal,
EventPayload: body,
EventCount: jobData.numEvents,
WorkspaceId: workspaceId,
}
jobData.job = job
jobData.jobs = jobs
return
}

Expand Down

0 comments on commit 1ccec6e

Please sign in to comment.