Skip to content

Commit

Permalink
improve event emit package (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
kelindar committed Oct 8, 2023
1 parent 2b30349 commit ae75898
Show file tree
Hide file tree
Showing 8 changed files with 315 additions and 232 deletions.
37 changes: 19 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,23 +59,23 @@ time.Sleep(10 * time.Second)
It outputs:

```
Task executed at 22:04.400, elapsed=0s
Task executed at 22:05.000, elapsed=600ms
Task executed at 22:06.000, elapsed=1s
Task executed at 22:07.000, elapsed=1s
Task executed at 22:08.000, elapsed=1s
Task executed at 22:09.000, elapsed=1s
Task executed at 22:09.400, elapsed=5s
Task executed at 22:10.000, elapsed=1s
Task executed at 22:11.000, elapsed=1s
Task executed at 22:12.000, elapsed=1s
Task executed at 22:13.000, elapsed=1s
Task executed at 22:14.000, elapsed=1s
Task executed at 04.400, elapsed=0s
Task executed at 05.000, elapsed=600ms
Task executed at 06.000, elapsed=1s
Task executed at 07.000, elapsed=1s
Task executed at 08.000, elapsed=1s
Task executed at 09.000, elapsed=1s
Task executed at 09.400, elapsed=5s
Task executed at 10.000, elapsed=1s
Task executed at 11.000, elapsed=1s
Task executed at 12.000, elapsed=1s
Task executed at 13.000, elapsed=1s
Task executed at 14.000, elapsed=1s
```

## Event Scheduling (Integration)

The [github.com/kelindar/timeline/event](https://github.com/kelindar/timeline/tree/main/event) sub-package seamlessly integrates the timeline scheduler with event-driven programming. It allows you to emit and subscribe to events with precise timing, making it ideal for applications that require both event-driven architectures and time-based scheduling.
The [github.com/kelindar/timeline/emit](https://github.com/kelindar/timeline/tree/main/emit) sub-package seamlessly integrates the timeline scheduler with event-driven programming. It allows you to emit and subscribe to events with precise timing, making it ideal for applications that require both event-driven architectures and time-based scheduling.

```go
// Custom event type
Expand All @@ -91,16 +91,17 @@ func (Message) Type() uint32 {
func main() {

// Emit the event immediately
event.Emit(Message{Text: "Hello, World!"})
event.Next(Message{Text: "Hello, World!"})

// Emit the event every second
event.EmitEvery(Message{Text: "Are we there yet?"}, 1*time.Second)
event.Every(Message{Text: "Are we there yet?"}, 1*time.Second)

// Subscribe and Handle the Event
cancel := event.On[Message](func(ev event.Event[Message]) {
cancel := event.On[Message](func(ev Message, now time.Time, elapsed time.Duration) error {
fmt.Printf("Received '%s' at %02d.%03d, elapsed=%v\n",
ev.Data.Text,
ev.Time.Second(), ev.Time.UnixMilli()%1000, ev.Elapsed)
ev.Text,
now.Second(), now.UnixMilli()%1000, elapsed)
return nil
})
defer cancel() // Remember to unsubscribe when done

Expand Down
12 changes: 7 additions & 5 deletions event/README.md → emit/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,24 @@ func (Message) Type() uint32 {
func main() {

// Emit the event immediately
event.Emit(Message{Text: "Hello, World!"})
event.Next(Message{Text: "Hello, World!"})

// Emit the event every second
event.EmitEvery(Message{Text: "Are we there yet?"}, 1*time.Second)
event.Every(Message{Text: "Are we there yet?"}, 1*time.Second)

// Subscribe and Handle the Event
cancel := event.On[Message](func(ev event.Event[Message]) {
cancel := event.On[Message](func(ev Message, now time.Time, elapsed time.Duration) error {
fmt.Printf("Received '%s' at %02d.%03d, elapsed=%v\n",
ev.Data.Text,
ev.Time.Second(), ev.Time.UnixMilli()%1000, ev.Elapsed)
ev.Text,
now.Second(), now.UnixMilli()%1000, elapsed)
return nil
})
defer cancel() // Remember to unsubscribe when done

// Let the program run for a while to receive events
time.Sleep(5 * time.Second)
}

```

You will see similar output, with 'Are we there yet?' being emitted every second, and 'Hello, World!' being emitted immediately.
Expand Down
127 changes: 127 additions & 0 deletions emit/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright (c) Roman Atachiants and contributors. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root

package emit

import (
"context"
"math"
"time"

"github.com/kelindar/event"
"github.com/kelindar/timeline"
)

// Scheduler is the default scheduler used to emit events.
var Scheduler = func() *timeline.Scheduler {
s := timeline.New()
s.Start(context.Background())
return s
}()

// ----------------------------------------- Forward Event -----------------------------------------

// signal represents a forwarded event
type signal[T event.Event] struct {
Time time.Time // The time at which the event was emitted
Elapsed time.Duration // The time elapsed since the last event
Data T
}

// Type returns the type of the event
func (e signal[T]) Type() uint32 {
return e.Data.Type()
}

// ----------------------------------------- Error Event -----------------------------------------

// fault represents an error event
type fault struct {
error
About any // The context of the error
}

// Type returns the type of the event
func (e fault) Type() uint32 {
return math.MaxUint32
}

// ----------------------------------------- Subscribe -----------------------------------------

// On subscribes to an event, the type of the event will be automatically
// inferred from the provided type. Must be constant for this to work.
func On[T event.Event](handler func(event T, now time.Time, elapsed time.Duration) error) context.CancelFunc {
return event.Subscribe[signal[T]](event.Default, func(m signal[T]) {
if err := handler(m.Data, m.Time, m.Elapsed); err != nil {
Error(err, m.Data)
}
})
}

// OnType subscribes to an event with the specified event type.
func OnType[T event.Event](eventType uint32, handler func(event T, now time.Time, elapsed time.Duration) error) context.CancelFunc {
return event.SubscribeTo[signal[T]](event.Default, eventType, func(m signal[T]) {
if err := handler(m.Data, m.Time, m.Elapsed); err != nil {
Error(err, m.Data)
}
})
}

// OnError subscribes to an error event.
func OnError(handler func(err error, about any)) context.CancelFunc {
return event.Subscribe[fault](event.Default, func(m fault) {
handler(m.error, m.About)
})
}

// ----------------------------------------- Publish -----------------------------------------

// Next writes an event during the next tick.
func Next[T event.Event](ev T) {
Scheduler.Run(emit(ev))
}

// At writes an event at specific 'at' time.
func At[T event.Event](ev T, at time.Time) {
Scheduler.RunAt(emit(ev), at)
}

// After writes an event after a 'delay'.
func After[T event.Event](ev T, after time.Duration) {
Scheduler.RunAfter(emit(ev), after)
}

// Every writes an event at 'interval' intervals, starting at the next boundary tick.
func Every[T event.Event](ev T, interval time.Duration) {
Scheduler.RunEvery(emit(ev), interval)
}

// EveryAt writes an event at 'interval' intervals, starting at 'startTime'.
func EveryAt[T event.Event](ev T, interval time.Duration, startTime time.Time) {
Scheduler.RunEveryAt(emit(ev), interval, startTime)
}

// EveryAfter writes an event at 'interval' intervals after a 'delay'.
func EveryAfter[T event.Event](ev T, interval time.Duration, delay time.Duration) {
Scheduler.RunEveryAfter(emit(ev), interval, delay)
}

// Error writes an error event.
func Error(err error, about any) {
event.Publish(event.Default, fault{
error: err,
About: about,
})
}

// emit writes an event into the dispatcher
func emit[T event.Event](ev T) func(now time.Time, elapsed time.Duration) bool {
return func(now time.Time, elapsed time.Duration) bool {
event.Publish(event.Default, signal[T]{
Data: ev,
Time: now,
Elapsed: elapsed,
})
return true
}
}
153 changes: 153 additions & 0 deletions emit/event_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package emit

import (
"fmt"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

/*
cpu: 13th Gen Intel(R) Core(TM) i7-13700K
BenchmarkEvent/1x1-24 13259682 84.58 ns/op 11.73 million/s 169 B/op 1 allocs/op
BenchmarkEvent/1x10-24 16216171 104.8 ns/op 74.95 million/s 249 B/op 1 allocs/op
BenchmarkEvent/1x100-24 26087012 669.5 ns/op 70.51 million/s 228 B/op 1 allocs/op
BenchmarkEvent/10x1-24 2721086 510.1 ns/op 18.33 million/s 953 B/op 10 allocs/op
BenchmarkEvent/10x10-24 1000000 1095 ns/op 50.99 million/s 2100 B/op 10 allocs/op
BenchmarkEvent/10x100-24 1000000 1294 ns/op 57.49 million/s 2151 B/op 10 allocs/op
*/
func BenchmarkEvent(b *testing.B) {
for _, topics := range []int{1, 10} {
for _, subs := range []int{1, 10, 100} {
b.Run(fmt.Sprintf("%dx%d", topics, subs), func(b *testing.B) {
var count atomic.Int64
for i := 0; i < subs; i++ {
for id := 10; id < 10+topics; id++ {
defer OnType(uint32(id), func(ev Dynamic, now time.Time, elapsed time.Duration) error {
count.Add(1)
return nil
})()
}
}

start := time.Now()
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
for id := 10; id < 10+topics; id++ {
Next(Dynamic{ID: id})
}
}

elapsed := time.Since(start)
rate := float64(count.Load()) / 1e6 / elapsed.Seconds()
b.ReportMetric(rate, "million/s")
})
}
}
}

func TestEmit(t *testing.T) {
events := make(chan MyEvent2)
defer On(func(ev MyEvent2, now time.Time, elapsed time.Duration) error {
assert.Equal(t, "Hello", ev.Text)
events <- ev
return nil
})()

// Emit the event
Next(MyEvent2{Text: "Hello"})
<-events

At(MyEvent2{Text: "Hello"}, time.Now().Add(40*time.Millisecond))
<-events

After(MyEvent2{Text: "Hello"}, 20*time.Millisecond)
<-events

EveryAt(MyEvent2{Text: "Hello"}, 50*time.Millisecond, time.Now().Add(10*time.Millisecond))
<-events

EveryAfter(MyEvent2{Text: "Hello"}, 30*time.Millisecond, 10*time.Millisecond)
<-events

Every(MyEvent2{Text: "Hello"}, 10*time.Millisecond)
<-events
}

func TestOnType(t *testing.T) {
events := make(chan Dynamic)
defer OnType(42, func(ev Dynamic, now time.Time, elapsed time.Duration) error {
assert.Equal(t, 42, ev.ID)
events <- ev
return nil
})()

// Emit the event
Next(Dynamic{ID: 42})
<-events
}

func TestOnError(t *testing.T) {
errors := make(chan error)
defer OnError(func(err error, about any) {
errors <- err
})()

defer On(func(ev MyEvent2, now time.Time, elapsed time.Duration) error {
return fmt.Errorf("On()")
})()

// Emit the event
Error(fmt.Errorf("Err"), nil)
assert.Equal(t, "Err", (<-errors).Error())

// Fail in the handler
Next(MyEvent2{})
assert.Equal(t, "On()", (<-errors).Error())

}

func TestOnTypeError(t *testing.T) {
errors := make(chan error)
defer OnError(func(err error, about any) {
errors <- err
})()

defer OnType(42, func(ev Dynamic, now time.Time, elapsed time.Duration) error {
return fmt.Errorf("OnType()")
})()

// Fail in dynamic event handler
Next(Dynamic{ID: 42})
assert.Equal(t, "OnType()", (<-errors).Error())
}

// ------------------------------------- Test Events -------------------------------------

const (
TypeEvent1 = 0x1
TypeEvent2 = 0x2
)

type MyEvent1 struct {
Number int
}

func (t MyEvent1) Type() uint32 { return TypeEvent1 }

type MyEvent2 struct {
Text string
}

func (t MyEvent2) Type() uint32 { return TypeEvent2 }

type Dynamic struct {
ID int
}

func (t Dynamic) Type() uint32 {
return uint32(t.ID)
}
Loading

0 comments on commit ae75898

Please sign in to comment.