Skip to content

Commit

Permalink
fix: wrong processor out of order stats
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth committed Feb 27, 2024
1 parent 3f87930 commit 7468c95
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 14 deletions.
18 changes: 4 additions & 14 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ type Handle struct {
tracer stats.Tracer
backendConfig backendconfig.BackendConfig
transformer transformer.Transformer
lastJobID int64

gatewayDB jobsdb.JobsDB
routerDB jobsdb.JobsDB
Expand Down Expand Up @@ -176,8 +175,8 @@ type processorStats struct {
statDBReadRequests func(partition string) stats.Measurement
statDBReadEvents func(partition string) stats.Measurement
statDBReadPayloadBytes func(partition string) stats.Measurement
statDBReadOutOfOrder func(partition string) stats.Measurement
statDBReadOutOfSequence func(partition string) stats.Measurement
StatDBReadOutOfOrder func(partition string) stats.Measurement
StatDBReadOutOfSequence func(partition string) stats.Measurement
statMarkExecuting func(partition string) stats.Measurement
statDBWriteStatusTime func(partition string) stats.Measurement
statDBWriteJobsTime func(partition string) stats.Measurement
Expand Down Expand Up @@ -507,12 +506,12 @@ func (proc *Handle) Setup(
"partition": partition,
})
}
proc.stats.statDBReadOutOfOrder = func(partition string) stats.Measurement {
proc.stats.StatDBReadOutOfOrder = func(partition string) stats.Measurement {
return proc.statsFactory.NewTaggedStat("processor_db_read_out_of_order", stats.CountType, stats.Tags{
"partition": partition,
})
}
proc.stats.statDBReadOutOfSequence = func(partition string) stats.Measurement {
proc.stats.StatDBReadOutOfSequence = func(partition string) stats.Measurement {
return proc.statsFactory.NewTaggedStat("processor_db_read_out_of_sequence", stats.CountType, stats.Tags{
"partition": partition,
})
Expand Down Expand Up @@ -2922,15 +2921,6 @@ func (proc *Handle) getJobs(partition string) jobsdb.JobsResult {
totalPayloadBytes := 0
for _, job := range unprocessedList.Jobs {
totalPayloadBytes += len(job.EventPayload)

if job.JobID <= proc.lastJobID {
proc.logger.Debugf("Out of order job_id: prev: %d cur: %d", proc.lastJobID, job.JobID)
proc.stats.statDBReadOutOfOrder(partition).Count(1)
} else if proc.lastJobID != 0 && job.JobID != proc.lastJobID+1 {
proc.logger.Debugf("Out of sequence job_id: prev: %d cur: %d", proc.lastJobID, job.JobID)
proc.stats.statDBReadOutOfSequence(partition).Count(1)
}
proc.lastJobID = job.JobID
}
dbReadTime := time.Since(s)
defer proc.stats.statDBR(partition).SendTiming(dbReadTime)
Expand Down
12 changes: 12 additions & 0 deletions processor/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type worker struct {
handle workerHandle
logger logger.Logger

lastJobID int64

lifecycle struct { // worker lifecycle related fields
ctx context.Context // worker context
cancel context.CancelFunc // worker context cancel function
Expand Down Expand Up @@ -132,6 +134,16 @@ func (w *worker) Work() (worked bool) {
return
}
worked = true
for _, job := range jobs.Jobs {
if job.JobID <= w.lastJobID {
w.logger.Debugf("Out of order job_id: prev: %d cur: %d", w.lastJobID, job.JobID)
w.handle.stats().StatDBReadOutOfOrder(w.partition).Count(1)
} else if w.lastJobID != 0 && job.JobID != w.lastJobID+1 {
w.logger.Debugf("Out of sequence job_id: prev: %d cur: %d", w.lastJobID, job.JobID)
w.handle.stats().StatDBReadOutOfSequence(w.partition).Count(1)
}
w.lastJobID = job.JobID
}

if err := w.handle.markExecuting(w.partition, jobs.Jobs); err != nil {
w.logger.Error(err)
Expand Down
6 changes: 6 additions & 0 deletions processor/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,12 @@ func (*mockWorkerHandle) stats() *processorStats {
DBReadThroughput: func(partition string) stats.Measurement {
return stats.Default.NewStat("db_read_throughput", stats.CountType)
},
StatDBReadOutOfOrder: func(partition string) stats.Measurement {
return stats.Default.NewStat("db_read_out_of_order", stats.CountType)
},
StatDBReadOutOfSequence: func(partition string) stats.Measurement {
return stats.Default.NewStat("db_read_out_of_sequence", stats.CountType)
},
}
}

Expand Down

0 comments on commit 7468c95

Please sign in to comment.