Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CRI] beef up network teardown in StopPodSandbox #41434

Merged
merged 3 commits into from Feb 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/kubelet/dockershim/BUILD
Expand Up @@ -34,6 +34,7 @@ go_library(
"//pkg/kubelet/cm:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/dockershim/cm:go_default_library",
"//pkg/kubelet/dockershim/errors:go_default_library",
"//pkg/kubelet/dockertools:go_default_library",
"//pkg/kubelet/dockertools/securitycontext:go_default_library",
"//pkg/kubelet/leaky:go_default_library",
Expand Down Expand Up @@ -83,6 +84,7 @@ go_test(
"//pkg/kubelet/api/v1alpha1/runtime:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/container/testing:go_default_library",
"//pkg/kubelet/dockershim/errors:go_default_library",
"//pkg/kubelet/dockershim/testing:go_default_library",
"//pkg/kubelet/dockertools:go_default_library",
"//pkg/kubelet/dockertools/securitycontext:go_default_library",
Expand Down Expand Up @@ -113,6 +115,7 @@ filegroup(
srcs = [
":package-srcs",
"//pkg/kubelet/dockershim/cm:all-srcs",
"//pkg/kubelet/dockershim/errors:all-srcs",
"//pkg/kubelet/dockershim/remote:all-srcs",
"//pkg/kubelet/dockershim/testing:all-srcs",
],
Expand Down
9 changes: 8 additions & 1 deletion pkg/kubelet/dockershim/checkpoint_store.go
Expand Up @@ -23,6 +23,8 @@ import (
"path/filepath"
"regexp"
"strings"

"k8s.io/kubernetes/pkg/kubelet/dockershim/errors"
)

const (
Expand All @@ -40,6 +42,7 @@ type CheckpointStore interface {
// Write persists a checkpoint with key
Write(key string, data []byte) error
// Read retrieves a checkpoint with key
// Read must return CheckpointNotFoundError if checkpoint is not found
Read(key string) ([]byte, error)
// Delete deletes a checkpoint with key
// Delete must not return error if checkpoint does not exist
Expand Down Expand Up @@ -75,7 +78,11 @@ func (fstore *FileStore) Read(key string) ([]byte, error) {
if err := validateKey(key); err != nil {
return nil, err
}
return ioutil.ReadFile(fstore.getCheckpointPath(key))
bytes, err := ioutil.ReadFile(fstore.getCheckpointPath(key))
if os.IsNotExist(err) {
return bytes, errors.CheckpointNotFoundError
}
return bytes, err
}

func (fstore *FileStore) Delete(key string) error {
Expand Down
3 changes: 2 additions & 1 deletion pkg/kubelet/dockershim/checkpoint_store_test.go
Expand Up @@ -23,6 +23,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/kubelet/dockershim/errors"
)

func TestFileStore(t *testing.T) {
Expand Down Expand Up @@ -102,7 +103,7 @@ func TestFileStore(t *testing.T) {
err = store.Delete(c.key)
assert.NoError(t, err)
_, err = store.Read(c.key)
assert.Error(t, err)
assert.EqualValues(t, errors.CheckpointNotFoundError, err)
}

// Test delete non existed checkpoint
Expand Down
7 changes: 3 additions & 4 deletions pkg/kubelet/dockershim/docker_checkpoint.go
Expand Up @@ -23,6 +23,7 @@ import (
"path/filepath"

"github.com/golang/glog"
"k8s.io/kubernetes/pkg/kubelet/dockershim/errors"
hashutil "k8s.io/kubernetes/pkg/util/hash"
)

Expand All @@ -34,8 +35,6 @@ const (
schemaVersion = "v1"
)

var CorruptCheckpointError = fmt.Errorf("checkpoint is corrupted.")

type Protocol string

// PortMapping is the port mapping configurations of a sandbox.
Expand Down Expand Up @@ -108,11 +107,11 @@ func (handler *PersistentCheckpointHandler) GetCheckpoint(podSandboxID string) (
err = json.Unmarshal(blob, &checkpoint)
if err != nil {
glog.Errorf("Failed to unmarshal checkpoint %q. Checkpoint content: %q. ErrMsg: %v", podSandboxID, string(blob), err)
return &checkpoint, CorruptCheckpointError
return &checkpoint, errors.CorruptCheckpointError
}
if checkpoint.CheckSum != calculateChecksum(checkpoint) {
glog.Errorf("Checksum of checkpoint %q is not valid", podSandboxID)
return &checkpoint, CorruptCheckpointError
return &checkpoint, errors.CorruptCheckpointError
}
return &checkpoint, nil
}
Expand Down
50 changes: 35 additions & 15 deletions pkg/kubelet/dockershim/docker_sandbox.go
Expand Up @@ -27,6 +27,7 @@ import (
utilerrors "k8s.io/apimachinery/pkg/util/errors"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockershim/errors"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/qos"
"k8s.io/kubernetes/pkg/kubelet/types"
Expand Down Expand Up @@ -114,8 +115,10 @@ func (ds *dockerService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (str
// after us?
func (ds *dockerService) StopPodSandbox(podSandboxID string) error {
var namespace, name string
var checkpointErr, statusErr error
needNetworkTearDown := false

// Try to retrieve sandbox information from docker daemon or sandbox checkpoint
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the sandbox container no longer exists?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will try to retrieve sandbox checkpoint instead.

If both container and checkpoint does not exist (This actually happens when kubelet internal cache still has the sandbox but GC already clean it up.), then we only has sandbox ID. We can still proceed to teardown with only the ID, there is no network namespace either, then CNI bridge plugin will just try to do best effort clean up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

status, statusErr := ds.PodSandboxStatus(podSandboxID)
if statusErr == nil {
nsOpts := status.GetLinux().GetNamespaces().GetOptions()
Expand All @@ -124,36 +127,53 @@ func (ds *dockerService) StopPodSandbox(podSandboxID string) error {
namespace = m.Namespace
name = m.Name
} else {
checkpoint, err := ds.checkpointHandler.GetCheckpoint(podSandboxID)
if err != nil {
glog.Errorf("Failed to get checkpoint for sandbox %q: %v", podSandboxID, err)
return fmt.Errorf("failed to get sandbox status: %v", statusErr)
var checkpoint *PodSandboxCheckpoint
checkpoint, checkpointErr = ds.checkpointHandler.GetCheckpoint(podSandboxID)

// Proceed if both sandbox container and checkpoint could not be found. This means that following
// actions will only have sandbox ID and not have pod namespace and name information.
// Return error if encounter any unexpected error.
if checkpointErr != nil {
if dockertools.IsContainerNotFoundError(statusErr) && checkpointErr == errors.CheckpointNotFoundError {
glog.Warningf("Both sandbox container and checkpoint for id %q could not be found. "+
"Proceed without further sandbox information.", podSandboxID)
} else {
return utilerrors.NewAggregate([]error{
fmt.Errorf("failed to get checkpoint for sandbox %q: %v", podSandboxID, checkpointErr),
fmt.Errorf("failed to get sandbox status: %v", statusErr)})
}
} else {
namespace = checkpoint.Namespace
name = checkpoint.Name
}
namespace = checkpoint.Namespace
name = checkpoint.Name

// Always trigger network plugin to tear down
needNetworkTearDown = true
}

// WARNING: The following operations made the following assumption:
// 1. kubelet will retry on any error returned by StopPodSandbox.
// 2. tearing down network and stopping sandbox container can succeed in any sequence.
// This depends on the implementation detail of network plugin and proper error handling.
// For kubenet, if tearing down network failed and sandbox container is stopped, kubelet
// will retry. On retry, kubenet will not be able to retrieve network namespace of the sandbox
// since it is stopped. With empty network namespcae, CNI bridge plugin will conduct best
// effort clean up and will not return error.
errList := []error{}
if needNetworkTearDown {
cID := kubecontainer.BuildContainerID(runtimeName, podSandboxID)
if err := ds.networkPlugin.TearDownPod(namespace, name, cID); err != nil {
// TODO: Figure out a way to retry this error. We can't
// right now because the plugin throws errors when it doesn't find
// eth0, which might not exist for various reasons (setup failed,
// conf changed etc). In theory, it should teardown everything else
// so there's no need to retry.
glog.Errorf("Failed to teardown sandbox %q for pod %s/%s: %v", podSandboxID, namespace, name, err)
errList = append(errList, fmt.Errorf("failed to teardown sandbox %q for pod %s/%s: %v", podSandboxID, namespace, name, err))
}
}
if err := ds.client.StopContainer(podSandboxID, defaultSandboxGracePeriod); err != nil {
glog.Errorf("Failed to stop sandbox %q: %v", podSandboxID, err)
// Do not return error if the container does not exist
if !dockertools.IsContainerNotFoundError(err) {
return err
errList = append(errList, err)
}
}
return nil
return utilerrors.NewAggregate(errList)
// TODO: Stop all running containers in the sandbox.
}

Expand Down Expand Up @@ -355,7 +375,7 @@ func (ds *dockerService) ListPodSandbox(filter *runtimeapi.PodSandboxFilter) ([]
if err != nil {
glog.Errorf("Failed to retrieve checkpoint for sandbox %q: %v", id, err)

if err == CorruptCheckpointError {
if err == errors.CorruptCheckpointError {
glog.V(2).Info("Removing corrupted checkpoint %q: %+v", id, *checkpoint)
ds.checkpointHandler.RemoveCheckpoint(id)
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/kubelet/dockershim/docker_service.go
Expand Up @@ -32,6 +32,7 @@ import (
kubecm "k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockershim/cm"
"k8s.io/kubernetes/pkg/kubelet/dockershim/errors"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/network/cni"
Expand Down Expand Up @@ -292,8 +293,14 @@ func (ds *dockerService) GetNetNS(podSandboxID string) (string, error) {
func (ds *dockerService) GetPodPortMappings(podSandboxID string) ([]*hostport.PortMapping, error) {
// TODO: get portmappings from docker labels for backward compatibility
checkpoint, err := ds.checkpointHandler.GetCheckpoint(podSandboxID)
// Return empty portMappings if checkpoint is not found
if err != nil {
return nil, err
if err == errors.CheckpointNotFoundError {
glog.Warningf("Failed to retrieve checkpoint for sandbox %q: %v", err)
return nil, nil
} else {
return nil, err
}
}

portMappings := []*hostport.PortMapping{}
Expand Down
27 changes: 27 additions & 0 deletions pkg/kubelet/dockershim/errors/BUILD
@@ -0,0 +1,27 @@
package(default_visibility = ["//visibility:public"])

licenses(["notice"])

load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)

go_library(
name = "go_default_library",
srcs = ["errors.go"],
tags = ["automanaged"],
)

filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)

filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)
22 changes: 22 additions & 0 deletions pkg/kubelet/dockershim/errors/errors.go
@@ -0,0 +1,22 @@
/*
Copyright 2017 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package errors

import "fmt"

var CorruptCheckpointError = fmt.Errorf("checkpoint is corrupted.")
var CheckpointNotFoundError = fmt.Errorf("checkpoint is not found.")
1 change: 1 addition & 0 deletions pkg/kubelet/dockershim/testing/BUILD
Expand Up @@ -11,6 +11,7 @@ go_library(
name = "go_default_library",
srcs = ["util.go"],
tags = ["automanaged"],
deps = ["//pkg/kubelet/dockershim/errors:go_default_library"],
)

filegroup(
Expand Down
5 changes: 3 additions & 2 deletions pkg/kubelet/dockershim/testing/util.go
Expand Up @@ -17,8 +17,9 @@ limitations under the License.
package testing

import (
"fmt"
"sync"

"k8s.io/kubernetes/pkg/kubelet/dockershim/errors"
)

// MemStore is an implementation of CheckpointStore interface which stores checkpoint in memory.
Expand All @@ -43,7 +44,7 @@ func (mstore *MemStore) Read(key string) ([]byte, error) {
defer mstore.Unlock()
data, ok := mstore.mem[key]
if !ok {
return nil, fmt.Errorf("checkpoint %q could not be found", key)
return nil, errors.CheckpointNotFoundError
}
return data, nil
}
Expand Down