Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: wrong processor out of order stats #4426

Merged
merged 2 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 4 additions & 14 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ type Handle struct {
tracer stats.Tracer
backendConfig backendconfig.BackendConfig
transformer transformer.Transformer
lastJobID int64

gatewayDB jobsdb.JobsDB
routerDB jobsdb.JobsDB
Expand Down Expand Up @@ -176,8 +175,8 @@ type processorStats struct {
statDBReadRequests func(partition string) stats.Measurement
statDBReadEvents func(partition string) stats.Measurement
statDBReadPayloadBytes func(partition string) stats.Measurement
statDBReadOutOfOrder func(partition string) stats.Measurement
statDBReadOutOfSequence func(partition string) stats.Measurement
StatDBReadOutOfOrder func(partition string) stats.Measurement
StatDBReadOutOfSequence func(partition string) stats.Measurement
Sidddddarth marked this conversation as resolved.
Show resolved Hide resolved
statMarkExecuting func(partition string) stats.Measurement
statDBWriteStatusTime func(partition string) stats.Measurement
statDBWriteJobsTime func(partition string) stats.Measurement
Expand Down Expand Up @@ -507,12 +506,12 @@ func (proc *Handle) Setup(
"partition": partition,
})
}
proc.stats.statDBReadOutOfOrder = func(partition string) stats.Measurement {
proc.stats.StatDBReadOutOfOrder = func(partition string) stats.Measurement {
return proc.statsFactory.NewTaggedStat("processor_db_read_out_of_order", stats.CountType, stats.Tags{
"partition": partition,
})
}
proc.stats.statDBReadOutOfSequence = func(partition string) stats.Measurement {
proc.stats.StatDBReadOutOfSequence = func(partition string) stats.Measurement {
return proc.statsFactory.NewTaggedStat("processor_db_read_out_of_sequence", stats.CountType, stats.Tags{
"partition": partition,
})
Expand Down Expand Up @@ -2922,15 +2921,6 @@ func (proc *Handle) getJobs(partition string) jobsdb.JobsResult {
totalPayloadBytes := 0
for _, job := range unprocessedList.Jobs {
totalPayloadBytes += len(job.EventPayload)

if job.JobID <= proc.lastJobID {
proc.logger.Debugf("Out of order job_id: prev: %d cur: %d", proc.lastJobID, job.JobID)
proc.stats.statDBReadOutOfOrder(partition).Count(1)
} else if proc.lastJobID != 0 && job.JobID != proc.lastJobID+1 {
proc.logger.Debugf("Out of sequence job_id: prev: %d cur: %d", proc.lastJobID, job.JobID)
proc.stats.statDBReadOutOfSequence(partition).Count(1)
}
proc.lastJobID = job.JobID
}
dbReadTime := time.Since(s)
defer proc.stats.statDBR(partition).SendTiming(dbReadTime)
Expand Down
12 changes: 12 additions & 0 deletions processor/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type worker struct {
handle workerHandle
logger logger.Logger

lastJobID int64

lifecycle struct { // worker lifecycle related fields
ctx context.Context // worker context
cancel context.CancelFunc // worker context cancel function
Expand Down Expand Up @@ -132,6 +134,16 @@ func (w *worker) Work() (worked bool) {
return
}
worked = true
for _, job := range jobs.Jobs {
if job.JobID <= w.lastJobID {
w.logger.Debugf("Out of order job_id: prev: %d cur: %d", w.lastJobID, job.JobID)
w.handle.stats().StatDBReadOutOfOrder(w.partition).Count(1)
} else if w.lastJobID != 0 && job.JobID != w.lastJobID+1 {
w.logger.Debugf("Out of sequence job_id: prev: %d cur: %d", w.lastJobID, job.JobID)
w.handle.stats().StatDBReadOutOfSequence(w.partition).Count(1)
}
Sidddddarth marked this conversation as resolved.
Show resolved Hide resolved
w.lastJobID = job.JobID
}

if err := w.handle.markExecuting(w.partition, jobs.Jobs); err != nil {
w.logger.Error(err)
Expand Down
6 changes: 6 additions & 0 deletions processor/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,12 @@ func (*mockWorkerHandle) stats() *processorStats {
DBReadThroughput: func(partition string) stats.Measurement {
return stats.Default.NewStat("db_read_throughput", stats.CountType)
},
StatDBReadOutOfOrder: func(partition string) stats.Measurement {
return stats.Default.NewStat("db_read_out_of_order", stats.CountType)
},
StatDBReadOutOfSequence: func(partition string) stats.Measurement {
return stats.Default.NewStat("db_read_out_of_sequence", stats.CountType)
},
Sidddddarth marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down