Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth committed Mar 1, 2024
1 parent 7468c95 commit 04c92d1
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 12 deletions.
8 changes: 4 additions & 4 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ type processorStats struct {
statDBReadRequests func(partition string) stats.Measurement
statDBReadEvents func(partition string) stats.Measurement
statDBReadPayloadBytes func(partition string) stats.Measurement
StatDBReadOutOfOrder func(partition string) stats.Measurement
StatDBReadOutOfSequence func(partition string) stats.Measurement
statDBReadOutOfOrder func(partition string) stats.Measurement
statDBReadOutOfSequence func(partition string) stats.Measurement
statMarkExecuting func(partition string) stats.Measurement
statDBWriteStatusTime func(partition string) stats.Measurement
statDBWriteJobsTime func(partition string) stats.Measurement
Expand Down Expand Up @@ -506,12 +506,12 @@ func (proc *Handle) Setup(
"partition": partition,
})
}
proc.stats.StatDBReadOutOfOrder = func(partition string) stats.Measurement {
proc.stats.statDBReadOutOfOrder = func(partition string) stats.Measurement {
return proc.statsFactory.NewTaggedStat("processor_db_read_out_of_order", stats.CountType, stats.Tags{
"partition": partition,
})
}
proc.stats.StatDBReadOutOfSequence = func(partition string) stats.Measurement {
proc.stats.statDBReadOutOfSequence = func(partition string) stats.Measurement {
return proc.statsFactory.NewTaggedStat("processor_db_read_out_of_sequence", stats.CountType, stats.Tags{
"partition": partition,
})
Expand Down
16 changes: 12 additions & 4 deletions processor/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,19 @@ func (w *worker) Work() (worked bool) {
worked = true
for _, job := range jobs.Jobs {
if job.JobID <= w.lastJobID {
w.logger.Debugf("Out of order job_id: prev: %d cur: %d", w.lastJobID, job.JobID)
w.handle.stats().StatDBReadOutOfOrder(w.partition).Count(1)
w.logger.Debugn(
"Out of order job_id",
logger.NewIntField("prev", w.lastJobID),
logger.NewIntField("cur", job.JobID),
)
w.handle.stats().statDBReadOutOfOrder(w.partition).Count(1)
} else if w.lastJobID != 0 && job.JobID != w.lastJobID+1 {
w.logger.Debugf("Out of sequence job_id: prev: %d cur: %d", w.lastJobID, job.JobID)
w.handle.stats().StatDBReadOutOfSequence(w.partition).Count(1)
w.logger.Debugn(
"Out of sequence job_id",
logger.NewIntField("prev", w.lastJobID),
logger.NewIntField("cur", job.JobID),
)
w.handle.stats().statDBReadOutOfSequence(w.partition).Count(1)
}
w.lastJobID = job.JobID
}
Expand Down
8 changes: 4 additions & 4 deletions processor/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,11 @@ func (*mockWorkerHandle) stats() *processorStats {
DBReadThroughput: func(partition string) stats.Measurement {
return stats.Default.NewStat("db_read_throughput", stats.CountType)
},
StatDBReadOutOfOrder: func(partition string) stats.Measurement {
return stats.Default.NewStat("db_read_out_of_order", stats.CountType)
statDBReadOutOfOrder: func(partition string) stats.Measurement {
return stats.NOP.NewStat("db_read_out_of_order", stats.CountType)
},
StatDBReadOutOfSequence: func(partition string) stats.Measurement {
return stats.Default.NewStat("db_read_out_of_sequence", stats.CountType)
statDBReadOutOfSequence: func(partition string) stats.Measurement {
return stats.NOP.NewStat("db_read_out_of_sequence", stats.CountType)
},
}
}
Expand Down

0 comments on commit 04c92d1

Please sign in to comment.