Skip to content

Commit

Permalink
Add cancellation possibilities
Browse files Browse the repository at this point in the history
Also service event like `listener:new` removed. Because there is
no guarantee that event will delivered synchronously. In other
case cancellation should be added as well.
  • Loading branch information
olebedev committed Nov 16, 2015
1 parent 98da4ef commit 985ed7b
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 92 deletions.
161 changes: 96 additions & 65 deletions emitter.go
Expand Up @@ -21,15 +21,21 @@ const (
// Reset only to clear previously defined flags.
// Example:
// ee.Use("*", Reset) // clears flags for this pattern
Reset Flag = 0
FlagReset Flag = 0
// Once indicates to remove the listener after first sending.
Once Flag = 1 << iota
FlagOnce Flag = 1 << iota
// Void indicates to skip sending.
Void
FlagVoid
// Skip indicates to skip sending if channel is blocked.
Skip
FlagSkip
// Close indicates to drop listener if channel is blocked.
Close
FlagClose
)

type Error int

const (
ErrorPattern Error = -1 << iota
)

// New returns just created Emitter interface. Capacity argument
Expand All @@ -38,11 +44,7 @@ func New(capacity uint) Emitter {
return &emitter{
listeners: make(map[string][]listener),
capacity: capacity,
// Predefined flags for service events.
// They can be changed as any other flags
// via `Use` method. It need to avoid dead
// lock when unbuffered channel was created.
flags: map[string]Flag{"listener:*": Reset | Skip},
flags: map[string]Flag{},
}
}

