-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add a subpackage for scheduling pub/sub events
- Loading branch information
Showing
8 changed files
with
298 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
## Event Package for Timeline | ||
|
||
The event package seamlessly integrates the timeline scheduler with event-driven programming. It allows you to emit and subscribe to events with precise timing, making it ideal for applications that require both event-driven architectures and time-based scheduling. | ||
|
||
## Quick Start | ||
|
||
Let's dive right in with a simple example to get you started with the event package. | ||
|
||
```go | ||
// Custom event type | ||
type Message struct { | ||
Text string | ||
} | ||
|
||
// Type returns the type of the event for the dispatcher | ||
func (Message) Type() uint32 { | ||
return 0x1 | ||
} | ||
|
||
func main() { | ||
|
||
// Emit the event immediately | ||
event.Emit(Message{Text: "Hello, World!"}) | ||
|
||
// Emit the event every second | ||
event.EmitEvery(Message{Text: "Are we there yet?"}, 1*time.Second) | ||
|
||
// Subscribe and Handle the Event | ||
cancel := event.On[Message](func(ev event.Event[Message]) { | ||
fmt.Printf("Received '%s' at %02d.%03d, elapsed=%v\n", | ||
ev.Data.Text, | ||
ev.Time.Second(), ev.Time.UnixMilli()%1000, ev.Elapsed) | ||
}) | ||
defer cancel() // Remember to unsubscribe when done | ||
|
||
// Let the program run for a while to receive events | ||
time.Sleep(5 * time.Second) | ||
} | ||
``` | ||
|
||
You will see similar output, with 'Are we there yet?' being emitted every second, and 'Hello, World!' being emitted immediately. | ||
|
||
``` | ||
Received 'Hello, World!' at 21.060, elapsed=0s | ||
Received 'Are we there yet?' at 22.000, elapsed=940ms | ||
Received 'Are we there yet?' at 23.000, elapsed=1s | ||
Received 'Are we there yet?' at 24.000, elapsed=1s | ||
Received 'Are we there yet?' at 25.000, elapsed=1s | ||
Received 'Are we there yet?' at 26.000, elapsed=1s | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
// Copyright (c) Roman Atachiants and contributors. All rights reserved. | ||
// Licensed under the MIT license. See LICENSE file in the project root | ||
|
||
package event | ||
|
||
import ( | ||
"context" | ||
"time" | ||
|
||
"github.com/kelindar/event" | ||
"github.com/kelindar/timeline" | ||
) | ||
|
||
// Scheduler is the default scheduler used to emit events. | ||
var Scheduler = func() *timeline.Scheduler { | ||
s := timeline.New() | ||
s.Start(context.Background()) | ||
return s | ||
}() | ||
|
||
// signal represents a signal event | ||
type Event[T event.Event] struct { | ||
Time time.Time // The time at which the event was emitted | ||
Elapsed time.Duration // The time elapsed since the last event | ||
Data T | ||
} | ||
|
||
// Type returns the type of the event | ||
func (e Event[T]) Type() uint32 { | ||
return e.Data.Type() | ||
} | ||
|
||
// emit writes an event into the dispatcher | ||
func emit[T event.Event](ev T) func(now time.Time, elapsed time.Duration) bool { | ||
return func(now time.Time, elapsed time.Duration) bool { | ||
event.Publish(event.Default, Event[T]{ | ||
Data: ev, | ||
Time: now, | ||
Elapsed: elapsed, | ||
}) | ||
return true | ||
} | ||
} | ||
|
||
// On subscribes to an event, the type of the event will be automatically | ||
// inferred from the provided type. Must be constant for this to work. This | ||
// functions same way as Subscribe() but uses the default dispatcher instead. | ||
func On[T event.Event](handler func(Event[T])) context.CancelFunc { | ||
return event.Subscribe[Event[T]](event.Default, handler) | ||
} | ||
|
||
// OnType subscribes to an event with the specified event type. This functions | ||
// same way as SubscribeTo() but uses the default dispatcher instead. | ||
func OnType[T event.Event](eventType uint32, handler func(Event[T])) context.CancelFunc { | ||
return event.SubscribeTo[Event[T]](event.Default, eventType, handler) | ||
} | ||
|
||
// Emit writes an event during the next tick. | ||
func Emit[T event.Event](ev T) { | ||
Scheduler.Run(emit(ev)) | ||
} | ||
|
||
// EmitAt writes an event at specific 'at' time. | ||
func EmitAt[T event.Event](ev T, at time.Time) { | ||
Scheduler.RunAt(emit(ev), at) | ||
} | ||
|
||
// EmitAfter writes an event after a 'delay'. | ||
func EmitAfter[T event.Event](ev T, after time.Duration) { | ||
Scheduler.RunAfter(emit(ev), after) | ||
} | ||
|
||
// EmitEvery writes an event at 'interval' intervals, starting at the next boundary tick. | ||
func EmitEvery[T event.Event](ev T, interval time.Duration) { | ||
Scheduler.RunEvery(emit(ev), interval) | ||
} | ||
|
||
// EmitEveryAt writes an event at 'interval' intervals, starting at 'startTime'. | ||
func EmitEveryAt[T event.Event](ev T, interval time.Duration, startTime time.Time) { | ||
Scheduler.RunEveryAt(emit(ev), interval, startTime) | ||
} | ||
|
||
// EmitEveryAfter writes an event at 'interval' intervals after a 'delay'. | ||
func EmitEveryAfter[T event.Event](ev T, interval time.Duration, delay time.Duration) { | ||
Scheduler.RunEveryAfter(emit(ev), interval, delay) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
package event | ||
|
||
import ( | ||
"fmt" | ||
"sync/atomic" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
/* | ||
cpu: 13th Gen Intel(R) Core(TM) i7-13700K | ||
BenchmarkEvent/1x1-24 17777803 60.10 ns/op 16.45 million/s 170 B/op 1 allocs/op | ||
BenchmarkEvent/1x10-24 16062572 162.3 ns/op 67.38 million/s 220 B/op 1 allocs/op | ||
BenchmarkEvent/1x100-24 22221769 106.9 ns/op 56.41 million/s 205 B/op 1 allocs/op | ||
BenchmarkEvent/10x1-24 3183024 459.8 ns/op 23.68 million/s 920 B/op 10 allocs/op | ||
BenchmarkEvent/10x10-24 1465628 1136 ns/op 55.92 million/s 1981 B/op 10 allocs/op | ||
BenchmarkEvent/10x100-24 1432830 1780 ns/op 61.69 million/s 2130 B/op 10 allocs/op | ||
*/ | ||
func BenchmarkEvent(b *testing.B) { | ||
for _, topics := range []int{1, 10} { | ||
for _, subs := range []int{1, 10, 100} { | ||
b.Run(fmt.Sprintf("%dx%d", topics, subs), func(b *testing.B) { | ||
var count atomic.Int64 | ||
for i := 0; i < subs; i++ { | ||
for id := 10; id < 10+topics; id++ { | ||
defer OnType(uint32(id), func(ev Event[Dynamic]) { | ||
count.Add(1) | ||
})() | ||
} | ||
} | ||
|
||
start := time.Now() | ||
b.ReportAllocs() | ||
b.ResetTimer() | ||
for n := 0; n < b.N; n++ { | ||
for id := 10; id < 10+topics; id++ { | ||
Emit(Dynamic{ID: id}) | ||
} | ||
} | ||
|
||
elapsed := time.Since(start) | ||
rate := float64(count.Load()) / 1e6 / elapsed.Seconds() | ||
b.ReportMetric(rate, "million/s") | ||
}) | ||
} | ||
} | ||
} | ||
|
||
func TestEmit(t *testing.T) { | ||
events := make(chan MyEvent2) | ||
defer On(func(ev Event[MyEvent2]) { | ||
assert.Equal(t, "Hello", ev.Data.Text) | ||
events <- ev.Data | ||
})() | ||
|
||
// Emit the event | ||
Emit(MyEvent2{Text: "Hello"}) | ||
<-events | ||
|
||
EmitAt(MyEvent2{Text: "Hello"}, time.Now().Add(40*time.Millisecond)) | ||
<-events | ||
|
||
EmitAfter(MyEvent2{Text: "Hello"}, 20*time.Millisecond) | ||
<-events | ||
|
||
EmitEveryAt(MyEvent2{Text: "Hello"}, 50*time.Millisecond, time.Now().Add(10*time.Millisecond)) | ||
<-events | ||
|
||
EmitEveryAfter(MyEvent2{Text: "Hello"}, 30*time.Millisecond, 10*time.Millisecond) | ||
<-events | ||
|
||
EmitEvery(MyEvent2{Text: "Hello"}, 10*time.Millisecond) | ||
<-events | ||
} | ||
|
||
func TestOnType(t *testing.T) { | ||
events := make(chan Dynamic) | ||
defer OnType(42, func(ev Event[Dynamic]) { | ||
assert.Equal(t, 42, ev.Data.ID) | ||
events <- ev.Data | ||
})() | ||
|
||
// Emit the event | ||
Emit(Dynamic{ID: 42}) | ||
<-events | ||
} | ||
|
||
// ------------------------------------- Test Events ------------------------------------- | ||
|
||
const ( | ||
TypeEvent1 = 0x1 | ||
TypeEvent2 = 0x2 | ||
) | ||
|
||
type MyEvent1 struct { | ||
Number int | ||
} | ||
|
||
func (t MyEvent1) Type() uint32 { return TypeEvent1 } | ||
|
||
type MyEvent2 struct { | ||
Text string | ||
} | ||
|
||
func (t MyEvent2) Type() uint32 { return TypeEvent2 } | ||
|
||
type Dynamic struct { | ||
ID int | ||
} | ||
|
||
func (t Dynamic) Type() uint32 { | ||
return uint32(t.ID) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
package main | ||
|
||
import ( | ||
"fmt" | ||
"time" | ||
|
||
"github.com/kelindar/timeline/event" | ||
) | ||
|
||
// Custom event type | ||
type Message struct { | ||
Text string | ||
} | ||
|
||
// Type returns the type of the event for the dispatcher | ||
func (Message) Type() uint32 { | ||
return 0x1 | ||
} | ||
|
||
func main() { | ||
|
||
// Emit the event immediately | ||
event.Emit(Message{Text: "Hello, World!"}) | ||
|
||
// Emit the event every second | ||
event.EmitEvery(Message{Text: "Are we there yet?"}, 1*time.Second) | ||
|
||
// Subscribe and Handle the Event | ||
cancel := event.On[Message](func(ev event.Event[Message]) { | ||
fmt.Printf("Received '%s' at %02d.%03d, elapsed=%v\n", | ||
ev.Data.Text, | ||
ev.Time.Second(), ev.Time.UnixMilli()%1000, ev.Elapsed) | ||
}) | ||
defer cancel() // Remember to unsubscribe when done | ||
|
||
// Let the program run for a while to receive events | ||
time.Sleep(5 * time.Second) | ||
} |
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters