diff --git a/pkg/api/graph/graph.go b/pkg/api/graph/graph.go index 6a74172e92b8..8d31fbdd1386 100644 --- a/pkg/api/graph/graph.go +++ b/pkg/api/graph/graph.go @@ -9,7 +9,7 @@ import ( ) type Node struct { - concrete.Node + graph.Node UniqueName } @@ -23,6 +23,10 @@ type uniqueNamer interface { UniqueName() string } +type NodeFinder interface { + Find(name UniqueName) graph.Node +} + // UniqueNodeInitializer is a graph that allows nodes with a unique name to be added without duplication. // If the node is newly added, true will be returned. type UniqueNodeInitializer interface { @@ -44,6 +48,7 @@ type MutableUniqueGraph interface { graph.Mutable MutableDirectedEdge UniqueNodeInitializer + NodeFinder } type Edge struct { @@ -294,6 +299,13 @@ func (g uniqueNamedGraph) FindOrCreate(name UniqueName, fn NodeInitializerFunc) return node, false } +func (g uniqueNamedGraph) Find(name UniqueName) graph.Node { + if node, ok := g.names[name]; ok { + return node + } + return nil +} + type typedGraph struct{} type stringer interface { diff --git a/pkg/api/graph/graph_test.go b/pkg/api/graph/graph_test.go index a38f75d70715..d2c90e4c5d8d 100644 --- a/pkg/api/graph/graph_test.go +++ b/pkg/api/graph/graph_test.go @@ -190,7 +190,8 @@ func TestGraph(t *testing.T) { } bc++ case *image.ImageStream: - if g.Kind(node) != ImageStreamGraphKind { + // TODO resolve this check for 2 kinds, since both have the same object type + if g.Kind(node) != ImageStreamGraphKind && g.Kind(node) != ImageStreamTagGraphKind { t.Fatalf("unexpected kind: %v", g.Kind(node)) } ir++ diff --git a/pkg/api/graph/types.go b/pkg/api/graph/types.go index 7f9114d53f54..52636097c271 100644 --- a/pkg/api/graph/types.go +++ b/pkg/api/graph/types.go @@ -18,12 +18,17 @@ import ( const ( UnknownGraphKind = iota - ImageStreamGraphKind + ImageStreamTagGraphKind DockerRepositoryGraphKind BuildConfigGraphKind DeploymentConfigGraphKind SourceRepositoryGraphKind ServiceGraphKind + ImageGraphKind + PodGraphKind + ImageStreamGraphKind + ReplicationControllerGraphKind + ImageLayerGraphKind ) const ( UnknownGraphEdgeKind = iota @@ -34,6 +39,9 @@ const ( BuildOutputGraphEdgeKind UsedInDeploymentGraphEdgeKind ExposedThroughServiceGraphEdgeKind + ReferencedImageGraphEdgeKind + WeakReferencedImageGraphEdgeKind + ReferencedImageLayerGraphKind ) type ServiceNode struct { @@ -117,7 +125,7 @@ func (n ImageStreamTagNode) String() string { } func (*ImageStreamTagNode) Kind() int { - return ImageStreamGraphKind + return ImageStreamTagGraphKind } type DockerImageRepositoryNode struct { @@ -157,6 +165,50 @@ func (SourceRepositoryNode) Kind() int { return SourceRepositoryGraphKind } +type ImageNode struct { + Node + Image *image.Image +} + +func (n ImageNode) Object() interface{} { + return n.Image +} + +func (n ImageNode) String() string { + return fmt.Sprintf("", n.Image.Name) +} + +func (*ImageNode) Kind() int { + return ImageGraphKind +} + +func Image(g MutableUniqueGraph, img *image.Image) graph.Node { + return EnsureUnique(g, + UniqueName(fmt.Sprintf("%d|%s", ImageGraphKind, img.Name)), + func(node Node) graph.Node { + return &ImageNode{node, img} + }, + ) +} + +func FindImage(g MutableUniqueGraph, imageName string) graph.Node { + return g.Find(UniqueName(fmt.Sprintf("%d|%s", ImageGraphKind, imageName))) +} + +type PodNode struct { + Node + Pod *kapi.Pod +} + +func Pod(g MutableUniqueGraph, pod *kapi.Pod) graph.Node { + return EnsureUnique(g, + UniqueName(fmt.Sprintf("%d|%s/%s", PodGraphKind, pod.Namespace, pod.Name)), + func(node Node) graph.Node { + return &PodNode{node, pod} + }, + ) +} + // Service adds the provided service to the graph if it does not already exist. It does not // link the service to covered nodes (that is a separate method). func Service(g MutableUniqueGraph, svc *kapi.Service) graph.Node { @@ -216,14 +268,14 @@ func SourceRepository(g MutableUniqueGraph, source build.BuildSource) (graph.Nod ), true } -// ImageStreamTag adds a graph node for the specific tag in an Image Repository if it +// ImageStreamTag adds a graph node for the specific tag in an Image Stream if it // does not already exist. func ImageStreamTag(g MutableUniqueGraph, namespace, name, tag string) graph.Node { if len(tag) == 0 { tag = image.DefaultImageTag } return EnsureUnique(g, - UniqueName(fmt.Sprintf("%d|%s/%s:%s", ImageStreamGraphKind, namespace, name, tag)), + UniqueName(fmt.Sprintf("%d|%s/%s:%s", ImageStreamTagGraphKind, namespace, name, tag)), func(node Node) graph.Node { return &ImageStreamTagNode{node, &image.ImageStream{ ObjectMeta: kapi.ObjectMeta{ @@ -444,3 +496,84 @@ func defaultNamespace(value, defaultValue string) string { } return value } + +type ImageStreamNode struct { + Node + *image.ImageStream +} + +func (n ImageStreamNode) Object() interface{} { + return n.ImageStream +} + +func (n ImageStreamNode) String() string { + return fmt.Sprintf("", n.Namespace, n.Name) +} + +func (*ImageStreamNode) Kind() int { + return ImageStreamGraphKind +} + +// ImageStream adds a graph node for the Image Stream if it does not already exist. +func ImageStream(g MutableUniqueGraph, stream *image.ImageStream) graph.Node { + return EnsureUnique(g, + UniqueName(fmt.Sprintf("%d|%s/%s", ImageStreamGraphKind, stream.Namespace, stream.Name)), + func(node Node) graph.Node { + return &ImageStreamNode{node, stream} + }, + ) +} + +type ReplicationControllerNode struct { + Node + *kapi.ReplicationController +} + +func (n ReplicationControllerNode) Object() interface{} { + return n.ReplicationController +} + +func (n ReplicationControllerNode) String() string { + return fmt.Sprintf("", n.Namespace, n.Name) +} + +func (*ReplicationControllerNode) Kind() int { + return ReplicationControllerGraphKind +} + +// ReplicationController adds a graph node for the ReplicationController if it does not already exist. +func ReplicationController(g MutableUniqueGraph, rc *kapi.ReplicationController) graph.Node { + return EnsureUnique(g, + UniqueName(fmt.Sprintf("%d|%s/%s", ReplicationControllerGraphKind, rc.Namespace, rc.Name)), + func(node Node) graph.Node { + return &ReplicationControllerNode{node, rc} + }, + ) +} + +type ImageLayerNode struct { + Node + Layer string +} + +func (n ImageLayerNode) Object() interface{} { + return n.Layer +} + +func (n ImageLayerNode) String() string { + return fmt.Sprintf("", n.Layer) +} + +func (*ImageLayerNode) Kind() int { + return ImageLayerGraphKind +} + +// ImageLayer adds a graph node for the layer if it does not already exist. +func ImageLayer(g MutableUniqueGraph, layer string) graph.Node { + return EnsureUnique(g, + UniqueName(fmt.Sprintf("%d|%s", ImageLayerGraphKind, layer)), + func(node Node) graph.Node { + return &ImageLayerNode{node, layer} + }, + ) +} diff --git a/pkg/client/fake_imagestreams.go b/pkg/client/fake_imagestreams.go index cf4fd9c0ddcb..879bf2c6a49e 100644 --- a/pkg/client/fake_imagestreams.go +++ b/pkg/client/fake_imagestreams.go @@ -28,13 +28,13 @@ func (c *FakeImageStreams) Get(name string) (*imageapi.ImageStream, error) { return obj.(*imageapi.ImageStream), err } -func (c *FakeImageStreams) Create(repo *imageapi.ImageStream) (*imageapi.ImageStream, error) { +func (c *FakeImageStreams) Create(stream *imageapi.ImageStream) (*imageapi.ImageStream, error) { obj, err := c.Fake.Invokes(FakeAction{Action: "create-imagestream"}, &imageapi.ImageStream{}) return obj.(*imageapi.ImageStream), err } -func (c *FakeImageStreams) Update(repo *imageapi.ImageStream) (*imageapi.ImageStream, error) { - obj, err := c.Fake.Invokes(FakeAction{Action: "update-imagestream"}, &imageapi.ImageStream{}) +func (c *FakeImageStreams) Update(stream *imageapi.ImageStream) (*imageapi.ImageStream, error) { + obj, err := c.Fake.Invokes(FakeAction{Action: "update-imagestream", Value: stream}, stream) return obj.(*imageapi.ImageStream), err } @@ -47,3 +47,8 @@ func (c *FakeImageStreams) Watch(label labels.Selector, field fields.Selector, r c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "watch-imagestreams"}) return nil, nil } + +func (c *FakeImageStreams) UpdateStatus(stream *imageapi.ImageStream) (result *imageapi.ImageStream, err error) { + obj, err := c.Fake.Invokes(FakeAction{Action: "update-status-imagestream", Value: stream}, stream) + return obj.(*imageapi.ImageStream), err +} diff --git a/pkg/client/imagestreams.go b/pkg/client/imagestreams.go index 43ca3b04ab31..917af5451a14 100644 --- a/pkg/client/imagestreams.go +++ b/pkg/client/imagestreams.go @@ -21,6 +21,7 @@ type ImageStreamInterface interface { Update(stream *imageapi.ImageStream) (*imageapi.ImageStream, error) Delete(name string) error Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) + UpdateStatus(stream *imageapi.ImageStream) (*imageapi.ImageStream, error) } // ImageStreamNamespaceGetter exposes methods to get ImageStreams by Namespace @@ -100,3 +101,10 @@ func (c *imageStreams) Watch(label labels.Selector, field fields.Selector, resou FieldsSelectorParam(field). Watch() } + +// UpdateStatus updates the image stream's status. Returns the server's representation of the image stream, and an error, if it occurs. +func (c *imageStreams) UpdateStatus(stream *imageapi.ImageStream) (result *imageapi.ImageStream, err error) { + result = &imageapi.ImageStream{} + err = c.r.Put().Namespace(c.ns).Resource("imageStreams").Name(stream.Name).SubResource("status").Body(stream).Do().Into(result) + return +} diff --git a/pkg/cmd/admin/admin.go b/pkg/cmd/admin/admin.go index 352552cda1a4..9ed8fc672608 100644 --- a/pkg/cmd/admin/admin.go +++ b/pkg/cmd/admin/admin.go @@ -4,6 +4,7 @@ import ( "fmt" "io" + eximageprune "github.com/openshift/origin/pkg/cmd/experimental/imageprune" "github.com/spf13/cobra" "github.com/openshift/origin/pkg/cmd/cli/cmd" @@ -47,6 +48,7 @@ func NewCommandAdmin(name, fullName string, out io.Writer) *cobra.Command { cmds.AddCommand(exipfailover.NewCmdIPFailoverConfig(f, fullName, "ipfailover", out)) cmds.AddCommand(exrouter.NewCmdRouter(f, fullName, "router", out)) cmds.AddCommand(exregistry.NewCmdRegistry(f, fullName, "registry", out)) + cmds.AddCommand(eximageprune.NewCmdPruneImages(f, fullName, "prune-images", out)) cmds.AddCommand(buildchain.NewCmdBuildChain(f, fullName, "build-chain")) cmds.AddCommand(cmd.NewCmdConfig(fullName, "config")) diff --git a/pkg/cmd/dockerregistry/dockerregistry.go b/pkg/cmd/dockerregistry/dockerregistry.go index 1ade014009d3..a412c3f1ecfc 100644 --- a/pkg/cmd/dockerregistry/dockerregistry.go +++ b/pkg/cmd/dockerregistry/dockerregistry.go @@ -3,6 +3,7 @@ package dockerregistry import ( "crypto/tls" "crypto/x509" + "encoding/json" "fmt" "io" "io/ioutil" @@ -18,24 +19,136 @@ import ( _ "github.com/docker/distribution/registry/storage/driver/s3" "github.com/docker/distribution/version" gorillahandlers "github.com/gorilla/handlers" + "github.com/gorilla/mux" _ "github.com/openshift/origin/pkg/dockerregistry/server" ) -type healthHandler struct { - delegate http.Handler +func newOpenShiftHandler(app *handlers.App) http.Handler { + router := mux.NewRouter() + router.HandleFunc("/healthz", health.StatusHandler) + // TODO add https scheme + router.HandleFunc("/admin/layers", deleteLayerFunc(app)).Methods("DELETE") + //router.HandleFunc("/admin/manifests", deleteManifestFunc(app)).Methods("DELETE") + // delegate to the registry if it's not 1 of the OpenShift routes + router.NotFoundHandler = app + + return router +} + +// DeleteLayersRequest is a mapping from layers to the image repositories that +// reference them. Below is a sample request: +// +// { +// "layer1": ["repo1", "repo2"], +// "layer2": ["repo1", "repo3"], +// ... +// } +type DeleteLayersRequest map[string][]string + +// AddLayer adds a layer to the request if it doesn't already exist. +func (r DeleteLayersRequest) AddLayer(layer string) { + if _, ok := r[layer]; !ok { + r[layer] = []string{} + } } -func newHealthHandler(delegate http.Handler) http.Handler { - return &healthHandler{delegate} +// AddStream adds an image stream reference to the layer. +func (r DeleteLayersRequest) AddStream(layer, stream string) { + r[layer] = append(r[layer], stream) } -func (h *healthHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { - if req.URL.Path == "/healthz" { - health.StatusHandler(w, req) - return +// deleteLayerFunc returns an http.HandlerFunc that is able to fully delete a +// layer from storage. +func deleteLayerFunc(app *handlers.App) http.HandlerFunc { + return func(w http.ResponseWriter, req *http.Request) { + defer req.Body.Close() + log.Infof("deleteLayerFunc invoked") + + //TODO verify auth + + body, err := ioutil.ReadAll(req.Body) + if err != nil { + //TODO + log.Errorf("Error reading body: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + deletions := DeleteLayersRequest{} + err = json.Unmarshal(body, &deletions) + if err != nil { + //TODO + log.Errorf("Error unmarshaling body: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + adminService := app.Registry().AdminService() + errs := []error{} + for layer, repos := range deletions { + log.Infof("Deleting layer=%q, repos=%v", layer, repos) + layerErrs := adminService.DeleteLayer(layer, repos) + errs = append(errs, layerErrs...) + } + + log.Infof("errs=%v", errs) + + //TODO write response + w.WriteHeader(http.StatusOK) + } +} + +/* +type DeleteManifestsRequest map[string][]string + +func (r *DeleteManifestsRequest) AddManifest(revision string) { + if _, ok := r[revision]; !ok { + r[revision] = []string{} + } +} + +func (r *DeleteManifestsRequest) AddStream(revision, stream string) { + r[revision] = append(r[revision], stream) +} + +func deleteManifestsFunc(app *handlers.App) http.HandlerFunc { + return func(w http.ResponseWriter, req *http.Request) { + defer req.Body.Close() + + //TODO verify auth + + body, err := ioutil.ReadAll(req.Body) + if err != nil { + //TODO + log.Errorf("Error reading body: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + deletions := DeleteManifestsRequest{} + err = json.Unmarshal(body, &deletions) + if err != nil { + //TODO + log.Errorf("Error unmarshaling body: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + adminService := app.Registry().AdminService() + errs := []error{} + for revision, repos := range deletions { + log.Infof("Deleting manifest revision=%q, repos=%v", revision, repos) + manifestErrs := adminService.DeleteManifest(revision, repos) + errs = append(errs, manifestErrs...) + } + + log.Infof("errs=%v", errs) + + //TODO write response + w.WriteHeader(http.StatusOK) } - h.delegate.ServeHTTP(w, req) } +*/ // Execute runs the Docker registry. func Execute(configFile io.Reader) { @@ -55,7 +168,7 @@ func Execute(configFile io.Reader) { ctx := context.Background() app := handlers.NewApp(ctx, *config) - handler := newHealthHandler(app) + handler := newOpenShiftHandler(app) handler = gorillahandlers.CombinedLoggingHandler(os.Stdout, handler) if config.HTTP.TLS.Certificate == "" { diff --git a/pkg/cmd/experimental/imageprune/imageprune.go b/pkg/cmd/experimental/imageprune/imageprune.go new file mode 100644 index 000000000000..b39a890c2757 --- /dev/null +++ b/pkg/cmd/experimental/imageprune/imageprune.go @@ -0,0 +1,115 @@ +package imageprune + +import ( + "fmt" + "io" + "net/http" + + kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/golang/glog" + "github.com/openshift/origin/pkg/cmd/dockerregistry" + imageapi "github.com/openshift/origin/pkg/image/api" + "github.com/openshift/origin/pkg/image/prune" + "github.com/spf13/cobra" + + "github.com/openshift/origin/pkg/cmd/util/clientcmd" +) + +const longDesc = ` +` + +type registryURLs []string + +func (u *registryURLs) Type() string { + return "string" +} + +func (u *registryURLs) String() string { + return fmt.Sprintf("%v", *u) +} + +func (u *registryURLs) Set(value string) error { + *u = append(*u, value) + return nil +} + +type config struct { + DryRun bool + RegistryURLs registryURLs + MinimumResourcePruningAge int +} + +func NewCmdPruneImages(f *clientcmd.Factory, parentName, name string, out io.Writer) *cobra.Command { + cfg := &config{ + DryRun: true, + RegistryURLs: registryURLs{"docker-registry.default.local"}, + MinimumResourcePruningAge: 60, + } + + cmd := &cobra.Command{ + Use: name, + Short: "Prune images", + Long: fmt.Sprintf(longDesc, parentName, name), + + Run: func(cmd *cobra.Command, args []string) { + if len(args) > 0 { + glog.Fatalf("No arguments are allowed to this command") + } + + osClient, kClient, err := f.Clients() + if err != nil { + glog.Fatalf("Error getting client: %v", err) + } + + if registryService, err := kClient.Services(kapi.NamespaceDefault).Get("docker-registry"); err != nil { + glog.Errorf("Error getting docker-registry service: %v", err) + } else { + cfg.RegistryURLs = append(cfg.RegistryURLs, fmt.Sprintf("%s:%d", registryService.Spec.PortalIP, registryService.Spec.Ports[0].Port)) + } + + pruner, err := prune.NewImagePruner(cfg.RegistryURLs, cfg.MinimumResourcePruningAge, osClient, osClient, kClient, kClient) + if err != nil { + glog.Fatalf("Error creating image pruner: %v", err) + } + + var ( + imagePruneFunc prune.ImagePruneFunc + layerPruneFunc prune.LayerPruneFunc + ) + switch cfg.DryRun { + case false: + fmt.Fprintln(out, "Dry run *disabled* - images will be pruned and data will be deleted!") + imagePruneFunc = prune.DefaultImagePruneFunc(osClient.Images(), osClient) + layerPruneFunc = prune.DefaultLayerPruneFunc(http.DefaultClient) + default: + fmt.Fprintln(out, "Dry run enabled - no modifications will be made.") + imagePruneFunc = func(image *imageapi.Image, streams []*imageapi.ImageStream) []error { + fmt.Fprintf(out, "Pruning image %q\n", image.Name) + return []error{} + } + + layerPruneFunc = func(registryURL string, req dockerregistry.DeleteLayersRequest) []error { + fmt.Fprintf(out, "Pruning from registry %q\n", registryURL) + for layer, repos := range req { + fmt.Fprintf(out, "\tLayer %q\n", layer) + if len(repos) > 0 { + fmt.Fprint(out, "\tReferenced streams:\n") + } + for _, repo := range repos { + fmt.Fprintf(out, "\t\t%q\n", repo) + } + } + return []error{} + } + } + + pruner.Run(imagePruneFunc, layerPruneFunc) + }, + } + + cmd.Flags().BoolVar(&cfg.DryRun, "dry-run", cfg.DryRun, "Perform an image pruning dry-run, displaying what would be deleted but not actually deleting anything (default=true).") + cmd.Flags().Var(&cfg.RegistryURLs, "registry-urls", "TODO") + cmd.Flags().IntVar(&cfg.MinimumResourcePruningAge, "older-than", cfg.MinimumResourcePruningAge, "TODO") + + return cmd +} diff --git a/pkg/image/prune/imagepruner.go b/pkg/image/prune/imagepruner.go new file mode 100644 index 000000000000..34479de6ae39 --- /dev/null +++ b/pkg/image/prune/imagepruner.go @@ -0,0 +1,577 @@ +package prune + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "strings" + + kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + kclient "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/golang/glog" + gonum "github.com/gonum/graph" + "github.com/openshift/origin/pkg/api/graph" + "github.com/openshift/origin/pkg/client" + "github.com/openshift/origin/pkg/cmd/dockerregistry" + imageapi "github.com/openshift/origin/pkg/image/api" +) + +// pruneAlgorithm contains the various settings to use when evaluating images +// and layers for pruning. +type pruneAlgorithm struct { + registryURLs []string + minimumAgeInMinutesToPrune int +} + +// externalImage returns true if the image belongs to an external Docker +// registry; i.e., a registry not controlled by OpenShift. +func (pa pruneAlgorithm) externalImage(image string) bool { + for _, url := range pa.registryURLs { + if strings.HasPrefix(image, url) { + return false + } + } + + return true +} + +// ImagePruneFunc is a function that is invoked for each image that is +// prunable, along with the list of image streams that reference it. +type ImagePruneFunc func(image *imageapi.Image, streams []*imageapi.ImageStream) []error + +// LayerPruneFunc is a function that is invoked for each registry, along with +// a DeleteLayersRequest that contains the layers that can be pruned and the +// image stream names that reference each layer. +type LayerPruneFunc func(registryURL string, req dockerregistry.DeleteLayersRequest) []error + +// ImagePruner knows how to prune images and layers. +type ImagePruner interface { + Run(imagePruneFunc ImagePruneFunc, layerPruneFunc LayerPruneFunc) +} + +// imagePruner implements ImagePruner. +type imagePruner struct { + g graph.Graph + algorithm pruneAlgorithm +} + +var _ ImagePruner = &imagePruner{} + +// NewImagePruner creates a new ImagePruner. +// +// registryURLs is a list of OpenShift registries. Only images with URLs +// belonging to this list are candidates for pruning. +// +// minimumAgeInMinutesToPrune is the minimum age, in minutes, that a resource +// must be in order for the image it references (or an image itself) to be a +// candidate for pruning. For example, if minimumAgeInMinutesToPrune is 60, and +// an ImageStream is only 59 minutes old, none of the images it references are +// eligible for pruning. +// +// images, streams, pods, and rcs are client interfaces for retrieving each +// respective resource type. +func NewImagePruner(registryURLs []string, minimumAgeInMinutesToPrune int, images client.ImagesInterfacer, streams client.ImageStreamsNamespacer, pods kclient.PodsNamespacer, rcs kclient.ReplicationControllersNamespacer) (ImagePruner, error) { + allImages, err := images.Images().List(labels.Everything(), fields.Everything()) + if err != nil { + return nil, fmt.Errorf("Error listing images: %v", err) + } + + allStreams, err := streams.ImageStreams(kapi.NamespaceAll).List(labels.Everything(), fields.Everything()) + if err != nil { + return nil, fmt.Errorf("Error listing image streams: %v", err) + } + + allPods, err := pods.Pods(kapi.NamespaceAll).List(labels.Everything()) + if err != nil { + return nil, fmt.Errorf("Error listing pods: %v", err) + } + + allRCs, err := rcs.ReplicationControllers(kapi.NamespaceAll).List(labels.Everything()) + if err != nil { + return nil, fmt.Errorf("Error listing replication controllers: %v", err) + } + + return newImagePruner(registryURLs, minimumAgeInMinutesToPrune, allImages, allStreams, allPods, allRCs), nil +} + +// newImagePruner creates a new ImagePruner. +func newImagePruner(registryURLs []string, minimumAgeInMinutesToPrune int, images *imageapi.ImageList, streams *imageapi.ImageStreamList, pods *kapi.PodList, rcs *kapi.ReplicationControllerList) ImagePruner { + g := graph.New() + + glog.V(1).Infof("Creating image pruner with registryURLs=%v, minimumAgeInMinutesToPrune=%d", registryURLs, minimumAgeInMinutesToPrune) + algorithm := pruneAlgorithm{ + registryURLs: registryURLs, + minimumAgeInMinutesToPrune: minimumAgeInMinutesToPrune, + } + + addImagesToGraph(g, images, algorithm) + addImageStreamsToGraph(g, streams, algorithm) + addPodsToGraph(g, pods, algorithm) + addReplicationControllersToGraph(g, rcs, algorithm) + + return &imagePruner{ + g: g, + algorithm: algorithm, + } +} + +// addImagesToGraph adds all images to the graph that belong to one of the +// registries in the algorithm and are at least as old as the minimum age +// threshold as specified by the algorithm. It also adds all the images' layers +// to the graph. +func addImagesToGraph(g graph.Graph, images *imageapi.ImageList, algorithm pruneAlgorithm) { + for i := range images.Items { + image := &images.Items[i] + + if algorithm.externalImage(image.DockerImageReference) { + glog.V(4).Infof("Image %q belongs to an external registry - skipping", image.DockerImageReference) + continue + } + + age := util.Now().Sub(image.CreationTimestamp.Time) + ageInMinutes := int(age.Minutes()) + if ageInMinutes < algorithm.minimumAgeInMinutesToPrune { + glog.V(4).Infof("Image %q is younger than minimum pruning age, skipping (age=%d)", image.Name, ageInMinutes) + continue + } + + glog.V(4).Infof("Adding image %q to graph", image.Name) + imageNode := graph.Image(g, image) + + manifest := imageapi.DockerImageManifest{} + if err := json.Unmarshal([]byte(image.DockerImageManifest), &manifest); err != nil { + glog.Errorf("Unable to extract manifest from image: %v. This image's layers won't be pruned if the image is pruned now.", err) + } + + for _, layer := range manifest.FSLayers { + glog.V(4).Infof("Adding image layer %q to graph", layer.DockerBlobSum) + layerNode := graph.ImageLayer(g, layer.DockerBlobSum) + g.AddEdge(imageNode, layerNode, graph.ReferencedImageLayerGraphKind) + } + } +} + +// addImageStreamsToGraph adds all the streams to the graph. The current image +// associated with each of an image stream's tags is never eligible for +// pruning. Prior image revisions for a tag are candidates for pruning if the +// image stream's age is at least as old as the minimum threshold in algorithm. +// Otherwise, if the image stream is younger than the threshold, all image +// revisions for that stream are ineligible for pruning. +// +// addImageStreamsToGraph also adds references from each stream to all the +// layers it references (via each image a stream references). +func addImageStreamsToGraph(g graph.Graph, streams *imageapi.ImageStreamList, algorithm pruneAlgorithm) { + for i := range streams.Items { + stream := &streams.Items[i] + + // use a weak reference for old image revisions by default + oldImageRevisionReferenceKind := graph.WeakReferencedImageGraphEdgeKind + + age := util.Now().Sub(stream.CreationTimestamp.Time) + if int(age.Minutes()) < algorithm.minimumAgeInMinutesToPrune { + // stream's age is below threshold - use a strong reference for old image revisions instead + glog.V(4).Infof("Stream %s/%s is below age threshold - none of its images are eligible for pruning", stream.Namespace, stream.Name) + oldImageRevisionReferenceKind = graph.ReferencedImageGraphEdgeKind + } + + glog.V(4).Infof("Adding image stream %s/%s to graph", stream.Namespace, stream.Name) + isNode := graph.ImageStream(g, stream) + imageStreamNode := isNode.(*graph.ImageStreamNode) + + for tag, history := range stream.Status.Tags { + for i := range history.Items { + if algorithm.externalImage(history.Items[i].DockerImageReference) { + glog.V(4).Infof("Tag %q revision %d points to %s which is part of an external registry; skipping", tag, i, history.Items[i].DockerImageReference) + continue + } + + n := graph.FindImage(g, history.Items[i].Image) + if n == nil { + glog.V(1).Infof("Unable to find image %q in graph", history.Items[i].Image) + continue + } + imageNode := n.(*graph.ImageNode) + + var kind int + switch i { + case 0: + kind = graph.ReferencedImageGraphEdgeKind + default: + kind = oldImageRevisionReferenceKind + } + glog.V(4).Infof("Adding edge (kind=%d) from %q to %q", kind, imageStreamNode.UniqueName.UniqueName(), imageNode.UniqueName.UniqueName()) + g.AddEdge(imageStreamNode, imageNode, kind) + + glog.V(4).Infof("Adding stream->layer references") + // add stream -> layer references so we can prune them later + for _, s := range g.Successors(imageNode) { + if g.Kind(s) != graph.ImageLayerGraphKind { + continue + } + glog.V(4).Infof("Adding reference from stream %q to layer %q", stream.Name, s.(*graph.ImageLayerNode).Layer) + g.AddEdge(imageStreamNode, s, graph.ReferencedImageLayerGraphKind) + } + } + } + } +} + +// addPodsToGraph adds pods to the graph. +// +// A pod is only *excluded* from being added to the graph if its phase is not +// pending or running and it is at least as old as the minimum age threshold +// defined by algorithm. +// +// Edges are added to the graph from each pod to the images specified by that +// pod's list of containers, as long as the image belongs to one of the +// registries specified in algorithm. +func addPodsToGraph(g graph.Graph, pods *kapi.PodList, algorithm pruneAlgorithm) { + for i := range pods.Items { + pod := &pods.Items[i] + + if pod.Status.Phase != kapi.PodRunning && pod.Status.Phase != kapi.PodPending { + age := util.Now().Sub(pod.CreationTimestamp.Time) + if int(age.Minutes()) >= algorithm.minimumAgeInMinutesToPrune { + glog.V(4).Infof("Pod %s/%s is not running or pending and age is at least minimum pruning age - skipping", pod.Namespace, pod.Name) + // not pending or running, age is at least minimum pruning age, skip + continue + } + } + + glog.V(4).Infof("Adding pod %s/%s to graph", pod.Namespace, pod.Name) + podNode := graph.Pod(g, pod) + + addPodSpecToGraph(g, &pod.Spec, podNode, algorithm) + } +} + +// Edges are added to the graph from each predecessor (pod or replication +// controller) to the images specified by the pod spec's list of containers, as +// long as the image belongs to one of the registries specified in algorithm. +func addPodSpecToGraph(g graph.Graph, spec *kapi.PodSpec, predecessor gonum.Node, algorithm pruneAlgorithm) { + for j := range spec.Containers { + container := spec.Containers[j] + + glog.V(4).Infof("Examining container image %q", container.Image) + if algorithm.externalImage(container.Image) { + glog.V(4).Infof("Image belongs to an external registry - skipping") + continue + } + + ref, err := imageapi.ParseDockerImageReference(container.Image) + if err != nil { + glog.Errorf("Unable to parse docker image reference %q: %v", container.Image, err) + continue + } + + if len(ref.ID) == 0 { + glog.Errorf("Missing image ID") + continue + } + + imageNode := graph.FindImage(g, ref.ID) + if imageNode == nil { + glog.Errorf("Expected to find image %q in the graph, but it was missing", ref.ID) + continue + } + + glog.V(4).Infof("Adding edge from pod to image") + g.AddEdge(predecessor, imageNode, graph.ReferencedImageGraphEdgeKind) + } +} + +// addReplicationControllersToGraph adds replication controllers to the graph. +// +// Edges are added to the graph from each replication controller to the images +// specified by its pod spec's list of containers, as long as the image belongs +// to one of the registries specified in algorithm. +func addReplicationControllersToGraph(g graph.Graph, rcs *kapi.ReplicationControllerList, algorithm pruneAlgorithm) { + for i := range rcs.Items { + rc := &rcs.Items[i] + rcNode := graph.ReplicationController(g, rc) + addPodSpecToGraph(g, &rc.Spec.Template.Spec, rcNode, algorithm) + } +} + +// imageNodeSubgraph returns only nodes of type ImageNode. +func imageNodeSubgraph(nodes []gonum.Node) []*graph.ImageNode { + ret := []*graph.ImageNode{} + for i := range nodes { + if node, ok := nodes[i].(*graph.ImageNode); ok { + ret = append(ret, node) + } + } + return ret +} + +// edgeKind returns true if the edge from "from" to "to" is of the desired kind. +func edgeKind(g graph.Graph, from, to gonum.Node, desiredKind int) bool { + edge := g.EdgeBetween(from, to) + kind := g.EdgeKind(edge) + return kind == desiredKind +} + +// imageIsPrunable returns true iff the image node only has weak references +// from its predecessors to it. A weak reference to an image is a reference +// from an image stream to an image where the image is not the current image +// for a tag and the image stream is at least as old as the minimum pruning +// age. +func imageIsPrunable(g graph.Graph, imageNode *graph.ImageNode) bool { + onlyWeakReferences := true + + for _, n := range g.Predecessors(imageNode) { + glog.V(4).Infof("Examining predecessor %#v", n) + if !edgeKind(g, n, imageNode, graph.WeakReferencedImageGraphEdgeKind) { + glog.V(4).Infof("Strong reference detected") + onlyWeakReferences = false + break + } + } + + return onlyWeakReferences + +} + +// pruneImages invokes imagePruneFunc with each image that is prunable, along +// with the image streams that reference the image. After imagePruneFunc is +// invoked, the image node is removed from the graph, so that layers eligible +// for pruning may later be identified. +func pruneImages(g graph.Graph, imageNodes []*graph.ImageNode, imagePruneFunc ImagePruneFunc) { + for _, imageNode := range imageNodes { + glog.V(4).Infof("Examining image %q", imageNode.Image.Name) + + if !imageIsPrunable(g, imageNode) { + glog.V(4).Infof("Image has strong references - not pruning") + continue + } + + glog.V(4).Infof("Image has only weak references - pruning") + + streams := imageStreamPredecessors(g, imageNode) + if errs := imagePruneFunc(imageNode.Image, streams); len(errs) > 0 { + //TODO + glog.Errorf("Error pruning image %q: %v", imageNode.Image.Name, errs) + } + + // remove pruned image node from graph, for layer pruning later + g.RemoveNode(imageNode) + } +} + +// Run identifies images eligible for pruning, invoking imagePruneFunc for each +// image, and then it identifies layers eligible for pruning, invoking +// layerPruneFunc for each registry URL that has layers that can be pruned. +func (p *imagePruner) Run(imagePruneFunc ImagePruneFunc, layerPruneFunc LayerPruneFunc) { + allNodes := p.g.NodeList() + + imageNodes := imageNodeSubgraph(allNodes) + pruneImages(p.g, imageNodes, imagePruneFunc) + + layerNodes := layerNodeSubgraph(allNodes) + pruneLayers(p.g, layerNodes, layerPruneFunc) +} + +// layerNodeSubgraph returns the subset of nodes that are ImageLayerNodes. +func layerNodeSubgraph(nodes []gonum.Node) []*graph.ImageLayerNode { + ret := []*graph.ImageLayerNode{} + for i := range nodes { + if node, ok := nodes[i].(*graph.ImageLayerNode); ok { + ret = append(ret, node) + } + } + return ret +} + +// layerIsPrunable returns true if the layer is not referenced by any images. +func layerIsPrunable(g graph.Graph, layerNode *graph.ImageLayerNode) bool { + for _, predecessor := range g.Predecessors(layerNode) { + glog.V(4).Infof("Examining layer predecessor %#v", predecessor) + if g.Kind(predecessor) == graph.ImageGraphKind { + glog.V(4).Infof("Layer has an image predecessor") + return false + } + } + + return true +} + +// streamLayerReferences returns a list of ImageStreamNodes that reference a +// given ImageLayeNode. +func streamLayerReferences(g graph.Graph, layerNode *graph.ImageLayerNode) []*graph.ImageStreamNode { + ret := []*graph.ImageStreamNode{} + + for _, predecessor := range g.Predecessors(layerNode) { + if g.Kind(predecessor) != graph.ImageStreamGraphKind { + continue + } + + ret = append(ret, predecessor.(*graph.ImageStreamNode)) + } + + return ret +} + +// pruneLayers creates a mapping of registryURLs to +// dockerregistry.DeleteLayersRequest objects, invoking layerPruneFunc for each +// registryURL and request. +func pruneLayers(g graph.Graph, layerNodes []*graph.ImageLayerNode, layerPruneFunc LayerPruneFunc) { + registryDeletionRequests := map[string]dockerregistry.DeleteLayersRequest{} + + for _, layerNode := range layerNodes { + glog.V(4).Infof("Examining layer %q", layerNode.Layer) + + if !layerIsPrunable(g, layerNode) { + glog.V(4).Infof("Layer %q has image references - not pruning", layerNode.Layer) + continue + } + + // get streams that reference layer + streamNodes := streamLayerReferences(g, layerNode) + + // for each stream, get its registry + for _, streamNode := range streamNodes { + stream := streamNode.ImageStream + streamName := fmt.Sprintf("%s/%s", stream.Namespace, stream.Name) + glog.V(4).Infof("Layer has an image stream predecessor: %s", streamName) + + ref, err := imageapi.DockerImageReferenceForStream(stream) + if err != nil { + //TODO + glog.Errorf("Error constructing DockerImageReference for %q", streamName) + continue + } + + // update registry layer deletion request + glog.V(4).Infof("Looking for existing deletion request for registry %q", ref.Registry) + deletionRequest, ok := registryDeletionRequests[ref.Registry] + if !ok { + glog.V(4).Infof("Request not found - creating new one") + deletionRequest = dockerregistry.DeleteLayersRequest{} + registryDeletionRequests[ref.Registry] = deletionRequest + } + + /* + glog.V(4).Infof("Looking for existing entry for layer %q in deletion request", layerNode.Layer) + layerRepos, ok := deletionRequest[layerNode.Layer] + if !ok { + glog.V(4).Infof("Layer entry not found - creating new one") + layerRepos = []string{} + } + */ + deletionRequest.AddLayer(layerNode.Layer) + deletionRequest.AddStream(layerNode.Layer, streamName) + + /* + glog.V(4).Infof("Adding stream %q to request", streamName) + layerRepos = append(layerRepos, streamName) + deletionRequest[layerNode.Layer] = layerRepos + + glog.V(4).Infof("layerRepos now = %v", layerRepos) + */ + } + } + + for registryURL, req := range registryDeletionRequests { + glog.V(4).Infof("Invoking layerPruneFunc with registry=%q, req=%#v", registryURL, req) + layerPruneFunc(registryURL, req) + } +} + +// DefaultImagePruneFunc returns an ImagePruneFunc that deletes the image and +// removes it from each referencing ImageStream's status.tags. +func DefaultImagePruneFunc(images client.ImageInterface, streams client.ImageStreamsNamespacer) ImagePruneFunc { + return func(image *imageapi.Image, referencedStreams []*imageapi.ImageStream) []error { + result := []error{} + + glog.V(4).Infof("Deleting image %q", image.Name) + if err := images.Delete(image.Name); err != nil { + e := fmt.Errorf("Error deleting image: %v", err) + glog.Error(e) + result = append(result, e) + return result + } + + for _, stream := range referencedStreams { + glog.V(4).Infof("Checking if stream %s/%s has references to image in status.tags", stream.Namespace, stream.Name) + for tag, history := range stream.Status.Tags { + glog.V(4).Infof("Checking tag %q", tag) + newHistory := imageapi.TagEventList{Items: []imageapi.TagEvent{history.Items[0]}} + for i, tagEvent := range history.Items[1:] { + glog.V(4).Infof("Checking tag event %d with image %q", i+1, tagEvent.Image) + if tagEvent.Image != image.Name { + glog.V(4).Infof("Tag event doesn't match deleting image - keeping") + newHistory.Items = append(newHistory.Items, tagEvent) + } + } + stream.Status.Tags[tag] = newHistory + } + + glog.V(4).Infof("Updating image stream %s/%s", stream.Namespace, stream.Name) + glog.V(5).Infof("Updated stream: %#v", stream) + if _, err := streams.ImageStreams(stream.Namespace).UpdateStatus(stream); err != nil { + e := fmt.Errorf("Unable to update image stream status %s/%s: %v", stream.Namespace, stream.Name, err) + glog.Error(e) + result = append(result, e) + } + } + + return result + } +} + +// DefaultLayerPruneFunc returns a LayerPruneFunc that sends the +// DeleteLayersRequest to the Docker registry. +// +// The request URL is http://registryURL/admin/layers and it is a DELETE +// request. +// +// The body of the request is JSON, and it is a map[string][]string, with each +// key being a layer, and each value being a list of Docker image repository +// names referenced by the layer. +func DefaultLayerPruneFunc(registryClient *http.Client) LayerPruneFunc { + return func(registryURL string, deletions dockerregistry.DeleteLayersRequest) []error { + glog.V(4).Infof("Starting pruning of layers from %q, req %#v", registryURL, deletions) + body, err := json.Marshal(&deletions) + if err != nil { + glog.Errorf("Error marshaling request body: %v", err) + return []error{fmt.Errorf("Error creating request body: %v", err)} + } + + //TODO https + req, err := http.NewRequest("DELETE", fmt.Sprintf("http://%s/admin/layers", registryURL), bytes.NewReader(body)) + if err != nil { + glog.Errorf("Error creating request: %v", err) + return []error{fmt.Errorf("Error creating request: %v", err)} + } + + glog.V(4).Infof("Sending request to registry") + resp, err := registryClient.Do(req) + if err != nil { + glog.Errorf("Error sending request: %v", err) + return []error{fmt.Errorf("Error sending request: %v", err)} + } + defer resp.Body.Close() + + //TODO read response + + return []error{} + } +} + +// imageStreamPredecessors returns a list of ImageStreams that are predecessors +// of imageNode. +func imageStreamPredecessors(g graph.Graph, imageNode *graph.ImageNode) []*imageapi.ImageStream { + streams := []*imageapi.ImageStream{} + + for _, n := range g.Predecessors(imageNode) { + if streamNode, ok := n.(*graph.ImageStreamNode); ok { + streams = append(streams, streamNode.ImageStream) + } + } + + return streams +} diff --git a/pkg/image/prune/imagepruner_test.go b/pkg/image/prune/imagepruner_test.go new file mode 100644 index 000000000000..d327dc93f792 --- /dev/null +++ b/pkg/image/prune/imagepruner_test.go @@ -0,0 +1,396 @@ +package prune + +import ( + "flag" + "fmt" + "reflect" + "testing" + "time" + + kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/openshift/origin/pkg/client" + "github.com/openshift/origin/pkg/cmd/dockerregistry" + imageapi "github.com/openshift/origin/pkg/image/api" +) + +func imageList(images ...imageapi.Image) imageapi.ImageList { + return imageapi.ImageList{ + Items: images, + } +} + +func agedImage(id, ref string, ageInMinutes int64) imageapi.Image { + image := imageapi.Image{ + ObjectMeta: kapi.ObjectMeta{ + Name: id, + }, + DockerImageReference: ref, + } + + if ageInMinutes >= 0 { + image.CreationTimestamp = util.NewTime(util.Now().Add(time.Duration(-1*ageInMinutes) * time.Minute)) + } + + return image +} + +func image(id, ref string) imageapi.Image { + return agedImage(id, ref, -1) +} + +func podList(pods ...kapi.Pod) kapi.PodList { + return kapi.PodList{ + Items: pods, + } +} + +func pod(namespace, name string, phase kapi.PodPhase, containerImages ...string) kapi.Pod { + return agedPod(namespace, name, phase, -1, containerImages...) +} + +func agedPod(namespace, name string, phase kapi.PodPhase, ageInMinutes int64, containerImages ...string) kapi.Pod { + pod := kapi.Pod{ + ObjectMeta: kapi.ObjectMeta{ + Namespace: namespace, + Name: name, + }, + Spec: podSpec(containerImages...), + Status: kapi.PodStatus{ + Phase: phase, + }, + } + + if ageInMinutes >= 0 { + pod.CreationTimestamp = util.NewTime(util.Now().Add(time.Duration(-1*ageInMinutes) * time.Minute)) + } + + return pod +} + +func podSpec(containerImages ...string) kapi.PodSpec { + spec := kapi.PodSpec{ + Containers: []kapi.Container{}, + } + for _, image := range containerImages { + container := kapi.Container{ + Image: image, + } + spec.Containers = append(spec.Containers, container) + } + return spec +} + +func streamList(streams ...imageapi.ImageStream) imageapi.ImageStreamList { + return imageapi.ImageStreamList{ + Items: streams, + } +} + +func stream(namespace, name string, tags map[string]imageapi.TagEventList) imageapi.ImageStream { + return agedStream(namespace, name, -1, tags) +} + +func agedStream(namespace, name string, ageInMinutes int64, tags map[string]imageapi.TagEventList) imageapi.ImageStream { + stream := imageapi.ImageStream{ + ObjectMeta: kapi.ObjectMeta{ + Namespace: namespace, + Name: name, + }, + Status: imageapi.ImageStreamStatus{ + Tags: tags, + }, + } + + if ageInMinutes >= 0 { + stream.CreationTimestamp = util.NewTime(util.Now().Add(time.Duration(-1*ageInMinutes) * time.Minute)) + } + + return stream +} + +func streamPtr(namespace, name string, tags map[string]imageapi.TagEventList) *imageapi.ImageStream { + s := stream(namespace, name, tags) + return &s +} + +func tags(list ...namedTagEventList) map[string]imageapi.TagEventList { + m := make(map[string]imageapi.TagEventList, len(list)) + for _, tag := range list { + m[tag.name] = tag.events + } + return m +} + +type namedTagEventList struct { + name string + events imageapi.TagEventList +} + +func tag(name string, events ...imageapi.TagEvent) namedTagEventList { + return namedTagEventList{ + name: name, + events: imageapi.TagEventList{ + Items: events, + }, + } +} + +func tagEvent(id, ref string) imageapi.TagEvent { + return imageapi.TagEvent{ + Image: id, + DockerImageReference: ref, + } +} + +func rcList(rcs ...kapi.ReplicationController) kapi.ReplicationControllerList { + return kapi.ReplicationControllerList{ + Items: rcs, + } +} + +func rc(namespace, name string, containerImages ...string) kapi.ReplicationController { + return kapi.ReplicationController{ + ObjectMeta: kapi.ObjectMeta{ + Namespace: namespace, + Name: name, + }, + Spec: kapi.ReplicationControllerSpec{ + Template: &kapi.PodTemplateSpec{ + Spec: podSpec(containerImages...), + }, + }, + } +} + +var logLevel = flag.Int("loglevel", 0, "") + +func TestRun(t *testing.T) { + flag.Lookup("v").Value.Set(fmt.Sprint(*logLevel)) + registryURL := "registry" + + tests := map[string]struct { + registryURLs []string + images imageapi.ImageList + pods kapi.PodList + streams imageapi.ImageStreamList + rcs kapi.ReplicationControllerList + expectedDeletions []string + expectedUpdatedStreams []string + }{ + "1 pod - phase pending - don't prune": { + images: imageList(image("id", registryURL+"/foo/bar@id")), + pods: podList(pod("foo", "pod1", kapi.PodPending, registryURL+"/foo/bar@id")), + expectedDeletions: []string{}, + }, + "3 pods - last phase pending - don't prune": { + images: imageList(image("id", registryURL+"/foo/bar@id")), + pods: podList( + pod("foo", "pod1", kapi.PodSucceeded, registryURL+"/foo/bar@id"), + pod("foo", "pod2", kapi.PodSucceeded, registryURL+"/foo/bar@id"), + pod("foo", "pod3", kapi.PodPending, registryURL+"/foo/bar@id"), + ), + expectedDeletions: []string{}, + }, + "1 pod - phase running - don't prune": { + images: imageList(image("id", registryURL+"/foo/bar@id")), + pods: podList(pod("foo", "pod1", kapi.PodRunning, registryURL+"/foo/bar@id")), + expectedDeletions: []string{}, + }, + "3 pods - last phase running - don't prune": { + images: imageList(image("id", registryURL+"/foo/bar@id")), + pods: podList( + pod("foo", "pod1", kapi.PodSucceeded, registryURL+"/foo/bar@id"), + pod("foo", "pod2", kapi.PodSucceeded, registryURL+"/foo/bar@id"), + pod("foo", "pod3", kapi.PodRunning, registryURL+"/foo/bar@id"), + ), + expectedDeletions: []string{}, + }, + "pod phase succeeded - prune": { + images: imageList(image("id", registryURL+"/foo/bar@id")), + pods: podList(pod("foo", "pod1", kapi.PodSucceeded, registryURL+"/foo/bar@id")), + expectedDeletions: []string{"id"}, + }, + "pod phase succeeded, pod less than min pruning age - don't prune": { + images: imageList(image("id", registryURL+"/foo/bar@id")), + pods: podList(agedPod("foo", "pod1", kapi.PodSucceeded, 5, registryURL+"/foo/bar@id")), + expectedDeletions: []string{}, + }, + "pod phase succeeded, image less than min pruning age - don't prune": { + images: imageList(agedImage("id", registryURL+"/foo/bar@id", 5)), + pods: podList(pod("foo", "pod1", kapi.PodSucceeded, registryURL+"/foo/bar@id")), + expectedDeletions: []string{}, + }, + "pod phase failed - prune": { + images: imageList(image("id", registryURL+"/foo/bar@id")), + pods: podList( + pod("foo", "pod1", kapi.PodFailed, registryURL+"/foo/bar@id"), + pod("foo", "pod2", kapi.PodFailed, registryURL+"/foo/bar@id"), + pod("foo", "pod3", kapi.PodFailed, registryURL+"/foo/bar@id"), + ), + expectedDeletions: []string{"id"}, + }, + "pod phase unknown - prune": { + images: imageList(image("id", registryURL+"/foo/bar@id")), + pods: podList( + pod("foo", "pod1", kapi.PodUnknown, registryURL+"/foo/bar@id"), + pod("foo", "pod2", kapi.PodUnknown, registryURL+"/foo/bar@id"), + pod("foo", "pod3", kapi.PodUnknown, registryURL+"/foo/bar@id"), + ), + expectedDeletions: []string{"id"}, + }, + "referenced by rc - don't prune": { + images: imageList(image("id", registryURL+"/foo/bar@id")), + rcs: rcList(rc("foo", "rc1", registryURL+"/foo/bar@id")), + expectedDeletions: []string{}, + }, + "image stream - keep most recent image": { + images: imageList( + image("id", registryURL+"/foo/bar@id"), + image("id2", registryURL+"/foo/bar@id2"), + ), + streams: streamList( + stream("foo", "bar", tags( + tag("latest", + tagEvent("id", registryURL+"/foo/bar@id"), + tagEvent("id2", registryURL+"/foo/bar@id2"), + ), + )), + ), + expectedDeletions: []string{"id2"}, + expectedUpdatedStreams: []string{"foo/bar"}, + }, + "image stream age less than min pruning age - don't prune": { + images: imageList( + image("id", registryURL+"/foo/bar@id"), + image("id2", registryURL+"/foo/bar@id2"), + ), + streams: streamList( + agedStream("foo", "bar", 5, tags( + tag("latest", + tagEvent("id", registryURL+"/foo/bar@id"), + tagEvent("id2", registryURL+"/foo/bar@id2"), + ), + )), + ), + expectedDeletions: []string{}, + expectedUpdatedStreams: []string{}, + }, + "multiple resources pointing to image - don't prune": { + images: imageList( + image("id", registryURL+"/foo/bar@id"), + image("id2", registryURL+"/foo/bar@id2"), + ), + streams: streamList( + stream("foo", "bar", tags( + tag("latest", + tagEvent("id", registryURL+"/foo/bar@id"), + tagEvent("id2", registryURL+"/foo/bar@id2"), + ), + )), + ), + rcs: rcList(rc("foo", "rc1", registryURL+"/foo/bar@id2")), + pods: podList(pod("foo", "pod1", kapi.PodRunning, registryURL+"/foo/bar@id2")), + expectedDeletions: []string{}, + expectedUpdatedStreams: []string{}, + }, + } + + for name, test := range tests { + registryURLs := []string{registryURL} + if len(test.registryURLs) > 0 { + registryURLs = test.registryURLs + } + p := NewImagePruner(registryURLs, 60, &test.images, &test.streams, &test.pods, &test.rcs) + actualDeletions := util.NewStringSet() + actualUpdatedStreams := util.NewStringSet() + + imagePruneFunc := func(image *imageapi.Image, streams []*imageapi.ImageStream) []error { + actualDeletions.Insert(image.Name) + for _, stream := range streams { + actualUpdatedStreams.Insert(fmt.Sprintf("%s/%s", stream.Namespace, stream.Name)) + } + return []error{} + } + + layerPruneFunc := func(registryURL string, req dockerregistry.DeleteLayersRequest) []error { + return []error{} + } + + p.Run(imagePruneFunc, layerPruneFunc) + + expectedDeletions := util.NewStringSet(test.expectedDeletions...) + if !reflect.DeepEqual(expectedDeletions, actualDeletions) { + t.Errorf("%s: expected image deletions %q, got %q", name, expectedDeletions.List(), actualDeletions.List()) + } + + expectedUpdatedStreams := util.NewStringSet(test.expectedUpdatedStreams...) + if !reflect.DeepEqual(expectedUpdatedStreams, actualUpdatedStreams) { + t.Errorf("%s: expected stream updates %q, got %q", name, expectedUpdatedStreams.List(), actualUpdatedStreams.List()) + } + } +} + +func TestDefaultImagePruneFunc(t *testing.T) { + tests := map[string]struct { + referencedStreams []*imageapi.ImageStream + expectedUpdates []*imageapi.ImageStream + }{ + "no referenced streams": { + referencedStreams: []*imageapi.ImageStream{}, + expectedUpdates: []*imageapi.ImageStream{}, + }, + "1 tag, 1 image revision": { + referencedStreams: []*imageapi.ImageStream{ + streamPtr("foo", "bar", tags( + tag("latest", + tagEvent("id", "registry/foo/bar@id"), + ), + )), + }, + expectedUpdates: []*imageapi.ImageStream{}, + }, + "1 tag, multiple image revisions": { + referencedStreams: []*imageapi.ImageStream{ + streamPtr("foo", "bar", tags( + tag("latest", + tagEvent("id", "registry/foo/bar@id"), + tagEvent("id2", "registry/foo/bar@id2"), + ), + )), + }, + expectedUpdates: []*imageapi.ImageStream{ + streamPtr("foo", "bar", tags( + tag("latest", + tagEvent("id", "registry/foo/bar@id"), + ), + )), + }, + }, + } + + for name, test := range tests { + fakeClient := client.Fake{} + pruneFunc := DefaultImagePruneFunc(fakeClient.Images(), &fakeClient) + err := pruneFunc(&imageapi.Image{ObjectMeta: kapi.ObjectMeta{Name: "id2"}}, test.referencedStreams) + _ = err + + if len(fakeClient.Actions) < 1 { + t.Fatalf("%s: expected image deletion", name) + } + + if e, a := len(test.referencedStreams), len(fakeClient.Actions)-1; e != a { + t.Errorf("%s: expected %d stream updates, got %d", name, e, a) + } + + for i := range test.expectedUpdates { + if e, a := "update-status-imagestream", fakeClient.Actions[i+1].Action; e != a { + t.Errorf("%s: unexpected action %q", name, a) + } + updatedStream := fakeClient.Actions[i+1].Value.(*imageapi.ImageStream) + if e, a := test.expectedUpdates[i], updatedStream; !reflect.DeepEqual(e, a) { + t.Errorf("%s: unexpected updated stream: %s", name, util.ObjectDiff(e, a)) + } + } + } +} diff --git a/pkg/image/prune/summary.go b/pkg/image/prune/summary.go new file mode 100644 index 000000000000..f8fe0178c4f3 --- /dev/null +++ b/pkg/image/prune/summary.go @@ -0,0 +1,35 @@ +package prune + +import ( + "github.com/openshift/origin/pkg/cmd/dockerregistry" + imageapi "github.com/openshift/origin/pkg/image/api" +) + +type summarizingPruner struct { + delegate ImagePruner +} + +var _ ImagePruner = &summarizingPruner{} + +func NewSummarizingImagePruner(pruner ImagePruner) ImagePruner { + + return &summarizingPruner{} +} + +func (p *summarizingPruner) Run(baseImagePruneFunc ImagePruneFunc, baseLayerPruneFunc LayerPruneFunc) { + p.delegate.Run(p.imagePruneFunc(baseImagePruneFunc), p.layerPruneFunc(baseLayerPruneFunc)) +} + +func (p *summarizingPruner) imagePruneFunc(base ImagePruneFunc) ImagePruneFunc { + return func(image *imageapi.Image, streams []*imageapi.ImageStream) []error { + errs := base(image, streams) + return errs + } +} + +func (p *summarizingPruner) layerPruneFunc(base LayerPruneFunc) LayerPruneFunc { + return func(registryURL string, req dockerregistry.DeleteLayersRequest) []error { + errs := base(registryURL, req) + return errs + } +}