Skip to content

Commit

Permalink
Add a subpackage for scheduling pub/sub events (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
kelindar committed Oct 7, 2023
1 parent d61a525 commit d7f2809
Show file tree
Hide file tree
Showing 8 changed files with 298 additions and 4 deletions.
50 changes: 50 additions & 0 deletions event/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
## Event Package for Timeline

The event package seamlessly integrates the timeline scheduler with event-driven programming. It allows you to emit and subscribe to events with precise timing, making it ideal for applications that require both event-driven architectures and time-based scheduling.

## Quick Start

Let's dive right in with a simple example to get you started with the event package.

```go
// Custom event type
type Message struct {
Text string
}

// Type returns the type of the event for the dispatcher
func (Message) Type() uint32 {
return 0x1
}

func main() {

// Emit the event immediately
event.Emit(Message{Text: "Hello, World!"})

// Emit the event every second
event.EmitEvery(Message{Text: "Are we there yet?"}, 1*time.Second)

// Subscribe and Handle the Event
cancel := event.On[Message](func(ev event.Event[Message]) {
fmt.Printf("Received '%s' at %02d.%03d, elapsed=%v\n",
ev.Data.Text,
ev.Time.Second(), ev.Time.UnixMilli()%1000, ev.Elapsed)
})
defer cancel() // Remember to unsubscribe when done

// Let the program run for a while to receive events
time.Sleep(5 * time.Second)
}
```

You will see similar output, with 'Are we there yet?' being emitted every second, and 'Hello, World!' being emitted immediately.

```
Received 'Hello, World!' at 21.060, elapsed=0s
Received 'Are we there yet?' at 22.000, elapsed=940ms
Received 'Are we there yet?' at 23.000, elapsed=1s
Received 'Are we there yet?' at 24.000, elapsed=1s
Received 'Are we there yet?' at 25.000, elapsed=1s
Received 'Are we there yet?' at 26.000, elapsed=1s
```
86 changes: 86 additions & 0 deletions event/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright (c) Roman Atachiants and contributors. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root

package event

import (
"context"
"time"

"github.com/kelindar/event"
"github.com/kelindar/timeline"
)

// Scheduler is the default scheduler used to emit events.
var Scheduler = func() *timeline.Scheduler {
s := timeline.New()
s.Start(context.Background())
return s
}()

// signal represents a signal event
type Event[T event.Event] struct {
Time time.Time // The time at which the event was emitted
Elapsed time.Duration // The time elapsed since the last event
Data T
}

// Type returns the type of the event
func (e Event[T]) Type() uint32 {
return e.Data.Type()
}

// emit writes an event into the dispatcher
func emit[T event.Event](ev T) func(now time.Time, elapsed time.Duration) bool {
return func(now time.Time, elapsed time.Duration) bool {
event.Publish(event.Default, Event[T]{
Data: ev,
Time: now,
Elapsed: elapsed,
})
return true
}
}

// On subscribes to an event, the type of the event will be automatically
// inferred from the provided type. Must be constant for this to work. This
// functions same way as Subscribe() but uses the default dispatcher instead.
func On[T event.Event](handler func(Event[T])) context.CancelFunc {
return event.Subscribe[Event[T]](event.Default, handler)
}

// OnType subscribes to an event with the specified event type. This functions
// same way as SubscribeTo() but uses the default dispatcher instead.
func OnType[T event.Event](eventType uint32, handler func(Event[T])) context.CancelFunc {
return event.SubscribeTo[Event[T]](event.Default, eventType, handler)
}

// Emit writes an event during the next tick.
func Emit[T event.Event](ev T) {
Scheduler.Run(emit(ev))
}

// EmitAt writes an event at specific 'at' time.
func EmitAt[T event.Event](ev T, at time.Time) {
Scheduler.RunAt(emit(ev), at)
}

// EmitAfter writes an event after a 'delay'.
func EmitAfter[T event.Event](ev T, after time.Duration) {
Scheduler.RunAfter(emit(ev), after)
}

// EmitEvery writes an event at 'interval' intervals, starting at the next boundary tick.
func EmitEvery[T event.Event](ev T, interval time.Duration) {
Scheduler.RunEvery(emit(ev), interval)
}

// EmitEveryAt writes an event at 'interval' intervals, starting at 'startTime'.
func EmitEveryAt[T event.Event](ev T, interval time.Duration, startTime time.Time) {
Scheduler.RunEveryAt(emit(ev), interval, startTime)
}

// EmitEveryAfter writes an event at 'interval' intervals after a 'delay'.
func EmitEveryAfter[T event.Event](ev T, interval time.Duration, delay time.Duration) {
Scheduler.RunEveryAfter(emit(ev), interval, delay)
}
115 changes: 115 additions & 0 deletions event/event_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package event

import (
"fmt"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

/*
cpu: 13th Gen Intel(R) Core(TM) i7-13700K
BenchmarkEvent/1x1-24 17777803 60.10 ns/op 16.45 million/s 170 B/op 1 allocs/op
BenchmarkEvent/1x10-24 16062572 162.3 ns/op 67.38 million/s 220 B/op 1 allocs/op
BenchmarkEvent/1x100-24 22221769 106.9 ns/op 56.41 million/s 205 B/op 1 allocs/op
BenchmarkEvent/10x1-24 3183024 459.8 ns/op 23.68 million/s 920 B/op 10 allocs/op
BenchmarkEvent/10x10-24 1465628 1136 ns/op 55.92 million/s 1981 B/op 10 allocs/op
BenchmarkEvent/10x100-24 1432830 1780 ns/op 61.69 million/s 2130 B/op 10 allocs/op
*/
func BenchmarkEvent(b *testing.B) {
for _, topics := range []int{1, 10} {
for _, subs := range []int{1, 10, 100} {
b.Run(fmt.Sprintf("%dx%d", topics, subs), func(b *testing.B) {
var count atomic.Int64
for i := 0; i < subs; i++ {
for id := 10; id < 10+topics; id++ {
defer OnType(uint32(id), func(ev Event[Dynamic]) {
count.Add(1)
})()
}
}

start := time.Now()
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
for id := 10; id < 10+topics; id++ {
Emit(Dynamic{ID: id})
}
}

elapsed := time.Since(start)
rate := float64(count.Load()) / 1e6 / elapsed.Seconds()
b.ReportMetric(rate, "million/s")
})
}
}
}

func TestEmit(t *testing.T) {
events := make(chan MyEvent2)
defer On(func(ev Event[MyEvent2]) {
assert.Equal(t, "Hello", ev.Data.Text)
events <- ev.Data
})()

// Emit the event
Emit(MyEvent2{Text: "Hello"})
<-events

EmitAt(MyEvent2{Text: "Hello"}, time.Now().Add(40*time.Millisecond))
<-events

EmitAfter(MyEvent2{Text: "Hello"}, 20*time.Millisecond)
<-events

EmitEveryAt(MyEvent2{Text: "Hello"}, 50*time.Millisecond, time.Now().Add(10*time.Millisecond))
<-events

EmitEveryAfter(MyEvent2{Text: "Hello"}, 30*time.Millisecond, 10*time.Millisecond)
<-events

EmitEvery(MyEvent2{Text: "Hello"}, 10*time.Millisecond)
<-events
}

func TestOnType(t *testing.T) {
events := make(chan Dynamic)
defer OnType(42, func(ev Event[Dynamic]) {
assert.Equal(t, 42, ev.Data.ID)
events <- ev.Data
})()

// Emit the event
Emit(Dynamic{ID: 42})
<-events
}

// ------------------------------------- Test Events -------------------------------------

const (
TypeEvent1 = 0x1
TypeEvent2 = 0x2
)

type MyEvent1 struct {
Number int
}

func (t MyEvent1) Type() uint32 { return TypeEvent1 }

type MyEvent2 struct {
Text string
}

func (t MyEvent2) Type() uint32 { return TypeEvent2 }

type Dynamic struct {
ID int
}

func (t Dynamic) Type() uint32 {
return uint32(t.ID)
}
38 changes: 38 additions & 0 deletions example/event/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package main

import (
"fmt"
"time"

"github.com/kelindar/timeline/event"
)

// Custom event type
type Message struct {
Text string
}

// Type returns the type of the event for the dispatcher
func (Message) Type() uint32 {
return 0x1
}

func main() {

// Emit the event immediately
event.Emit(Message{Text: "Hello, World!"})

// Emit the event every second
event.EmitEvery(Message{Text: "Are we there yet?"}, 1*time.Second)

// Subscribe and Handle the Event
cancel := event.On[Message](func(ev event.Event[Message]) {
fmt.Printf("Received '%s' at %02d.%03d, elapsed=%v\n",
ev.Data.Text,
ev.Time.Second(), ev.Time.UnixMilli()%1000, ev.Elapsed)
})
defer cancel() // Remember to unsubscribe when done

// Let the program run for a while to receive events
time.Sleep(5 * time.Second)
}
File renamed without changes.
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ module github.com/kelindar/timeline

go 1.20

require github.com/stretchr/testify v1.8.4
require (
github.com/kelindar/event v1.4.1
github.com/stretchr/testify v1.8.4
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/kelindar/event v1.4.1 h1:3kAHmGVRBO/TyKFc20UxsxDFgZMs7E/CjMUR2Hvj2+g=
github.com/kelindar/event v1.4.1/go.mod h1:UxWPQjWK8u0o9Z3ponm2mgREimM95hm26/M9z8F488Q=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
Expand Down
6 changes: 3 additions & 3 deletions timeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ func (s *Scheduler) Run(task Task) {
s.schedule(task, s.now(), 0)
}

// RunAt schedules a task for a specific 'when' time.
func (s *Scheduler) RunAt(task Task, when time.Time) {
s.schedule(task, tickOf(when), 0)
// RunAt schedules a task for a specific 'at' time.
func (s *Scheduler) RunAt(task Task, at time.Time) {
s.schedule(task, tickOf(at), 0)
}

// RunAfter schedules a task to run after a 'delay'.
Expand Down

0 comments on commit d7f2809

Please sign in to comment.