Skip to content

Commit

Permalink
feat(bus): Subscribe to all, or by ID. "Type" -> "Topic"
Browse files Browse the repository at this point in the history
  • Loading branch information
dustmop committed Jan 19, 2021
1 parent 783b1d6 commit cc139bc
Show file tree
Hide file tree
Showing 20 changed files with 344 additions and 148 deletions.
8 changes: 4 additions & 4 deletions base/dsfs/dataset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,9 +334,9 @@ func TestDatasetSaveEvents(t *testing.T) {
privKey := testPeers.GetTestPeerInfo(10).PrivKey
bus := event.NewBus(ctx)

fired := map[event.Type]int{}
bus.Subscribe(func(ctx context.Context, t event.Type, payload interface{}) error {
fired[t]++
fired := map[event.Topic]int{}
bus.SubscribeTopics(func(ctx context.Context, e event.Event) error {
fired[e.Topic]++
return nil
},
event.ETDatasetSaveStarted,
Expand All @@ -356,7 +356,7 @@ func TestDatasetSaveEvents(t *testing.T) {
t.Fatal(err)
}

expect := map[event.Type]int{
expect := map[event.Topic]int{
event.ETDatasetSaveStarted: 1,
event.ETDatasetSaveProgress: 2,
event.ETDatasetSaveCompleted: 1,
Expand Down
10 changes: 5 additions & 5 deletions cmd/print.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,17 +245,17 @@ func PrintProgressBarsOnEvents(w io.Writer, bus event.Bus) {
progress := map[string]*mpb.Bar{}

// wire up a subscription to print download progress to streams
bus.Subscribe(func(_ context.Context, typ event.Type, payload interface{}) error {
bus.SubscribeTopics(func(_ context.Context, e event.Event) error {
lock.Lock()
defer lock.Unlock()
log.Debugw("handle event", "type", typ, "payload", payload)
log.Debugw("handle event", "type", e.Topic, "payload", e.Payload)

switch evt := payload.(type) {
switch evt := e.Payload.(type) {
case event.DsSaveEvent:
evtID := fmt.Sprintf("%s/%s", evt.Username, evt.Name)
cpl := int64(math.Ceil(evt.Completion * 100))

switch typ {
switch e.Topic {
case event.ETDatasetSaveStarted:
bar, exists := progress[evtID]
if !exists {
Expand All @@ -277,7 +277,7 @@ func PrintProgressBarsOnEvents(w io.Writer, bus event.Bus) {
}
}
case event.RemoteEvent:
switch typ {
switch e.Topic {
case event.ETRemoteClientPushVersionProgress:
bar, exists := progress[evt.Ref.String()]
if !exists {
Expand Down
2 changes: 1 addition & 1 deletion cmd/print_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestProgressBars(t *testing.T) {
ref := dsref.MustParse("c/d")

events := []struct {
t event.Type
t event.Topic
p interface{}
}{
{event.ETDatasetSaveStarted, event.DsSaveEvent{Username: "a", Name: "b", Completion: 0.1}},
Expand Down
10 changes: 5 additions & 5 deletions dscache/dscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func NewDscache(ctx context.Context, fsys qfs.Filesystem, bus event.Bus, usernam
}
}
cache.DefaultUsername = username
bus.Subscribe(cache.handler,
bus.SubscribeTopics(cache.handler,
event.ETDatasetNameInit,
event.ETDatasetCommitChange,
event.ETDatasetDeleteAll,
Expand Down Expand Up @@ -237,14 +237,14 @@ func (d *Dscache) validateProfileID(profileID string) bool {
return len(profileID) == lengthOfProfileID
}

func (d *Dscache) handler(_ context.Context, t event.Type, payload interface{}) error {
act, ok := payload.(event.DsChange)
func (d *Dscache) handler(_ context.Context, e event.Event) error {
act, ok := e.Payload.(event.DsChange)
if !ok {
log.Error("dscache got an event with a payload that isn't a event.DsChange type: %v", payload)
log.Error("dscache got an event with a payload that isn't a event.DsChange type: %v", e.Payload)
return nil
}

switch t {
switch e.Topic {
case event.ETDatasetNameInit:
if err := d.updateInitDataset(act); err != nil && err != ErrNoDscache {
log.Error(err)
Expand Down
16 changes: 8 additions & 8 deletions event/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,32 @@ import (
const (
// ETDatasetNameInit is when a dataset is initialized
// payload is a DsChange
ETDatasetNameInit = Type("dataset:Init")
ETDatasetNameInit = Topic("dataset:Init")
// ETDatasetCommitChange is when a dataset changes its newest commit
// payload is a DsChange
ETDatasetCommitChange = Type("dataset:CommitChange")
ETDatasetCommitChange = Topic("dataset:CommitChange")
// ETDatasetDeleteAll is when a dataset is entirely deleted
// payload is a DsChange
ETDatasetDeleteAll = Type("dataset:DeleteAll")
ETDatasetDeleteAll = Topic("dataset:DeleteAll")
// ETDatasetRename is when a dataset is renamed
// payload is a DsChange
ETDatasetRename = Type("dataset:Rename")
ETDatasetRename = Topic("dataset:Rename")
// ETDatasetCreateLink is when a dataset is linked to a working directory
// payload is a DsChange
ETDatasetCreateLink = Type("dataset:CreateLink")
ETDatasetCreateLink = Topic("dataset:CreateLink")

// ETDatasetSaveStarted fires when saving a dataset starts
// subscriptions do not block the publisher
// payload will be a DsSaveEvent
ETDatasetSaveStarted = Type("dataset:SaveStarted")
ETDatasetSaveStarted = Topic("dataset:SaveStarted")
// ETDatasetSaveProgress indicates a change in progress of dataset version
// creation.
// subscriptions do not block the publisher
// payload will be a DsSaveEvent
ETDatasetSaveProgress = Type("dataset:SaveProgress")
ETDatasetSaveProgress = Topic("dataset:SaveProgress")
// ETDatasetSaveCompleted indicates creating a dataset version finished
// payload will be a DsSaveEvent
ETDatasetSaveCompleted = Type("dataset:SaveCompleted")
ETDatasetSaveCompleted = Topic("dataset:SaveCompleted")
)

// DsChange represents the result of a change to a dataset
Expand Down
126 changes: 104 additions & 22 deletions event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"sync"
"time"

golog "github.com/ipfs/go-log"
)
Expand All @@ -17,39 +18,56 @@ var (
// ErrBusClosed indicates the event bus is no longer coordinating events
// because it's parent context has closed
ErrBusClosed = fmt.Errorf("event bus is closed")
// NowFunc is the function that generates timestamps (tests may override)
NowFunc = time.Now
)

// Type is the set of all kinds of events emitted by the bus. Use the "Type"
// Topic is the set of all kinds of events emitted by the bus. Use the "Topic"
// type to distinguish between different events. Event emitters should
// declare Types as constants and document the expected payload type.
type Type string
// declare Topics as constants and document the expected payload type.
type Topic string

// Event represents an event that subscribers will receive from the bus
type Event struct {
Topic Topic
Timestamp int64
SessionID string
Payload interface{}
}

// Handler is a function that will be called by the event bus whenever a
// subscribed topic is published. Handler calls are blocking, called in order
// matching event is published. Handler calls are blocking, called in order
// of subscription. Any error returned by a handler is passed back to the
// event publisher.
// The handler context originates from the publisher, and in practice will often
// be scoped to a "request context" like an HTTP request or CLI command
// invocation.
// Generally, even handlers should aim to return quickly, and only delegate to
// goroutines when the publishing event is firing on a long-running process
type Handler func(ctx context.Context, t Type, payload interface{}) error
type Handler func(ctx context.Context, e Event) error

// Publisher is an interface that can only publish an event
type Publisher interface {
Publish(ctx context.Context, t Type, payload interface{}) error
Publish(ctx context.Context, t Topic, payload interface{}) error
PublishID(ctx context.Context, t Topic, sessionID string, payload interface{}) error
}

// Bus is a central coordination point for event publication and subscription
// zero or more subscribers register eventTypes to be notified of, a publisher
// zero or more subscribers register eventTopics to be notified of, a publisher
// writes a topic event to the bus, which broadcasts to all subscribers of that
// topic
type Bus interface {
// Publish an event to the bus
Publish(ctx context.Context, t Type, data interface{}) error
// Subscribe to one or more eventTypes with a handler function that will be called
Publish(ctx context.Context, t Topic, data interface{}) error
// PublishID publishes an event with an arbitrary session id
PublishID(ctx context.Context, t Topic, sessionID string, data interface{}) error
// Subscribe to one or more eventTopics with a handler function that will be called
// whenever the event topic is published
Subscribe(handler Handler, eventTypes ...Type)
SubscribeTopics(handler Handler, eventTopics ...Topic)
// SubscribeID subscribes to only events that have a matching session id
SubscribeID(handler Handler, sessionID string)
// SubscribeAll subscribes to all events
SubscribeAll(handler Handler)
// NumSubscriptions returns the number of subscribers to the bus's events
NumSubscribers() int
}
Expand All @@ -64,20 +82,32 @@ type nilBus struct{}
var _ Bus = (*nilBus)(nil)

// Publish does nothing with the event
func (nilBus) Publish(_ context.Context, _ Type, _ interface{}) error {
func (nilBus) Publish(_ context.Context, _ Topic, _ interface{}) error {
return nil
}

// PublishID does nothing with the event
func (nilBus) PublishID(_ context.Context, _ Topic, _ string, _ interface{}) error {
return nil
}

func (nilBus) Subscribe(handler Handler, eventTypes ...Type) {}
// SubscribeTopics does nothing
func (nilBus) SubscribeTopics(handler Handler, eventTopics ...Topic) {}

func (nilBus) SubscribeID(handler Handler, id string) {}

func (nilBus) SubscribeAll(handler Handler) {}

func (nilBus) NumSubscribers() int {
return 0
}

type bus struct {
lk sync.RWMutex
closed bool
subs map[Type][]Handler
lk sync.RWMutex
closed bool
subs map[Topic][]Handler
allSubs []Handler
idSubs map[string][]Handler
}

// assert at compile time that bus implements the Bus interface
Expand All @@ -90,7 +120,9 @@ var _ Bus = (*bus)(nil)
// TODO (b5) - finish context-closing cleanup
func NewBus(ctx context.Context) Bus {
b := &bus{
subs: map[Type][]Handler{},
subs: map[Topic][]Handler{},
idSubs: map[string][]Handler{},
allSubs: []Handler{},
}

go func(b *bus) {
Expand All @@ -105,17 +137,47 @@ func NewBus(ctx context.Context) Bus {
}

// Publish sends an event to the bus
func (b *bus) Publish(ctx context.Context, topic Type, data interface{}) error {
func (b *bus) Publish(ctx context.Context, topic Topic, payload interface{}) error {
return b.publish(ctx, topic, "", payload)
}

// Publish sends an event with a given sessionID to the bus
func (b *bus) PublishID(ctx context.Context, topic Topic, sessionID string, payload interface{}) error {
return b.publish(ctx, topic, sessionID, payload)
}

func (b *bus) publish(ctx context.Context, topic Topic, sessionID string, payload interface{}) error {
b.lk.RLock()
defer b.lk.RUnlock()
log.Debugw("publish", "topic", topic, "payload", data)
log.Debugw("publish", "topic", topic, "payload", payload)

if b.closed {
return ErrBusClosed
}

e := Event{
Topic: topic,
Timestamp: NowFunc().UnixNano(),
SessionID: sessionID,
Payload: payload,
}

for _, handler := range b.subs[topic] {
if err := handler(ctx, topic, data); err != nil {
if err := handler(ctx, e); err != nil {
return err
}
}

if sessionID != "" {
for _, handler := range b.idSubs[sessionID] {
if err := handler(ctx, e); err != nil {
return err
}
}
}

for _, handler := range b.allSubs {
if err := handler(ctx, e); err != nil {
return err
}
}
Expand All @@ -124,16 +186,32 @@ func (b *bus) Publish(ctx context.Context, topic Type, data interface{}) error {
}

// Subscribe requests events from the given topic, returning a channel of those events
func (b *bus) Subscribe(handler Handler, eventTypes ...Type) {
func (b *bus) SubscribeTopics(handler Handler, eventTopics ...Topic) {
b.lk.Lock()
defer b.lk.Unlock()
log.Debugf("Subscribe: %v", eventTypes)
log.Debugf("Subscribe to topics: %v", eventTopics)

for _, topic := range eventTypes {
for _, topic := range eventTopics {
b.subs[topic] = append(b.subs[topic], handler)
}
}

// SubscribeID requests events that match the given sessionID
func (b *bus) SubscribeID(handler Handler, sessionID string) {
b.lk.Lock()
defer b.lk.Unlock()
log.Debugf("Subscribe to ID: %v", sessionID)
b.idSubs[sessionID] = append(b.idSubs[sessionID], handler)
}

// SubscribeAll requets all events from the bus
func (b *bus) SubscribeAll(handler Handler) {
b.lk.Lock()
defer b.lk.Unlock()
log.Debugf("Subscribe All")
b.allSubs = append(b.allSubs, handler)
}

// NumSubscribers returns the number of subscribers to the bus's events
func (b *bus) NumSubscribers() int {
b.lk.Lock()
Expand All @@ -142,5 +220,9 @@ func (b *bus) NumSubscribers() int {
for _, handlers := range b.subs {
total += len(handlers)
}
for _, handlers := range b.idSubs {
total += len(handlers)
}
total += len(b.allSubs)
return total
}

0 comments on commit cc139bc

Please sign in to comment.