-
Notifications
You must be signed in to change notification settings - Fork 4.7k
/
rest.go
371 lines (329 loc) · 15.2 KB
/
rest.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
package buildlog
import (
"fmt"
"io"
"time"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
genericrest "k8s.io/apiserver/pkg/registry/generic/rest"
"k8s.io/apiserver/pkg/registry/rest"
kapi "k8s.io/kubernetes/pkg/api"
kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/registry/core/pod"
buildapi "github.com/openshift/origin/pkg/build/apis/build"
"github.com/openshift/origin/pkg/build/apis/build/validation"
buildtypedclient "github.com/openshift/origin/pkg/build/generated/internalclientset/typed/build/internalversion"
"github.com/openshift/origin/pkg/build/registry"
buildutil "github.com/openshift/origin/pkg/build/util"
)
// REST is an implementation of RESTStorage for the api server.
type REST struct {
BuildClient buildtypedclient.BuildsGetter
PodGetter pod.ResourceGetter
ConnectionInfo kubeletclient.ConnectionInfoGetter
Timeout time.Duration
}
// TODO these wrapers shouldb e removed
type podGetter struct {
kcoreclient.PodsGetter
}
func (g *podGetter) Get(ctx apirequest.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
ns, ok := apirequest.NamespaceFrom(ctx)
if !ok {
return nil, errors.NewBadRequest("namespace parameter required.")
}
return g.Pods(ns).Get(name, *options)
}
const defaultTimeout time.Duration = 10 * time.Second
// NewREST creates a new REST for BuildLog
// Takes build registry and pod client to get necessary attributes to assemble
// URL to which the request shall be redirected in order to get build logs.
func NewREST(buildClient buildtypedclient.BuildsGetter, pn kcoreclient.PodsGetter, connectionInfo kubeletclient.ConnectionInfoGetter) *REST {
return &REST{
BuildClient: buildClient,
PodGetter: &podGetter{pn},
ConnectionInfo: connectionInfo,
Timeout: defaultTimeout,
}
}
var _ = rest.GetterWithOptions(&REST{})
// Get returns a streamer resource with the contents of the build log
func (r *REST) Get(ctx apirequest.Context, name string, opts runtime.Object) (runtime.Object, error) {
buildLogOpts, ok := opts.(*buildapi.BuildLogOptions)
if !ok {
return nil, errors.NewBadRequest("did not get an expected options.")
}
if errs := validation.ValidateBuildLogOptions(buildLogOpts); len(errs) > 0 {
return nil, errors.NewInvalid(buildapi.Kind("BuildLogOptions"), "", errs)
}
build, err := r.BuildClient.Builds(apirequest.NamespaceValue(ctx)).Get(name, metav1.GetOptions{})
if err != nil {
return nil, err
}
if buildLogOpts.Previous {
version := buildutil.VersionForBuild(build)
// Use the previous version
version--
previousBuildName := buildutil.BuildNameForConfigVersion(buildutil.ConfigNameForBuild(build), version)
previous, err := r.BuildClient.Builds(apirequest.NamespaceValue(ctx)).Get(previousBuildName, metav1.GetOptions{})
if err != nil {
return nil, err
}
build = previous
}
switch build.Status.Phase {
// Build has not launched, wait until it runs
case buildapi.BuildPhaseNew, buildapi.BuildPhasePending:
if buildLogOpts.NoWait {
glog.V(4).Infof("Build %s/%s is in %s state. No logs to retrieve yet.", build.Namespace, build.Name, build.Status.Phase)
// return empty content if not waiting for build
return &genericrest.LocationStreamer{}, nil
}
glog.V(4).Infof("Build %s/%s is in %s state, waiting for Build to start", build.Namespace, build.Name, build.Status.Phase)
latest, ok, err := registry.WaitForRunningBuild(r.BuildClient, build, r.Timeout)
if err != nil {
return nil, errors.NewBadRequest(fmt.Sprintf("unable to wait for build %s to run: %v", build.Name, err))
}
switch latest.Status.Phase {
case buildapi.BuildPhaseError:
return nil, errors.NewBadRequest(fmt.Sprintf("build %s encountered an error: %s", build.Name, buildutil.NoBuildLogsMessage))
case buildapi.BuildPhaseCancelled:
return nil, errors.NewBadRequest(fmt.Sprintf("build %s was cancelled: %s", build.Name, buildutil.NoBuildLogsMessage))
}
if !ok {
return nil, errors.NewTimeoutError(fmt.Sprintf("timed out waiting for build %s to start after %s", build.Name, r.Timeout), 1)
}
// The build was cancelled
case buildapi.BuildPhaseCancelled:
return nil, errors.NewBadRequest(fmt.Sprintf("build %s was cancelled. %s", build.Name, buildutil.NoBuildLogsMessage))
// An error occurred launching the build, return an error
case buildapi.BuildPhaseError:
return nil, errors.NewBadRequest(fmt.Sprintf("build %s is in an error state. %s", build.Name, buildutil.NoBuildLogsMessage))
}
// The container should be the default build container, so setting it to blank
buildPodName := buildapi.GetBuildPodName(build)
// if we can't at least get the build pod, we're not going to get very far, so
// error out now.
obj, err := r.PodGetter.Get(ctx, buildPodName, &metav1.GetOptions{})
if err != nil {
return nil, errors.NewBadRequest(err.Error())
}
// check for old style builds with a single container/no initcontainers
// and handle them w/ the old logging code.
buildPod := obj.(*kapi.Pod)
if len(buildPod.Spec.InitContainers) == 0 {
logOpts := buildapi.BuildToPodLogOptions(buildLogOpts)
location, transport, err := pod.LogLocation(r.PodGetter, r.ConnectionInfo, ctx, buildPodName, logOpts)
if err != nil {
if errors.IsNotFound(err) {
return nil, errors.NewNotFound(kapi.Resource("pod"), buildPodName)
}
return nil, errors.NewBadRequest(err.Error())
}
return &genericrest.LocationStreamer{
Location: location,
Transport: transport,
ContentType: "text/plain",
Flush: buildLogOpts.Follow,
ResponseChecker: genericrest.NewGenericHttpResponseChecker(kapi.Resource("pod"), buildPodName),
}, nil
}
// new style builds w/ init containers from here out.
// we'll funnel all the initcontainer+main container logs into this single stream
reader, writer := io.Pipe()
pipeStreamer := PipeStreamer{
In: writer,
Out: reader,
Flush: buildLogOpts.Follow,
ContentType: "text/plain",
}
// background thread will poll the init containers until they are running/terminated
// and then stream the logs from them into the pipe, one by one, before streaming
// the primary container logs into the pipe. Any errors that occur will result
// in a premature return and aborted log stream.
go func() {
defer pipeStreamer.In.Close()
// containers that we've successfully streamed the logs for and don't need
// to worry about it anymore.
doneWithContainer := map[string]bool{}
// check all the init containers for logs at least once.
waitForInitContainers := true
// once we see an init container that failed, stop iterating the containers
// because no additional init containers will run.
initFailed := false
// sleep in between rounds of iterating the containers unless we successfully
// streamed something in which case it makes sense to immediately proceed to
// checking if another init container is ready (or we're done with all initcontainers)
sleep := true
// If we are following the logs, keep iterating through the init containers
// until they have all run, unless we see one of them fail.
// If we aren't following the logs, we will run through all the init containers exactly once.
for waitForInitContainers {
select {
case <-ctx.Done():
glog.V(4).Infof("timed out while iterating on build init containers for build pod %s/%s", build.Namespace, buildPodName)
return
default:
}
glog.V(4).Infof("iterating through build init containers for build pod %s/%s", build.Namespace, buildPodName)
// assume we are not going to need to iterate again until proven otherwise
waitForInitContainers = false
// Get the latest version of the pod so we can check init container statuses
obj, err = r.PodGetter.Get(ctx, buildPodName, &metav1.GetOptions{})
if err != nil {
s := fmt.Sprintf("error retrieving build pod %s/%s : %v", build.Namespace, buildPodName, err.Error())
// we're sending the error message as the log output so the user at least sees some indication of why
// they didn't get the logs they expected.
pipeStreamer.In.Write([]byte(s))
return
}
buildPod = obj.(*kapi.Pod)
// Walk all the initcontainers in order and dump/stream their logs. The initcontainers
// are defined in order of execution, so that's the order we'll dump their logs in.
for _, status := range buildPod.Status.InitContainerStatuses {
glog.V(4).Infof("processing build pod: %s/%s container: %s in state %#v", build.Namespace, buildPodName, status.Name, status.State)
if status.State.Terminated != nil && status.State.Terminated.ExitCode != 0 {
initFailed = true
// if we see a failed init container, don't continue waiting for additional init containers
// as they will never run, but we do need to dump/stream the logs for this container so
// we won't break out of the loop here, we just won't re-enter the loop.
waitForInitContainers = false
// if we've already dealt with the logs for this container we are done.
// We might have streamed the logs for it last time around, but we wouldn't see that it
// terminated with a failure until now.
if doneWithContainer[status.Name] {
break
}
}
// if we've already dumped the logs for this init container, ignore it.
if doneWithContainer[status.Name] {
continue
}
// ignore containers in a waiting state(they have no logs yet), but
// flag that we need to keep iterating while we wait for it to run.
if status.State.Waiting != nil {
waitForInitContainers = true
continue
}
// get the container logstream for this init container
containerLogOpts := buildapi.BuildToPodLogOptions(buildLogOpts)
containerLogOpts.Container = status.Name
// never "follow" logs for terminated containers, it causes latency in streaming the result
// and there's no point since the log is complete already.
if status.State.Terminated != nil {
containerLogOpts.Follow = false
}
if err := r.pipeLogs(ctx, build.Namespace, buildPodName, containerLogOpts, pipeStreamer.In); err != nil {
glog.Errorf("error: failed to stream logs for build pod: %s/%s container: %s, due to: %v", build.Namespace, buildPodName, status.Name, err)
return
}
// if we successfully streamed anything, don't wait before the next iteration
// of init container checking/streaming.
sleep = false
// we are done with this container, we can ignore on future iterations.
doneWithContainer[status.Name] = true
// no point in processing more init containers once we've seen one that failed,
// no additional init containers will run.
if initFailed {
break
}
} // loop over all the initcontainers
// if we're not in log follow mode, don't keep waiting on container logs once
// we've iterated all the init containers once.
if !buildLogOpts.Follow {
break
}
// don't iterate too quickly waiting for the next init container to run unless
// we did some log streaming during this iteration.
if sleep {
time.Sleep(time.Second)
}
// loop over the pod until we've seen all the initcontainers enter the running state and
} // streamed their logs, or seen a failed initcontainer, or we weren't in follow mode.
// done handling init container logs, get the main container logs, unless
// an init container failed, in which case there will be no main container logs and we're done.
if !initFailed {
// Wait for the main container to be running, this can take a second after the initcontainers
// finish so we have to poll.
err := wait.PollImmediate(time.Second, 10*time.Minute, func() (bool, error) {
obj, err = r.PodGetter.Get(ctx, buildPodName, &metav1.GetOptions{})
if err != nil {
s := fmt.Sprintf("error while getting build logs, could not retrieve build pod %s/%s : %v", build.Namespace, buildPodName, err.Error())
pipeStreamer.In.Write([]byte(s))
return false, err
}
buildPod = obj.(*kapi.Pod)
// we can get logs from a pod in any state other than pending.
if buildPod.Status.Phase != kapi.PodPending {
return true, nil
}
return false, nil
})
if err != nil {
glog.Errorf("error: failed to stream logs for build pod: %s/%s due to: %v", build.Namespace, buildPodName, err)
return
}
containerLogOpts := buildapi.BuildToPodLogOptions(buildLogOpts)
containerLogOpts.Container = ""
// never follow logs for terminated pods, it just causes latency in streaming the result.
if buildPod.Status.Phase == kapi.PodFailed || buildPod.Status.Phase == kapi.PodSucceeded {
containerLogOpts.Follow = false
}
if err := r.pipeLogs(ctx, build.Namespace, buildPodName, containerLogOpts, pipeStreamer.In); err != nil {
glog.Errorf("error: failed to stream logs for build pod: %s/%s due to: %v", build.Namespace, buildPodName, err)
return
}
}
}()
return &pipeStreamer, nil
}
// NewGetOptions returns a new options object for build logs
func (r *REST) NewGetOptions() (runtime.Object, bool, string) {
return &buildapi.BuildLogOptions{}, false, ""
}
// New creates an empty BuildLog resource
func (r *REST) New() runtime.Object {
return &buildapi.BuildLog{}
}
// pipeLogs retrieves the logs for a particular container and streams them into the provided writer.
func (r *REST) pipeLogs(ctx apirequest.Context, namespace, buildPodName string, containerLogOpts *kapi.PodLogOptions, writer io.Writer) error {
glog.V(4).Infof("pulling build pod logs for %s/%s, container %s", namespace, buildPodName, containerLogOpts.Container)
location, transport, err := pod.LogLocation(r.PodGetter, r.ConnectionInfo, ctx, buildPodName, containerLogOpts)
if err != nil {
s := fmt.Sprintf("error retrieving logs for build pod: %s/%s container: %s, %v", namespace, buildPodName, containerLogOpts.Container, err.Error())
_, err2 := writer.Write([]byte(s))
if err2 != nil {
glog.Errorf("error: could not write build log error %q to stream due to: %v", s, err2)
}
return err
}
locationStreamer := genericrest.LocationStreamer{
Location: location,
Transport: transport,
ContentType: "text/plain",
Flush: true,
ResponseChecker: genericrest.NewGenericHttpResponseChecker(kapi.Resource("pod"), buildPodName),
}
out, _, _, err := locationStreamer.InputStream("", "")
if err != nil {
s := fmt.Sprintf("error streaming logs from build pod: %s/%s container: %s, %v", namespace, buildPodName, containerLogOpts.Container, err.Error())
_, err2 := writer.Write([]byte(s))
if err2 != nil {
glog.Errorf("error: could not write build log error %q to stream due to: %v", s, err2)
}
return err
}
if out == nil {
glog.Warningf("warning: logs for build pod: %s/%s container: %s returned a nil stream", namespace, buildPodName, containerLogOpts.Container)
return nil
}
glog.V(4).Infof("retrieved logs for build pod: %s/%s container: %s", namespace, buildPodName, containerLogOpts.Container)
// dump all container logs from the log stream into a single output stream that we'll send back to the client.
_, err = io.Copy(writer, out)
return err
}