forked from openshift/origin
-
Notifications
You must be signed in to change notification settings - Fork 0
/
prune.go
304 lines (247 loc) · 9.09 KB
/
prune.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
package prune
import (
"fmt"
"github.com/docker/distribution"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest/schema2"
"github.com/docker/distribution/reference"
"github.com/docker/distribution/registry/storage"
"github.com/docker/distribution/registry/storage/driver"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/openshift/origin/pkg/dockerregistry/server/client"
imageapi "github.com/openshift/origin/pkg/image/apis/image"
imageapiv1 "github.com/openshift/origin/pkg/image/apis/image/v1"
)
// Pruner defines a common set of operations for pruning
type Pruner interface {
DeleteRepository(ctx context.Context, reponame string) error
DeleteManifestLink(ctx context.Context, svc distribution.ManifestService, reponame string, dgst digest.Digest) error
DeleteBlob(ctx context.Context, dgst digest.Digest) error
}
// DryRunPruner prints information about each object that going to remove.
type DryRunPruner struct{}
var _ Pruner = &DryRunPruner{}
func (p *DryRunPruner) DeleteRepository(ctx context.Context, reponame string) error {
logger := context.GetLogger(ctx)
logger.Printf("Would delete repository: %s", reponame)
return nil
}
func (p *DryRunPruner) DeleteManifestLink(ctx context.Context, svc distribution.ManifestService, reponame string, dgst digest.Digest) error {
logger := context.GetLogger(ctx)
logger.Printf("Would delete manifest link: %s@%s", reponame, dgst)
return nil
}
func (p *DryRunPruner) DeleteBlob(ctx context.Context, dgst digest.Digest) error {
logger := context.GetLogger(ctx)
logger.Printf("Would delete blob: %s", dgst)
return nil
}
// RegistryPruner deletes objects.
type RegistryPruner struct {
StorageDriver driver.StorageDriver
}
var _ Pruner = &RegistryPruner{}
// DeleteRepository removes a repository directory from the storage
func (p *RegistryPruner) DeleteRepository(ctx context.Context, reponame string) error {
vacuum := storage.NewVacuum(ctx, p.StorageDriver)
// Log message will be generated by RemoveRepository with loglevel=info.
if err := vacuum.RemoveRepository(reponame); err != nil {
return fmt.Errorf("unable to remove the repository %s: %v", reponame, err)
}
return nil
}
// DeleteManifestLink removes a manifest link from the storage
func (p *RegistryPruner) DeleteManifestLink(ctx context.Context, svc distribution.ManifestService, reponame string, dgst digest.Digest) error {
logger := context.GetLogger(ctx)
logger.Printf("Deleting manifest link: %s@%s", reponame, dgst)
if err := svc.Delete(ctx, dgst); err != nil {
return fmt.Errorf("failed to delete the manifest link %s@%s: %v", reponame, dgst, err)
}
return nil
}
// DeleteBlob removes a blob from the storage
func (p *RegistryPruner) DeleteBlob(ctx context.Context, dgst digest.Digest) error {
vacuum := storage.NewVacuum(ctx, p.StorageDriver)
// Log message will be generated by RemoveBlob with loglevel=info.
if err := vacuum.RemoveBlob(string(dgst)); err != nil {
return fmt.Errorf("failed to delete the blob %s: %v", dgst, err)
}
return nil
}
// garbageCollector holds objects for later deletion. If the object is replaced,
// then the previous one will be deleted.
type garbageCollector struct {
Pruner Pruner
Ctx context.Context
repoName string
manifestService distribution.ManifestService
manifestRepo string
manifestLink digest.Digest
}
func (gc *garbageCollector) AddRepository(repoName string) error {
// If the place is occupied, then it is necessary to clean it.
if err := gc.Collect(); err != nil {
return err
}
gc.repoName = repoName
return nil
}
func (gc *garbageCollector) AddManifestLink(svc distribution.ManifestService, repoName string, dgst digest.Digest) error {
// If the place is occupied, then it is necessary to clean it.
if err := gc.Collect(); err != nil {
return err
}
gc.manifestService = svc
gc.manifestRepo = repoName
gc.manifestLink = dgst
return nil
}
func (gc *garbageCollector) Collect() error {
if len(gc.manifestLink) > 0 {
if err := gc.Pruner.DeleteManifestLink(gc.Ctx, gc.manifestService, gc.manifestRepo, gc.manifestLink); err != nil {
return err
}
gc.manifestLink = ""
}
if len(gc.repoName) > 0 {
if err := gc.Pruner.DeleteRepository(gc.Ctx, gc.repoName); err != nil {
return err
}
gc.repoName = ""
}
return nil
}
func imageStreamHasManifestDigest(is *imageapiv1.ImageStream, dgst digest.Digest) bool {
for _, tagEventList := range is.Status.Tags {
for _, tagEvent := range tagEventList.Items {
if tagEvent.Image == string(dgst) {
return true
}
}
}
return false
}
// Summary is cumulative information about what was pruned.
type Summary struct {
Blobs int
DiskSpace int64
}
// Prune removes blobs which are not used by Images in OpenShift.
//
// On error, the Summary will contain what was deleted so far.
//
// TODO(dmage): remove layer links to a blob if the blob is removed or it doesn't belong to the ImageStream.
// TODO(dmage): keep young blobs (docker/distribution#2297).
func Prune(ctx context.Context, registry distribution.Namespace, registryClient client.RegistryClient, pruner Pruner) (Summary, error) {
logger := context.GetLogger(ctx)
repositoryEnumerator, ok := registry.(distribution.RepositoryEnumerator)
if !ok {
return Summary{}, fmt.Errorf("unable to convert Namespace to RepositoryEnumerator")
}
oc, err := registryClient.Client()
if err != nil {
return Summary{}, fmt.Errorf("error getting clients: %v", err)
}
imageList, err := oc.Images().List(metav1.ListOptions{})
if err != nil {
return Summary{}, fmt.Errorf("error listing images: %v", err)
}
inuse := make(map[string]string)
for _, image := range imageList.Items {
// Keep the manifest.
inuse[image.Name] = image.DockerImageReference
if err := imageapiv1.ImageWithMetadata(&image); err != nil {
return Summary{}, fmt.Errorf("error getting image metadata: %v", err)
}
// Keep the config for a schema 2 manifest.
if image.DockerImageManifestMediaType == schema2.MediaTypeManifest {
meta, ok := image.DockerImageMetadata.Object.(*imageapi.DockerImage)
if ok {
inuse[meta.ID] = image.DockerImageReference
}
}
// Keep image layers.
for _, layer := range image.DockerImageLayers {
inuse[layer.Name] = image.DockerImageReference
}
}
var stats Summary
// The Enumerate calls a Stat() on each file or directory in the tree before call our handler.
// Therefore, we can not delete subdirectories from the handler. On some types of storage (S3),
// this can lead to an error in the Enumerate.
// We are waiting for the completion of our handler and perform deferred deletion of objects.
gc := &garbageCollector{
Ctx: ctx,
Pruner: pruner,
}
err = repositoryEnumerator.Enumerate(ctx, func(repoName string) error {
logger.Debugln("Processing repository", repoName)
named, err := reference.WithName(repoName)
if err != nil {
return fmt.Errorf("failed to parse the repo name %s: %v", repoName, err)
}
ref, err := imageapi.ParseDockerImageReference(repoName)
if err != nil {
return fmt.Errorf("failed to parse the image reference %s: %v", repoName, err)
}
is, err := oc.ImageStreams(ref.Namespace).Get(ref.Name, metav1.GetOptions{})
if kerrors.IsNotFound(err) {
logger.Printf("The image stream %s/%s is not found, will remove the whole repository", ref.Namespace, ref.Name)
return gc.AddRepository(repoName)
} else if err != nil {
return fmt.Errorf("failed to get the image stream %s: %v", repoName, err)
}
repository, err := registry.Repository(ctx, named)
if err != nil {
return err
}
manifestService, err := repository.Manifests(ctx)
if err != nil {
return err
}
manifestEnumerator, ok := manifestService.(distribution.ManifestEnumerator)
if !ok {
return fmt.Errorf("unable to convert ManifestService into ManifestEnumerator")
}
err = manifestEnumerator.Enumerate(ctx, func(dgst digest.Digest) error {
if _, ok := inuse[string(dgst)]; ok && imageStreamHasManifestDigest(is, dgst) {
logger.Debugf("Keeping the manifest link %s@%s", repoName, dgst)
return nil
}
return gc.AddManifestLink(manifestService, repoName, dgst)
})
if e, ok := err.(driver.PathNotFoundError); ok {
logger.Printf("Skipped manifest link pruning for the repository %s: %v", repoName, e)
} else if err != nil {
return fmt.Errorf("failed to prune manifest links in the repository %s: %v", repoName, err)
}
return nil
})
if e, ok := err.(driver.PathNotFoundError); ok {
logger.Warnf("No repositories found: %v", e)
return stats, nil
} else if err != nil {
return stats, err
}
if err := gc.Collect(); err != nil {
return stats, err
}
logger.Debugln("Processing blobs")
blobStatter := registry.BlobStatter()
err = registry.Blobs().Enumerate(ctx, func(dgst digest.Digest) error {
if imageReference, ok := inuse[string(dgst)]; ok {
logger.Debugf("Keeping the blob %s (it belongs to the image %s)", dgst, imageReference)
return nil
}
desc, err := blobStatter.Stat(ctx, dgst)
if err != nil {
return err
}
stats.Blobs++
stats.DiskSpace += desc.Size
return pruner.DeleteBlob(ctx, dgst)
})
return stats, err
}