Skip to content

Commit

Permalink
fix elapsed calculation for rescheduled jobs (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
kelindar committed Oct 11, 2023
1 parent 696ab26 commit 670aeb8
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 16 deletions.
10 changes: 5 additions & 5 deletions emit/event.go
Expand Up @@ -51,13 +51,13 @@ func (e fault) Type() uint32 {

var nextTimerID uint32 = 1 << 30

// timer represents a timer event
type timer struct {
// Timer represents a Timer event
type Timer struct {
ID uint32
}

// Type returns the type of the event
func (e timer) Type() uint32 {
func (e Timer) Type() uint32 {
return e.ID
}

Expand Down Expand Up @@ -97,12 +97,12 @@ func OnEvery(handler func(now time.Time, elapsed time.Duration) error, interval
}

// Subscribe to the timer event
cancel := OnType[timer](id, func(_ timer, now time.Time, elapsed time.Duration) error {
cancel := OnType[Timer](id, func(_ Timer, now time.Time, elapsed time.Duration) error {
return handler(now, elapsed)
})

// Start the timer
Every(timer{ID: id}, interval)
Every(Timer{ID: id}, interval)
return cancel
}

Expand Down
8 changes: 4 additions & 4 deletions example/event/main.go
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"
"time"

event "github.com/kelindar/timeline/emit"
"github.com/kelindar/timeline/emit"
)

// Custom event type
Expand All @@ -20,13 +20,13 @@ func (Message) Type() uint32 {
func main() {

// Emit the event immediately
event.Next(Message{Text: "Hello, World!"})
emit.Next(Message{Text: "Hello, World!"})

// Emit the event every second
event.Every(Message{Text: "Are we there yet?"}, 1*time.Second)
emit.Every(Message{Text: "Are we there yet?"}, 500*time.Millisecond)

// Subscribe and Handle the Event
cancel := event.On[Message](func(ev Message, now time.Time, elapsed time.Duration) error {
cancel := emit.On[Message](func(ev Message, now time.Time, elapsed time.Duration) error {
fmt.Printf("Received '%s' at %02d.%03d, elapsed=%v\n",
ev.Text,
now.Second(), now.UnixMilli()%1000, elapsed)
Expand Down
21 changes: 14 additions & 7 deletions timeline.go
Expand Up @@ -88,19 +88,21 @@ func (s *Scheduler) RunEveryAfter(task Task, interval, delay time.Duration) {
s.schedule(task, s.after(delay), durationOf(interval))
}

// ScheduleFunc schedules an event to be processed at a given time.
// schedule schedules an event to be processed at a given time.
func (s *Scheduler) schedule(event Task, when tick, repeat span) {
evt := job{
s.enqueueJob(job{
Task: event,
RunAt: when,
Since: span(when - s.now()),
Every: repeat,
}

bucket := s.bucketOf(evt.RunAt)
})
}

// enqueueJob adds a job to the queue.
func (s *Scheduler) enqueueJob(job job) {
bucket := s.bucketOf(job.RunAt)
bucket.mu.Lock()
bucket.queue = append(bucket.queue, evt)
bucket.queue = append(bucket.queue, job)
bucket.mu.Unlock()
}

Expand Down Expand Up @@ -139,7 +141,12 @@ func (s *Scheduler) Tick() time.Time {
bucket.queue[offset] = task
offset++
default: // different bucket
s.schedule(task.Task, nextTick, task.Every)
s.enqueueJob(job{
Task: task.Task,
RunAt: nextTick,
Since: task.Every,
Every: task.Every,
})
}
}
}
Expand Down
21 changes: 21 additions & 0 deletions timeline_test.go
Expand Up @@ -6,6 +6,7 @@ package timeline
import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -191,6 +192,26 @@ func TestRun(t *testing.T) {
assert.Equal(t, 2, count.Value())
}

func TestElapsed(t *testing.T) {
s := New()

var wg sync.WaitGroup
wg.Add(3)
s.RunEvery(func(now time.Time, elapsed time.Duration) bool {
fmt.Printf("Tick at %02d.%03d, elapsed=%v\n",
now.Second(), now.UnixMilli()%1000, elapsed)
assert.Equal(t, 10*time.Millisecond, elapsed)
wg.Done()
return true
}, 10*time.Millisecond)

s.Tick()
s.Tick()
s.Tick()
s.Tick()
wg.Wait()
}

func TestTickOf(t *testing.T) {
tc := map[tick]time.Duration{
0: 0,
Expand Down

0 comments on commit 670aeb8

Please sign in to comment.