Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
dmage committed Aug 21, 2020
1 parent b7245a4 commit b5cd6e1
Show file tree
Hide file tree
Showing 4 changed files with 232 additions and 189 deletions.
236 changes: 153 additions & 83 deletions pkg/cli/admin/prune/imageprune/prune.go
Expand Up @@ -207,7 +207,7 @@ type Pruner interface {
manifestDeleter ManifestDeleter,
blobDeleter BlobDeleter,
imageDeleter ImageDeleter,
)
) []error
}

// pruner is an object that knows how to prune a data set
Expand Down Expand Up @@ -341,8 +341,10 @@ func (p *pruner) analyzeImageStreamsReferences(options PrunerOptions) kerrors.Ag
func (p *pruner) analyzeImageReference(referrer ReferencedBy, logPrefix string, imageReference string) error {
ref, err := reference.Parse(imageReference)
if err != nil {
klog.Warningf("%s: %sunable to parse image reference %q: %v - skipping", referrer, logPrefix, imageReference, err)
err = fmt.Errorf("%s: %sunable to parse image reference %q: %v", referrer, logPrefix, imageReference, err)
klog.Warningf("%s - skipping", err)
if !p.ignoreInvalidRefs {
return err
// FIXME FIXME FIXME
//return newErrBadReferenceToImage(container.Image, referrer, err.Error())
}
Expand Down Expand Up @@ -428,7 +430,7 @@ func (p *pruner) analyzeReferencesFromBuildStrategy(referrer ReferencedBy, strat
if err != nil {
klog.Warningf("%s: failed to parse ImageStreamImage name %q: %v", referrer, from.Name, err)
if !p.ignoreInvalidRefs {
return []error{err}
return []error{fmt.Errorf("%s: invalid ImageStreamImage reference %q: %v", referrer, from.Name, err)}
// FIXME return []error{newErrBadReferenceTo("ImageStreamImage", from.Name, referrer, err.Error())}
}
return nil
Expand All @@ -450,7 +452,7 @@ func (p *pruner) analyzeReferencesFromBuildStrategy(referrer ReferencedBy, strat
if err != nil {
klog.Warningf("%s: failed to parse ImageStreamTag name %q: %v", referrer, from.Name, err)
if !p.ignoreInvalidRefs {
return []error{err}
return []error{fmt.Errorf("%s: invalid ImageStreamTag reference %q: %v", referrer, from.Name, err)}
// FIXME return nil, newErrBadReferenceTo("ImageStreamTag", istagName, referrer, err.Error())
}
return nil
Expand Down Expand Up @@ -752,9 +754,10 @@ func (p *pruner) getGlobalReferenceCounts(images map[string]*imagev1.Image) (ref
return counts, nil
}

func (p *pruner) pruneImageStreamTag(is *imagev1.ImageStream, tagEventList imagev1.NamedTagEventList, counts referenceCounts, layerLinkDeleter LayerLinkDeleter) (imagev1.NamedTagEventList, int, []string) {
func (p *pruner) pruneImageStreamTag(is *imagev1.ImageStream, tagEventList imagev1.NamedTagEventList, counts referenceCounts, layerLinkDeleter LayerLinkDeleter) (imagev1.NamedTagEventList, int, []string, []error) {
filteredItems := tagEventList.Items[:0]
var manifestsToDelete []string
var errs []error
for rev, item := range tagEventList.Items {
if item.Created.After(p.algorithm.keepYoungerThan) {
klog.V(4).Infof("imagestream %s/%s: tag %s: revision %d: keeping %s because of --keep-younger-than", is.Namespace, is.Name, tagEventList.Tag, rev+1, item.Image)
Expand Down Expand Up @@ -787,15 +790,11 @@ func (p *pruner) pruneImageStreamTag(is *imagev1.ImageStream, tagEventList image
}

if p.algorithm.pruneOverSizeLimit {
// FIXME
klog.Fatal("FIXME pruneOverSizeLimit is not implemented")
/*
if exceedsLimits(stream, imageNode.Image, limits) {
kind = WeakReferencedImageEdgeKind
} else {
kind = ReferencedImageEdgeKind
}
*/
if !exceedsLimits(is, image, p.imageStreamLimits) {
klog.V(4).Infof("imagestream %s/%s: tag %s: revision %d: keeping %s because --prune-over-size-limit is used and image does not exceed limits", is.Namespace, is.Name, tagEventList.Tag, rev+1, item.Image)
filteredItems = append(filteredItems, item)
continue
}
} else {
if rev < p.algorithm.keepTagRevisions {
klog.V(4).Infof("imagestream %s/%s: tag %s: revision %d: keeping %s because of --keep-tag-revisions", is.Namespace, is.Name, tagEventList.Tag, rev+1, item.Image)
Expand All @@ -817,17 +816,22 @@ func (p *pruner) pruneImageStreamTag(is *imagev1.ImageStream, tagEventList image

klog.V(4).Infof("imagestream %s/%s: tag %s: revision %d: deleting repository links for %s...", is.Namespace, is.Name, tagEventList.Tag, rev+1, item.Image)

if counts.Manifests.Add(image.Name, -1) == 0 {
manifestsToDelete = append(manifestsToDelete, image.Name)
}
if p.algorithm.pruneRegistry {
if counts.Manifests.Add(image.Name, -1) == 0 {
manifestsToDelete = append(manifestsToDelete, image.Name)
}

imageBlobs, err := getImageBlobs(image)
if err != nil {
klog.Warningf("imagestream %s/%s: tag %s: image %s: %s", is.Namespace, is.Name, tagEventList.Tag, item.Image, err)
}
for _, blob := range imageBlobs {
if counts.Blobs.Add(blob, -1) == 0 {
layerLinkDeleter.DeleteLayerLink(fmt.Sprintf("%s/%s", is.Namespace, is.Name), blob)
imageBlobs, err := getImageBlobs(image)
if err != nil {
klog.Warningf("imagestream %s/%s: tag %s: image %s: %s", is.Namespace, is.Name, tagEventList.Tag, item.Image, err)
}
for _, blob := range imageBlobs {
if counts.Blobs.Add(blob, -1) == 0 {
err := layerLinkDeleter.DeleteLayerLink(fmt.Sprintf("%s/%s", is.Namespace, is.Name), blob)
if err != nil {
errs = append(errs, fmt.Errorf("failed to delete layer link %s: %v", blob, err))
}
}
}
}
}
Expand All @@ -836,10 +840,10 @@ func (p *pruner) pruneImageStreamTag(is *imagev1.ImageStream, tagEventList image

tagEventList.Items = filteredItems

return tagEventList, deletedRevisions, manifestsToDelete
return tagEventList, deletedRevisions, manifestsToDelete, errs
}

func (p *pruner) pruneImageStream(stream *imagev1.ImageStream, imageStreamDeleter ImageStreamDeleter, layerLinkDeleter LayerLinkDeleter, manifestDeleter ManifestDeleter) (*imagev1.ImageStream, error) {
func (p *pruner) pruneImageStream(stream *imagev1.ImageStream, imageStreamDeleter ImageStreamDeleter, layerLinkDeleter LayerLinkDeleter, manifestDeleter ManifestDeleter) (*imagev1.ImageStream, []error) {
klog.V(4).Infof("Examining ImageStream %s/%s", stream.Namespace, stream.Name)

if !p.algorithm.pruneOverSizeLimit && stream.CreationTimestamp.Time.After(p.algorithm.keepYoungerThan) {
Expand All @@ -848,6 +852,7 @@ func (p *pruner) pruneImageStream(stream *imagev1.ImageStream, imageStreamDelete
}

var manifestsToDelete []string
var errs []error
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
manifestsToDelete = nil

Expand All @@ -870,12 +875,22 @@ func (p *pruner) pruneImageStream(stream *imagev1.ImageStream, imageStreamDelete

deletedRevisions := 0
for i, tagEventList := range is.Status.Tags {
updatedTagEventList, tagDeletedRevisions, tagManifestsToDelete := p.pruneImageStreamTag(is, tagEventList, counts, layerLinkDeleter)
updatedTagEventList, tagDeletedRevisions, tagManifestsToDelete, tagErrs := p.pruneImageStreamTag(is, tagEventList, counts, layerLinkDeleter)
is.Status.Tags[i] = updatedTagEventList
deletedRevisions += tagDeletedRevisions
manifestsToDelete = append(manifestsToDelete, tagManifestsToDelete...)
errs = append(errs, tagErrs...)
}

// delete tags without items
tags := is.Status.Tags[:0]
for i := range is.Status.Tags {
if len(is.Status.Tags[i].Items) > 0 {
tags = append(tags, is.Status.Tags[i])
}
}
is.Status.Tags = tags

if deletedRevisions > 0 {
updatedStream, err := imageStreamDeleter.UpdateImageStream(is, deletedRevisions)
if kerrapi.IsNotFound(err) {
Expand All @@ -891,20 +906,22 @@ func (p *pruner) pruneImageStream(stream *imagev1.ImageStream, imageStreamDelete
return nil
})
if err != nil {
return stream, err
errs = append(errs, err)
return stream, errs
}

for _, digest := range manifestsToDelete {
err := manifestDeleter.DeleteManifest(fmt.Sprintf("%s/%s", stream.Namespace, stream.Name), digest)
if err != nil {
return stream, err
if p.algorithm.pruneRegistry {
for _, digest := range manifestsToDelete {
err := manifestDeleter.DeleteManifest(fmt.Sprintf("%s/%s", stream.Namespace, stream.Name), digest)
if err != nil {
errs = append(errs, fmt.Errorf("failed to delete manifest link %s: %v", digest, err))
}
}
}

return stream, nil
return stream, errs
}

func (p *pruner) pruneImage(image *imagev1.Image, usedImages map[string]bool, counts referenceCounts, blobDeleter BlobDeleter, imageDeleter ImageDeleter) error {
func (p *pruner) pruneImage(image *imagev1.Image, usedImages map[string]bool, counts referenceCounts, blobDeleter BlobDeleter, imageDeleter ImageDeleter) []error {
if usedImages[image.Name] {
klog.V(4).Infof("image %s: keeping because it is used by imagestreams", image.Name)
return nil
Expand All @@ -922,41 +939,61 @@ func (p *pruner) pruneImage(image *imagev1.Image, usedImages map[string]bool, co
return nil
}

imageBlobs, err := getImageBlobs(image)
if err != nil {
return err
}
for _, blob := range imageBlobs {
if counts.Blobs.Add(blob, -1) == 0 {
err := blobDeleter.DeleteBlob(blob)
var errs []error
var blobDeleted bool

if p.algorithm.pruneRegistry {
imageBlobs, err := getImageBlobs(image)
if err != nil {
return []error{err}
}

for _, blob := range imageBlobs {
if counts.Blobs.Add(blob, -1) == 0 {
err := blobDeleter.DeleteBlob(blob)
if err != nil {
errs = append(errs, fmt.Errorf("failed to delete blob %s: %v", blob, err))
} else {
blobDeleted = true
}
}
}
if counts.Blobs.Add(image.Name, -1) == 0 {
err := blobDeleter.DeleteBlob(image.Name)
if err != nil {
return err
errs = append(errs, fmt.Errorf("failed to delete manifest blob %s: %v", image.Name, err))
} else {
blobDeleted = true
}
}
} else {
blobDeleted = true
}
if counts.Blobs.Add(image.Name, -1) == 0 {
err := blobDeleter.DeleteBlob(image.Name)
if err != nil {
return err

if blobDeleted {
if err := imageDeleter.DeleteImage(image); err != nil {
errs = append(errs, fmt.Errorf("failed to delete image %s: %v", image.Name, err))
}
}

return imageDeleter.DeleteImage(image)
return errs
}

func (p *pruner) pruneImageStreams(
streamPruner ImageStreamDeleter,
layerLinkDeleter LayerLinkDeleter,
manifestDeleter ManifestDeleter,
) {
) []error {
var keys []string
for k := range p.imageStreams {
keys = append(keys, k)
}

workQueue := make(chan string)
var wg sync.WaitGroup
var mu sync.Mutex
workQueue := make(chan string)
errorsCh := make(chan error)

for i := 0; i < p.numWorkers; i++ {
wg.Add(1)
go func() {
Expand All @@ -966,10 +1003,7 @@ func (p *pruner) pruneImageStreams(
stream := p.imageStreams[k]
mu.Unlock()

updatedStream, err := p.pruneImageStream(stream, streamPruner, layerLinkDeleter, manifestDeleter)
if err != nil {
klog.Fatalf("imagestream %s/%s: %v", stream.Namespace, stream.Name, err)
}
updatedStream, errs := p.pruneImageStream(stream, streamPruner, layerLinkDeleter, manifestDeleter)

if updatedStream == nil {
mu.Lock()
Expand All @@ -980,36 +1014,34 @@ func (p *pruner) pruneImageStreams(
p.imageStreams[k] = updatedStream
mu.Unlock()
}

for _, err := range errs {
errorsCh <- fmt.Errorf("imagestream %s/%s: %v", stream.Namespace, stream.Name, err)
}
}
}()
}
for _, k := range keys {
workQueue <- k

go func() {
for _, k := range keys {
workQueue <- k
}
close(workQueue)
wg.Wait()
close(errorsCh)
}()

var errs []error
for err := range errorsCh {
errs = append(errs, err)
}
close(workQueue)
wg.Wait()
return errs
}

// Prune prunes the objects like this:
// 1. it calculates the prunable images and builds a queue
// - the queue does not ever grow, it only shrinks (newly created images are not added)
// 2. it untags the prunable images from image streams
// 3. it spawns workers
// 4. it turns each prunable image into a job for the workers and makes sure they are busy
// 5. it terminates the workers once the queue is empty and reports results
func (p *pruner) Prune(
streamPruner ImageStreamDeleter,
layerLinkDeleter LayerLinkDeleter,
manifestDeleter ManifestDeleter,
func (p *pruner) pruneImages(
blobDeleter BlobDeleter,
imageDeleter ImageDeleter,
) {
// Stage 1: delete history from image streams

p.pruneImageStreams(streamPruner, layerLinkDeleter, manifestDeleter)

// Stage 2: delete images

) []error {
usedImages := map[string]bool{}
for _, stream := range p.imageStreams {
for _, tag := range stream.Status.Tags {
Expand All @@ -1026,24 +1058,62 @@ func (p *pruner) Prune(

var wg sync.WaitGroup
imagesToDelete := make(chan *imagev1.Image)
errorsCh := make(chan error)

for i := 0; i < p.numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for image := range imagesToDelete {
err := p.pruneImage(image, usedImages, counts, blobDeleter, imageDeleter)
if err != nil {
klog.Fatalf("failed to delete image %s: %v", image.Name, err)
errs := p.pruneImage(image, usedImages, counts, blobDeleter, imageDeleter)
for _, err := range errs {
errorsCh <- err
}
}
}()
}

for _, image := range p.images {
imagesToDelete <- image
go func() {
for _, image := range p.images {
imagesToDelete <- image
}
close(imagesToDelete)
wg.Wait()
close(errorsCh)
}()

var errs []error
for err = range errorsCh {
errs = append(errs, err)
}
return errs
}

// Prune prunes the objects like this:
// 1. it calculates the prunable images and builds a queue
// - the queue does not ever grow, it only shrinks (newly created images are not added)
// 2. it untags the prunable images from image streams
// 3. it spawns workers
// 4. it turns each prunable image into a job for the workers and makes sure they are busy
// 5. it terminates the workers once the queue is empty and reports results
func (p *pruner) Prune(
streamPruner ImageStreamDeleter,
layerLinkDeleter LayerLinkDeleter,
manifestDeleter ManifestDeleter,
blobDeleter BlobDeleter,
imageDeleter ImageDeleter,
) []error {
var errs []error

// Stage 1: delete history from image streams
errs = append(errs, p.pruneImageStreams(streamPruner, layerLinkDeleter, manifestDeleter)...)

// Stage 2: delete images
if p.algorithm.namespace == "" {
errs = append(errs, p.pruneImages(blobDeleter, imageDeleter)...)
}
close(imagesToDelete)
wg.Wait()

return errs
}

// imageDeleter removes an image from OpenShift.
Expand Down

0 comments on commit b5cd6e1

Please sign in to comment.