Skip to content

Commit

Permalink
feat(scheduler): add a channel to handle misfired jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn committed Apr 13, 2024
1 parent e55b059 commit a68b809
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 1 deletion.
14 changes: 13 additions & 1 deletion quartz/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ type StdSchedulerOptions struct {
// using a custom implementation of the JobQueue, where operations
// may timeout or fail.
RetryInterval time.Duration

// MisfiredChan allows the creation of event listeners to handle jobs that
// have failed to be executed on time and have been skipped by the scheduler.
//
// Misfires can occur due to insufficient resources or scheduler downtime.
// Adjust OutdatedThreshold to establish an acceptable delay time and
// ensure regular job execution.
MisfiredChan chan ScheduledJob
}

// Verify StdScheduler satisfies the Scheduler interface.
Expand Down Expand Up @@ -516,7 +524,11 @@ func (sched *StdScheduler) validateJob(job ScheduledJob) (bool, func() (int64, e
now := NowNano()
if job.NextRunTime() < now-sched.opts.OutdatedThreshold.Nanoseconds() {
duration := time.Duration(now - job.NextRunTime())
logger.Debugf("Job %s skipped as outdated %s.", job.JobDetail().jobKey, duration)
logger.Debugf("Job %s is outdated %s.", job.JobDetail().jobKey, duration)
select {
case sched.opts.MisfiredChan <- job:
default:
}
return false, func() (int64, error) { return job.Trigger().NextFireTime(now) }
} else if job.NextRunTime() > now {
logger.Debugf("Job %s is not due to run yet.", job.JobDetail().jobKey)
Expand Down
26 changes: 26 additions & 0 deletions quartz/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,32 @@ func TestScheduler_JobWithRetriesCtxDone(t *testing.T) {
sched.Stop()
}

func TestScheduler_MisfiredJob(t *testing.T) {
funcJob := job.NewFunctionJob(func(_ context.Context) (string, error) {
time.Sleep(20 * time.Millisecond)
return "ok", nil
})

misfiredChan := make(chan quartz.ScheduledJob, 1)
sched := quartz.NewStdSchedulerWithOptions(quartz.StdSchedulerOptions{
BlockingExecution: true,
OutdatedThreshold: time.Millisecond,
RetryInterval: time.Millisecond,
MisfiredChan: misfiredChan,
}, nil)

jobDetail := quartz.NewJobDetail(funcJob, quartz.NewJobKey("funcJob"))
err := sched.ScheduleJob(jobDetail, quartz.NewSimpleTrigger(2*time.Millisecond))
assert.IsNil(t, err)

sched.Start(context.Background())

job := <-misfiredChan
assert.Equal(t, job.JobDetail().JobKey().Name(), "funcJob")

sched.Stop()
}

func TestScheduler_PauseResume(t *testing.T) {
var n int32
funcJob := job.NewFunctionJob(func(_ context.Context) (string, error) {
Expand Down

0 comments on commit a68b809

Please sign in to comment.