Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rkt: Add GetPodStatus(), GetPodStatusAndAPIPodStatus(). #18643

Merged
merged 2 commits into from
Dec 19, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
266 changes: 237 additions & 29 deletions pkg/kubelet/rkt/rkt.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"os"
"os/exec"
"path"
"sort"
"strconv"
"strings"
"syscall"
Expand All @@ -40,6 +41,7 @@ import (
"github.com/golang/glog"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/credentialprovider"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
Expand Down Expand Up @@ -89,6 +91,7 @@ const (

defaultImageTag = "latest"
defaultRktAPIServiceAddr = "localhost:15441"
defaultNetworkName = "rkt.kubernetes.io"
)

// Runtime implements the Containerruntime for rkt. The implementation
Expand Down Expand Up @@ -680,7 +683,7 @@ func (r *Runtime) preparePod(pod *api.Pod, pullSecrets []api.Secret) (string, *k
if pod.Spec.SecurityContext != nil && pod.Spec.SecurityContext.HostNetwork {
runPrepared = fmt.Sprintf("%s run-prepared --mds-register=false --net=host %s", r.rktBinAbsPath, uuid)
} else {
runPrepared = fmt.Sprintf("%s run-prepared --mds-register=false %s", r.rktBinAbsPath, uuid)
runPrepared = fmt.Sprintf("%s run-prepared --mds-register=false --net=%s %s", r.rktBinAbsPath, defaultNetworkName, uuid)
}

// TODO handle pod.Spec.HostPID
Expand Down Expand Up @@ -842,35 +845,22 @@ func (r *Runtime) convertRktPod(rktpod rktapi.Pod) (*kubecontainer.Pod, error) {
return nil, fmt.Errorf("couldn't parse pod creation timestamp: %v", err)
}

var state kubecontainer.ContainerState
switch rktpod.State {
case rktapi.PodState_POD_STATE_RUNNING:
state = kubecontainer.ContainerStateRunning
case rktapi.PodState_POD_STATE_ABORTED_PREPARE, rktapi.PodState_POD_STATE_EXITED,
rktapi.PodState_POD_STATE_DELETING, rktapi.PodState_POD_STATE_GARBAGE:
state = kubecontainer.ContainerStateExited
default:
state = kubecontainer.ContainerStateUnknown
}

kubepod := &kubecontainer.Pod{
ID: types.UID(podUID),
Name: podName,
Namespace: podNamespace,
}
for _, app := range rktpod.Apps {
manifest := &appcschema.ImageManifest{}
err := json.Unmarshal(app.Image.Manifest, manifest)
if err != nil {
return nil, err
}
containerHashString, ok := manifest.Annotations.Get(k8sRktContainerHashAnno)

for i, app := range rktpod.Apps {
// The order of the apps is determined by the rkt pod manifest.
// TODO(yifan): Let the server to unmarshal the annotations? https://github.com/coreos/rkt/issues/1872
hashStr, ok := manifest.Apps[i].Annotations.Get(k8sRktContainerHashAnno)
if !ok {
return nil, fmt.Errorf("app is missing annotation %s", k8sRktContainerHashAnno)
return nil, fmt.Errorf("app %q is missing annotation %s", app.Name, k8sRktContainerHashAnno)
}
containerHash, err := strconv.ParseUint(containerHashString, 10, 64)
containerHash, err := strconv.ParseUint(hashStr, 10, 64)
if err != nil {
return nil, fmt.Errorf("couldn't parse container's hash: %v", err)
return nil, fmt.Errorf("couldn't parse container's hash %q: %v", hashStr, err)
}

kubepod.Containers = append(kubepod.Containers, &kubecontainer.Container{
Expand All @@ -879,7 +869,7 @@ func (r *Runtime) convertRktPod(rktpod rktapi.Pod) (*kubecontainer.Pod, error) {
Image: app.Image.Name,
Hash: containerHash,
Created: podCreated,
State: state,
State: appStateToContainerState(app.State),
})
}

Expand Down Expand Up @@ -1521,16 +1511,234 @@ func (r *Runtime) RemoveImage(image kubecontainer.ImageSpec) error {
return nil
}

// appStateToContainerState converts rktapi.AppState to kubecontainer.ContainerState.
func appStateToContainerState(state rktapi.AppState) kubecontainer.ContainerState {
switch state {
case rktapi.AppState_APP_STATE_RUNNING:
return kubecontainer.ContainerStateRunning
case rktapi.AppState_APP_STATE_EXITED:
return kubecontainer.ContainerStateExited
}
return kubecontainer.ContainerStateUnknown
}

// TODO(yifan): Rename to getPodInfo when the old getPodInfo is removed.
func retrievePodInfo(pod *rktapi.Pod) (podManifest *appcschema.PodManifest, creationTime time.Time, restartCount int, err error) {
var manifest appcschema.PodManifest
if err = json.Unmarshal(pod.Manifest, &manifest); err != nil {
return
}

creationTimeStr, ok := manifest.Annotations.Get(k8sRktCreationTimeAnno)
if !ok {
err = fmt.Errorf("no creation timestamp in pod manifest")
return
}
unixSec, err := strconv.ParseInt(creationTimeStr, 10, 64)
if err != nil {
return
}

if countString, ok := manifest.Annotations.Get(k8sRktRestartCountAnno); ok {
restartCount, err = strconv.Atoi(countString)
if err != nil {
return
}
}

return &manifest, time.Unix(unixSec, 0), restartCount, nil
}

func (r *Runtime) GetPodStatus(uid types.UID, name, namespace string) (*kubecontainer.PodStatus, error) {
return nil, fmt.Errorf("Not implemented yet")
podStatus := &kubecontainer.PodStatus{
ID: uid,
Name: name,
Namespace: namespace,
}

// TODO(yifan): Use ListPods(detail=true) to avoid another InspectPod rpc here.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The container runtime interface we have now is more docker-centric, so PLEG does GetPods() to detect changes and then inspect the pod for details. We should monitor the resource usage of rkt to see if this flow/interface work well with rkt for our scalability goals.

listReq := &rktapi.ListPodsRequest{
Filter: &rktapi.PodFilter{
Annotations: []*rktapi.KeyValue{
{
Key: k8sRktKubeletAnno,
Value: k8sRktKubeletAnnoValue,
},
{
Key: k8sRktUIDAnno,
Value: string(uid),
},
},
},
}

listResp, err := r.apisvc.ListPods(context.Background(), listReq)
if err != nil {
return nil, fmt.Errorf("couldn't list pods: %v", err)
}

var latestPod *rktapi.Pod
var latestRestartCount int = -1

// Only use the latest pod to get the information, (which has the
// largest restart count).
for _, rktpod := range listResp.Pods {
inspectReq := &rktapi.InspectPodRequest{rktpod.Id}
inspectResp, err := r.apisvc.InspectPod(context.Background(), inspectReq)
if err != nil {
glog.Warningf("rkt: Couldn't inspect rkt pod, (uuid %q): %v", rktpod.Id, err)
continue
}

pod := inspectResp.Pod

manifest, creationTime, restartCount, err := retrievePodInfo(pod)
if err != nil {
glog.Warning("rkt: Couldn't get necessary info from the rkt pod, (uuid %q): %v", pod.Id, err)
continue
}

if restartCount > latestRestartCount {
latestPod = pod
latestRestartCount = restartCount
}

for i, app := range pod.Apps {
// The order of the apps is determined by the rkt pod manifest.
// TODO(yifan): Let the server to unmarshal the annotations?
hashStr, ok := manifest.Apps[i].Annotations.Get(k8sRktContainerHashAnno)
if !ok {
glog.Warningf("rkt: No container hash in pod manifest for rkt pod, (uuid %q, app %q)", pod.Id, app.Name)
continue
}

hashNum, err := strconv.ParseUint(hashStr, 10, 64)
if err != nil {
glog.Warningf("rkt: Invalid hash value %q for rkt pod, (uuid %q, app %q)", pod.Id, app.Name)
continue
}

cs := kubecontainer.ContainerStatus{
ID: buildContainerID(&containerID{uuid: pod.Id, appName: app.Name}),
Name: app.Name,
State: appStateToContainerState(app.State),
// TODO(yifan): Use the creation/start/finished timestamp when it's implemented.
CreatedAt: creationTime,
StartedAt: creationTime,
ExitCode: int(app.ExitCode),
Image: app.Image.Name,
ImageID: "rkt://" + app.Image.Id, // TODO(yifan): Add the prefix only in api.PodStatus.
Hash: hashNum,
// TODO(yifan): Note that now all apps share the same restart count, this might
// change once apps don't share the same lifecycle.
// See https://github.com/appc/spec/pull/547.
RestartCount: restartCount,
// TODO(yifan): Add reason and message field.
}

podStatus.ContainerStatuses = append(podStatus.ContainerStatuses, &cs)

}
}

if latestPod != nil {
// Try to fill the IP info.
for _, n := range latestPod.Networks {
if n.Name == defaultNetworkName {
podStatus.IP = n.Ipv4
}
}
}

return podStatus, nil
}

func (r *Runtime) ConvertPodStatusToAPIPodStatus(_ *api.Pod, _ *kubecontainer.PodStatus) (*api.PodStatus, error) {
return nil, fmt.Errorf("Not implemented yet")
type sortByRestartCount []*kubecontainer.ContainerStatus

func (s sortByRestartCount) Len() int { return len(s) }
func (s sortByRestartCount) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s sortByRestartCount) Less(i, j int) bool { return s[i].RestartCount > s[j].RestartCount }

// TODO(yifan): Delete this function when the logic is moved to kubelet.
func (r *Runtime) ConvertPodStatusToAPIPodStatus(pod *api.Pod, status *kubecontainer.PodStatus) (*api.PodStatus, error) {
apiPodStatus := &api.PodStatus{
// TODO(yifan): Add reason and message field.
PodIP: status.IP,
}

sort.Sort(sortByRestartCount(status.ContainerStatuses))

containerStatuses := make(map[string]*api.ContainerStatus)
for _, c := range status.ContainerStatuses {
var st api.ContainerState
switch c.State {
case kubecontainer.ContainerStateRunning:
st.Running = &api.ContainerStateRunning{
StartedAt: unversioned.NewTime(c.StartedAt),
}
case kubecontainer.ContainerStateExited:
if pod.Spec.RestartPolicy == api.RestartPolicyAlways ||
pod.Spec.RestartPolicy == api.RestartPolicyOnFailure && c.ExitCode != 0 {
// TODO(yifan): Add reason and message.
st.Waiting = &api.ContainerStateWaiting{}
break
}
st.Terminated = &api.ContainerStateTerminated{
ExitCode: c.ExitCode,
StartedAt: unversioned.NewTime(c.StartedAt),
// TODO(yifan): Add reason, message, finishedAt, signal.
ContainerID: c.ID.String(),
}
default:
// Unknown state.
// TODO(yifan): Add reason and message.
st.Waiting = &api.ContainerStateWaiting{}
}

status, ok := containerStatuses[c.Name]
if !ok {
containerStatuses[c.Name] = &api.ContainerStatus{
Name: c.Name,
Image: c.Image,
ImageID: c.ImageID,
ContainerID: c.ID.String(),
RestartCount: c.RestartCount,
State: st,
}
continue
}

// Found multiple container statuses, fill that as last termination state.
if status.LastTerminationState.Waiting == nil &&
status.LastTerminationState.Running == nil &&
status.LastTerminationState.Terminated == nil {
status.LastTerminationState = st
}
}

for _, c := range pod.Spec.Containers {
cs, ok := containerStatuses[c.Name]
if !ok {
cs = &api.ContainerStatus{
Name: c.Name,
Image: c.Image,
// TODO(yifan): Add reason and message.
State: api.ContainerState{Waiting: &api.ContainerStateWaiting{}},
}
}
apiPodStatus.ContainerStatuses = append(apiPodStatus.ContainerStatuses, *cs)
}

return apiPodStatus, nil
}

func (r *Runtime) GetPodStatusAndAPIPodStatus(pod *api.Pod) (*kubecontainer.PodStatus, *api.PodStatus, error) {
podStatus, err := r.GetAPIPodStatus(pod)
return nil, podStatus, err

// Get the pod status.
podStatus, err := r.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
if err != nil {
return nil, nil, err
}
var apiPodStatus *api.PodStatus
apiPodStatus, err = r.ConvertPodStatusToAPIPodStatus(pod, podStatus)
return podStatus, apiPodStatus, err
}