diff --git a/tasks.go b/tasks.go index a99c334..0d385e9 100644 --- a/tasks.go +++ b/tasks.go @@ -1,11 +1,17 @@ /* -Package tasks is an easy to use in-process task scheduler for Go. +Package tasks is an easy to use in-process task scheduler for Go. Focused on interval based execution, Tasks +provides both recurring and one-time task scheduling. -This package provides both recurring and one-time task execution. Tasks are run within their own goroutine which -improves time accuracy for execution. This package also allows users to specify error handling though custom error -functions. +Tasks is focused on accuracy of task execution. To do this each task is called within it's own goroutine. +This ensures that long execution of a single invocation does not throw the schedule as a whole off track. -Below is a simple example of starting the scheduler and registering a new task. +As usage of this scheduler scales, it is expected to have a larger number of sleeping goroutines. As it is +designed to leverage Go's ability to optimize goroutine CPU scheduling. + +For simplicity this task scheduler uses the time.Duration type to specify intervals. This allows for a simple +interface and flexible control over when tasks are executed. + +Below is an example of starting the scheduler and registering a new task that runs every 30 seconds. // Start the Scheduler scheduler := tasks.New() @@ -17,31 +23,52 @@ Below is a simple example of starting the scheduler and registering a new task. TaskFunc: func() error { // Put your logic here }(), - ErrFunc: func(err error) { - // Put custom error handling here - }(), }) if err != nil { // Do Stuff } -For simplicity this task scheduler uses the time.Duration type to specify intervals. This allows for a simple interface -and flexible control over when tasks are executed. -The below example shows scheduling a task to run only once 30 days from now. +Sometimes schedules need to started at a later time. This package provides the ability to start a task only after +a certain time. The below example shows this in practice. + + // Add a recurring task for every 30 days, starting 30 days from now + id, err := scheduler.Add(&tasks.Task{ + Interval: time.Duration(30 * (24 * time.Hour)), + StartAfter: time.Now().Add(30 * (24 * time.Hour)), + TaskFunc: func() error { + // Put your logic here + }(), + }) + if err != nil { + // Do Stuff - // Define time to execute - t := time.Now().Add(30 * (24 * time.Hour)) +It is also common for applications to run a task only once. The below example shows scheduling a task to run only once after +waiting for 60 seconds. - // Add a one time only task for 30 days from now + // Add a one time only task for 60 seconds from now id, err := scheduler.Add(&tasks.Task{ - Interval: time.Until(t), + Interval: time.Duration(60 * time.Second) RunOnce: true, TaskFunc: func() error { // Put your logic here }(), - ErrFunc: func(err error) { - // Put custom error handling here + }) + if err != nil { + // Do Stuff + +One powerful feature of Tasks is that it allows users to specify custom error handling. This is done by allowing users to +define a function that is called when a task returns an error. The below example shows scheduling a task that logs when an +error occurs. + + // Add a task with custom error handling + id, err := scheduler.Add(&tasks.Task{ + Interval: time.Duration(30 * time.Second), + TaskFunc: func() error { + // Put your logic here + }(), + ErrFunc: func(e error) { + log.Printf("An error occured when executing task %s - %s", id, e) }(), }) if err != nil { @@ -86,6 +113,10 @@ type Task struct { // the task self deleting. RunOnce bool + // StartAfter is used to specify a start time for the scheduler. When set, tasks will wait for the specified + // time to start the schedule ticker. + StartAfter time.Time + // TaskFunc is the user defined function to execute as part of this task. TaskFunc func() error @@ -147,7 +178,6 @@ func (schd *Scheduler) Add(t *Task) (string, error) { if t.Interval <= time.Duration(0) { return "", fmt.Errorf("task interval must be defined") } - t.ticker = time.NewTicker(t.Interval) // Create Context used to cancel downstream Goroutines t.ctx, t.cancel = context.WithCancel(context.Background()) @@ -163,9 +193,11 @@ func (schd *Scheduler) Add(t *Task) (string, error) { } t.id = id.String() schd.tasks[t.id] = t - go schd.execTask(t) - return t.id, nil + break } + + go schd.scheduleTask(t) + return t.id, nil } // Del will unschedule the specified task and remove it from the task list. Deletion will prevent future invocations of @@ -179,7 +211,9 @@ func (schd *Scheduler) Del(name string) { // Stop the task defer t.cancel() - defer t.ticker.Stop() + if t.ticker != nil { + defer t.ticker.Stop() + } // Remove from task list schd.Lock() @@ -187,7 +221,6 @@ func (schd *Scheduler) Del(name string) { if _, ok := schd.tasks[name]; ok { delete(schd.tasks, name) } - return } // Lookup will find the specified task from the internal task list using the task ID provided. @@ -222,6 +255,19 @@ func (schd *Scheduler) Stop() { } } +// scheduleTask creates the underlying scheduled task. If StartAfter is set, this routine will wait until the +// time specified. +func (schd *Scheduler) scheduleTask(t *Task) { + select { + case <-time.After(time.Until(t.StartAfter)): + t.ticker = time.NewTicker(t.Interval) + go schd.execTask(t) + return + case <-t.ctx.Done(): + return + } +} + // execTask is the underlying scheduler, it is used to trigger and execute tasks. func (schd *Scheduler) execTask(t *Task) { for { diff --git a/tasks_test.go b/tasks_test.go index cc298bd..1ca5635 100644 --- a/tasks_test.go +++ b/tasks_test.go @@ -15,7 +15,7 @@ func TestAdd(t *testing.T) { id, err := scheduler.Add(&Task{ Interval: time.Duration(1 * time.Minute), TaskFunc: func() error { return nil }, - ErrFunc: func(e error) { return }, + ErrFunc: func(e error) {}, }) if err != nil { t.Errorf("Unexpected errors when scheduling a valid task - %s", err) @@ -39,7 +39,7 @@ func TestAdd(t *testing.T) { t.Run("Check for nil callback", func(t *testing.T) { _, err := scheduler.Add(&Task{ Interval: time.Duration(1 * time.Minute), - ErrFunc: func(e error) { return }, + ErrFunc: func(e error) {}, }) if err == nil { t.Errorf("Unexpected success when scheduling an invalid task - %s", err) @@ -49,7 +49,7 @@ func TestAdd(t *testing.T) { t.Run("Check for nil interval", func(t *testing.T) { _, err := scheduler.Add(&Task{ TaskFunc: func() error { return nil }, - ErrFunc: func(e error) { return }, + ErrFunc: func(e error) {}, }) if err == nil { t.Errorf("Unexpected success when scheduling an invalid task - %s", err) @@ -72,7 +72,7 @@ func TestScheduler(t *testing.T) { doneCh <- struct{}{} return nil }, - ErrFunc: func(e error) { return }, + ErrFunc: func(e error) {}, }) if err != nil { t.Errorf("Unexpected errors when scheduling a valid task - %s", err) @@ -90,6 +90,40 @@ func TestScheduler(t *testing.T) { } }) + t.Run("Verify StartAfter works as expected", func(t *testing.T) { + // Channel for orchestrating when the task ran + doneCh := make(chan struct{}) + + // Create a Start time + sa := time.Now().Add(10 * time.Second) + + // Setup A task + id, err := scheduler.Add(&Task{ + Interval: time.Duration(1 * time.Second), + StartAfter: sa, + TaskFunc: func() error { + doneCh <- struct{}{} + return nil + }, + ErrFunc: func(e error) {}, + }) + if err != nil { + t.Errorf("Unexpected errors when scheduling a valid task - %s", err) + } + defer scheduler.Del(id) + + // Make sure it runs especially when we want it too + select { + case <-doneCh: + if time.Now().Before(sa) { + t.Errorf("Task executed before the defined start time now %s, supposed to be %s", time.Now().String(), sa.String()) + } + return + case <-time.After(15 * time.Second): + t.Errorf("Scheduler failed to execute the scheduled tasks within 15 seconds") + } + }) + t.Run("Verify Tasks Dont run when Deleted", func(t *testing.T) { // Channel for orchestrating when the task ran doneCh := make(chan struct{}) @@ -101,7 +135,7 @@ func TestScheduler(t *testing.T) { doneCh <- struct{}{} return nil }, - ErrFunc: func(e error) { return }, + ErrFunc: func(e error) {}, }) if err != nil { t.Errorf("Unexpected errors when scheduling a valid task - %s", err) @@ -140,7 +174,7 @@ func TestScheduler(t *testing.T) { doneCh <- struct{}{} return nil }, - ErrFunc: func(e error) { return }, + ErrFunc: func(e error) {}, }) if err != nil { t.Errorf("Unexpected errors when scheduling a valid task - %s", err)