forked from openshift/origin
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrest.go
430 lines (381 loc) · 14.2 KB
/
rest.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
package imagestreamimport
import (
"fmt"
"net/http"
"time"
"github.com/golang/glog"
gocontext "golang.org/x/net/context"
kapi "k8s.io/kubernetes/pkg/api"
kapierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/rest"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/diff"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/validation/field"
"github.com/openshift/origin/pkg/client"
"github.com/openshift/origin/pkg/dockerregistry"
"github.com/openshift/origin/pkg/image/api"
"github.com/openshift/origin/pkg/image/importer"
"github.com/openshift/origin/pkg/image/registry/imagestream"
)
// ImporterFunc returns an instance of the importer that should be used per invocation.
type ImporterFunc func(r importer.RepositoryRetriever) importer.Interface
// ImporterDockerRegistryFunc returns an instance of a docker client that should be used per invocation of import,
// may be nil if no legacy import capability is required.
type ImporterDockerRegistryFunc func() dockerregistry.Client
// REST implements the RESTStorage interface for ImageStreamImport
type REST struct {
importFn ImporterFunc
streams imagestream.Registry
internalStreams rest.CreaterUpdater
images rest.Creater
secrets client.ImageStreamSecretsNamespacer
transport http.RoundTripper
insecureTransport http.RoundTripper
clientFn ImporterDockerRegistryFunc
}
// NewREST returns a REST storage implementation that handles importing images. The clientFn argument is optional
// if v1 Docker Registry importing is not required. Insecure transport is optional, and both transports should not
// include client certs unless you wish to allow the entire cluster to import using those certs.
func NewREST(importFn ImporterFunc, streams imagestream.Registry, internalStreams rest.CreaterUpdater,
images rest.Creater, secrets client.ImageStreamSecretsNamespacer,
transport, insecureTransport http.RoundTripper,
clientFn ImporterDockerRegistryFunc,
) *REST {
return &REST{
importFn: importFn,
streams: streams,
internalStreams: internalStreams,
images: images,
secrets: secrets,
transport: transport,
insecureTransport: insecureTransport,
clientFn: clientFn,
}
}
// New is only implemented to make REST implement RESTStorage
func (r *REST) New() runtime.Object {
return &api.ImageStreamImport{}
}
func (r *REST) Create(ctx kapi.Context, obj runtime.Object) (runtime.Object, error) {
isi, ok := obj.(*api.ImageStreamImport)
if !ok {
return nil, kapierrors.NewBadRequest(fmt.Sprintf("obj is not an ImageStreamImport: %#v", obj))
}
inputMeta := isi.ObjectMeta
if err := rest.BeforeCreate(Strategy, ctx, obj); err != nil {
return nil, err
}
namespace, ok := kapi.NamespaceFrom(ctx)
if !ok {
return nil, kapierrors.NewBadRequest("a namespace must be specified to import images")
}
if r.clientFn != nil {
if client := r.clientFn(); client != nil {
ctx = kapi.WithValue(ctx, importer.ContextKeyV1RegistryClient, client)
}
}
// only load secrets if we need them
credentials := importer.NewLazyCredentialsForSecrets(func() ([]kapi.Secret, error) {
secrets, err := r.secrets.ImageStreamSecrets(namespace).Secrets(isi.Name, kapi.ListOptions{})
if err != nil {
return nil, err
}
return secrets.Items, nil
})
importCtx := importer.NewContext(r.transport, r.insecureTransport).WithCredentials(credentials)
imports := r.importFn(importCtx)
if err := imports.Import(ctx.(gocontext.Context), isi); err != nil {
return nil, kapierrors.NewInternalError(err)
}
// if we encountered an error loading credentials and any images could not be retrieved with an access
// related error, modify the message.
// TODO: set a status cause
if err := credentials.Err(); err != nil {
for i, image := range isi.Status.Images {
switch image.Status.Reason {
case unversioned.StatusReasonUnauthorized, unversioned.StatusReasonForbidden:
isi.Status.Images[i].Status.Message = fmt.Sprintf("Unable to load secrets for this image: %v; (%s)", err, image.Status.Message)
}
}
if r := isi.Status.Repository; r != nil {
switch r.Status.Reason {
case unversioned.StatusReasonUnauthorized, unversioned.StatusReasonForbidden:
r.Status.Message = fmt.Sprintf("Unable to load secrets for this repository: %v; (%s)", err, r.Status.Message)
}
}
}
// TODO: perform the transformation of the image stream and return it with the ISI if import is false
// so that clients can see what the resulting object would look like.
if !isi.Spec.Import {
clearManifests(isi)
return isi, nil
}
create := false
stream, err := r.streams.GetImageStream(ctx, isi.Name)
if err != nil {
if !kapierrors.IsNotFound(err) {
return nil, err
}
// consistency check, stream must exist
if len(inputMeta.ResourceVersion) > 0 || len(inputMeta.UID) > 0 {
return nil, err
}
create = true
stream = &api.ImageStream{
ObjectMeta: kapi.ObjectMeta{
Name: isi.Name,
Namespace: namespace,
Generation: 0,
},
}
} else {
if len(inputMeta.ResourceVersion) > 0 && inputMeta.ResourceVersion != stream.ResourceVersion {
glog.V(4).Infof("DEBUG: mismatch between requested ResourceVersion %s and located ResourceVersion %s", inputMeta.ResourceVersion, stream.ResourceVersion)
return nil, kapierrors.NewConflict(api.Resource("imagestream"), inputMeta.Name, fmt.Errorf("the image stream was updated from %q to %q", inputMeta.ResourceVersion, stream.ResourceVersion))
}
if len(inputMeta.UID) > 0 && inputMeta.UID != stream.UID {
glog.V(4).Infof("DEBUG: mismatch between requested UID %s and located UID %s", inputMeta.UID, stream.UID)
return nil, kapierrors.NewNotFound(api.Resource("imagestream"), inputMeta.Name)
}
}
if stream.Annotations == nil {
stream.Annotations = make(map[string]string)
}
now := unversioned.Now()
_, hasAnnotation := stream.Annotations[api.DockerImageRepositoryCheckAnnotation]
nextGeneration := stream.Generation + 1
original, err := kapi.Scheme.DeepCopy(stream)
if err != nil {
return nil, err
}
// walk the retrieved images, ensuring each one exists in etcd
importedImages := make(map[string]error)
updatedImages := make(map[string]*api.Image)
if spec := isi.Spec.Repository; spec != nil {
for i, status := range isi.Status.Repository.Images {
if checkImportFailure(status, stream, status.Tag, nextGeneration, now) {
continue
}
image := status.Image
ref, err := api.ParseDockerImageReference(image.DockerImageReference)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to parse image reference during import: %v", err))
continue
}
from, err := api.ParseDockerImageReference(spec.From.Name)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to parse from reference during import: %v", err))
continue
}
tag := ref.Tag
if len(status.Tag) > 0 {
tag = status.Tag
}
// we've imported a set of tags, ensure spec tag will point to this for later imports
from.ID, from.Tag = "", tag
if updated, ok := r.importSuccessful(ctx, image, stream, tag, from.Exact(), nextGeneration, now, spec.ImportPolicy, importedImages, updatedImages); ok {
isi.Status.Repository.Images[i].Image = updated
}
}
}
for i, spec := range isi.Spec.Images {
if spec.To == nil {
continue
}
tag := spec.To.Name
// record a failure condition
status := isi.Status.Images[i]
if checkImportFailure(status, stream, tag, nextGeneration, now) {
// ensure that we have a spec tag set
ensureSpecTag(stream, tag, spec.From.Name, spec.ImportPolicy, false)
continue
}
// record success
image := status.Image
if updated, ok := r.importSuccessful(ctx, image, stream, tag, spec.From.Name, nextGeneration, now, spec.ImportPolicy, importedImages, updatedImages); ok {
isi.Status.Images[i].Image = updated
}
}
// TODO: should we allow partial failure?
for _, err := range importedImages {
if err != nil {
return nil, err
}
}
clearManifests(isi)
hasChanges := !kapi.Semantic.DeepEqual(original, stream)
if create {
stream.Annotations[api.DockerImageRepositoryCheckAnnotation] = now.UTC().Format(time.RFC3339)
glog.V(4).Infof("create new stream: %#v", stream)
obj, err = r.internalStreams.Create(ctx, stream)
} else {
if hasAnnotation && !hasChanges {
glog.V(4).Infof("stream did not change: %#v", stream)
obj, err = original.(*api.ImageStream), nil
} else {
if glog.V(4) {
glog.V(4).Infof("updated stream %s", diff.ObjectDiff(original, stream))
}
stream.Annotations[api.DockerImageRepositoryCheckAnnotation] = now.UTC().Format(time.RFC3339)
obj, _, err = r.internalStreams.Update(ctx, stream)
}
}
if err != nil {
return nil, err
}
isi.Status.Import = obj.(*api.ImageStream)
return isi, nil
}
func checkImportFailure(status api.ImageImportStatus, stream *api.ImageStream, tag string, nextGeneration int64, now unversioned.Time) bool {
if status.Image != nil && status.Status.Status == unversioned.StatusSuccess {
return false
}
message := status.Status.Message
if len(message) == 0 {
message = "unknown error prevented import"
}
condition := api.TagEventCondition{
Type: api.ImportSuccess,
Status: kapi.ConditionFalse,
Message: message,
Reason: string(status.Status.Reason),
Generation: nextGeneration,
LastTransitionTime: now,
}
if tag == "" {
if len(status.Tag) > 0 {
tag = status.Tag
} else if status.Image != nil {
if ref, err := api.ParseDockerImageReference(status.Image.DockerImageReference); err == nil {
tag = ref.Tag
}
}
}
if !api.HasTagCondition(stream, tag, condition) {
api.SetTagConditions(stream, tag, condition)
if tagRef, ok := stream.Spec.Tags[tag]; ok {
zero := int64(0)
tagRef.Generation = &zero
stream.Spec.Tags[tag] = tagRef
}
}
return true
}
// ensureSpecTag guarantees that the spec tag is set with the provided from and importPolicy. If reset is passed,
// the tag will be overwritten.
func ensureSpecTag(stream *api.ImageStream, tag, from string, importPolicy api.TagImportPolicy, reset bool) api.TagReference {
if stream.Spec.Tags == nil {
stream.Spec.Tags = make(map[string]api.TagReference)
}
specTag, ok := stream.Spec.Tags[tag]
if ok && !reset {
return specTag
}
specTag.From = &kapi.ObjectReference{
Kind: "DockerImage",
Name: from,
}
zero := int64(0)
specTag.Generation = &zero
specTag.ImportPolicy = importPolicy
stream.Spec.Tags[tag] = specTag
return specTag
}
// importSuccessful records a successful import into an image stream, setting the spec tag, status tag or conditions, and ensuring
// the image is created in etcd. Images are cached so they are not created multiple times in a row (when multiple tags point to the
// same image), and a failure to persist the image will be summarized before we update the stream. If an image was imported by this
// operation, it *replaces* the imported image (from the remote repository) with the updated image.
func (r *REST) importSuccessful(
ctx kapi.Context,
image *api.Image, stream *api.ImageStream, tag string, from string, nextGeneration int64, now unversioned.Time, importPolicy api.TagImportPolicy,
importedImages map[string]error, updatedImages map[string]*api.Image,
) (*api.Image, bool) {
pullSpec, _ := api.MostAccuratePullSpec(image.DockerImageReference, image.Name, "")
tagEvent := api.TagEvent{
Created: now,
DockerImageReference: pullSpec,
Image: image.Name,
Generation: nextGeneration,
}
if stream.Spec.Tags == nil {
stream.Spec.Tags = make(map[string]api.TagReference)
}
// ensure the spec and status tag match the imported image
changed := api.DifferentTagEvent(stream, tag, tagEvent)
specTag, ok := stream.Spec.Tags[tag]
if changed || !ok {
specTag = ensureSpecTag(stream, tag, from, importPolicy, true)
api.AddTagEventToImageStream(stream, tag, tagEvent)
}
// always reset the import policy
specTag.ImportPolicy = importPolicy
stream.Spec.Tags[tag] = specTag
// import or reuse the image, and ensure tag conditions are set
importErr, alreadyImported := importedImages[image.Name]
if importErr != nil {
api.SetTagConditions(stream, tag, newImportFailedCondition(importErr, nextGeneration, now))
} else {
api.SetTagConditions(stream, tag)
}
// create the image if it does not exist, otherwise cache the updated status from the store for use by other tags
if alreadyImported {
if updatedImage, ok := updatedImages[image.Name]; ok {
return updatedImage, true
}
return nil, false
}
updated, err := r.images.Create(ctx, image)
switch {
case kapierrors.IsAlreadyExists(err):
if err := api.ImageWithMetadata(image); err != nil {
glog.V(4).Infof("Unable to update image metadata during image import when image already exists %q: err", image.Name, err)
}
updated = image
fallthrough
case err == nil:
updatedImage := updated.(*api.Image)
updatedImages[image.Name] = updatedImage
//isi.Status.Repository.Images[i].Image = updatedImage
importedImages[image.Name] = nil
return updatedImage, true
default:
importedImages[image.Name] = err
}
return nil, false
}
// clearManifests unsets the manifest for each object that does not request it
func clearManifests(isi *api.ImageStreamImport) {
for i := range isi.Status.Images {
if !isi.Spec.Images[i].IncludeManifest {
if isi.Status.Images[i].Image != nil {
isi.Status.Images[i].Image.DockerImageManifest = ""
}
}
}
if isi.Spec.Repository != nil && !isi.Spec.Repository.IncludeManifest {
for i := range isi.Status.Repository.Images {
if isi.Status.Repository.Images[i].Image != nil {
isi.Status.Repository.Images[i].Image.DockerImageManifest = ""
}
}
}
}
func newImportFailedCondition(err error, gen int64, now unversioned.Time) api.TagEventCondition {
c := api.TagEventCondition{
Type: api.ImportSuccess,
Status: kapi.ConditionFalse,
Message: err.Error(),
Generation: gen,
LastTransitionTime: now,
}
if status, ok := err.(kapierrors.APIStatus); ok {
s := status.Status()
c.Reason, c.Message = string(s.Reason), s.Message
}
return c
}
func invalidStatus(kind, position string, errs ...*field.Error) unversioned.Status {
return kapierrors.NewInvalid(api.Kind(kind), position, errs).(kapierrors.APIStatus).Status()
}