Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kubelet/rkt: add container/image gc for rkt. #14686

Merged
merged 1 commit into from
Oct 7, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,8 @@ func NewMainKubelet(
return nil, err
}
klet.containerRuntime = rktRuntime
klet.containerGC = rktRuntime
klet.imageManager = rkt.NewImageManager(rktRuntime)

// No Docker daemon to put in a container.
dockerDaemonContainer = ""
Expand Down
18 changes: 15 additions & 3 deletions pkg/kubelet/rkt/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,28 @@ limitations under the License.

package rkt

import "github.com/golang/glog"

// ImageManager manages and garbage collects the container images for rkt.
type ImageManager struct {
runtime *runtime
runtime *Runtime
}

func NewImageManager(r *runtime) *ImageManager {
func NewImageManager(r *Runtime) *ImageManager {
return &ImageManager{runtime: r}
}

// GarbageCollect collects the images. It is not implemented by rkt yet.
// GarbageCollect collects the images.
// TODO(yifan): Enforce ImageGCPolicy.
func (im *ImageManager) GarbageCollect() error {
if _, err := im.runtime.runCommand("image", "gc"); err != nil {
glog.Errorf("rkt: Failed to gc image: %v", err)
return err
}
return nil
}

// Start is a no-op for rkt as we don't need to mark unused images in kubelet.
func (im *ImageManager) Start() error {
return nil
}
70 changes: 36 additions & 34 deletions pkg/kubelet/rkt/rkt.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ const (
defaultImageTag = "latest"
)

// runtime implements the Containerruntime for rkt. The implementation
// Runtime implements the Containerruntime for rkt. The implementation
// uses systemd, so in order to run this runtime, systemd must be installed
// on the machine.
type runtime struct {
type Runtime struct {
systemd *dbus.Conn
// The absolute path to rkt binary.
rktBinAbsPath string
Expand All @@ -98,7 +98,7 @@ type runtime struct {
imagePuller kubecontainer.ImagePuller
}

var _ kubecontainer.Runtime = &runtime{}
var _ kubecontainer.Runtime = &Runtime{}

// TODO(yifan): Remove this when volumeManager is moved to separate package.
type volumeGetter interface {
Expand All @@ -113,7 +113,7 @@ func New(config *Config,
recorder record.EventRecorder,
containerRefManager *kubecontainer.RefManager,
prober prober.Prober,
volumeGetter volumeGetter) (kubecontainer.Runtime, error) {
volumeGetter volumeGetter) (*Runtime, error) {

systemdVersion, err := getSystemdVersion()
if err != nil {
Expand Down Expand Up @@ -142,7 +142,7 @@ func New(config *Config,
}
}

rkt := &runtime{
rkt := &Runtime{
systemd: systemd,
rktBinAbsPath: rktBinAbsPath,
config: config,
Expand Down Expand Up @@ -170,7 +170,7 @@ func New(config *Config,
return rkt, nil
}

func (r *runtime) buildCommand(args ...string) *exec.Cmd {
func (r *Runtime) buildCommand(args ...string) *exec.Cmd {
cmd := exec.Command(r.rktBinAbsPath)
cmd.Args = append(cmd.Args, r.config.buildGlobalOptions()...)
cmd.Args = append(cmd.Args, args...)
Expand All @@ -179,7 +179,7 @@ func (r *runtime) buildCommand(args ...string) *exec.Cmd {

// runCommand invokes rkt binary with arguments and returns the result
// from stdout in a list of strings. Each string in the list is a line.
func (r *runtime) runCommand(args ...string) ([]string, error) {
func (r *Runtime) runCommand(args ...string) ([]string, error) {
glog.V(4).Info("rkt: Run command:", args)

var stdout, stderr bytes.Buffer
Expand Down Expand Up @@ -394,7 +394,7 @@ func parseImageName(image string) (string, string) {

// getImageManifest invokes 'rkt image cat-manifest' to retrive the image manifest
// for the image.
func (r *runtime) getImageManifest(image string) (*appcschema.ImageManifest, error) {
func (r *Runtime) getImageManifest(image string) (*appcschema.ImageManifest, error) {
var manifest appcschema.ImageManifest

repoToPull, tag := parseImageName(image)
Expand All @@ -413,7 +413,7 @@ func (r *runtime) getImageManifest(image string) (*appcschema.ImageManifest, err
}

// makePodManifest transforms a kubelet pod spec to the rkt pod manifest.
func (r *runtime) makePodManifest(pod *api.Pod, pullSecrets []api.Secret) (*appcschema.PodManifest, error) {
func (r *Runtime) makePodManifest(pod *api.Pod, pullSecrets []api.Secret) (*appcschema.PodManifest, error) {
var globalPortMappings []kubecontainer.PortMapping
manifest := appcschema.BlankPodManifest()

Expand Down Expand Up @@ -535,7 +535,7 @@ func serviceFilePath(serviceName string) string {
//
// On success, it will return a string that represents name of the unit file
// and the runtime pod.
func (r *runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, *kubecontainer.Pod, error) {
func (r *Runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, *kubecontainer.Pod, error) {
// Generate the pod manifest from the pod spec.
manifest, err := r.makePodManifest(pod, pullSecrets)
if err != nil {
Expand Down Expand Up @@ -643,7 +643,7 @@ func (r *runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, *k

// generateEvents is a helper function that generates some container
// life cycle events for containers in a pod.
func (r *runtime) generateEvents(runtimePod *kubecontainer.Pod, reason string, failure error) {
func (r *Runtime) generateEvents(runtimePod *kubecontainer.Pod, reason string, failure error) {
// Set up container references.
for _, c := range runtimePod.Containers {
containerID := string(c.ID)
Expand Down Expand Up @@ -679,7 +679,7 @@ func (r *runtime) generateEvents(runtimePod *kubecontainer.Pod, reason string, f

// RunPod first creates the unit file for a pod, and then
// starts the unit over d-bus.
func (r *runtime) RunPod(pod *api.Pod, pullSecrets []api.Secret) error {
func (r *Runtime) RunPod(pod *api.Pod, pullSecrets []api.Secret) error {
glog.V(4).Infof("Rkt starts to run pod: name %q.", kubeletUtil.FormatPodName(pod))

name, runtimePod, prepareErr := r.preparePod(pod, pullSecrets)
Expand Down Expand Up @@ -722,7 +722,7 @@ func (r *runtime) RunPod(pod *api.Pod, pullSecrets []api.Secret) error {
}

// readServiceFile reads the service file and constructs the runtime pod and the rkt info.
func (r *runtime) readServiceFile(serviceName string) (*kubecontainer.Pod, *rktInfo, error) {
func (r *Runtime) readServiceFile(serviceName string) (*kubecontainer.Pod, *rktInfo, error) {
f, err := os.Open(serviceFilePath(serviceName))
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -769,7 +769,7 @@ func (r *runtime) readServiceFile(serviceName string) (*kubecontainer.Pod, *rktI
// Then it will use the result to construct a list of container runtime pods.
// If all is false, then only running pods will be returned, otherwise all pods will be
// returned.
func (r *runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) {
func (r *Runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) {
glog.V(4).Infof("Rkt getting pods")

units, err := r.systemd.ListUnits()
Expand All @@ -796,7 +796,7 @@ func (r *runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) {

// KillPod invokes 'systemctl kill' to kill the unit that runs the pod.
// TODO(yifan): Handle network plugin.
func (r *runtime) KillPod(pod *api.Pod, runningPod kubecontainer.Pod) error {
func (r *Runtime) KillPod(pod *api.Pod, runningPod kubecontainer.Pod) error {
glog.V(4).Infof("Rkt is killing pod: name %q.", runningPod.Name)

serviceName := makePodServiceFileName(runningPod.ID)
Expand All @@ -817,7 +817,7 @@ func (r *runtime) KillPod(pod *api.Pod, runningPod kubecontainer.Pod) error {

// getPodStatus reads the service file and invokes 'rkt status $UUID' to get the
// pod's status.
func (r *runtime) getPodStatus(serviceName string) (*api.PodStatus, error) {
func (r *Runtime) getPodStatus(serviceName string) (*api.PodStatus, error) {
var status api.PodStatus

// TODO(yifan): Get rkt uuid from the service file name.
Expand All @@ -842,7 +842,7 @@ func (r *runtime) getPodStatus(serviceName string) (*api.PodStatus, error) {
}

// GetPodStatus returns the status of the given pod.
func (r *runtime) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) {
func (r *Runtime) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) {
serviceName := makePodServiceFileName(pod.UID)
return r.getPodStatus(serviceName)
}
Expand All @@ -854,7 +854,7 @@ func (r *runtime) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) {
// Example:
// rkt:0.3.2+git --> []int{0, 3, 2}.
//
func (r *runtime) Version() (kubecontainer.Version, error) {
func (r *Runtime) Version() (kubecontainer.Version, error) {
output, err := r.runCommand("version")
if err != nil {
return nil, err
Expand All @@ -878,7 +878,7 @@ func (r *runtime) Version() (kubecontainer.Version, error) {

// TODO(yifan): This is very racy, unefficient, and unsafe, we need to provide
// different namespaces. See: https://github.com/coreos/rkt/issues/836.
func (r *runtime) writeDockerAuthConfig(image string, credsSlice []docker.AuthConfiguration) error {
func (r *Runtime) writeDockerAuthConfig(image string, credsSlice []docker.AuthConfiguration) error {
if len(credsSlice) == 0 {
return nil
}
Expand Down Expand Up @@ -922,7 +922,7 @@ func (r *runtime) writeDockerAuthConfig(image string, credsSlice []docker.AuthCo
//
// http://issue.k8s.io/7203
//
func (r *runtime) PullImage(image kubecontainer.ImageSpec, pullSecrets []api.Secret) error {
func (r *Runtime) PullImage(image kubecontainer.ImageSpec, pullSecrets []api.Secret) error {
img := image.Image
// TODO(yifan): The credential operation is a copy from dockertools package,
// Need to resolve the code duplication.
Expand Down Expand Up @@ -951,7 +951,7 @@ func (r *runtime) PullImage(image kubecontainer.ImageSpec, pullSecrets []api.Sec
}

// TODO(yifan): Searching the image via 'rkt images' might not be the most efficient way.
func (r *runtime) IsImagePresent(image kubecontainer.ImageSpec) (bool, error) {
func (r *Runtime) IsImagePresent(image kubecontainer.ImageSpec) (bool, error) {
repoToPull, tag := parseImageName(image.Image)
// Example output of 'rkt image list --fields=name':
//
Expand Down Expand Up @@ -985,7 +985,7 @@ func (r *runtime) IsImagePresent(image kubecontainer.ImageSpec) (bool, error) {
}

// SyncPod syncs the running pod to match the specified desired pod.
func (r *runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus, pullSecrets []api.Secret, backOff *util.Backoff) error {
func (r *Runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus, pullSecrets []api.Secret, backOff *util.Backoff) error {
podFullName := kubeletUtil.FormatPodName(pod)

// Add references to all containers.
Expand Down Expand Up @@ -1062,7 +1062,7 @@ func (r *runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus
// See https://github.com/coreos/rkt/blob/master/Documentation/commands.md#logging for more details.
//
// TODO(yifan): If the rkt is using lkvm as the stage1 image, then this function will fail.
func (r *runtime) GetContainerLogs(pod *api.Pod, containerID string, logOptions *api.PodLogOptions, stdout, stderr io.Writer) error {
func (r *Runtime) GetContainerLogs(pod *api.Pod, containerID string, logOptions *api.PodLogOptions, stdout, stderr io.Writer) error {
id, err := parseContainerID(containerID)
if err != nil {
return err
Expand All @@ -1081,8 +1081,10 @@ func (r *runtime) GetContainerLogs(pod *api.Pod, containerID string, logOptions
return cmd.Run()
}

// GarbageCollect collects the pods/containers. TODO(yifan): Enforce the gc policy.
func (r *runtime) GarbageCollect() error {
// GarbageCollect collects the pods/containers.
// TODO(yifan): Enforce the gc policy, also, it would be better if we can
// just GC kubernetes pods.
func (r *Runtime) GarbageCollect() error {
if err := exec.Command("systemctl", "reset-failed").Run(); err != nil {
glog.Errorf("rkt: Failed to reset failed systemd services: %v", err)
}
Expand All @@ -1096,7 +1098,7 @@ func (r *runtime) GarbageCollect() error {
// Note: In rkt, the container ID is in the form of "UUID:appName", where
// appName is the container name.
// TODO(yifan): If the rkt is using lkvm as the stage1 image, then this function will fail.
func (r *runtime) RunInContainer(containerID string, cmd []string) ([]byte, error) {
func (r *Runtime) RunInContainer(containerID string, cmd []string) ([]byte, error) {
glog.V(4).Infof("Rkt running in container.")

id, err := parseContainerID(containerID)
Expand All @@ -1110,14 +1112,14 @@ func (r *runtime) RunInContainer(containerID string, cmd []string) ([]byte, erro
return []byte(strings.Join(result, "\n")), err
}

func (r *runtime) AttachContainer(containerID string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
func (r *Runtime) AttachContainer(containerID string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
return fmt.Errorf("unimplemented")
}

// Note: In rkt, the container ID is in the form of "UUID:appName", where UUID is
// the rkt UUID, and appName is the container name.
// TODO(yifan): If the rkt is using lkvm as the stage1 image, then this function will fail.
func (r *runtime) ExecInContainer(containerID string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
func (r *Runtime) ExecInContainer(containerID string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
glog.V(4).Infof("Rkt execing in container.")

id, err := parseContainerID(containerID)
Expand Down Expand Up @@ -1169,7 +1171,7 @@ func (r *runtime) ExecInContainer(containerID string, cmd []string, stdin io.Rea
}

// findRktID returns the rkt uuid for the pod.
func (r *runtime) findRktID(pod *kubecontainer.Pod) (string, error) {
func (r *Runtime) findRktID(pod *kubecontainer.Pod) (string, error) {
serviceName := makePodServiceFileName(pod.ID)

f, err := os.Open(serviceFilePath(serviceName))
Expand Down Expand Up @@ -1205,7 +1207,7 @@ func (r *runtime) findRktID(pod *kubecontainer.Pod) (string, error) {
//
// TODO(yifan): Merge with the same function in dockertools.
// TODO(yifan): If the rkt is using lkvm as the stage1 image, then this function will fail.
func (r *runtime) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error {
func (r *Runtime) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error {
glog.V(4).Infof("Rkt port forwarding in container.")

rktID, err := r.findRktID(pod)
Expand Down Expand Up @@ -1265,7 +1267,7 @@ func isUUID(input string) bool {

// getPodInfo returns the pod info of a single pod according
// to the uuid.
func (r *runtime) getPodInfo(uuid string) (*podInfo, error) {
func (r *Runtime) getPodInfo(uuid string) (*podInfo, error) {
status, err := r.runCommand("status", uuid)
if err != nil {
return nil, err
Expand All @@ -1281,7 +1283,7 @@ func (r *runtime) getPodInfo(uuid string) (*podInfo, error) {
// TODO(yifan): Replace with 'rkt image cat-manifest'.
// imageName should be in the form of 'example.com/app:latest', which should matches
// the result of 'rkt image list'. If the version is empty, then 'latest' is assumed.
func (r *runtime) getImageByName(imageName string) (*kubecontainer.Image, error) {
func (r *Runtime) getImageByName(imageName string) (*kubecontainer.Image, error) {
// TODO(yifan): Print hash in 'rkt image cat-manifest'?
images, err := r.ListImages()
if err != nil {
Expand Down Expand Up @@ -1309,7 +1311,7 @@ func (r *runtime) getImageByName(imageName string) (*kubecontainer.Image, error)
}

// ListImages lists all the available appc images on the machine by invoking 'rkt image list'.
func (r *runtime) ListImages() ([]kubecontainer.Image, error) {
func (r *Runtime) ListImages() ([]kubecontainer.Image, error) {
// Example output of 'rkt image list --fields=key,name':
//
// KEY NAME
Expand Down Expand Up @@ -1355,7 +1357,7 @@ func parseImageInfo(input string) (*kubecontainer.Image, error) {

// RemoveImage removes an on-disk image using 'rkt image rm'.
// TODO(yifan): Use image ID to reference image.
func (r *runtime) RemoveImage(image kubecontainer.ImageSpec) error {
func (r *Runtime) RemoveImage(image kubecontainer.ImageSpec) error {
img, err := r.getImageByName(image.Image)
if err != nil {
return err
Expand Down