Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

support listeners for events

Change adds two methods to client:

- AddEventListener adds a channel to receive *APIEvents
  If this is the first listener, event monitoring is started.
- RemoveEventListener causes a channel to no longer receive *APIEvents
  If this is the last listener, event monitoring is stopped.
  • Loading branch information...
commit 479c4e292f885ca68ddd0093620e661323f1d6b3 1 parent 58b2278
Jeffrey Hulten authored
View
1  AUTHORS
@@ -11,3 +11,4 @@ Philippe Lafoucrière <philippe.lafoucriere@tech-angels.com>
Sridhar Ratnakumar <sridharr@activestate.com>
Tim Schindler <tim@catalyst-zero.com>
Wiliam Souza <wiliamsouza83@gmail.com>
+Jeffrey Hulten <jhulten@gmail.com>
View
253 event.go
@@ -0,0 +1,253 @@
+// Copyright 2013 go-dockerclient authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package docker
+
+import (
+ "bufio"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "math"
+ "net"
+ "net/http"
+ "net/http/httputil"
+ "strings"
+ "sync"
+ "time"
+)
+
+type APIEvents struct {
+ Status string
+ ID string
+ From string
+ Time int64
+}
+
+type EventMonitoringState struct {
+ sync.RWMutex
+ enabled bool
+ lastSeen int64
+ C chan *APIEvents
+ errC chan error
+ listeners []chan *APIEvents
+}
+
+var eventMonitor EventMonitoringState
+var ErrNoListeners = errors.New("No listeners to send event to...")
+
+func (c *Client) AddEventListener(listener chan *APIEvents) error {
+ err := eventMonitor.enableEventMonitoring(c)
+ if err != nil {
+ return err
+ }
+ err = eventMonitor.addListener(listener)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func (c *Client) RemoveEventListener(listener chan *APIEvents) error {
+ err := eventMonitor.removeListener(listener)
+ if err != nil {
+ return err
+ }
+
+ if len(eventMonitor.listeners) == 0 {
+ err = eventMonitor.disableEventMonitoring()
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (eventState *EventMonitoringState) addListener(listener chan *APIEvents) error {
+ eventState.Lock()
+ defer eventState.Unlock()
+ if listenerExists(listener, &eventState.listeners) {
+ return fmt.Errorf("Listener already exists")
+ }
+ eventState.listeners = append(eventState.listeners, listener)
+ return nil
+}
+
+func (eventState *EventMonitoringState) removeListener(listener chan *APIEvents) error {
+ eventState.Lock()
+ defer eventState.Unlock()
+ var newListeners []chan *APIEvents
+ if listenerExists(listener, &eventState.listeners) {
+ for _, l := range eventState.listeners {
+ if l != listener {
+ newListeners = append(newListeners, l)
+ }
+ }
+ eventState.listeners = newListeners
+ }
+ return nil
+}
+
+func listenerExists(a chan *APIEvents, list *[]chan *APIEvents) bool {
+ for _, b := range *list {
+ if b == a {
+ return true
+ }
+ }
+ return false
+}
+
+func (eventState *EventMonitoringState) enableEventMonitoring(c *Client) error {
+ eventState.Lock()
+ defer eventState.Unlock()
+ if !eventState.enabled {
+ eventState.enabled = true
+ eventState.C = make(chan *APIEvents, 100)
+ eventState.errC = make(chan error, 1)
+ go eventState.monitorEvents(c)
+ }
+ return nil
+}
+
+func (eventState *EventMonitoringState) disableEventMonitoring() error {
+ eventState.Lock()
+ defer eventState.Unlock()
+ if !eventState.enabled {
+ eventState.enabled = false
+ close(eventState.C)
+ close(eventState.errC)
+ }
+ return nil
+}
+
+func (eventState *EventMonitoringState) monitorEvents(c *Client) {
+ var retries int
+ var err error
+
+ // wait for first listener
+ for len(eventState.listeners) == 0 {
+ time.Sleep(10 * time.Millisecond)
+ }
+
+ for err = c.eventHijack(uint32(eventState.lastSeen), eventState.C, eventState.errC); err != nil && retries < 5; retries++ {
+ waitTime := int64(float64(10) * math.Pow(2, float64(retries)))
+ time.Sleep(time.Duration(waitTime) * time.Millisecond)
+ err = c.eventHijack(uint32(eventState.lastSeen), eventState.C, eventState.errC)
+ }
+
+ if err != nil {
+ eventState.terminate(err)
+ }
+
+ for eventState.enabled {
+ timeout := time.After(100 * time.Millisecond)
+ select {
+ case ev := <-eventState.C:
+ // send the event
+ go eventState.sendEvent(ev)
+
+ // update lastSeen if appropriate
+ go func(e *APIEvents) {
+ eventState.Lock()
+ defer eventState.Unlock()
+ if eventState.lastSeen < e.Time {
+ eventState.lastSeen = e.Time
+ }
+ }(ev)
+
+ case err = <-eventState.errC:
+ if err == ErrNoListeners {
+ // if there are no listeners, exit normally
+ eventState.terminate(nil)
+ return
+ } else if err != nil {
+ // otherwise, trigger a restart via the error channel
+ defer func() { go eventState.monitorEvents(c) }()
+ return
+ }
+ case <-timeout:
+ continue
+ }
+ }
+}
+
+func (eventState *EventMonitoringState) sendEvent(event *APIEvents) {
+
+ eventState.RLock()
+ defer eventState.RUnlock()
+ if len(eventState.listeners) == 0 {
+ eventState.errC <- ErrNoListeners
+ }
+ for _, listener := range eventState.listeners {
+ listener <- event
+ }
+}
+
+func (eventState *EventMonitoringState) terminate(err error) {
+ eventState.disableEventMonitoring()
+}
+
+func (c *Client) eventHijack(startTime uint32, eventChan chan *APIEvents, errChan chan error) error {
+
+ uri := "/events"
+
+ if startTime != 0 {
+ uri += fmt.Sprintf("?since=%d", startTime)
+ }
+
+ req, err := http.NewRequest("GET", c.getURL(uri), nil)
+ if err != nil {
+ return err
+ }
+
+ req.Header.Set("Content-Type", "plain/text")
+ protocol := c.endpointURL.Scheme
+ address := c.endpointURL.Path
+ if protocol != "unix" {
+ protocol = "tcp"
+ address = c.endpointURL.Host
+ }
+
+ dial, err := net.Dial(protocol, address)
+ if err != nil {
+ return err
+ }
+
+ clientconn := httputil.NewClientConn(dial, nil)
+ clientconn.Do(req)
+
+ conn, rwc := clientconn.Hijack()
+ if err != nil {
+ return err
+ }
+
+ go func(rwc io.Reader) {
+
+ defer clientconn.Close()
+ defer conn.Close()
+
+ scanner := bufio.NewScanner(rwc)
+ for scanner.Scan() {
+ line := scanner.Text()
+
+ // Only pay attention to lines that start as json objects
+ if strings.HasPrefix(line, "{") {
+ var e APIEvents
+ err = json.Unmarshal([]byte(line), &e)
+ if err != nil {
+ errChan <- err
+ }
+ eventChan <- &e
+ }
+
+ }
+ if err := scanner.Err(); err != nil {
+ errChan <- err
+ }
+ }(rwc)
+
+ return nil
+}
View
92 event_test.go
@@ -0,0 +1,92 @@
+// Copyright 2013 go-dockerclient authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package docker
+
+import (
+ "bufio"
+ "fmt"
+ "net/http"
+ "net/http/httptest"
+ "strings"
+ "testing"
+ "time"
+)
+
+func TestEventListeners(t *testing.T) {
+ response := `{"status":"create","id":"dfdf82bd3881","from":"base:latest","time":1374067924}
+{"status":"start","id":"dfdf82bd3881","from":"base:latest","time":1374067924}
+{"status":"stop","id":"dfdf82bd3881","from":"base:latest","time":1374067966}
+{"status":"destroy","id":"dfdf82bd3881","from":"base:latest","time":1374067970}
+`
+
+ var req http.Request
+ server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ rsc := bufio.NewScanner(strings.NewReader(response))
+ for rsc.Scan() {
+ w.Write([]byte(rsc.Text()))
+ w.(http.Flusher).Flush()
+ time.Sleep(10 * time.Millisecond)
+ }
+ req = *r
+ }))
+ defer server.Close()
+
+ client, err := NewClient(server.URL)
+ if err != nil {
+ t.Errorf("Failed to create client: %s", err)
+ }
+
+ listener := make(chan *APIEvents, 10)
+ defer client.RemoveEventListener(listener)
+
+ err = client.AddEventListener(listener)
+ if err != nil {
+ t.Errorf("Failed to add event listener: %s", err)
+ }
+
+ timeout := time.After(1 * time.Second)
+ var count int
+
+ for {
+ select {
+ case msg := <-listener:
+ t.Logf("Recieved: %s", *msg)
+ count++
+ err = checkEvent(count, msg)
+ if err != nil {
+ t.Fatalf("Check event failed: %s", err)
+ }
+ if count == 4 {
+ return
+ }
+ case <-timeout:
+ t.Fatal("TestAddEventListener timed out waiting on events")
+ }
+ }
+}
+
+func checkEvent(index int, event *APIEvents) error {
+ if event.ID != "dfdf82bd3881" {
+ return fmt.Errorf("event ID did not match. Expected dfdf82bd3881 got %s", event.ID)
+ }
+ if event.From != "base:latest" {
+ return fmt.Errorf("event from did not match. Expected base:latest got %s", event.From)
+ }
+ var status string
+ switch index {
+ case 1:
+ status = "create"
+ case 2:
+ status = "start"
+ case 3:
+ status = "stop"
+ case 4:
+ status = "destroy"
+ }
+ if event.Status != status {
+ return fmt.Errorf("event status did not match. Expected %s got %s", status, event.Status)
+ }
+ return nil
+}
View
38 example_test.go
@@ -7,10 +7,11 @@ package docker_test
import (
"archive/tar"
"bytes"
- "github.com/fsouza/go-dockerclient"
"io"
"log"
"time"
+
+ "github.com/fsouza/go-dockerclient"
)
func ExampleClient_AttachToContainer() {
@@ -80,6 +81,7 @@ func ExampleClient_BuildImage() {
if err != nil {
log.Fatal(err)
}
+
t := time.Now()
inputbuf, outputbuf := bytes.NewBuffer(nil), bytes.NewBuffer(nil)
tr := tar.NewWriter(inputbuf)
@@ -97,3 +99,37 @@ func ExampleClient_BuildImage() {
log.Println("build image success, imageid:", imageid)
}
}
+
+func ExampleClient_ListenEvents() {
+ client, err := docker.NewClient("http://localhost:4243")
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ listener := make(chan *docker.APIEvents)
+ err = client.AddEventListener(listener)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ defer func() {
+
+ err = client.RemoveEventListener(listener)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ }()
+
+ timeout := time.After(1 * time.Second)
+
+ for {
+ select {
+ case msg := <-listener:
+ log.Println(msg)
+ case <-timeout:
+ break
+ }
+ }
+
+}
View
51 testing/server.go
@@ -12,9 +12,6 @@ import (
"encoding/json"
"errors"
"fmt"
- "github.com/fsouza/go-dockerclient"
- "github.com/fsouza/go-dockerclient/utils"
- "github.com/gorilla/mux"
mathrand "math/rand"
"net"
"net/http"
@@ -22,6 +19,10 @@ import (
"strings"
"sync"
"time"
+
+ "github.com/fsouza/go-dockerclient"
+ "github.com/fsouza/go-dockerclient/utils"
+ "github.com/gorilla/mux"
)
// DockerServer represents a programmable, concurrent (not much), HTTP server
@@ -73,6 +74,7 @@ func (s *DockerServer) buildMuxer() {
s.mux.Path("/images/json").Methods("GET").HandlerFunc(s.listImages)
s.mux.Path("/images/{id:.*}").Methods("DELETE").HandlerFunc(s.removeImage)
s.mux.Path("/images/{name:.*}/push").Methods("POST").HandlerFunc(s.pushImage)
+ s.mux.Path("/events").Methods("GET").HandlerFunc(s.listEvents)
}
// Stop stops the server.
@@ -442,3 +444,46 @@ func (s *DockerServer) removeImage(w http.ResponseWriter, r *http.Request) {
s.images[index] = s.images[len(s.images)-1]
s.images = s.images[:len(s.images)-1]
}
+
+func (s *DockerServer) listEvents(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Content-Type", "application/json")
+
+ var events [][]byte
+ count := mathrand.Intn(20)
+ for i := 0; i < count; i++ {
+ data, err := json.Marshal(s.generateEvent())
+ if err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ return
+ }
+ events = append(events, data)
+ }
+
+ w.WriteHeader(http.StatusOK)
+
+ for _, d := range events {
+ fmt.Fprintln(w, d)
+ time.Sleep(time.Duration(mathrand.Intn(200)) * time.Millisecond)
+ }
+}
+
+func (s *DockerServer) generateEvent() *docker.APIEvents {
+ var eventType string
+
+ switch mathrand.Intn(4) {
+ case 0:
+ eventType = "create"
+ case 1:
+ eventType = "start"
+ case 2:
+ eventType = "stop"
+ case 3:
+ eventType = "destroy"
+ }
+
+ return &docker.APIEvents{
+ ID: s.generateID(),
+ Status: eventType,
+ From: "mybase:latest",
+ Time: time.Now().Unix()}
+}
Please sign in to comment.
Something went wrong with that request. Please try again.