From 6c5d9285bf464c52db79efd616de1694030f7ec2 Mon Sep 17 00:00:00 2001 From: Sidddddarth <82795818+Sidddddarth@users.noreply.github.com> Date: Mon, 9 Jan 2023 16:05:24 +0530 Subject: [PATCH] fix: unnecessary router pending event counts (#2849) Fixes unnecessary extra statistics captured at router level, where pending event Gauges were initialized for all destinations for all workspaces served by the server instance. Co-authored-by: Aris Tzoumas --- processor/processor.go | 130 ++++++++++++++++------------ router/batchrouter/batchrouter.go | 29 ++++--- router/router.go | 62 ++++++------- services/multitenant/tenantstats.go | 11 ++- 4 files changed, 128 insertions(+), 104 deletions(-) diff --git a/processor/processor.go b/processor/processor.go index e5d0e3340e..4b0446814a 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -17,6 +17,7 @@ import ( "time" jsoniter "github.com/json-iterator/go" + "github.com/samber/lo" "github.com/tidwall/gjson" "golang.org/x/sync/errgroup" @@ -1550,85 +1551,84 @@ func sendQueryRetryStats(attempt int) { func (proc *HandleT) Store(in *storeMessage) { statusList, destJobs, batchDestJobs := in.statusList, in.destJobs, in.batchDestJobs - processorLoopStats := make(map[string]map[string]map[string]int) beforeStoreStatus := time.Now() // XX: Need to do this in a transaction if len(batchDestJobs) > 0 { proc.logger.Debug("[Processor] Total jobs written to batch router : ", len(batchDestJobs)) - err := misc.RetryWithNotify(context.Background(), proc.jobsDBCommandTimeout, proc.jobdDBMaxRetries, func(ctx context.Context) error { - return proc.batchRouterDB.WithStoreSafeTx(ctx, func(tx jobsdb.StoreSafeTx) error { - err := proc.batchRouterDB.StoreInTx(ctx, tx, batchDestJobs) - if err != nil { - return fmt.Errorf("storing batch router jobs: %w", err) - } - - // rsources stats - err = proc.updateRudderSourcesStats(ctx, tx, batchDestJobs) - if err != nil { - return fmt.Errorf("publishing rsources stats for batch router: %w", err) - } - return nil - }) - }, sendRetryStoreStats) + err := misc.RetryWithNotify( + context.Background(), + proc.jobsDBCommandTimeout, + proc.jobdDBMaxRetries, + func(ctx context.Context) error { + return proc.batchRouterDB.WithStoreSafeTx( + ctx, + func(tx jobsdb.StoreSafeTx) error { + err := proc.batchRouterDB.StoreInTx(ctx, tx, batchDestJobs) + if err != nil { + return fmt.Errorf("storing batch router jobs: %w", err) + } + + // rsources stats + err = proc.updateRudderSourcesStats(ctx, tx, batchDestJobs) + if err != nil { + return fmt.Errorf("publishing rsources stats for batch router: %w", err) + } + return nil + }) + }, sendRetryStoreStats) if err != nil { panic(err) } - totalPayloadBatchBytes := 0 - processorLoopStats["batch_router"] = make(map[string]map[string]int) - for i := range batchDestJobs { - _, ok := processorLoopStats["batch_router"][batchDestJobs[i].WorkspaceId] - if !ok { - processorLoopStats["batch_router"][batchDestJobs[i].WorkspaceId] = make(map[string]int) - } - processorLoopStats["batch_router"][batchDestJobs[i].WorkspaceId][batchDestJobs[i].CustomVal] += 1 - totalPayloadBatchBytes += len(batchDestJobs[i].EventPayload) - } - proc.multitenantI.ReportProcLoopAddStats(processorLoopStats["batch_router"], "batch_rt") - + proc.multitenantI.ReportProcLoopAddStats( + getJobCountsByWorkspaceDestType(batchDestJobs), + "batch_rt", + ) proc.stats.statBatchDestNumOutputEvents.Count(len(batchDestJobs)) proc.stats.statDBWriteBatchEvents.Observe(float64(len(batchDestJobs))) - proc.stats.statDBWriteBatchPayloadBytes.Observe(float64(totalPayloadBatchBytes)) + proc.stats.statDBWriteBatchPayloadBytes.Observe( + float64(lo.SumBy(destJobs, func(j *jobsdb.JobT) int { return len(j.EventPayload) })), + ) } if len(destJobs) > 0 { proc.logger.Debug("[Processor] Total jobs written to router : ", len(destJobs)) - err := misc.RetryWithNotify(context.Background(), proc.jobsDBCommandTimeout, proc.jobdDBMaxRetries, func(ctx context.Context) error { - return proc.routerDB.WithStoreSafeTx(ctx, func(tx jobsdb.StoreSafeTx) error { - err := proc.routerDB.StoreInTx(ctx, tx, destJobs) - if err != nil { - return fmt.Errorf("storing router jobs: %w", err) - } - - // rsources stats - err = proc.updateRudderSourcesStats(ctx, tx, destJobs) - if err != nil { - return fmt.Errorf("publishing rsources stats for router: %w", err) - } - return nil - }) - }, sendRetryStoreStats) + err := misc.RetryWithNotify( + context.Background(), + proc.jobsDBCommandTimeout, + proc.jobdDBMaxRetries, + func(ctx context.Context) error { + return proc.routerDB.WithStoreSafeTx( + ctx, + func(tx jobsdb.StoreSafeTx) error { + err := proc.routerDB.StoreInTx(ctx, tx, destJobs) + if err != nil { + return fmt.Errorf("storing router jobs: %w", err) + } + + // rsources stats + err = proc.updateRudderSourcesStats(ctx, tx, destJobs) + if err != nil { + return fmt.Errorf("publishing rsources stats for router: %w", err) + } + return nil + }) + }, sendRetryStoreStats) if err != nil { panic(err) } - totalPayloadRouterBytes := 0 - processorLoopStats["router"] = make(map[string]map[string]int) - for i := range destJobs { - _, ok := processorLoopStats["router"][destJobs[i].WorkspaceId] - if !ok { - processorLoopStats["router"][destJobs[i].WorkspaceId] = make(map[string]int) - } - processorLoopStats["router"][destJobs[i].WorkspaceId][destJobs[i].CustomVal] += 1 - totalPayloadRouterBytes += len(destJobs[i].EventPayload) - } - proc.multitenantI.ReportProcLoopAddStats(processorLoopStats["router"], "rt") - + proc.multitenantI.ReportProcLoopAddStats( + getJobCountsByWorkspaceDestType(destJobs), + "rt", + ) proc.stats.statDestNumOutputEvents.Count(len(destJobs)) proc.stats.statDBWriteRouterEvents.Observe(float64(len(destJobs))) - proc.stats.statDBWriteRouterPayloadBytes.Observe(float64(totalPayloadRouterBytes)) + proc.stats.statDBWriteRouterPayloadBytes.Observe( + float64(lo.SumBy(destJobs, func(j *jobsdb.JobT) int { return len(j.EventPayload) })), + ) } for _, jobs := range in.procErrorJobsByDestID { @@ -1702,6 +1702,22 @@ func (proc *HandleT) Store(in *storeMessage) { proc.stats.statProcErrDBW.Count(len(in.procErrorJobs)) } +// getJobCountsByWorkspaceDestType returns the number of jobs per workspace and destination type +// +// map[workspaceID]map[destType]count +func getJobCountsByWorkspaceDestType(jobs []*jobsdb.JobT) map[string]map[string]int { + jobCounts := make(map[string]map[string]int) + for _, job := range jobs { + workspace := job.WorkspaceId + destType := job.CustomVal + if _, ok := jobCounts[workspace]; !ok { + jobCounts[workspace] = make(map[string]int) + } + jobCounts[workspace][destType] += 1 + } + return jobCounts +} + type transformSrcDestOutput struct { reportMetrics []*types.PUReportedMetric destJobs []*jobsdb.JobT diff --git a/router/batchrouter/batchrouter.go b/router/batchrouter/batchrouter.go index 56d17c488b..30d1aa9473 100644 --- a/router/batchrouter/batchrouter.go +++ b/router/batchrouter/batchrouter.go @@ -1149,7 +1149,7 @@ func (brt *HandleT) setJobStatus(batchJobs *BatchJobsT, isWarehouse bool, errOcc batchJobState string errorResp []byte ) - batchRouterWorkspaceJobStatusCount := make(map[string]map[string]int) + batchRouterWorkspaceJobStatusCount := make(map[string]int) var abortedEvents []*jobsdb.JobT var batchReqMetric batchRequestMetric if errOccurred != nil { @@ -1274,13 +1274,8 @@ func (brt *HandleT) setJobStatus(batchJobs *BatchJobsT, isWarehouse bool, errOcc errorCode := getBRTErrorCode(jobState) var cd *types.ConnectionDetails workspaceID := job.WorkspaceId - _, ok := batchRouterWorkspaceJobStatusCount[workspaceID] - if !ok { - batchRouterWorkspaceJobStatusCount[workspaceID] = make(map[string]int) - } key := fmt.Sprintf("%s:%s:%s:%s:%s:%s:%s", parameters.SourceID, parameters.DestinationID, parameters.SourceBatchID, jobState, strconv.Itoa(errorCode), parameters.EventName, parameters.EventType) - _, ok = connectionDetailsMap[key] - if !ok { + if _, ok := connectionDetailsMap[key]; !ok { cd = types.CreateConnectionDetail(parameters.SourceID, parameters.DestinationID, parameters.SourceBatchID, parameters.SourceTaskID, parameters.SourceTaskRunID, parameters.SourceJobID, parameters.SourceJobRunID, parameters.SourceDefinitionID, parameters.DestinationDefinitionID, parameters.SourceCategory) connectionDetailsMap[key] = cd transformedAtMap[key] = parameters.TransformAt @@ -1299,7 +1294,7 @@ func (brt *HandleT) setJobStatus(batchJobs *BatchJobsT, isWarehouse bool, errOcc } if status.JobState != jobsdb.Failed.State { if status.JobState == jobsdb.Succeeded.State || status.JobState == jobsdb.Aborted.State { - batchRouterWorkspaceJobStatusCount[workspaceID][parameters.DestinationID] += 1 + batchRouterWorkspaceJobStatusCount[workspaceID] += 1 } sd.Count++ } @@ -1307,10 +1302,13 @@ func (brt *HandleT) setJobStatus(batchJobs *BatchJobsT, isWarehouse bool, errOcc // REPORTING - END } - for workspace := range batchRouterWorkspaceJobStatusCount { - for destID := range batchRouterWorkspaceJobStatusCount[workspace] { - metric.DecreasePendingEvents("batch_rt", workspace, brt.destType, float64(batchRouterWorkspaceJobStatusCount[workspace][destID])) - } + for workspace, jobCount := range batchRouterWorkspaceJobStatusCount { + metric.DecreasePendingEvents( + "batch_rt", + workspace, + brt.destType, + float64(jobCount), + ) } // tracking batch router errors if diagnostics.EnableDestinationFailuresMetric { @@ -1766,7 +1764,12 @@ func (worker *workerT) workerProcess() { "workspaceId": destDrainStat.Workspace, }) brt.drainedJobsStat.Count(destDrainStat.Count) - metric.DecreasePendingEvents("batch_rt", destDrainStat.Workspace, brt.destType, float64(drainStatsbyDest[destID].Count)) + metric.DecreasePendingEvents( + "batch_rt", + destDrainStat.Workspace, + brt.destType, + float64(destDrainStat.Count), + ) } } // Mark the jobs as executing diff --git a/router/router.go b/router/router.go index 293b419422..7a15959945 100644 --- a/router/router.go +++ b/router/router.go @@ -1405,12 +1405,6 @@ func (rt *HandleT) commitStatusList(responseList *[]jobResponseT) { } // REPORTING - ROUTER - END - defer func() { - for workspace := range routerWorkspaceJobStatusCount { - metric.DecreasePendingEvents("rt", workspace, rt.destName, float64(routerWorkspaceJobStatusCount[workspace])) - } - }() - if len(statusList) > 0 { rt.logger.Debugf("[%v Router] :: flushing batch of %v status", rt.destName, updateStatusBatchSize) @@ -1447,6 +1441,14 @@ func (rt *HandleT) commitStatusList(responseList *[]jobResponseT) { panic(err) } rt.updateProcessedEventsMetrics(statusList) + for workspace, jobCount := range routerWorkspaceJobStatusCount { + metric.DecreasePendingEvents( + "rt", + workspace, + rt.destName, + float64(jobCount), + ) + } } if rt.guaranteeUserEventOrder { @@ -1967,34 +1969,32 @@ func (rt *HandleT) backendConfigSubscriber() { for i := range wConfig.Sources { source := &wConfig.Sources[i] rt.sourceIDWorkspaceMap[source.ID] = workspaceID - if _, ok := rt.workspaceSet[workspaceID]; !ok { - rt.workspaceSet[workspaceID] = struct{}{} - rt.MultitenantI.UpdateWorkspaceLatencyMap(rt.destName, workspaceID, 0) - } - if len(source.Destinations) > 0 { - for i := range source.Destinations { - destination := &source.Destinations[i] - if destination.DestinationDefinition.Name == rt.destName { - if _, ok := rt.destinationsMap[destination.ID]; !ok { - rt.destinationsMap[destination.ID] = &routerutils.BatchDestinationT{ - Destination: *destination, - Sources: []backendconfig.SourceT{}, - } + for i := range source.Destinations { + destination := &source.Destinations[i] + if destination.DestinationDefinition.Name == rt.destName { + if _, ok := rt.destinationsMap[destination.ID]; !ok { + rt.destinationsMap[destination.ID] = &routerutils.BatchDestinationT{ + Destination: *destination, + Sources: []backendconfig.SourceT{}, } - rt.destinationsMap[destination.ID].Sources = append(rt.destinationsMap[destination.ID].Sources, *source) + } + if _, ok := rt.workspaceSet[workspaceID]; !ok { + rt.workspaceSet[workspaceID] = struct{}{} + rt.MultitenantI.UpdateWorkspaceLatencyMap(rt.destName, workspaceID, 0) + } + rt.destinationsMap[destination.ID].Sources = append(rt.destinationsMap[destination.ID].Sources, *source) - rt.destinationResponseHandler = New(destination.DestinationDefinition.ResponseRules) - if value, ok := destination.DestinationDefinition.Config["saveDestinationResponse"].(bool); ok { - rt.saveDestinationResponse = value - } + rt.destinationResponseHandler = New(destination.DestinationDefinition.ResponseRules) + if value, ok := destination.DestinationDefinition.Config["saveDestinationResponse"].(bool); ok { + rt.saveDestinationResponse = value + } - // Config key "throttlingCost" is expected to have the eventType as the first key and the call type - // as the second key (e.g. track, identify, etc...) or default to apply the cost to all call types: - // dDT["config"]["throttlingCost"] = `{"eventType":{"default":1,"track":2,"identify":3}}` - if value, ok := destination.DestinationDefinition.Config["throttlingCost"].(map[string]interface{}); ok { - m := types.NewEventTypeThrottlingCost(value) - rt.throttlingCosts.Store(&m) - } + // Config key "throttlingCost" is expected to have the eventType as the first key and the call type + // as the second key (e.g. track, identify, etc...) or default to apply the cost to all call types: + // dDT["config"]["throttlingCost"] = `{"eventType":{"default":1,"track":2,"identify":3}}` + if value, ok := destination.DestinationDefinition.Config["throttlingCost"].(map[string]interface{}); ok { + m := types.NewEventTypeThrottlingCost(value) + rt.throttlingCosts.Store(&m) } } } diff --git a/services/multitenant/tenantstats.go b/services/multitenant/tenantstats.go index 5b0adf9bc8..b3cede2d6a 100644 --- a/services/multitenant/tenantstats.go +++ b/services/multitenant/tenantstats.go @@ -72,8 +72,13 @@ func (t *Stats) Start() error { } for workspace := range pileUpStatMap { - for destType := range pileUpStatMap[workspace] { - metric.IncreasePendingEvents(dbPrefix, workspace, destType, float64(pileUpStatMap[workspace][destType])) + for destType, jobCount := range pileUpStatMap[workspace] { + metric.IncreasePendingEvents( + dbPrefix, + workspace, + destType, + float64(jobCount), + ) } } } @@ -96,7 +101,7 @@ func sendQueryRetryStats(attempt int) { } func NewStats(routerDBs map[string]jobsdb.MultiTenantJobsDB) *Stats { - t := Stats{} + var t Stats config.RegisterDurationConfigVariable(60, &t.jobdDBQueryRequestTimeout, true, time.Second, []string{"JobsDB.Multitenant.QueryRequestTimeout", "JobsDB.QueryRequestTimeout"}...) config.RegisterIntConfigVariable(3, &t.jobdDBMaxRetries, true, 1, []string{"JobsDB." + "Router." + "MaxRetries", "JobsDB." + "MaxRetries"}...) t.RouterDBs = routerDBs