Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

flushing out tests. trying to figure out what is not working

  • Loading branch information...
commit b9159e8036be24b699457d52169981d18e39ba82 1 parent 79ebc4f
Jeffrey Hulten authored
Showing with 141 additions and 19 deletions.
  1. +59 −16 event.go
  2. +82 −3 event_test.go
View
75 event.go
@@ -14,6 +14,7 @@ import (
"net"
"net/http"
"net/http/httputil"
+ "os"
"sync"
"time"
)
@@ -38,6 +39,8 @@ var eventMonitor EventMonitoringState
var ErrNoListeners = errors.New("No listeners to send event to...")
func (c *Client) AddEventListener(listener chan *APIEvents) error {
+ fmt.Println("enter AddEventListener")
+ defer fmt.Println("exit AddEventListener")
err := eventMonitor.enableEventMonitoring(c)
if err != nil {
return err
@@ -50,6 +53,8 @@ func (c *Client) AddEventListener(listener chan *APIEvents) error {
}
func (c *Client) RemoveEventListener(listener chan *APIEvents) error {
+ fmt.Println("enter RemoveEventListener")
+ defer fmt.Println("exit RemoveEventListener")
err := eventMonitor.removeListener(listener)
if err != nil {
return err
@@ -66,6 +71,9 @@ func (c *Client) RemoveEventListener(listener chan *APIEvents) error {
}
func (eventState *EventMonitoringState) addListener(listener chan *APIEvents) error {
+ fmt.Println("enter addListener")
+ defer fmt.Println("exit addListener")
+
eventState.Lock()
defer eventState.Unlock()
if listenerExists(listener, &eventState.listeners) {
@@ -76,6 +84,8 @@ func (eventState *EventMonitoringState) addListener(listener chan *APIEvents) er
}
func (eventState *EventMonitoringState) removeListener(listener chan *APIEvents) error {
+ fmt.Println("enter removeListener")
+ defer fmt.Println("exit removeListener")
eventState.Lock()
defer eventState.Unlock()
var newListeners []chan *APIEvents
@@ -91,6 +101,8 @@ func (eventState *EventMonitoringState) removeListener(listener chan *APIEvents)
}
func listenerExists(a chan *APIEvents, list *[]chan *APIEvents) bool {
+ fmt.Println("enter listenerExists")
+ defer fmt.Println("exit listenerExists")
for _, b := range *list {
if b == a {
return true
@@ -100,6 +112,8 @@ func listenerExists(a chan *APIEvents, list *[]chan *APIEvents) bool {
}
func (eventState *EventMonitoringState) enableEventMonitoring(c *Client) error {
+ fmt.Println("enter enableEventMonitoring")
+ defer fmt.Println("exit enableEventMonitoring")
eventState.Lock()
defer eventState.Unlock()
if !eventState.enabled {
@@ -112,6 +126,8 @@ func (eventState *EventMonitoringState) enableEventMonitoring(c *Client) error {
}
func (eventState *EventMonitoringState) disableEventMonitoring() error {
+ fmt.Println("enter disableEventMonitoring")
+ defer fmt.Println("exit disableEventMonitoring")
eventState.Lock()
defer eventState.Unlock()
if !eventState.enabled {
@@ -123,6 +139,8 @@ func (eventState *EventMonitoringState) disableEventMonitoring() error {
}
func (eventState *EventMonitoringState) monitorEvents(c *Client) {
+ fmt.Println("enter monitorEvents")
+ defer fmt.Println("exit monitorEvents")
var retries int
var err error
@@ -132,9 +150,10 @@ func (eventState *EventMonitoringState) monitorEvents(c *Client) {
}
for err = c.eventHijack(uint32(eventState.lastSeen), eventState.C, eventState.errC); err != nil && retries < 5; retries++ {
- waitTime := float64(time.Duration(100*time.Millisecond)) * math.Pow(2, float64(retries))
- eventState.errC <- fmt.Errorf("connection to event stream failed, retrying in %n: %s", waitTime, err)
- time.Sleep(time.Duration(int64(waitTime)))
+ fmt.Printf("eventHijack retry: %s\n", err)
+ waitTime := int64(float64(10) * math.Pow(2, float64(retries)))
+ fmt.Printf("connection to event stream failed, retrying in %n ms: %s", waitTime, err)
+ time.Sleep(time.Duration(waitTime) * time.Millisecond)
err = c.eventHijack(uint32(eventState.lastSeen), eventState.C, eventState.errC)
}
@@ -146,6 +165,7 @@ func (eventState *EventMonitoringState) monitorEvents(c *Client) {
timeout := time.After(100 * time.Millisecond)
select {
case ev := <-eventState.C:
+ fmt.Println("monitorEvents.C recieved")
// send the event
go eventState.sendEvent(ev)
@@ -159,6 +179,7 @@ func (eventState *EventMonitoringState) monitorEvents(c *Client) {
}(ev)
case err = <-eventState.errC:
+ fmt.Println("monitorEvents errC recieved")
if err == ErrNoListeners {
// if there are no listeners, exit normally
eventState.terminate(nil)
@@ -169,14 +190,19 @@ func (eventState *EventMonitoringState) monitorEvents(c *Client) {
return
}
case <-timeout:
+ fmt.Println("monitorEvents timeout")
continue
}
}
}
func (eventState *EventMonitoringState) sendEvent(event *APIEvents) {
+ fmt.Println("enter sendEvent")
+ defer fmt.Println("exit sendEvent")
+
eventState.RLock()
defer eventState.RUnlock()
+ fmt.Printf("sending to %n listeners\n", len(eventState.listeners))
if len(eventState.listeners) == 0 {
eventState.errC <- ErrNoListeners
}
@@ -186,6 +212,8 @@ func (eventState *EventMonitoringState) sendEvent(event *APIEvents) {
}
func (eventState *EventMonitoringState) terminate(err error) {
+ fmt.Println("enter terminate")
+ defer fmt.Println("exit terminate")
if err != nil {
fmt.Printf("terminating montoring", err)
}
@@ -193,7 +221,16 @@ func (eventState *EventMonitoringState) terminate(err error) {
}
func (c *Client) eventHijack(startTime uint32, eventChan chan *APIEvents, errChan chan error) error {
- req, err := http.NewRequest("GET", c.getURL(fmt.Sprintf("/events?since=%d", startTime)), nil)
+ fmt.Println("enter eventHijack")
+ defer fmt.Println("exit eventHijack")
+
+ uri := "/events"
+
+ if startTime != 0 {
+ uri += fmt.Sprintf("?since=%d", startTime)
+ }
+
+ req, err := http.NewRequest("GET", c.getURL(uri), nil)
if err != nil {
return err
}
@@ -211,26 +248,32 @@ func (c *Client) eventHijack(startTime uint32, eventChan chan *APIEvents, errCha
clientconn := httputil.NewClientConn(dial, nil)
clientconn.Do(req)
- defer clientconn.Close()
rwc, _ := clientconn.Hijack()
- defer rwc.Close()
+
+ fmt.Printf("remote: %s\n", rwc.LocalAddr().String())
go func(rwc io.ReadWriteCloser) {
- buf := bufio.NewReader(rwc)
- for {
- line, err := buf.ReadBytes('\n')
- if err != nil {
- errChan <- err
- return
- }
+ fmt.Println("enter eventHijack goroutine")
+ defer fmt.Println("exit eventHijack goroutine")
+
+ defer clientconn.Close()
+ defer rwc.Close()
+
+ scanner := bufio.NewScanner(rwc)
+ for scanner.Scan() {
+ line := scanner.Text()
+ fmt.Printf("rwc.RCV: %s\n", line)
+
var e APIEvents
- err = json.Unmarshal(line, &e)
+ err = json.Unmarshal([]byte(line), &e)
if err != nil {
errChan <- err
- return
}
- eventChan <- &e
+
+ }
+ if err := scanner.Err(); err != nil {
+ fmt.Fprintln(os.Stderr, "reading from network:", err)
}
}(rwc)
View
85 event_test.go
@@ -4,6 +4,85 @@
package docker
-// import (
-// "testing"
-// )
+import (
+ "fmt"
+ "net/http"
+ "net/http/httptest"
+ "testing"
+ "time"
+)
+
+func TestEventListeners(t *testing.T) {
+ response := `{"status":"create","id":"dfdf82bd3881","from":"base:latest","time":1374067924}
+{"status":"start","id":"dfdf82bd3881","from":"base:latest","time":1374067924}
+{"status":"stop","id":"dfdf82bd3881","from":"base:latest","time":1374067966}
+{"status":"destroy","id":"dfdf82bd3881","from":"base:latest","time":1374067970}
+`
+
+ var req http.Request
+ server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ fmt.Printf("test server recieved %s %s\n", r.Method, r.URL.String())
+ w.Write([]byte(response))
+ req = *r
+ }))
+ defer server.Close()
+
+ t.Logf("created test server: %s", server.URL)
+
+ client, err := NewClient(server.URL)
+ if err != nil {
+ t.Errorf("Failed to create client: %s", err)
+ }
+
+ listener := make(chan *APIEvents, 10)
+ defer client.RemoveEventListener(listener)
+
+ err = client.AddEventListener(listener)
+ if err != nil {
+ t.Errorf("Failed to add event listener: %s", err)
+ }
+
+ timeout := time.After(1 * time.Second)
+ var count int
+
+ for {
+ select {
+ case msg := <-listener:
+ t.Logf("Recieved: %s", *msg)
+ count++
+ err = checkEvent(count, msg)
+ if err != nil {
+ t.Fatalf("Check event failed: %s", err)
+ }
+ if count == 4 {
+ break
+ }
+ case <-timeout:
+ t.Fatal("TestAddEventListener timed out waiting on events")
+ }
+ }
+}
+
+func checkEvent(index int, event *APIEvents) error {
+ if event.ID != "dfdf82bd3881" {
+ return fmt.Errorf("event ID did not match. Expected dfdf82bd3881 got %s", event.ID)
+ }
+ if event.From != "base:latest" {
+ return fmt.Errorf("event from did not match. Expected base:latest got %s", event.From)
+ }
+ var status string
+ switch index {
+ case 1:
+ status = "create"
+ case 2:
+ status = "start"
+ case 3:
+ status = "stop"
+ case 4:
+ status = "destroy"
+ }
+ if event.Status != status {
+ return fmt.Errorf("event status did not match. Expected %s got %s", status, event.Status)
+ }
+ return nil
+}
Please sign in to comment.
Something went wrong with that request. Please try again.