Skip to content

Commit

Permalink
fix: unnecessary router pending event counts (#2849)
Browse files Browse the repository at this point in the history
Fixes unnecessary extra statistics captured at router level, where pending event Gauges were initialized for all destinations for all workspaces served by the server instance.

Co-authored-by: Aris Tzoumas <atzoumas@rudderstack.com>
  • Loading branch information
Sidddddarth and atzoum committed Jan 9, 2023
1 parent 0459aff commit 6c5d928
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 104 deletions.
130 changes: 73 additions & 57 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"

jsoniter "github.com/json-iterator/go"
"github.com/samber/lo"
"github.com/tidwall/gjson"
"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -1550,85 +1551,84 @@ func sendQueryRetryStats(attempt int) {

func (proc *HandleT) Store(in *storeMessage) {
statusList, destJobs, batchDestJobs := in.statusList, in.destJobs, in.batchDestJobs
processorLoopStats := make(map[string]map[string]map[string]int)
beforeStoreStatus := time.Now()
// XX: Need to do this in a transaction
if len(batchDestJobs) > 0 {
proc.logger.Debug("[Processor] Total jobs written to batch router : ", len(batchDestJobs))

err := misc.RetryWithNotify(context.Background(), proc.jobsDBCommandTimeout, proc.jobdDBMaxRetries, func(ctx context.Context) error {
return proc.batchRouterDB.WithStoreSafeTx(ctx, func(tx jobsdb.StoreSafeTx) error {
err := proc.batchRouterDB.StoreInTx(ctx, tx, batchDestJobs)
if err != nil {
return fmt.Errorf("storing batch router jobs: %w", err)
}

// rsources stats
err = proc.updateRudderSourcesStats(ctx, tx, batchDestJobs)
if err != nil {
return fmt.Errorf("publishing rsources stats for batch router: %w", err)
}
return nil
})
}, sendRetryStoreStats)
err := misc.RetryWithNotify(
context.Background(),
proc.jobsDBCommandTimeout,
proc.jobdDBMaxRetries,
func(ctx context.Context) error {
return proc.batchRouterDB.WithStoreSafeTx(
ctx,
func(tx jobsdb.StoreSafeTx) error {
err := proc.batchRouterDB.StoreInTx(ctx, tx, batchDestJobs)
if err != nil {
return fmt.Errorf("storing batch router jobs: %w", err)
}

// rsources stats
err = proc.updateRudderSourcesStats(ctx, tx, batchDestJobs)
if err != nil {
return fmt.Errorf("publishing rsources stats for batch router: %w", err)
}
return nil
})
}, sendRetryStoreStats)
if err != nil {
panic(err)
}

totalPayloadBatchBytes := 0
processorLoopStats["batch_router"] = make(map[string]map[string]int)
for i := range batchDestJobs {
_, ok := processorLoopStats["batch_router"][batchDestJobs[i].WorkspaceId]
if !ok {
processorLoopStats["batch_router"][batchDestJobs[i].WorkspaceId] = make(map[string]int)
}
processorLoopStats["batch_router"][batchDestJobs[i].WorkspaceId][batchDestJobs[i].CustomVal] += 1
totalPayloadBatchBytes += len(batchDestJobs[i].EventPayload)
}
proc.multitenantI.ReportProcLoopAddStats(processorLoopStats["batch_router"], "batch_rt")

proc.multitenantI.ReportProcLoopAddStats(
getJobCountsByWorkspaceDestType(batchDestJobs),
"batch_rt",
)
proc.stats.statBatchDestNumOutputEvents.Count(len(batchDestJobs))
proc.stats.statDBWriteBatchEvents.Observe(float64(len(batchDestJobs)))
proc.stats.statDBWriteBatchPayloadBytes.Observe(float64(totalPayloadBatchBytes))
proc.stats.statDBWriteBatchPayloadBytes.Observe(
float64(lo.SumBy(destJobs, func(j *jobsdb.JobT) int { return len(j.EventPayload) })),
)
}

if len(destJobs) > 0 {
proc.logger.Debug("[Processor] Total jobs written to router : ", len(destJobs))

err := misc.RetryWithNotify(context.Background(), proc.jobsDBCommandTimeout, proc.jobdDBMaxRetries, func(ctx context.Context) error {
return proc.routerDB.WithStoreSafeTx(ctx, func(tx jobsdb.StoreSafeTx) error {
err := proc.routerDB.StoreInTx(ctx, tx, destJobs)
if err != nil {
return fmt.Errorf("storing router jobs: %w", err)
}

// rsources stats
err = proc.updateRudderSourcesStats(ctx, tx, destJobs)
if err != nil {
return fmt.Errorf("publishing rsources stats for router: %w", err)
}
return nil
})
}, sendRetryStoreStats)
err := misc.RetryWithNotify(
context.Background(),
proc.jobsDBCommandTimeout,
proc.jobdDBMaxRetries,
func(ctx context.Context) error {
return proc.routerDB.WithStoreSafeTx(
ctx,
func(tx jobsdb.StoreSafeTx) error {
err := proc.routerDB.StoreInTx(ctx, tx, destJobs)
if err != nil {
return fmt.Errorf("storing router jobs: %w", err)
}

// rsources stats
err = proc.updateRudderSourcesStats(ctx, tx, destJobs)
if err != nil {
return fmt.Errorf("publishing rsources stats for router: %w", err)
}
return nil
})
}, sendRetryStoreStats)
if err != nil {
panic(err)
}

totalPayloadRouterBytes := 0
processorLoopStats["router"] = make(map[string]map[string]int)
for i := range destJobs {
_, ok := processorLoopStats["router"][destJobs[i].WorkspaceId]
if !ok {
processorLoopStats["router"][destJobs[i].WorkspaceId] = make(map[string]int)
}
processorLoopStats["router"][destJobs[i].WorkspaceId][destJobs[i].CustomVal] += 1
totalPayloadRouterBytes += len(destJobs[i].EventPayload)
}
proc.multitenantI.ReportProcLoopAddStats(processorLoopStats["router"], "rt")

proc.multitenantI.ReportProcLoopAddStats(
getJobCountsByWorkspaceDestType(destJobs),
"rt",
)
proc.stats.statDestNumOutputEvents.Count(len(destJobs))
proc.stats.statDBWriteRouterEvents.Observe(float64(len(destJobs)))
proc.stats.statDBWriteRouterPayloadBytes.Observe(float64(totalPayloadRouterBytes))
proc.stats.statDBWriteRouterPayloadBytes.Observe(
float64(lo.SumBy(destJobs, func(j *jobsdb.JobT) int { return len(j.EventPayload) })),
)
}

for _, jobs := range in.procErrorJobsByDestID {
Expand Down Expand Up @@ -1702,6 +1702,22 @@ func (proc *HandleT) Store(in *storeMessage) {
proc.stats.statProcErrDBW.Count(len(in.procErrorJobs))
}

// getJobCountsByWorkspaceDestType returns the number of jobs per workspace and destination type
//
// map[workspaceID]map[destType]count
func getJobCountsByWorkspaceDestType(jobs []*jobsdb.JobT) map[string]map[string]int {
jobCounts := make(map[string]map[string]int)
for _, job := range jobs {
workspace := job.WorkspaceId
destType := job.CustomVal
if _, ok := jobCounts[workspace]; !ok {
jobCounts[workspace] = make(map[string]int)
}
jobCounts[workspace][destType] += 1
}
return jobCounts
}

type transformSrcDestOutput struct {
reportMetrics []*types.PUReportedMetric
destJobs []*jobsdb.JobT
Expand Down
29 changes: 16 additions & 13 deletions router/batchrouter/batchrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1149,7 +1149,7 @@ func (brt *HandleT) setJobStatus(batchJobs *BatchJobsT, isWarehouse bool, errOcc
batchJobState string
errorResp []byte
)
batchRouterWorkspaceJobStatusCount := make(map[string]map[string]int)
batchRouterWorkspaceJobStatusCount := make(map[string]int)
var abortedEvents []*jobsdb.JobT
var batchReqMetric batchRequestMetric
if errOccurred != nil {
Expand Down Expand Up @@ -1274,13 +1274,8 @@ func (brt *HandleT) setJobStatus(batchJobs *BatchJobsT, isWarehouse bool, errOcc
errorCode := getBRTErrorCode(jobState)
var cd *types.ConnectionDetails
workspaceID := job.WorkspaceId
_, ok := batchRouterWorkspaceJobStatusCount[workspaceID]
if !ok {
batchRouterWorkspaceJobStatusCount[workspaceID] = make(map[string]int)
}
key := fmt.Sprintf("%s:%s:%s:%s:%s:%s:%s", parameters.SourceID, parameters.DestinationID, parameters.SourceBatchID, jobState, strconv.Itoa(errorCode), parameters.EventName, parameters.EventType)
_, ok = connectionDetailsMap[key]
if !ok {
if _, ok := connectionDetailsMap[key]; !ok {
cd = types.CreateConnectionDetail(parameters.SourceID, parameters.DestinationID, parameters.SourceBatchID, parameters.SourceTaskID, parameters.SourceTaskRunID, parameters.SourceJobID, parameters.SourceJobRunID, parameters.SourceDefinitionID, parameters.DestinationDefinitionID, parameters.SourceCategory)
connectionDetailsMap[key] = cd
transformedAtMap[key] = parameters.TransformAt
Expand All @@ -1299,18 +1294,21 @@ func (brt *HandleT) setJobStatus(batchJobs *BatchJobsT, isWarehouse bool, errOcc
}
if status.JobState != jobsdb.Failed.State {
if status.JobState == jobsdb.Succeeded.State || status.JobState == jobsdb.Aborted.State {
batchRouterWorkspaceJobStatusCount[workspaceID][parameters.DestinationID] += 1
batchRouterWorkspaceJobStatusCount[workspaceID] += 1
}
sd.Count++
}
}
// REPORTING - END
}

for workspace := range batchRouterWorkspaceJobStatusCount {
for destID := range batchRouterWorkspaceJobStatusCount[workspace] {
metric.DecreasePendingEvents("batch_rt", workspace, brt.destType, float64(batchRouterWorkspaceJobStatusCount[workspace][destID]))
}
for workspace, jobCount := range batchRouterWorkspaceJobStatusCount {
metric.DecreasePendingEvents(
"batch_rt",
workspace,
brt.destType,
float64(jobCount),
)
}
// tracking batch router errors
if diagnostics.EnableDestinationFailuresMetric {
Expand Down Expand Up @@ -1766,7 +1764,12 @@ func (worker *workerT) workerProcess() {
"workspaceId": destDrainStat.Workspace,
})
brt.drainedJobsStat.Count(destDrainStat.Count)
metric.DecreasePendingEvents("batch_rt", destDrainStat.Workspace, brt.destType, float64(drainStatsbyDest[destID].Count))
metric.DecreasePendingEvents(
"batch_rt",
destDrainStat.Workspace,
brt.destType,
float64(destDrainStat.Count),
)
}
}
// Mark the jobs as executing
Expand Down
62 changes: 31 additions & 31 deletions router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -1405,12 +1405,6 @@ func (rt *HandleT) commitStatusList(responseList *[]jobResponseT) {
}
// REPORTING - ROUTER - END

defer func() {
for workspace := range routerWorkspaceJobStatusCount {
metric.DecreasePendingEvents("rt", workspace, rt.destName, float64(routerWorkspaceJobStatusCount[workspace]))
}
}()

if len(statusList) > 0 {
rt.logger.Debugf("[%v Router] :: flushing batch of %v status", rt.destName, updateStatusBatchSize)

Expand Down Expand Up @@ -1447,6 +1441,14 @@ func (rt *HandleT) commitStatusList(responseList *[]jobResponseT) {
panic(err)
}
rt.updateProcessedEventsMetrics(statusList)
for workspace, jobCount := range routerWorkspaceJobStatusCount {
metric.DecreasePendingEvents(
"rt",
workspace,
rt.destName,
float64(jobCount),
)
}
}

if rt.guaranteeUserEventOrder {
Expand Down Expand Up @@ -1967,34 +1969,32 @@ func (rt *HandleT) backendConfigSubscriber() {
for i := range wConfig.Sources {
source := &wConfig.Sources[i]
rt.sourceIDWorkspaceMap[source.ID] = workspaceID
if _, ok := rt.workspaceSet[workspaceID]; !ok {
rt.workspaceSet[workspaceID] = struct{}{}
rt.MultitenantI.UpdateWorkspaceLatencyMap(rt.destName, workspaceID, 0)
}
if len(source.Destinations) > 0 {
for i := range source.Destinations {
destination := &source.Destinations[i]
if destination.DestinationDefinition.Name == rt.destName {
if _, ok := rt.destinationsMap[destination.ID]; !ok {
rt.destinationsMap[destination.ID] = &routerutils.BatchDestinationT{
Destination: *destination,
Sources: []backendconfig.SourceT{},
}
for i := range source.Destinations {
destination := &source.Destinations[i]
if destination.DestinationDefinition.Name == rt.destName {
if _, ok := rt.destinationsMap[destination.ID]; !ok {
rt.destinationsMap[destination.ID] = &routerutils.BatchDestinationT{
Destination: *destination,
Sources: []backendconfig.SourceT{},
}
rt.destinationsMap[destination.ID].Sources = append(rt.destinationsMap[destination.ID].Sources, *source)
}
if _, ok := rt.workspaceSet[workspaceID]; !ok {
rt.workspaceSet[workspaceID] = struct{}{}
rt.MultitenantI.UpdateWorkspaceLatencyMap(rt.destName, workspaceID, 0)
}
rt.destinationsMap[destination.ID].Sources = append(rt.destinationsMap[destination.ID].Sources, *source)

rt.destinationResponseHandler = New(destination.DestinationDefinition.ResponseRules)
if value, ok := destination.DestinationDefinition.Config["saveDestinationResponse"].(bool); ok {
rt.saveDestinationResponse = value
}
rt.destinationResponseHandler = New(destination.DestinationDefinition.ResponseRules)
if value, ok := destination.DestinationDefinition.Config["saveDestinationResponse"].(bool); ok {
rt.saveDestinationResponse = value
}

// Config key "throttlingCost" is expected to have the eventType as the first key and the call type
// as the second key (e.g. track, identify, etc...) or default to apply the cost to all call types:
// dDT["config"]["throttlingCost"] = `{"eventType":{"default":1,"track":2,"identify":3}}`
if value, ok := destination.DestinationDefinition.Config["throttlingCost"].(map[string]interface{}); ok {
m := types.NewEventTypeThrottlingCost(value)
rt.throttlingCosts.Store(&m)
}
// Config key "throttlingCost" is expected to have the eventType as the first key and the call type
// as the second key (e.g. track, identify, etc...) or default to apply the cost to all call types:
// dDT["config"]["throttlingCost"] = `{"eventType":{"default":1,"track":2,"identify":3}}`
if value, ok := destination.DestinationDefinition.Config["throttlingCost"].(map[string]interface{}); ok {
m := types.NewEventTypeThrottlingCost(value)
rt.throttlingCosts.Store(&m)
}
}
}
Expand Down
11 changes: 8 additions & 3 deletions services/multitenant/tenantstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,13 @@ func (t *Stats) Start() error {
}

for workspace := range pileUpStatMap {
for destType := range pileUpStatMap[workspace] {
metric.IncreasePendingEvents(dbPrefix, workspace, destType, float64(pileUpStatMap[workspace][destType]))
for destType, jobCount := range pileUpStatMap[workspace] {
metric.IncreasePendingEvents(
dbPrefix,
workspace,
destType,
float64(jobCount),
)
}
}
}
Expand All @@ -96,7 +101,7 @@ func sendQueryRetryStats(attempt int) {
}

func NewStats(routerDBs map[string]jobsdb.MultiTenantJobsDB) *Stats {
t := Stats{}
var t Stats
config.RegisterDurationConfigVariable(60, &t.jobdDBQueryRequestTimeout, true, time.Second, []string{"JobsDB.Multitenant.QueryRequestTimeout", "JobsDB.QueryRequestTimeout"}...)
config.RegisterIntConfigVariable(3, &t.jobdDBMaxRetries, true, 1, []string{"JobsDB." + "Router." + "MaxRetries", "JobsDB." + "MaxRetries"}...)
t.RouterDBs = routerDBs
Expand Down

0 comments on commit 6c5d928

Please sign in to comment.