-
Notifications
You must be signed in to change notification settings - Fork 288
/
podmonitor.go
172 lines (143 loc) · 4.28 KB
/
podmonitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package k8srollout
import (
"context"
"fmt"
"strings"
"time"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
"github.com/windmilleng/tilt/internal/k8s"
"github.com/windmilleng/tilt/internal/store"
"github.com/windmilleng/tilt/pkg/logger"
"github.com/windmilleng/tilt/pkg/model"
"github.com/windmilleng/tilt/pkg/model/logstore"
)
type PodMonitor struct {
pods map[k8s.PodID]podStatus
trackingStarted map[k8s.PodID]bool
}
func NewPodMonitor() *PodMonitor {
return &PodMonitor{
pods: make(map[k8s.PodID]podStatus),
trackingStarted: make(map[k8s.PodID]bool),
}
}
func (m *PodMonitor) diff(st store.RStore) []podStatus {
state := st.RLockState()
defer st.RUnlockState()
updates := make([]podStatus, 0)
active := make(map[k8s.PodID]bool)
for _, mt := range state.Targets() {
ms := mt.State
manifest := mt.Manifest
pod := ms.MostRecentPod()
podID := pod.PodID
if podID == "" {
continue
}
active[podID] = true
currentStatus := newPodStatus(pod, manifest.Name)
if !podStatusesEqual(currentStatus, m.pods[podID]) {
updates = append(updates, currentStatus)
m.pods[podID] = currentStatus
}
}
for key := range m.pods {
if !active[key] {
delete(m.pods, key)
}
}
return updates
}
func (m *PodMonitor) OnChange(ctx context.Context, st store.RStore) {
updates := m.diff(st)
for _, update := range updates {
ctx := logger.CtxWithLogHandler(ctx, podStatusWriter{
store: st,
manifestName: update.manifestName,
podID: update.podID,
})
m.print(ctx, update)
}
}
func (m *PodMonitor) print(ctx context.Context, update podStatus) {
if !m.trackingStarted[update.podID] {
logger.Get(ctx).Infof("\nTracking new pod rollout (%s):", update.podID)
m.trackingStarted[update.podID] = true
}
m.printCondition(ctx, "Scheduled", update.scheduled, update.startTime)
m.printCondition(ctx, "Initialized", update.initialized, update.scheduled.LastTransitionTime.Time)
m.printCondition(ctx, "Ready", update.ready, update.initialized.LastTransitionTime.Time)
}
func (m *PodMonitor) printCondition(ctx context.Context, name string, cond v1.PodCondition, startTime time.Time) {
l := logger.Get(ctx).WithFields(logger.Fields{logger.FieldNameProgressID: name})
indent := " "
duration := ""
spacerMax := 16
spacer := ""
if len(name) > spacerMax {
name = name[:spacerMax-1] + "…"
} else {
spacer = strings.Repeat(" ", spacerMax-len(name))
}
dur := cond.LastTransitionTime.Sub(startTime)
if !startTime.IsZero() && !cond.LastTransitionTime.IsZero() {
if dur == 0 {
duration = "<1s"
} else {
duration = fmt.Sprint(dur.Truncate(time.Millisecond))
}
}
if cond.Status == v1.ConditionTrue {
l.Infof("%s┊ %s%s- %s", indent, name, spacer, duration)
return
}
message := cond.Message
reason := cond.Reason
if cond.Status == "" || reason == "" || message == "" {
l.Infof("%s┊ %s%s- (…) Pending", indent, name, spacer)
return
}
prefix := "Not "
spacer = strings.Repeat(" ", spacerMax-len(name)-len(prefix))
l.Infof("%s┃ %s%s%s- (%s): %s", indent, prefix, name, spacer, reason, message)
}
type podStatus struct {
podID k8s.PodID
manifestName model.ManifestName
startTime time.Time
scheduled v1.PodCondition
initialized v1.PodCondition
ready v1.PodCondition
}
func newPodStatus(pod store.Pod, manifestName model.ManifestName) podStatus {
s := podStatus{podID: pod.PodID, manifestName: manifestName, startTime: pod.StartedAt}
for _, condition := range pod.Conditions {
switch condition.Type {
case v1.PodScheduled:
s.scheduled = condition
case v1.PodInitialized:
s.initialized = condition
case v1.PodReady:
s.ready = condition
}
}
return s
}
var podStatusAllowUnexported = cmp.AllowUnexported(podStatus{})
func podStatusesEqual(a, b podStatus) bool {
return cmp.Equal(a, b, podStatusAllowUnexported)
}
type podStatusWriter struct {
store store.RStore
podID k8s.PodID
manifestName model.ManifestName
}
func (w podStatusWriter) Write(level logger.Level, fields logger.Fields, p []byte) error {
w.store.Dispatch(store.NewLogAction(w.manifestName, SpanIDForPod(w.podID), level, fields, p))
return nil
}
func SpanIDForPod(podID k8s.PodID) logstore.SpanID {
return logstore.SpanID(fmt.Sprintf("monitor:%s", podID))
}
var _ store.Subscriber = &PodMonitor{}