/
containerlist.go
142 lines (122 loc) · 3.88 KB
/
containerlist.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package docker
import (
"context"
"sync"
"time"
dtypes "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
docker "github.com/docker/docker/client"
"github.com/signalfx/signalfx-agent/internal/utils"
"github.com/signalfx/signalfx-agent/internal/utils/filter"
log "github.com/sirupsen/logrus"
)
// ContainerChangeHandler is what gets called when a Docker container is
// initially recognized or changed in some way. old will be the previous state,
// or nil if no previous state is known. new is the new state, or nil if the
// container is being destroyed.
type ContainerChangeHandler func(old *dtypes.ContainerJSON, new *dtypes.ContainerJSON)
// ListAndWatchContainers accepts a changeHandler that gets called as containers come and go.
func ListAndWatchContainers(ctx context.Context, client *docker.Client, changeHandler ContainerChangeHandler, imageFilter filter.StringFilter, logger log.FieldLogger) error {
lock := sync.Mutex{}
containers := make(map[string]*dtypes.ContainerJSON)
// Make sure you hold the lock before calling this
updateContainer := func(id string) bool {
inspectCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
c, err := client.ContainerInspect(inspectCtx, id)
defer cancel()
if err != nil {
logger.WithError(err).Errorf("Could not inspect updated container %s", id)
} else if imageFilter == nil || !imageFilter.Matches(c.Config.Image) {
logger.Debugf("Updated Docker container %s", id)
containers[id] = &c
return true
}
return false
}
watchStarted := make(chan struct{})
go func() {
// This pattern is taken from
// https://github.com/docker/cli/blob/master/cli/command/container/stats.go
f := filters.NewArgs()
f.Add("type", "container")
f.Add("event", "destroy")
f.Add("event", "die")
f.Add("event", "pause")
f.Add("event", "stop")
f.Add("event", "start")
f.Add("event", "unpause")
f.Add("event", "update")
lastTime := time.Now()
START_STREAM:
for {
since := lastTime.Format(time.RFC3339Nano)
options := dtypes.EventsOptions{
Filters: f,
Since: since,
}
logger.Infof("Watching for Docker events since %s", since)
eventCh, errCh := client.Events(ctx, options)
if !utils.IsSignalChanClosed(watchStarted) {
close(watchStarted)
}
for {
select {
case event := <-eventCh:
lock.Lock()
switch event.Action {
// This assumes that all deleted containers get a "destroy"
// event associated with them, otherwise memory usage could
// be unbounded.
case "destroy":
logger.Debugf("Docker container was destroyed: %s", event.ID)
if _, ok := containers[event.ID]; ok {
delete(containers, event.ID)
changeHandler(containers[event.ID], nil)
}
default:
oldContainer := containers[event.ID]
if updateContainer(event.ID) {
changeHandler(oldContainer, containers[event.ID])
}
}
lock.Unlock()
lastTime = time.Unix(0, event.TimeNano)
case err := <-errCh:
logger.WithError(err).Error("Error watching docker container events")
time.Sleep(3 * time.Second)
continue START_STREAM
case <-ctx.Done():
// Event stream is tied to the same context and will quit
// also.
return
}
}
}
}()
<-watchStarted
f := filters.NewArgs()
f.Add("status", "running")
options := dtypes.ContainerListOptions{
Filters: f,
}
containerList, err := client.ContainerList(ctx, options)
if err != nil {
return err
}
wg := sync.WaitGroup{}
for i := range containerList {
wg.Add(1)
// The Docker API has a different return type for list vs. inspect, and
// no way to get the return type of list for individual containers,
// which makes this harder than it should be.
go func(id string) {
lock.Lock()
updateContainer(id)
changeHandler(nil, containers[id])
lock.Unlock()
wg.Done()
}(containerList[i].ID)
}
wg.Wait()
return nil
}