/
docker_compose_log_manager.go
186 lines (158 loc) · 5.16 KB
/
docker_compose_log_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
package engine
import (
"bytes"
"context"
"io"
"time"
"github.com/windmilleng/tilt/internal/dockercompose"
"github.com/windmilleng/tilt/internal/logger"
"github.com/windmilleng/tilt/internal/model"
"github.com/windmilleng/tilt/internal/store"
)
// Collects logs from running docker-compose services.
type DockerComposeLogManager struct {
watches map[model.ManifestName]dockerComposeLogWatch
dcc dockercompose.DockerComposeClient
}
func NewDockerComposeLogManager(dcc dockercompose.DockerComposeClient) *DockerComposeLogManager {
return &DockerComposeLogManager{
watches: make(map[model.ManifestName]dockerComposeLogWatch),
dcc: dcc,
}
}
// Diff the current watches against set of current docker-compose services, i.e.
// what we SHOULD be watching, returning the changes we need to make.
func (m *DockerComposeLogManager) diff(ctx context.Context, st store.RStore) (setup []dockerComposeLogWatch, teardown []dockerComposeLogWatch) {
state := st.RLockState()
defer st.RUnlockState()
// If we're not watching the FS for changes, then don't bother watching logs.
if !state.WatchFiles {
return nil, nil
}
for _, mt := range state.ManifestTargets {
manifest := mt.Manifest
if !manifest.IsDC() {
continue
}
// If the build hasn't started yet, don't start watching.
//
// TODO(nick): This points to a larger synchronization bug between DC
// LogManager and BuildController. Starting a build will delete all the logs
// that have been recorded so far. This creates race conditions: if the logs
// come in before the StartBuild event is recorded, those logs will get
// deleted. This affects tests and fast builds more than normal builds.
// But we should have a better way to associate logs with a particular build.
ms := mt.State
if ms.CurrentBuild.StartTime.IsZero() && ms.LastBuild().StartTime.IsZero() {
continue
}
existing, isActive := m.watches[manifest.Name]
startWatchTime := time.Unix(0, 0)
if isActive {
select {
case termTime := <-existing.terminationTime:
// If we're receiving on this channel, it's because the previous watcher ended or
// died somehow; we need to create a new one that picks up where it left off.
startWatchTime = termTime
default:
// Watcher is still active, no action needed.
continue
}
}
ctx, cancel := context.WithCancel(ctx)
w := dockerComposeLogWatch{
ctx: ctx,
cancel: cancel,
name: manifest.Name,
dc: manifest.DockerComposeTarget(),
startWatchTime: startWatchTime,
terminationTime: make(chan time.Time, 1),
}
m.watches[manifest.Name] = w
setup = append(setup, w)
}
for key, value := range m.watches {
_, inState := state.ManifestTargets[key]
if !inState {
delete(m.watches, key)
teardown = append(teardown, value)
}
}
return setup, teardown
}
func (m *DockerComposeLogManager) OnChange(ctx context.Context, st store.RStore) {
setup, teardown := m.diff(ctx, st)
for _, watch := range teardown {
watch.cancel()
}
for _, watch := range setup {
go m.consumeLogs(watch, st)
}
}
func (m *DockerComposeLogManager) consumeLogs(watch dockerComposeLogWatch, st store.RStore) {
defer func() {
watch.terminationTime <- time.Now()
}()
name := watch.name
readCloser, err := m.dcc.StreamLogs(watch.ctx, watch.dc.ConfigPath, watch.dc.Name)
if err != nil {
logger.Get(watch.ctx).Debugf("Error streaming %s logs: %v", name, err)
return
}
defer func() {
_ = readCloser.Close()
}()
// TODO(maia): docker-compose already prefixes logs, but maybe we want to roll
// our own (as in PodWatchManager) cuz it's prettier?
globalLogWriter := DockerComposeGlobalLogWriter{
writer: logger.Get(watch.ctx).Writer(logger.InfoLvl),
}
actionWriter := DockerComposeLogActionWriter{
store: st,
manifestName: name,
}
multiWriter := io.MultiWriter(globalLogWriter, actionWriter)
_, err = io.Copy(multiWriter, NewHardCancelReader(watch.ctx, readCloser))
if err != nil && watch.ctx.Err() == nil {
logger.Get(watch.ctx).Debugf("Error streaming %s logs: %v", name, err)
return
}
}
type dockerComposeLogWatch struct {
ctx context.Context
cancel func()
name model.ManifestName
dc model.DockerComposeTarget
startWatchTime time.Time
terminationTime chan time.Time
// TODO(maia): do we need to track these? (maybe if we implement with `docker logs <cID>`...)
// cID container.ID
// cName container.Name
}
type DockerComposeLogActionWriter struct {
store store.RStore
manifestName model.ManifestName
}
func (w DockerComposeLogActionWriter) Write(p []byte) (n int, err error) {
if shouldFilterDCLog(p) {
return len(p), nil
}
w.store.Dispatch(DockerComposeLogAction{
ManifestName: w.manifestName,
logEvent: newLogEvent(append([]byte{}, p...)),
})
return len(p), nil
}
var _ store.Subscriber = &DockerComposeLogManager{}
func shouldFilterDCLog(p []byte) bool {
return bytes.HasPrefix(p, []byte("Attaching to "))
}
type DockerComposeGlobalLogWriter struct {
writer io.Writer
}
func (w DockerComposeGlobalLogWriter) Write(p []byte) (n int, err error) {
if shouldFilterDCLog(p) {
return len(p), nil
}
return w.writer.Write(p)
}