Expand All @@ -61,7 +63,7 @@ type Emitter interface {
Off(string, ...<-chan Event) error
// Emit emits an event with the rest arguments to all
// listeners which were covered by topic(it can be pattern).
Emit(string, ...interface{}) error
Emit(string, ...interface{}) chan error
// Listeners returns slice of listeners which were covered by
// topic(it can be pattern) and error if pattern is invalid.
Listeners(string) ([]<-chan Event, error)
Expand Down Expand Up @@ -110,7 +112,7 @@ func (e *emitter) Use(pattern string, flags ...Flag) error {
// reduce the flags
var f Flag
for _, item := range flags {
if item == Reset {
if item == FlagReset {
delete(e.flags, pattern)
return nil
}
Expand Down Expand Up @@ -144,7 +146,6 @@ func (e *emitter) On(topic string, flags ...Flag) <-chan Event {
e.listeners[topic] = []listener{l}
}
e.mu.Unlock()
e.Emit("listener:new", readOnly(l.ch), topic)
return l.ch
}

Expand All @@ -164,7 +165,6 @@ func (e *emitter) Off(topic string, channels ...<-chan Event) error {
if len(channels) == 0 {
for i := len(listeners) - 1; i >= 0; i-- {
close(listeners[i].ch)
defer e.Emit("listener:remove", readOnly(listeners[i].ch), _topic)
listeners = drop(listeners, i)
}

Expand All @@ -174,7 +174,6 @@ func (e *emitter) Off(topic string, channels ...<-chan Event) error {
for i := len(listeners) - 1; i >= 0; i-- {
if curr == listeners[i].ch {
close(listeners[i].ch)
defer e.Emit("listener:remove", readOnly(listeners[i].ch), _topic)
listeners = drop(listeners, i)
}
}
Expand All @@ -186,7 +185,7 @@ func (e *emitter) Off(topic string, channels ...<-chan Event) error {
delete(e.listeners, _topic)
}
}
defer e.mu.Unlock()
e.mu.Unlock()
return nil
}

Expand All @@ -213,6 +212,8 @@ func (e *emitter) Listeners(topic string) ([]<-chan Event, error) {

// Topics returns all existing topics.
func (e *emitter) Topics() []string {
e.mu.Lock()
defer e.mu.Unlock()
acc := make([]string, len(e.listeners))
i := 0
for k := range e.listeners {
Expand All @@ -222,71 +223,95 @@ func (e *emitter) Topics() []string {
return acc
}

func readOnly(ch chan Event) <-chan Event { return ch }

// Emit emits an event with the rest arguments to all
// listeners which were covered by topic(it can be pattern).
func (e *emitter) Emit(topic string, args ...interface{}) error {
func (e *emitter) Emit(topic string, args ...interface{}) chan error {
e.mu.Lock()
done := make(chan error, 1)
var wg sync.WaitGroup

match, err := e.matched(topic)
if err != nil {
done <- err
close(done)
e.mu.Unlock()
return err
return done
}

var closed []chan Event
// fmt.Printf("[debug] 1 %-v\n", match)
for _, _topic := range match {
listeners := e.listeners[_topic]
topicFlags := e.getFlags(_topic)
// whole topic is skipping
if (topicFlags | Void) == topicFlags {
if (topicFlags | FlagVoid) == topicFlags {
continue
}

Loop:
for i := len(listeners) - 1; i >= 0; i-- {
lstnr := listeners[i]
flags := lstnr.flags | topicFlags

// unwind the flags
isOnce := (flags | Once) == flags
isVoid := (flags | Void) == flags
isSkip := (flags | Skip) == flags
isClose := (flags | Close) == flags

if isVoid {
continue Loop
}

if !send(lstnr.ch, Event{Topic: _topic, Args: args}, !(isSkip || isClose)) {
// if not sent
if isClose {
close(lstnr.ch)
closed = append(closed, lstnr.ch)
e.listeners[_topic] = drop(listeners, i)
wg.Add(1)
go func(lstnr listener, topicFlags Flag, _topic string) {
// unwind the flags
isOnce := (flags | FlagOnce) == flags
isVoid := (flags | FlagVoid) == flags
isSkip := (flags | FlagSkip) == flags
isClose := (flags | FlagClose) == flags

if isVoid {
wg.Done()
return
}
event := Event{
Topic: _topic,
OriginalTopic: topic,
Flags: flags,
Args: args,
}
if sent, cancaled := send(
done,
lstnr.ch,
event,
!(isSkip || isClose),
); !sent {
// if not sent
if isClose {
e.Off(_topic, lstnr.ch)
}
} else if cancaled {
// if sending canceled

} else {
// if event was sent successfully
if isOnce {
close(lstnr.ch)
closed = append(closed, lstnr.ch)
e.listeners[_topic] = drop(listeners, i)
} else {
// if event was sent successfully
if isOnce {
e.Off(_topic, lstnr.ch)
}
}
}
wg.Done()
}(lstnr, topicFlags, _topic)

}
if len(e.listeners[_topic]) == 0 {
delete(e.listeners, _topic)
}
}

for _, ch := range closed {
defer e.Emit("listener:remove", readOnly(ch), topic)
go func(done chan error) {
wg.Wait()
done <- nil
tryToClose(done)
}(done)

}

defer e.mu.Unlock()
return nil
e.mu.Unlock()
return done
}

func tryToClose(c chan error) (err error) {
defer func() {
if r := recover(); r != nil {
err = errors.New(r.(string))
}
}()
close(c)
return
}

func (e *emitter) matched(topic string) ([]string, error) {
Expand All @@ -310,16 +335,22 @@ func drop(l []listener, i int) []listener {
return append(l[:i], l[i+1:]...)
}

func send(ch chan Event, e Event, wait bool) bool {
if !wait {
select {
case ch <- e:
return true
default:
return false
func send(done chan error, ch chan Event, e Event, wait bool) (bool, bool) {
select {
case <-done:
break
default:
if !wait {
select {
case ch <- e:
return true, false
default:
return false, false
}
} else {
ch <- e
return true, false
}
} else {
ch <- e
}
return true
return false, true
}

0 comments on commit 985ed7b

Please sign in to comment.