Skip to content

Commit

Permalink
Merge pull request #1 from madflojo/start-after
Browse files Browse the repository at this point in the history
Start scheduler after X moment in time
  • Loading branch information
madflojo committed Dec 25, 2019
2 parents f50719e + 104cdef commit badba41
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 28 deletions.
90 changes: 68 additions & 22 deletions tasks.go
@@ -1,11 +1,17 @@
/*
Package tasks is an easy to use in-process task scheduler for Go.
Package tasks is an easy to use in-process task scheduler for Go. Focused on interval based execution, Tasks
provides both recurring and one-time task scheduling.
This package provides both recurring and one-time task execution. Tasks are run within their own goroutine which
improves time accuracy for execution. This package also allows users to specify error handling though custom error
functions.
Tasks is focused on accuracy of task execution. To do this each task is called within it's own goroutine.
This ensures that long execution of a single invocation does not throw the schedule as a whole off track.
Below is a simple example of starting the scheduler and registering a new task.
As usage of this scheduler scales, it is expected to have a larger number of sleeping goroutines. As it is
designed to leverage Go's ability to optimize goroutine CPU scheduling.
For simplicity this task scheduler uses the time.Duration type to specify intervals. This allows for a simple
interface and flexible control over when tasks are executed.
Below is an example of starting the scheduler and registering a new task that runs every 30 seconds.
// Start the Scheduler
scheduler := tasks.New()
Expand All @@ -17,31 +23,52 @@ Below is a simple example of starting the scheduler and registering a new task.
TaskFunc: func() error {
// Put your logic here
}(),
ErrFunc: func(err error) {
// Put custom error handling here
}(),
})
if err != nil {
// Do Stuff
}
For simplicity this task scheduler uses the time.Duration type to specify intervals. This allows for a simple interface
and flexible control over when tasks are executed.
The below example shows scheduling a task to run only once 30 days from now.
Sometimes schedules need to started at a later time. This package provides the ability to start a task only after
a certain time. The below example shows this in practice.
// Add a recurring task for every 30 days, starting 30 days from now
id, err := scheduler.Add(&tasks.Task{
Interval: time.Duration(30 * (24 * time.Hour)),
StartAfter: time.Now().Add(30 * (24 * time.Hour)),
TaskFunc: func() error {
// Put your logic here
}(),
})
if err != nil {
// Do Stuff
// Define time to execute
t := time.Now().Add(30 * (24 * time.Hour))
It is also common for applications to run a task only once. The below example shows scheduling a task to run only once after
waiting for 60 seconds.
// Add a one time only task for 30 days from now
// Add a one time only task for 60 seconds from now
id, err := scheduler.Add(&tasks.Task{
Interval: time.Until(t),
Interval: time.Duration(60 * time.Second)
RunOnce: true,
TaskFunc: func() error {
// Put your logic here
}(),
ErrFunc: func(err error) {
// Put custom error handling here
})
if err != nil {
// Do Stuff
One powerful feature of Tasks is that it allows users to specify custom error handling. This is done by allowing users to
define a function that is called when a task returns an error. The below example shows scheduling a task that logs when an
error occurs.
// Add a task with custom error handling
id, err := scheduler.Add(&tasks.Task{
Interval: time.Duration(30 * time.Second),
TaskFunc: func() error {
// Put your logic here
}(),
ErrFunc: func(e error) {
log.Printf("An error occured when executing task %s - %s", id, e)
}(),
})
if err != nil {
Expand Down Expand Up @@ -86,6 +113,10 @@ type Task struct {
// the task self deleting.
RunOnce bool

// StartAfter is used to specify a start time for the scheduler. When set, tasks will wait for the specified
// time to start the schedule ticker.
StartAfter time.Time

// TaskFunc is the user defined function to execute as part of this task.
TaskFunc func() error

Expand Down Expand Up @@ -147,7 +178,6 @@ func (schd *Scheduler) Add(t *Task) (string, error) {
if t.Interval <= time.Duration(0) {
return "", fmt.Errorf("task interval must be defined")
}
t.ticker = time.NewTicker(t.Interval)

// Create Context used to cancel downstream Goroutines
t.ctx, t.cancel = context.WithCancel(context.Background())
Expand All @@ -163,9 +193,11 @@ func (schd *Scheduler) Add(t *Task) (string, error) {
}
t.id = id.String()
schd.tasks[t.id] = t
go schd.execTask(t)
return t.id, nil
break
}

go schd.scheduleTask(t)
return t.id, nil
}

// Del will unschedule the specified task and remove it from the task list. Deletion will prevent future invocations of
Expand All @@ -179,15 +211,16 @@ func (schd *Scheduler) Del(name string) {

// Stop the task
defer t.cancel()
defer t.ticker.Stop()
if t.ticker != nil {
defer t.ticker.Stop()
}

// Remove from task list
schd.Lock()
defer schd.Unlock()
if _, ok := schd.tasks[name]; ok {
delete(schd.tasks, name)
}
return
}

// Lookup will find the specified task from the internal task list using the task ID provided.
Expand Down Expand Up @@ -222,6 +255,19 @@ func (schd *Scheduler) Stop() {
}
}

// scheduleTask creates the underlying scheduled task. If StartAfter is set, this routine will wait until the
// time specified.
func (schd *Scheduler) scheduleTask(t *Task) {
select {
case <-time.After(time.Until(t.StartAfter)):
t.ticker = time.NewTicker(t.Interval)
go schd.execTask(t)
return
case <-t.ctx.Done():
return
}
}

// execTask is the underlying scheduler, it is used to trigger and execute tasks.
func (schd *Scheduler) execTask(t *Task) {
for {
Expand Down
46 changes: 40 additions & 6 deletions tasks_test.go
Expand Up @@ -15,7 +15,7 @@ func TestAdd(t *testing.T) {
id, err := scheduler.Add(&Task{
Interval: time.Duration(1 * time.Minute),
TaskFunc: func() error { return nil },
ErrFunc: func(e error) { return },
ErrFunc: func(e error) {},
})
if err != nil {
t.Errorf("Unexpected errors when scheduling a valid task - %s", err)
Expand All @@ -39,7 +39,7 @@ func TestAdd(t *testing.T) {
t.Run("Check for nil callback", func(t *testing.T) {
_, err := scheduler.Add(&Task{
Interval: time.Duration(1 * time.Minute),
ErrFunc: func(e error) { return },
ErrFunc: func(e error) {},
})
if err == nil {
t.Errorf("Unexpected success when scheduling an invalid task - %s", err)
Expand All @@ -49,7 +49,7 @@ func TestAdd(t *testing.T) {
t.Run("Check for nil interval", func(t *testing.T) {
_, err := scheduler.Add(&Task{
TaskFunc: func() error { return nil },
ErrFunc: func(e error) { return },
ErrFunc: func(e error) {},
})
if err == nil {
t.Errorf("Unexpected success when scheduling an invalid task - %s", err)
Expand All @@ -72,7 +72,7 @@ func TestScheduler(t *testing.T) {
doneCh <- struct{}{}
return nil
},
ErrFunc: func(e error) { return },
ErrFunc: func(e error) {},
})
if err != nil {
t.Errorf("Unexpected errors when scheduling a valid task - %s", err)
Expand All @@ -90,6 +90,40 @@ func TestScheduler(t *testing.T) {
}
})

t.Run("Verify StartAfter works as expected", func(t *testing.T) {
// Channel for orchestrating when the task ran
doneCh := make(chan struct{})

// Create a Start time
sa := time.Now().Add(10 * time.Second)

// Setup A task
id, err := scheduler.Add(&Task{
Interval: time.Duration(1 * time.Second),
StartAfter: sa,
TaskFunc: func() error {
doneCh <- struct{}{}
return nil
},
ErrFunc: func(e error) {},
})
if err != nil {
t.Errorf("Unexpected errors when scheduling a valid task - %s", err)
}
defer scheduler.Del(id)

// Make sure it runs especially when we want it too
select {
case <-doneCh:
if time.Now().Before(sa) {
t.Errorf("Task executed before the defined start time now %s, supposed to be %s", time.Now().String(), sa.String())
}
return
case <-time.After(15 * time.Second):
t.Errorf("Scheduler failed to execute the scheduled tasks within 15 seconds")
}
})

t.Run("Verify Tasks Dont run when Deleted", func(t *testing.T) {
// Channel for orchestrating when the task ran
doneCh := make(chan struct{})
Expand All @@ -101,7 +135,7 @@ func TestScheduler(t *testing.T) {
doneCh <- struct{}{}
return nil
},
ErrFunc: func(e error) { return },
ErrFunc: func(e error) {},
})
if err != nil {
t.Errorf("Unexpected errors when scheduling a valid task - %s", err)
Expand Down Expand Up @@ -140,7 +174,7 @@ func TestScheduler(t *testing.T) {
doneCh <- struct{}{}
return nil
},
ErrFunc: func(e error) { return },
ErrFunc: func(e error) {},
})
if err != nil {
t.Errorf("Unexpected errors when scheduling a valid task - %s", err)
Expand Down

0 comments on commit badba41

Please sign in to comment.