Skip to content
Browse files

fixed some race conditions

  • Loading branch information...
1 parent 2b95a9f commit 243989aea21deef1682f4a8838c2c4250b82f353 Jeffrey Hulten committed Feb 24, 2014
Showing with 109 additions and 40 deletions.
  1. +108 −39 event.go
  2. +1 −1 event_test.go
View
147 event.go
@@ -16,6 +16,7 @@ import (
"net/http/httputil"
"strings"
"sync"
+ "sync/atomic"
"time"
)
@@ -26,22 +27,32 @@ type APIEvents struct {
Time int64
}
-type EventMonitoringState struct {
+type eventMonitoringState struct {
sync.RWMutex
+ sync.WaitGroup
enabled bool
- lastSeen int64
+ lastSeen *int64
C chan *APIEvents
errC chan error
listeners []chan *APIEvents
}
-var eventMonitor EventMonitoringState
-var ErrNoListeners = errors.New("No listeners to send event to...")
+// event monitoring state is singleton
+var eventMonitor eventMonitoringState
+
+var maxMonitorConnRetries = 5
+var retryInitialWaitTime = float64(10)
+
+var ErrNoListeners = errors.New("No listeners present to recieve event")
+var ErrListenerExists = errors.New("Listener already exists for docker events")
func (c *Client) AddEventListener(listener chan *APIEvents) error {
- err := eventMonitor.enableEventMonitoring(c)
- if err != nil {
- return err
+ var err error
+ if !eventMonitor.isEnabled() {
+ err = eventMonitor.enableEventMonitoring(c)
+ if err != nil {
+ return err
+ }
}
err = eventMonitor.addListener(listener)
if err != nil {
@@ -66,27 +77,46 @@ func (c *Client) RemoveEventListener(listener chan *APIEvents) error {
return nil
}
-func (eventState *EventMonitoringState) addListener(listener chan *APIEvents) error {
+func (eventState *eventMonitoringState) addListener(listener chan *APIEvents) error {
+
+ // lock to mutate internal state
eventState.Lock()
defer eventState.Unlock()
+
+ // return error if listener is in pool
if listenerExists(listener, &eventState.listeners) {
- return fmt.Errorf("Listener already exists")
+ return ErrListenerExists
}
+
+ // add to waitgroup for listeners
+ eventState.Add(1)
+ // add listener to list
eventState.listeners = append(eventState.listeners, listener)
return nil
}
-func (eventState *EventMonitoringState) removeListener(listener chan *APIEvents) error {
+func (eventState *eventMonitoringState) removeListener(listener chan *APIEvents) error {
+
+ // lock to mutate internal state
eventState.Lock()
defer eventState.Unlock()
- var newListeners []chan *APIEvents
+
if listenerExists(listener, &eventState.listeners) {
+ // placeholder for new listener list
+ var newListeners []chan *APIEvents
+
+ // iterate on existing listeners, only adding non-matching listeners to new list
for _, l := range eventState.listeners {
if l != listener {
newListeners = append(newListeners, l)
}
}
+
+ // update listener list
eventState.listeners = newListeners
+
+ // release listener from waitgroup
+ eventState.Add(-1)
}
return nil
}
@@ -100,21 +130,33 @@ func listenerExists(a chan *APIEvents, list *[]chan *APIEvents) bool {
return false
}
-func (eventState *EventMonitoringState) enableEventMonitoring(c *Client) error {
+func (eventState *eventMonitoringState) enableEventMonitoring(c *Client) error {
+ // lock to mutate internal state
eventState.Lock()
defer eventState.Unlock()
+
+ // if event monitoring is disabled, initialize it and start monitoring
if !eventState.enabled {
eventState.enabled = true
+ var lastSeenDefault = int64(0)
+ eventState.lastSeen = &lastSeenDefault
eventState.C = make(chan *APIEvents, 100)
eventState.errC = make(chan error, 1)
go eventState.monitorEvents(c)
}
return nil
}
-func (eventState *EventMonitoringState) disableEventMonitoring() error {
+func (eventState *eventMonitoringState) disableEventMonitoring() error {
+
+ // Wait until all sendEvents are finished
+ eventState.Wait()
+
+ // lock to mutate internal state
eventState.Lock()
defer eventState.Unlock()
+
+ // if event monitoring is enables, close the channels
if eventState.enabled {
eventState.enabled = false
close(eventState.C)
@@ -123,40 +165,28 @@ func (eventState *EventMonitoringState) disableEventMonitoring() error {
return nil
}
-func (eventState *EventMonitoringState) monitorEvents(c *Client) {
- var retries int
+func (eventState *eventMonitoringState) monitorEvents(c *Client) {
+
var err error
// wait for first listener
- for len(eventState.listeners) == 0 {
+ for eventState.noListeners() {
time.Sleep(10 * time.Millisecond)
}
- for err = c.eventHijack(uint32(eventState.lastSeen), eventState.C, eventState.errC); err != nil && retries < 5; retries++ {
- waitTime := int64(float64(10) * math.Pow(2, float64(retries)))
- time.Sleep(time.Duration(waitTime) * time.Millisecond)
- err = c.eventHijack(uint32(eventState.lastSeen), eventState.C, eventState.errC)
- }
-
- if err != nil {
+ if err = eventState.connectWithRetry(c); err != nil {
eventState.terminate(err)
}
- for eventState.enabled {
+ for eventState.isEnabled() {
timeout := time.After(100 * time.Millisecond)
select {
case ev := <-eventState.C:
// send the event
go eventState.sendEvent(ev)
- // update lastSeen if appropriate
- go func(e *APIEvents) {
- eventState.Lock()
- defer eventState.Unlock()
- if eventState.lastSeen < e.Time {
- eventState.lastSeen = e.Time
- }
- }(ev)
+ // update lastSeen
+ go eventState.updateLastSeen(ev)
case err = <-eventState.errC:
if err == ErrNoListeners {
@@ -174,23 +204,62 @@ func (eventState *EventMonitoringState) monitorEvents(c *Client) {
}
}
-func (eventState *EventMonitoringState) sendEvent(event *APIEvents) {
+func (eventState *eventMonitoringState) connectWithRetry(c *Client) error {
+ var retries int
+ var err error
+ for err = c.eventHijack(atomic.LoadInt64(eventState.lastSeen), eventState.C, eventState.errC); err != nil && retries < maxMonitorConnRetries; retries++ {
+ waitTime := int64(retryInitialWaitTime * math.Pow(2, float64(retries)))
+ time.Sleep(time.Duration(waitTime) * time.Millisecond)
+ err = c.eventHijack(atomic.LoadInt64(eventState.lastSeen), eventState.C, eventState.errC)
+ }
+ return err
+}
+
+func (eventState *eventMonitoringState) noListeners() bool {
+ eventState.RLock()
+ defer eventState.RUnlock()
+ return len(eventState.listeners) == 0
+}
+
+func (eventState *eventMonitoringState) isEnabled() bool {
+ eventState.RLock()
+ defer eventState.RUnlock()
+ return eventState.enabled
+}
+func (eventState *eventMonitoringState) sendEvent(event *APIEvents) {
+
+ // ensure the listener list doesn't change out from under us
eventState.RLock()
defer eventState.RUnlock()
- if len(eventState.listeners) == 0 {
- eventState.errC <- ErrNoListeners
+
+ // add to waitgroup to make sure we don't close prematurely
+ eventState.Add(1)
+ defer eventState.Done()
+
+ if eventState.isEnabled() {
+ if eventState.noListeners() {
+ eventState.errC <- ErrNoListeners
+ }
+ for _, listener := range eventState.listeners {
+ listener <- event
+ }
}
- for _, listener := range eventState.listeners {
- listener <- event
+}
+
+func (eventState *eventMonitoringState) updateLastSeen(e *APIEvents) {
+ eventState.Lock()
+ defer eventState.Unlock()
+ if atomic.LoadInt64(eventState.lastSeen) < e.Time {
+ atomic.StoreInt64(eventState.lastSeen, e.Time)
}
}
-func (eventState *EventMonitoringState) terminate(err error) {
+func (eventState *eventMonitoringState) terminate(err error) {
eventState.disableEventMonitoring()
}
-func (c *Client) eventHijack(startTime uint32, eventChan chan *APIEvents, errChan chan error) error {
+func (c *Client) eventHijack(startTime int64, eventChan chan *APIEvents, errChan chan error) error {
uri := "/events"
View
2 event_test.go
@@ -39,7 +39,7 @@ func TestEventListeners(t *testing.T) {
}
listener := make(chan *APIEvents, 10)
- defer client.RemoveEventListener(listener)
+ defer func() { time.Sleep(10 * time.Millisecond); client.RemoveEventListener(listener) }()
err = client.AddEventListener(listener)
if err != nil {

0 comments on commit 243989a

Please sign in to comment.
Something went wrong with that request. Please try again.