Skip to content

Commit

Permalink
feat: add contexts to interfaces (#41)
Browse files Browse the repository at this point in the history
* feat: add contexts to interfaces

* add tests

* Update quartz/scheduler.go

Co-authored-by: Eugene R. <reugpro@gmail.com>

* fix lint

* fix test!

Co-authored-by: Eugene R. <reugpro@gmail.com>
  • Loading branch information
tychoish and reugn committed Dec 22, 2022
1 parent 56c412a commit d16e8d9
Show file tree
Hide file tree
Showing 9 changed files with 271 additions and 80 deletions.
1 change: 1 addition & 0 deletions .golangci.yml
Expand Up @@ -36,3 +36,4 @@ issues:
- errcheck
- unparam
- prealloc
- funlen
19 changes: 14 additions & 5 deletions README.md
Expand Up @@ -13,8 +13,10 @@ Inspired by the [Quartz](https://github.com/quartz-scheduler/quartz) Java schedu
Scheduler interface
```go
type Scheduler interface {
// Start starts the scheduler.
Start()
// Start starts the scheduler. The scheduler will run until
// the Stop method is called or the context is canceled. Use
// the Wait method to block until all running jobs have completed.
Start(context.Context)
// IsStarted determines whether the scheduler has been started.
IsStarted() bool
// ScheduleJob schedules a job using a specified trigger.
Expand All @@ -29,6 +31,11 @@ type Scheduler interface {
Clear()
// Stop shutdowns the scheduler.
Stop()
// Wait blocks until the scheduler stops running and all jobs
// have returned. Wait will return when the context passed to
// it has expired. Until the context passed to start is
// cancelled or Stop is called directly.
Wait(context.Context)
}
```
Implemented Schedulers
Expand All @@ -52,7 +59,7 @@ Job interface. Any type that implements it can be scheduled.
```go
type Job interface {
// Execute is called by a Scheduler when the Trigger associated with this job fires.
Execute()
Execute(context.Context)
// Description returns the description of the Job.
Description() string
// Key returns the unique key for the Job.
Expand All @@ -77,16 +84,18 @@ Implemented Jobs

## Examples
```go
ctx := context.Background()
sched := quartz.NewStdScheduler()
sched.Start()
sched.Start(ctx)
cronTrigger, _ := quartz.NewCronTrigger("1/5 * * * * *")
shellJob := quartz.NewShellJob("ls -la")
curlJob, _ := quartz.NewCurlJob(http.MethodGet, "http://worldclockapi.com/api/json/est/now", "", nil)
functionJob := quartz.NewFunctionJob(func() (int, error) { return 42, nil })
functionJob := quartz.NewFunctionJob(func(_ context.Context) (int, error) { return 42, nil })
sched.ScheduleJob(shellJob, cronTrigger)
sched.ScheduleJob(curlJob, quartz.NewSimpleTrigger(time.Second*7))
sched.ScheduleJob(functionJob, quartz.NewSimpleTrigger(time.Second*5))
sched.Stop()
sched.Wait(ctx)
```
More code samples can be found in the examples directory.

Expand Down
25 changes: 16 additions & 9 deletions examples/main.go
@@ -1,6 +1,7 @@
package main

import (
"context"
"fmt"
"net/http"
"sync"
Expand All @@ -10,16 +11,20 @@ import (
)

func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg := new(sync.WaitGroup)
wg.Add(2)

go sampleJobs(wg)
go sampleScheduler(wg)
go sampleJobs(ctx, wg)
go sampleScheduler(ctx, wg)

wg.Wait()
}

func sampleScheduler(wg *sync.WaitGroup) {
func sampleScheduler(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()

sched := quartz.NewStdScheduler()
cronTrigger, err := quartz.NewCronTrigger("1/3 * * * * *")
if err != nil {
Expand All @@ -28,7 +33,8 @@ func sampleScheduler(wg *sync.WaitGroup) {
}

cronJob := PrintJob{"Cron job"}
sched.Start()
sched.Start(ctx)

sched.ScheduleJob(&PrintJob{"Ad hoc Job"}, quartz.NewRunOnceTrigger(time.Second*5))
sched.ScheduleJob(&PrintJob{"First job"}, quartz.NewSimpleTrigger(time.Second*12))
sched.ScheduleJob(&PrintJob{"Second job"}, quartz.NewSimpleTrigger(time.Second*6))
Expand All @@ -50,12 +56,13 @@ func sampleScheduler(wg *sync.WaitGroup) {

time.Sleep(time.Second * 2)
sched.Stop()
wg.Done()
sched.Wait(ctx)
}

func sampleJobs(wg *sync.WaitGroup) {
func sampleJobs(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
sched := quartz.NewStdScheduler()
sched.Start()
sched.Start(ctx)

cronTrigger, err := quartz.NewCronTrigger("1/5 * * * * *")
if err != nil {
Expand All @@ -69,7 +76,7 @@ func sampleJobs(wg *sync.WaitGroup) {
fmt.Println(err)
return
}
functionJob := quartz.NewFunctionJobWithDesc("42", func() (int, error) { return 42, nil })
functionJob := quartz.NewFunctionJobWithDesc("42", func(_ context.Context) (int, error) { return 42, nil })

sched.ScheduleJob(shellJob, cronTrigger)
sched.ScheduleJob(curlJob, quartz.NewSimpleTrigger(time.Second*7))
Expand All @@ -84,5 +91,5 @@ func sampleJobs(wg *sync.WaitGroup) {

time.Sleep(time.Second * 2)
sched.Stop()
wg.Done()
sched.Wait(ctx)
}
3 changes: 2 additions & 1 deletion examples/print_job.go
@@ -1,6 +1,7 @@
package main

import (
"context"
"fmt"

"github.com/reugn/go-quartz/quartz"
Expand All @@ -22,6 +23,6 @@ func (pj *PrintJob) Key() int {
}

// Execute is called by a Scheduler when the Trigger associated with this job fires.
func (pj *PrintJob) Execute() {
func (pj *PrintJob) Execute(_ context.Context) {
fmt.Println("Executing " + pj.Description())
}
7 changes: 4 additions & 3 deletions quartz/function_job.go
@@ -1,11 +1,12 @@
package quartz

import (
"context"
"fmt"
)

// Function represents an argument-less function which returns a generic type R and a possible error.
type Function[R any] func() (R, error)
type Function[R any] func(context.Context) (R, error)

// FunctionJob represents a Job that invokes the passed Function, implements the quartz.Job interface.
type FunctionJob[R any] struct {
Expand Down Expand Up @@ -50,8 +51,8 @@ func (f *FunctionJob[R]) Key() int {

// Execute is called by a Scheduler when the Trigger associated with this job fires.
// It invokes the held function, setting the results in Result and Error members.
func (f *FunctionJob[R]) Execute() {
result, err := (*f.function)()
func (f *FunctionJob[R]) Execute(ctx context.Context) {
result, err := (*f.function)(ctx)
if err != nil {
f.JobStatus = FAILURE
f.Result = nil
Expand Down
48 changes: 45 additions & 3 deletions quartz/function_job_test.go
@@ -1,26 +1,31 @@
package quartz_test

import (
"context"
"errors"
"testing"
"time"

"github.com/reugn/go-quartz/quartz"
)

func TestFunctionJob(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var n = 2
funcJob1 := quartz.NewFunctionJob(func() (string, error) {
funcJob1 := quartz.NewFunctionJob(func(_ context.Context) (string, error) {
n += 2
return "fired1", nil
})

funcJob2 := quartz.NewFunctionJob(func() (int, error) {
funcJob2 := quartz.NewFunctionJob(func(_ context.Context) (int, error) {
n += 2
return 42, nil
})

sched := quartz.NewStdScheduler()
sched.Start()
sched.Start(ctx)
sched.ScheduleJob(funcJob1, quartz.NewRunOnceTrigger(time.Millisecond*300))
sched.ScheduleJob(funcJob2, quartz.NewRunOnceTrigger(time.Millisecond*800))
time.Sleep(time.Second)
Expand All @@ -37,3 +42,40 @@ func TestFunctionJob(t *testing.T) {

assertEqual(t, n, 6)
}

func TestFunctionJobRespectsContext(t *testing.T) {
var n int
funcJob2 := quartz.NewFunctionJob(func(ctx context.Context) (bool, error) {
timer := time.NewTimer(time.Hour)
defer timer.Stop()
select {
case <-ctx.Done():
n--
return false, ctx.Err()
case <-timer.C:
n++
return true, nil
}
})

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sig := make(chan struct{})
go func() { defer close(sig); funcJob2.Execute(ctx) }()

if n != 0 {
t.Fatal("job should not have run yet")
}
cancel()
<-sig

if n != -1 {
t.Fatal("job side effect should have reflected cancelation:", n)
}
if !errors.Is(funcJob2.Error, context.Canceled) {
t.Fatal("unexpected error function", funcJob2.Error)
}
if funcJob2.Result != nil {
t.Fatal("errored jobs should not return values")
}
}
10 changes: 6 additions & 4 deletions quartz/job.go
Expand Up @@ -2,6 +2,7 @@ package quartz

import (
"bytes"
"context"
"fmt"
"io"
"net/http"
Expand All @@ -12,7 +13,7 @@ import (
// to be performed.
type Job interface {
// Execute is called by a Scheduler when the Trigger associated with this job fires.
Execute()
Execute(context.Context)

// Description returns the description of the Job.
Description() string
Expand Down Expand Up @@ -63,8 +64,8 @@ func (sh *ShellJob) Key() int {
}

// Execute is called by a Scheduler when the Trigger associated with this job fires.
func (sh *ShellJob) Execute() {
out, err := exec.Command("sh", "-c", sh.Cmd).Output()
func (sh *ShellJob) Execute(ctx context.Context) {
out, err := exec.CommandContext(ctx, "sh", "-c", sh.Cmd).Output()
if err != nil {
sh.JobStatus = FAILURE
sh.Result = err.Error()
Expand Down Expand Up @@ -128,8 +129,9 @@ func (cu *CurlJob) Key() int {
}

// Execute is called by a Scheduler when the Trigger associated with this job fires.
func (cu *CurlJob) Execute() {
func (cu *CurlJob) Execute(ctx context.Context) {
client := &http.Client{}
cu.request = cu.request.WithContext(ctx)
resp, err := client.Do(cu.request)
if err != nil {
cu.JobStatus = FAILURE
Expand Down

0 comments on commit d16e8d9

Please sign in to comment.