Skip to content

Commit

Permalink
Separate events subsystem
Browse files Browse the repository at this point in the history
* Events subsystem merged from `server/events.go` and
  `utils/jsonmessagepublisher.go` and moved to `events/events.go`
* Only public interface for this subsystem is engine jobs
* There is two new engine jobs - `log_event` and `subscribers_count`
* There is auxiliary function `container.LogEvent` for logging events for
  containers

Docker-DCO-1.1-Signed-off-by: Alexandr Morozov <lk4d4math@gmail.com> (github: LK4D4)
[solomon@docker.com: resolve merge conflicts]
Signed-off-by: Solomon Hykes <solomon@docker.com>
  • Loading branch information
LK4D4 authored and Solomon Hykes committed Aug 6, 2014
1 parent af07819 commit 8d05642
Show file tree
Hide file tree
Showing 20 changed files with 374 additions and 339 deletions.
4 changes: 4 additions & 0 deletions builtins/builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/docker/docker/daemon/networkdriver/bridge"
"github.com/docker/docker/dockerversion"
"github.com/docker/docker/engine"
"github.com/docker/docker/events"
"github.com/docker/docker/pkg/parsers/kernel"
"github.com/docker/docker/registry"
"github.com/docker/docker/server"
Expand All @@ -20,6 +21,9 @@ func Register(eng *engine.Engine) error {
if err := remote(eng); err != nil {
return err
}
if err := events.New().Install(eng); err != nil {
return err
}
if err := eng.Register("version", dockerVersion); err != nil {
return err
}
Expand Down
9 changes: 8 additions & 1 deletion daemon/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,13 @@ func (container *Container) WriteHostConfig() error {
return ioutil.WriteFile(pth, data, 0666)
}

func (container *Container) LogEvent(action string) {
d := container.daemon
if err := d.eng.Job("log_event", action, container.ID, d.Repositories().ImageName(container.Image)).Run(); err != nil {
utils.Errorf("Error running container: %s", err)
}
}

func (container *Container) getResourcePath(path string) (string, error) {
cleanPath := filepath.Join("/", path)
return symlink.FollowSymlinkInScope(filepath.Join(container.basefs, cleanPath), container.basefs)
Expand Down Expand Up @@ -508,7 +515,7 @@ func (container *Container) monitor(callback execdriver.StartCallback) error {
container.stdin, container.stdinPipe = io.Pipe()
}
if container.daemon != nil && container.daemon.srv != nil {
container.daemon.srv.LogEvent("die", container.ID, container.daemon.repositories.ImageName(container.Image))
container.LogEvent("die")
}
if container.daemon != nil && container.daemon.srv != nil && container.daemon.srv.IsRunning() {
// FIXME: here is race condition between two RUN instructions in Dockerfile
Expand Down
2 changes: 1 addition & 1 deletion daemon/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (daemon *Daemon) ContainerCreate(job *engine.Job) engine.Status {
if !container.Config.NetworkDisabled && daemon.SystemConfig().IPv4ForwardingDisabled {
job.Errorf("IPv4 forwarding is disabled.\n")
}
job.Eng.Job("log", "create", container.ID, daemon.Repositories().ImageName(container.Image)).Run()
container.LogEvent("create")
// FIXME: this is necessary because daemon.Create might return a nil container
// with a non-nil error. This should not happen! Once it's fixed we
// can remove this workaround.
Expand Down
2 changes: 1 addition & 1 deletion daemon/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (daemon *Daemon) ContainerDestroy(job *engine.Job) engine.Status {
if err := daemon.Destroy(container); err != nil {
return job.Errorf("Cannot destroy container %s: %s", name, err)
}
job.Eng.Job("log", "destroy", container.ID, daemon.Repositories().ImageName(container.Image)).Run()
container.LogEvent("destroy")

if removeVolume {
var (
Expand Down
2 changes: 1 addition & 1 deletion daemon/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (daemon *Daemon) ContainerExport(job *engine.Job) engine.Status {
return job.Errorf("%s: %s", name, err)
}
// FIXME: factor job-specific LogEvent to engine.Job.Run()
job.Eng.Job("log", "export", container.ID, daemon.Repositories().ImageName(container.Image)).Run()
container.LogEvent("export")
return engine.StatusOK
}
return job.Errorf("No such container: %s", name)
Expand Down
4 changes: 2 additions & 2 deletions daemon/image_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (daemon *Daemon) DeleteImage(eng *engine.Engine, name string, imgs *engine.
out := &engine.Env{}
out.Set("Untagged", repoName+":"+tag)
imgs.Add(out)
eng.Job("log", "untag", img.ID, "").Run()
eng.Job("log_event", "untag", img.ID, "").Run()
}
}
tags = daemon.Repositories().ByID()[img.ID]
Expand All @@ -111,7 +111,7 @@ func (daemon *Daemon) DeleteImage(eng *engine.Engine, name string, imgs *engine.
out := &engine.Env{}
out.Set("Deleted", img.ID)
imgs.Add(out)
eng.Job("log", "delete", img.ID, "").Run()
eng.Job("log_event", "delete", img.ID, "").Run()
if img.Parent != "" && !noprune {
err := daemon.DeleteImage(eng, img.Parent, imgs, false, force, noprune)
if first {
Expand Down
2 changes: 1 addition & 1 deletion daemon/kill.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (daemon *Daemon) ContainerKill(job *engine.Job) engine.Status {
if err := container.Kill(); err != nil {
return job.Errorf("Cannot kill container %s: %s", name, err)
}
job.Eng.Job("log", "kill", container.ID, daemon.Repositories().ImageName(container.Image)).Run()
container.LogEvent("kill")
} else {
// Otherwise, just send the requested signal
if err := container.KillSig(int(sig)); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions daemon/pause.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func (daemon *Daemon) ContainerPause(job *engine.Job) engine.Status {
if err := container.Pause(); err != nil {
return job.Errorf("Cannot pause container %s: %s", name, err)
}
job.Eng.Job("log", "pause", container.ID, daemon.Repositories().ImageName(container.Image)).Run()
container.LogEvent("pause")
return engine.StatusOK
}

Expand All @@ -32,6 +32,6 @@ func (daemon *Daemon) ContainerUnpause(job *engine.Job) engine.Status {
if err := container.Unpause(); err != nil {
return job.Errorf("Cannot unpause container %s: %s", name, err)
}
job.Eng.Job("log", "unpause", container.ID, daemon.Repositories().ImageName(container.Image)).Run()
container.LogEvent("unpause")
return engine.StatusOK
}
2 changes: 1 addition & 1 deletion daemon/restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func (daemon *Daemon) ContainerRestart(job *engine.Job) engine.Status {
if err := container.Restart(int(t)); err != nil {
return job.Errorf("Cannot restart container %s: %s\n", name, err)
}
job.Eng.Job("log", "restart", container.ID, daemon.Repositories().ImageName(container.Image)).Run()
container.LogEvent("restart")
} else {
return job.Errorf("No such container: %s\n", name)
}
Expand Down
5 changes: 0 additions & 5 deletions daemon/server.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
package daemon

import (
"github.com/docker/docker/utils"
)

type Server interface {
LogEvent(action, id, from string) *utils.JSONMessage
IsRunning() bool // returns true if the server is currently in operation
}
3 changes: 1 addition & 2 deletions daemon/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ func (daemon *Daemon) ContainerStart(job *engine.Job) engine.Status {
if err := container.Start(); err != nil {
return job.Errorf("Cannot start container %s: %s", name, err)
}
job.Eng.Job("log", "start", container.ID, daemon.Repositories().ImageName(container.Image)).Run()

container.LogEvent("start")
return engine.StatusOK
}

Expand Down
2 changes: 1 addition & 1 deletion daemon/stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (daemon *Daemon) ContainerStop(job *engine.Job) engine.Status {
if err := container.Stop(int(t)); err != nil {
return job.Errorf("Cannot stop container %s: %s\n", name, err)
}
job.Eng.Job("log", "stop", container.ID, daemon.Repositories().ImageName(container.Image)).Run()
container.LogEvent("stop")
} else {
return job.Errorf("No such container: %s\n", name)
}
Expand Down
176 changes: 176 additions & 0 deletions events/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package events

import (
"encoding/json"
"sync"
"time"

"github.com/docker/docker/engine"
"github.com/docker/docker/utils"
)

const eventsLimit = 64

type listener chan<- *utils.JSONMessage

type Events struct {
mu sync.RWMutex
events []*utils.JSONMessage
subscribers []listener
}

func New() *Events {
return &Events{
events: make([]*utils.JSONMessage, 0, eventsLimit),
}
}

// Install installs events public api in docker engine
func (e *Events) Install(eng *engine.Engine) error {
// Here you should describe public interface
jobs := map[string]engine.Handler{
"events": e.Get,
"log_event": e.Log,
"subscribers_count": e.SubscribersCount,
}
for name, job := range jobs {
if err := eng.Register(name, job); err != nil {
return err
}
}
return nil
}

func (e *Events) Get(job *engine.Job) engine.Status {
var (
since = job.GetenvInt64("since")
until = job.GetenvInt64("until")
timeout = time.NewTimer(time.Unix(until, 0).Sub(time.Now()))
)

// If no until, disable timeout
if until == 0 {
timeout.Stop()
}

listener := make(chan *utils.JSONMessage)
e.subscribe(listener)
defer e.unsubscribe(listener)

job.Stdout.Write(nil)

// Resend every event in the [since, until] time interval.
if since != 0 {
if err := e.writeCurrent(job, since, until); err != nil {
return job.Error(err)
}
}

for {
select {
case event, ok := <-listener:
if !ok {
return engine.StatusOK
}
if err := writeEvent(job, event); err != nil {
return job.Error(err)
}
case <-timeout.C:
return engine.StatusOK
}
}
}

func (e *Events) Log(job *engine.Job) engine.Status {
if len(job.Args) != 3 {
return job.Errorf("usage: %s ACTION ID FROM", job.Name)
}
// not waiting for receivers
go e.log(job.Args[0], job.Args[1], job.Args[2])
return engine.StatusOK
}

func (e *Events) SubscribersCount(job *engine.Job) engine.Status {
ret := &engine.Env{}
ret.SetInt("count", e.subscribersCount())
ret.WriteTo(job.Stdout)
return engine.StatusOK
}

func writeEvent(job *engine.Job, event *utils.JSONMessage) error {
// When sending an event JSON serialization errors are ignored, but all
// other errors lead to the eviction of the listener.
if b, err := json.Marshal(event); err == nil {
if _, err = job.Stdout.Write(b); err != nil {
return err
}
}
return nil
}

func (e *Events) writeCurrent(job *engine.Job, since, until int64) error {
e.mu.RLock()
for _, event := range e.events {
if event.Time >= since && (event.Time <= until || until == 0) {
if err := writeEvent(job, event); err != nil {
e.mu.RUnlock()
return err
}
}
}
e.mu.RUnlock()
return nil
}

func (e *Events) subscribersCount() int {
e.mu.RLock()
c := len(e.subscribers)
e.mu.RUnlock()
return c
}

func (e *Events) log(action, id, from string) {
e.mu.Lock()
now := time.Now().UTC().Unix()
jm := &utils.JSONMessage{Status: action, ID: id, From: from, Time: now}
if len(e.events) == cap(e.events) {
// discard oldest event
copy(e.events, e.events[1:])
e.events[len(e.events)-1] = jm
} else {
e.events = append(e.events, jm)
}
for _, s := range e.subscribers {
// We give each subscriber a 100ms time window to receive the event,
// after which we move to the next.
select {
case s <- jm:
case <-time.After(100 * time.Millisecond):
}
}
e.mu.Unlock()
}

func (e *Events) subscribe(l listener) {
e.mu.Lock()
e.subscribers = append(e.subscribers, l)
e.mu.Unlock()
}

// unsubscribe closes and removes the specified listener from the list of
// previously registed ones.
// It returns a boolean value indicating if the listener was successfully
// found, closed and unregistered.
func (e *Events) unsubscribe(l listener) bool {
e.mu.Lock()
for i, subscriber := range e.subscribers {
if subscriber == l {
close(l)
e.subscribers = append(e.subscribers[:i], e.subscribers[i+1:]...)
e.mu.Unlock()
return true
}
}
e.mu.Unlock()
return false
}

0 comments on commit 8d05642

Please sign in to comment.