Skip to content

Commit

Permalink
Add func for pruning an image from a stream
Browse files Browse the repository at this point in the history
  • Loading branch information
Andy Goldstein committed May 21, 2015
1 parent 2cbf0c9 commit 6b7a36a
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 126 deletions.
2 changes: 1 addition & 1 deletion pkg/api/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

type Node struct {
graph.Node
concrete.Node
UniqueName
}

Expand Down
10 changes: 9 additions & 1 deletion pkg/api/graph/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,16 +504,24 @@ func (*ImageStreamNode) Kind() int {
return ImageStreamGraphKind
}

func imageStreamName(stream *image.ImageStream) UniqueName {
return UniqueName(fmt.Sprintf("%d|%s", ImageStreamGraphKind, stream.Status.DockerImageRepository))
}

// ImageStream adds a graph node for the Image Stream if it does not already exist.
func ImageStream(g MutableUniqueGraph, stream *image.ImageStream) graph.Node {
return EnsureUnique(g,
UniqueName(fmt.Sprintf("%d|%s", ImageStreamGraphKind, stream.Status.DockerImageRepository)),
imageStreamName(stream),
func(node Node) graph.Node {
return &ImageStreamNode{node, stream}
},
)
}

func FindImageStream(g MutableUniqueGraph, stream *image.ImageStream) graph.Node {
return g.Find(imageStreamName(stream))
}

type ReplicationControllerNode struct {
Node
*kapi.ReplicationController
Expand Down
49 changes: 34 additions & 15 deletions pkg/cmd/admin/prune/images.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,42 +88,60 @@ func NewCmdPruneImages(f *clientcmd.Factory, parentName, name string, out io.Wri
w := tabwriter.NewWriter(out, 10, 4, 3, ' ', 0)
defer w.Flush()

var streams util.StringSet
printImageHeader := true
describingImagePruneFunc := func(image *imageapi.Image, streams []*imageapi.ImageStream) []error {
describingImagePruneFunc := func(image *imageapi.Image) error {
if printImageHeader {
printImageHeader = false
fmt.Fprintf(w, "IMAGE\tSTREAMS\n")
fmt.Fprintf(w, "IMAGE\tSTREAMS")
}
streamNames := util.NewStringSet()
for _, stream := range streams {
streamNames.Insert(fmt.Sprintf("%s/%s", stream.Namespace, stream.Name))

if streams.Len() > 0 {
fmt.Fprintf(w, strings.Join(streams.List(), ", "))
}
fmt.Fprintf(w, "%s\t%s\n", image.Name, strings.Join(streamNames.List(), ", "))

fmt.Fprintf(w, "\n%s\t", image.Name)
streams = util.NewStringSet()

return nil
}

describingImageStreamPruneFunc := func(stream *imageapi.ImageStream, image *imageapi.Image) (*imageapi.ImageStream, error) {
streams.Insert(stream.Status.DockerImageRepository)
return stream, nil
}

printLayerHeader := true
describingLayerPruneFunc := func(registryURL, repo, layer string) error {
if printLayerHeader {
printLayerHeader = false
fmt.Fprintf(w, "\nREGISTRY\tSTREAM\tLAYER\n")
// need to print the remaining streams for the last image
if streams.Len() > 0 {
fmt.Fprintf(w, strings.Join(streams.List(), ", "))
}
fmt.Fprintf(w, "\n\nREGISTRY\tSTREAM\tLAYER\n")
}
fmt.Fprintf(w, "%s\t%s\t%s\n", registryURL, repo, layer)
return nil
}

var (
imagePruneFunc prune.ImagePruneFunc
layerPruneFunc prune.LayerPruneFunc
blobPruneFunc prune.BlobPruneFunc
manifestPruneFunc prune.ManifestPruneFunc
imagePruneFunc prune.ImagePruneFunc
imageStreamPruneFunc prune.ImageStreamPruneFunc
layerPruneFunc prune.LayerPruneFunc
blobPruneFunc prune.BlobPruneFunc
manifestPruneFunc prune.ManifestPruneFunc
)

switch cfg.DryRun {
case false:
imagePruneFunc = func(image *imageapi.Image, referencedStreams []*imageapi.ImageStream) []error {
describingImagePruneFunc(image, referencedStreams)
return prune.DeletingImagePruneFunc(osClient.Images(), osClient)(image, referencedStreams)
imagePruneFunc = func(image *imageapi.Image) error {
describingImagePruneFunc(image)
return prune.DeletingImagePruneFunc(osClient.Images())(image)
}
imageStreamPruneFunc = func(stream *imageapi.ImageStream, image *imageapi.Image) (*imageapi.ImageStream, error) {
describingImageStreamPruneFunc(stream, image)
return prune.DeletingImageStreamPruneFunc(osClient)(stream, image)
}
layerPruneFunc = func(registryURL, repo, layer string) error {
describingLayerPruneFunc(registryURL, repo, layer)
Expand All @@ -134,6 +152,7 @@ func NewCmdPruneImages(f *clientcmd.Factory, parentName, name string, out io.Wri
default:
fmt.Fprintln(os.Stderr, "Dry run enabled - no modifications will be made.")
imagePruneFunc = describingImagePruneFunc
imageStreamPruneFunc = describingImageStreamPruneFunc
layerPruneFunc = describingLayerPruneFunc
blobPruneFunc = func(registryURL, blob string) error {
return nil
Expand All @@ -143,7 +162,7 @@ func NewCmdPruneImages(f *clientcmd.Factory, parentName, name string, out io.Wri
}
}

pruner.Run(imagePruneFunc, layerPruneFunc, blobPruneFunc, manifestPruneFunc)
pruner.Run(imagePruneFunc, imageStreamPruneFunc, layerPruneFunc, blobPruneFunc, manifestPruneFunc)
},
}

Expand Down
171 changes: 73 additions & 98 deletions pkg/image/prune/imagepruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
gonum "github.com/gonum/graph"
Expand All @@ -28,21 +27,17 @@ type pruneAlgorithm struct {
}

// ImagePruneFunc is a function that is invoked for each image that is
// prunable, along with the list of image streams that reference it.
type ImagePruneFunc func(image *imageapi.Image, streams []*imageapi.ImageStream) []error

// LayerPruneFunc is a function that is invoked for each registry, along with
// a DeleteLayersRequest that contains the layers that can be pruned and the
// image stream names that reference each layer.
//type LayerPruneFunc func(registryURL string, req server.DeleteLayersRequest) (requestError error, layerErrors map[string][]error)
// prunable.
type ImagePruneFunc func(image *imageapi.Image) error
type ImageStreamPruneFunc func(stream *imageapi.ImageStream, image *imageapi.Image) (*imageapi.ImageStream, error)
type LayerPruneFunc func(registryURL, repo, layer string) error
type BlobPruneFunc func(registryURL, blob string) error
type ManifestPruneFunc func(registryURL, repo, manifest string) error

// ImagePruner knows how to prune images and layers.
type ImagePruner interface {
// Run prunes images and layers.
Run(pruneImage ImagePruneFunc, pruneLayer LayerPruneFunc, pruneBlob BlobPruneFunc, pruneManifest ManifestPruneFunc)
Run(pruneImage ImagePruneFunc, pruneStream ImageStreamPruneFunc, pruneLayer LayerPruneFunc, pruneBlob BlobPruneFunc, pruneManifest ManifestPruneFunc)
}

// imagePruner implements ImagePruner.
Expand Down Expand Up @@ -428,7 +423,7 @@ func imageIsPrunable(g graph.Graph, imageNode *graph.ImageNode) bool {
// with the image streams that reference the image. After imagePruneFunc is
// invoked, the image node is removed from the graph, so that layers eligible
// for pruning may later be identified.
func pruneImages(g graph.Graph, imageNodes []*graph.ImageNode, pruneImage ImagePruneFunc, pruneManifest ManifestPruneFunc) {
func pruneImages(g graph.Graph, imageNodes []*graph.ImageNode, pruneImage ImagePruneFunc, pruneStream ImageStreamPruneFunc, pruneManifest ManifestPruneFunc) {
for _, imageNode := range imageNodes {
glog.V(4).Infof("Examining image %q", imageNode.Image.Name)

Expand All @@ -439,22 +434,34 @@ func pruneImages(g graph.Graph, imageNodes []*graph.ImageNode, pruneImage ImageP

glog.V(4).Infof("Image has only weak references - pruning")

streams := imageStreamPredecessors(g, imageNode)
if errs := pruneImage(imageNode.Image, streams); len(errs) > 0 {
glog.Errorf("Error pruning image %q: %v", imageNode.Image.Name, errs)
if err := pruneImage(imageNode.Image); err != nil {
glog.Errorf("Error pruning image %q: %v", imageNode.Image.Name, err)
}

for _, stream := range streams {
ref, err := imageapi.DockerImageReferenceForStream(stream)
repoName := fmt.Sprintf("%s/%s", ref.Namespace, ref.Name)
if err != nil {
glog.Errorf("Error constructing DockerImageReference for %q: %v", repoName, err)
continue
}
for _, n := range g.Predecessors(imageNode) {
if streamNode, ok := n.(*graph.ImageStreamNode); ok {
stream := streamNode.ImageStream
repoName := fmt.Sprintf("%s/%s", stream.Namespace, stream.Name)

glog.V(4).Infof("Pruning image from stream %s", repoName)
updatedStream, err := pruneStream(stream, imageNode.Image)
if err != nil {
glog.Errorf("Error pruning image from stream: %v", err)
continue
}

streamNode.ImageStream = updatedStream

ref, err := imageapi.DockerImageReferenceForStream(stream)
if err != nil {
glog.Errorf("Error constructing DockerImageReference for %q: %v", repoName, err)
continue
}

glog.V(4).Infof("Invoking pruneManifest for registry %q, repo %q, image %q", ref.Registry, repoName, imageNode.Image.Name)
if err := pruneManifest(ref.Registry, repoName, imageNode.Image.Name); err != nil {
glog.Errorf("Error pruning manifest for registry %q, repo %q, image %q: %v", ref.Registry, repoName, imageNode.Image.Name, err)
glog.V(4).Infof("Invoking pruneManifest for registry %q, repo %q, image %q", ref.Registry, repoName, imageNode.Image.Name)
if err := pruneManifest(ref.Registry, repoName, imageNode.Image.Name); err != nil {
glog.Errorf("Error pruning manifest for registry %q, repo %q, image %q: %v", ref.Registry, repoName, imageNode.Image.Name, err)
}
}
}

Expand All @@ -466,11 +473,11 @@ func pruneImages(g graph.Graph, imageNodes []*graph.ImageNode, pruneImage ImageP
// Run identifies images eligible for pruning, invoking imagePruneFunc for each
// image, and then it identifies layers eligible for pruning, invoking
// layerPruneFunc for each registry URL that has layers that can be pruned.
func (p *imagePruner) Run(pruneImage ImagePruneFunc, pruneLayer LayerPruneFunc, pruneBlob BlobPruneFunc, pruneManifest ManifestPruneFunc) {
func (p *imagePruner) Run(pruneImage ImagePruneFunc, pruneStream ImageStreamPruneFunc, pruneLayer LayerPruneFunc, pruneBlob BlobPruneFunc, pruneManifest ManifestPruneFunc) {
allNodes := p.g.NodeList()

imageNodes := imageNodeSubgraph(allNodes)
pruneImages(p.g, imageNodes, pruneImage, pruneManifest)
pruneImages(p.g, imageNodes, pruneImage, pruneStream, pruneManifest)

layerNodes := layerNodeSubgraph(allNodes)
pruneLayers(p.g, layerNodes, pruneLayer, pruneBlob)
Expand Down Expand Up @@ -563,72 +570,50 @@ func pruneLayers(g graph.Graph, layerNodes []*graph.ImageLayerNode, pruneLayer L

const retryCount = 2

// DeletingImagePruneFunc returns an ImagePruneFunc that deletes the image and
// removes it from each referencing ImageStream's status.tags.
func DeletingImagePruneFunc(images client.ImageInterface, streams client.ImageStreamsNamespacer) ImagePruneFunc {
return func(image *imageapi.Image, referencedStreams []*imageapi.ImageStream) []error {
result := []error{}

// DeletingImagePruneFunc returns an ImagePruneFunc that deletes the image.
func DeletingImagePruneFunc(images client.ImageInterface) ImagePruneFunc {
return func(image *imageapi.Image) error {
glog.V(4).Infof("Deleting image %q", image.Name)
if err := images.Delete(image.Name); err != nil {
e := fmt.Errorf("Error deleting image: %v", err)
glog.Error(e)
result = append(result, e)
return result
}

var updateStream func(*imageapi.Image, *imageapi.ImageStream, int) error
updateStream = func(image *imageapi.Image, stream *imageapi.ImageStream, retry int) error {
glog.V(4).Infof("Checking if stream %s/%s has references to image in status.tags", stream.Namespace, stream.Name)
for tag, history := range stream.Status.Tags {
glog.V(4).Infof("Checking tag %q", tag)
newHistory := imageapi.TagEventList{}
for i, tagEvent := range history.Items {
glog.V(4).Infof("Checking tag event %d with image %q", i, tagEvent.Image)
if tagEvent.Image != image.Name {
glog.V(4).Infof("Tag event doesn't match deleting image - keeping")
newHistory.Items = append(newHistory.Items, tagEvent)
}
}
stream.Status.Tags[tag] = newHistory
}
return e
}
return nil
}
}

glog.V(4).Infof("Updating image stream %s/%s", stream.Namespace, stream.Name)
glog.V(5).Infof("Updated stream: %#v", stream)
//TODO use the updated ImageStream that's returned and update the entry in the graph
//to hopefully avoid having to re-get the stream?
if _, err := streams.ImageStreams(stream.Namespace).UpdateStatus(stream); err != nil {
if errors.IsConflict(err) && retry > 0 {
if stream, err := streams.ImageStreams(stream.Namespace).Get(stream.Name); err == nil {
return updateStream(image, stream, retry-1)
}
func DeletingImageStreamPruneFunc(streams client.ImageStreamsNamespacer) ImageStreamPruneFunc {
return func(stream *imageapi.ImageStream, image *imageapi.Image) (*imageapi.ImageStream, error) {
glog.V(4).Infof("Checking if stream %s/%s has references to image in status.tags", stream.Namespace, stream.Name)
for tag, history := range stream.Status.Tags {
glog.V(4).Infof("Checking tag %q", tag)
newHistory := imageapi.TagEventList{}
for i, tagEvent := range history.Items {
glog.V(4).Infof("Checking tag event %d with image %q", i, tagEvent.Image)
if tagEvent.Image != image.Name {
glog.V(4).Infof("Tag event doesn't match deleting image - keeping")
newHistory.Items = append(newHistory.Items, tagEvent)
}
return err
}
return nil
stream.Status.Tags[tag] = newHistory
}

for _, stream := range referencedStreams {
if err := updateStream(image, stream, retryCount); err != nil {
e := fmt.Errorf("Unable to update image stream status %s/%s: %v", stream.Namespace, stream.Name, err)
glog.Error(e)
result = append(result, e)
}
glog.V(4).Infof("Updating image stream %s/%s", stream.Namespace, stream.Name)
glog.V(5).Infof("Updated stream: %#v", stream)
updatedStream, err := streams.ImageStreams(stream.Namespace).UpdateStatus(stream)
if err != nil {
return nil, err
}

return result
return updatedStream, nil
}
}

// DeletingLayerPruneFunc returns a LayerPruneFunc that sends the
// DeleteLayersRequest to the Docker registry.
//
// The request URL is http://registryURL/admin/layers and it is a DELETE
// request.
// DeletingLayerPruneFunc returns a LayerPruneFunc that uses registryClient to
// send a layer deletion request to the regsitry.
//
// The body of the request is JSON, and it is a map[string][]string, with each
// key being a layer, and each value being a list of Docker image repository
// names referenced by the layer.
// The request URL is http://registryURL/admin/<repo>/layers/<digest> and it is
// a DELETE request.
func DeletingLayerPruneFunc(registryClient *http.Client) LayerPruneFunc {
return func(registryURL, repoName, layer string) error {
glog.V(4).Infof("Pruning registry %q, repo %q, layer %q", registryURL, repoName, layer)
Expand All @@ -650,29 +635,17 @@ func DeletingLayerPruneFunc(registryClient *http.Client) LayerPruneFunc {

if resp.StatusCode != http.StatusNoContent {
glog.Errorf("Unexpected status code in response: %d", resp.StatusCode)
//TODO decode error response
//decoder := json.NewDecoder(resp.Body)
return fmt.Errorf("Unexpected status code %d in response", resp.StatusCode)
//TODO do a better job of decoding and reporting the errors?
decoder := json.NewDecoder(resp.Body)
response := make(map[string]interface{})
decoder.Decode(&response)
return fmt.Errorf("Unexpected status code %d in response: %#v", resp.StatusCode, response)
}

return nil
}
}

// imageStreamPredecessors returns a list of ImageStreams that are predecessors
// of imageNode.
func imageStreamPredecessors(g graph.Graph, imageNode *graph.ImageNode) []*imageapi.ImageStream {
streams := []*imageapi.ImageStream{}

for _, n := range g.Predecessors(imageNode) {
if streamNode, ok := n.(*graph.ImageStreamNode); ok {
streams = append(streams, streamNode.ImageStream)
}
}

return streams
}

func DeletingBlobPruneFunc(registryClient *http.Client) BlobPruneFunc {
return func(registryURL, blob string) error {
glog.V(4).Infof("Pruning registry %q, blob %q", registryURL, blob)
Expand All @@ -694,9 +667,11 @@ func DeletingBlobPruneFunc(registryClient *http.Client) BlobPruneFunc {

if resp.StatusCode != http.StatusNoContent {
glog.Errorf("Unexpected status code in response: %d", resp.StatusCode)
//TODO decode error response
//decoder := json.NewDecoder(resp.Body)
return fmt.Errorf("Unexpected status code %d in response", resp.StatusCode)
//TODO do a better job of decoding and reporting the errors?
decoder := json.NewDecoder(resp.Body)
response := make(map[string]interface{})
decoder.Decode(&response)
return fmt.Errorf("Unexpected status code %d in response: %#v", resp.StatusCode, response)
}

return nil
Expand Down Expand Up @@ -724,7 +699,7 @@ func DeletingManifestPruneFunc(registryClient *http.Client) ManifestPruneFunc {

if resp.StatusCode != http.StatusNoContent {
glog.Errorf("Unexpected status code in response: %d", resp.StatusCode)
//TODO decode error response
//TODO do a better job of decoding and reporting the errors?
decoder := json.NewDecoder(resp.Body)
response := make(map[string]interface{})
decoder.Decode(&response)
Expand Down
Loading

0 comments on commit 6b7a36a

Please sign in to comment.