Skip to content

Commit

Permalink
Make 'pod' package to use unified checkpointManager
Browse files Browse the repository at this point in the history
Signed-off-by: vikaschoudhary16 <choudharyvikas16@gmail.com>
  • Loading branch information
vikaschoudhary16 committed Apr 16, 2018
1 parent d62bd9e commit cedbd93
Show file tree
Hide file tree
Showing 31 changed files with 175 additions and 124 deletions.
2 changes: 1 addition & 1 deletion hack/.golint_failures
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ pkg/kubelet/checkpointmanager/checksum
pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1
pkg/kubelet/client
pkg/kubelet/cm
pkg/kubelet/cm/devicemanager/checkpoint
pkg/kubelet/cm/util
pkg/kubelet/config
pkg/kubelet/configmap
Expand All @@ -184,7 +185,6 @@ pkg/kubelet/dockershim/network/hostport
pkg/kubelet/dockershim/network/hostport/testing
pkg/kubelet/dockershim/network/kubenet
pkg/kubelet/dockershim/network/testing
pkg/kubelet/dockershim/testing
pkg/kubelet/events
pkg/kubelet/images
pkg/kubelet/kuberuntime
Expand Down
1 change: 1 addition & 0 deletions pkg/kubelet/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ go_library(
"//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/cadvisor:go_default_library",
"//pkg/kubelet/certificate:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/cm:go_default_library",
"//pkg/kubelet/config:go_default_library",
"//pkg/kubelet/configmap:go_default_library",
Expand Down
6 changes: 3 additions & 3 deletions pkg/kubelet/checkpoint/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/apis/core:go_default_library",
"//pkg/volume/util:go_default_library",
"//vendor/github.com/dchest/safefile:go_default_library",
"//vendor/github.com/ghodss/yaml:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/checkpointmanager/checksum:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
],
Expand All @@ -21,6 +20,7 @@ go_test(
embed = [":go_default_library"],
deps = [
"//pkg/apis/core:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
],
Expand Down
137 changes: 57 additions & 80 deletions pkg/kubelet/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,15 @@ limitations under the License.
package checkpoint

import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
"sync"

"github.com/dchest/safefile"
"github.com/ghodss/yaml"

"github.com/golang/glog"

"k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
)

const (
Expand All @@ -39,54 +34,44 @@ const (
podPrefix = "Pod"
)

// Manager is the interface used to manage checkpoints
// which involves writing resources to disk to recover
// during restart or failure scenarios.
// https://github.com/kubernetes/community/pull/1241/files
type Manager interface {
// LoadPods will load checkpointed Pods from disk
LoadPods() ([]*v1.Pod, error)

// WritePod will serialize a Pod to disk
WritePod(pod *v1.Pod) error
type PodCheckpoint interface {
checkpointmanager.Checkpoint
GetPod() *v1.Pod
}

// Deletes the checkpoint of the given pod from disk
DeletePod(pod *v1.Pod) error
// Data to be stored as checkpoint
type Data struct {
Pod *v1.Pod
Checksum checksum.Checksum
}

var instance Manager
var mutex = &sync.Mutex{}
// NewPodCheckpoint returns new pod checkpoint
func NewPodCheckpoint(pod *v1.Pod) PodCheckpoint {
return &Data{Pod: pod}
}

// fileCheckPointManager - is a checkpointer that writes contents to disk
// The type information of the resource objects are encoded in the name
type fileCheckPointManager struct {
path string
// MarshalCheckpoint returns marshalled data
func (cp *Data) MarshalCheckpoint() ([]byte, error) {
cp.Checksum = checksum.New(*cp.Pod)
return json.Marshal(*cp)
}

// NewCheckpointManager will create a Manager that points to the following path
func NewCheckpointManager(path string) Manager {
// NOTE: This is a precaution; current implementation should not run
// multiple checkpoint managers.
mutex.Lock()
defer mutex.Unlock()
instance = &fileCheckPointManager{path: path}
return instance
// UnmarshalCheckpoint returns unmarshalled data
func (cp *Data) UnmarshalCheckpoint(blob []byte) error {
return json.Unmarshal(blob, cp)
}

// GetInstance will return the current Manager, there should be only one.
func GetInstance() Manager {
mutex.Lock()
defer mutex.Unlock()
return instance
// VerifyChecksum verifies that passed checksum is same as calculated checksum
func (cp *Data) VerifyChecksum() error {
return cp.Checksum.Verify(*cp.Pod)
}

// loadPod will load Pod Checkpoint yaml file.
func (fcp *fileCheckPointManager) loadPod(file string) (*v1.Pod, error) {
return util.LoadPodFromFile(file)
func (cp *Data) GetPod() *v1.Pod {
return cp.Pod
}

// checkAnnotations will validate the checkpoint annotations exist on the Pod
func (fcp *fileCheckPointManager) checkAnnotations(pod *v1.Pod) bool {
func checkAnnotations(pod *v1.Pod) bool {
if podAnnotations := pod.GetAnnotations(); podAnnotations != nil {
if podAnnotations[core.BootstrapCheckpointAnnotationKey] == "true" {
return true
Expand All @@ -95,57 +80,49 @@ func (fcp *fileCheckPointManager) checkAnnotations(pod *v1.Pod) bool {
return false
}

// getPodPath returns the full qualified path for the pod checkpoint
func (fcp *fileCheckPointManager) getPodPath(pod *v1.Pod) string {
return fmt.Sprintf("%v/Pod%v%v.yaml", fcp.path, delimiter, pod.GetUID())
//getPodKey returns the full qualified path for the pod checkpoint
func getPodKey(pod *v1.Pod) string {
return fmt.Sprintf("Pod%v%v.yaml", delimiter, pod.GetUID())
}

// LoadPods Loads All Checkpoints from disk
func (fcp *fileCheckPointManager) LoadPods() ([]*v1.Pod, error) {
checkpoints := make([]*v1.Pod, 0)
files, err := ioutil.ReadDir(fcp.path)
func LoadPods(cpm checkpointmanager.CheckpointManager) ([]*v1.Pod, error) {
pods := make([]*v1.Pod, 0)

var err error
checkpointKeys := []string{}
checkpointKeys, err = cpm.ListCheckpoints()
if err != nil {
return nil, err
glog.Errorf("Failed to list checkpoints: %v", err)
}
for _, f := range files {
// get just the filename
_, fname := filepath.Split(f.Name())
// Get just the Resource from "Resource_Name"
fnfields := strings.Split(fname, delimiter)
switch fnfields[0] {
case podPrefix:
pod, err := fcp.loadPod(fmt.Sprintf("%s/%s", fcp.path, f.Name()))
if err != nil {
return nil, err
}
checkpoints = append(checkpoints, pod)
default:
glog.Warningf("Unsupported checkpoint file detected %v", f)

for _, key := range checkpointKeys {
checkpoint := NewPodCheckpoint(nil)
err := cpm.GetCheckpoint(key, checkpoint)
if err != nil {
glog.Errorf("Failed to retrieve checkpoint for pod %q: %v", key, err)
continue
}
pods = append(pods, checkpoint.GetPod())
}
return checkpoints, nil
return pods, nil
}

// Writes a checkpoint to a file on disk if annotation is present
func (fcp *fileCheckPointManager) WritePod(pod *v1.Pod) error {
// WritePod a checkpoint to a file on disk if annotation is present
func WritePod(cpm checkpointmanager.CheckpointManager, pod *v1.Pod) error {
var err error
if fcp.checkAnnotations(pod) {
if blob, err := yaml.Marshal(pod); err == nil {
err = safefile.WriteFile(fcp.getPodPath(pod), blob, 0644)
}
if checkAnnotations(pod) {
data := NewPodCheckpoint(pod)
err = cpm.CreateCheckpoint(getPodKey(pod), data)
} else {
// This is to handle an edge where a pod update could remove
// an annotation and the checkpoint should then be removed.
err = fcp.DeletePod(pod)
err = cpm.RemoveCheckpoint(getPodKey(pod))
}
return err
}

// Deletes a checkpoint from disk if present
func (fcp *fileCheckPointManager) DeletePod(pod *v1.Pod) error {
podPath := fcp.getPodPath(pod)
if err := os.Remove(podPath); !os.IsNotExist(err) {
return err
}
return nil
// DeletePod deletes a checkpoint from disk if present
func DeletePod(cpm checkpointmanager.CheckpointManager, pod *v1.Pod) error {
return cpm.RemoveCheckpoint(getPodKey(pod))
}
12 changes: 8 additions & 4 deletions pkg/kubelet/checkpoint/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
)

// TestWriteLoadDeletePods validates all combinations of write, load, and delete
Expand Down Expand Up @@ -70,15 +71,18 @@ func TestWriteLoadDeletePods(t *testing.T) {
}
defer os.RemoveAll(dir)

cp := NewCheckpointManager(dir)
cpm, err := checkpointmanager.NewCheckpointManager(dir)
if err != nil {
t.Errorf("Failed to initialize checkpoint manager error=%v", err)
}
for _, p := range testPods {
// Write pods should always pass unless there is an fs error
if err := cp.WritePod(p.pod); err != nil {
if err := WritePod(cpm, p.pod); err != nil {
t.Errorf("Failed to Write Pod: %v", err)
}
}
// verify the correct written files are loaded from disk
pods, err := cp.LoadPods()
pods, err := LoadPods(cpm)
if err != nil {
t.Errorf("Failed to Load Pods: %v", err)
}
Expand All @@ -104,7 +108,7 @@ func TestWriteLoadDeletePods(t *testing.T) {
} else if lpod != nil {
t.Errorf("Got unexpected result for %v, should not have been loaded", pname)
}
err = cp.DeletePod(p.pod)
err = DeletePod(cpm, p.pod)
if err != nil {
t.Errorf("Failed to delete pod %v", pname)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/checkpointmanager/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ go_test(
srcs = ["checkpoint_manager_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/kubelet/checkpointmanager/errors:go_default_library",
"//pkg/kubelet/checkpointmanager/checksum:go_default_library",
"//pkg/kubelet/checkpointmanager/testing:go_default_library",
"//pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
Expand Down
3 changes: 2 additions & 1 deletion pkg/kubelet/checkpointmanager/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
## DISCLAIMER
Sig-Node community has reached a general consensus, as a best practice, to
- Sig-Node community has reached a general consensus, as a best practice, to
avoid introducing any new checkpointing support. We reached this understanding
after struggling with some hard-to-debug issues in the production environments
caused by the checkpointing.
- Any changes to the checkpointed data structure would be considered incompatible and a component should add its own handling if it needs to ensure backward compatibility of reading old-format checkpoint files.

## Introduction
This folder contains a framework & primitives, Checkpointing Manager, which is
Expand Down
4 changes: 4 additions & 0 deletions pkg/kubelet/checkpointmanager/checkpoint_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package checkpointmanager

import (
"fmt"
"sync"

"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
utilstore "k8s.io/kubernetes/pkg/kubelet/util/store"
Expand Down Expand Up @@ -48,8 +49,10 @@ type CheckpointManager interface {
type impl struct {
path string
store utilstore.Store
mutex sync.Mutex
}

// NewCheckpointManager returns a new instance of a checkpoint manager
func NewCheckpointManager(checkpointDir string) (CheckpointManager, error) {
fstore, err := utilstore.NewFileStore(checkpointDir, utilfs.DefaultFs{})
if err != nil {
Expand Down Expand Up @@ -88,6 +91,7 @@ func (manager *impl) GetCheckpoint(checkpointKey string, checkpoint Checkpoint)
return err
}

// RemoveCheckpoint will not return error if checkpoint does not exist.
func (manager *impl) RemoveCheckpoint(checkpointKey string) error {
manager.mutex.Lock()
defer manager.mutex.Unlock()
Expand Down
7 changes: 3 additions & 4 deletions pkg/kubelet/checkpointmanager/checkpoint_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@ package checkpointmanager

import (
"encoding/json"
"hash/fnv"
"sort"
"testing"

"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
utilstore "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/testing"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1"
)
Expand All @@ -50,7 +49,7 @@ type CheckpointDataV2 struct {
type protocol string

// portMapping is the port mapping configurations of a sandbox.
type portMapping struct {
type PortMapping struct {
// protocol of the port mapping.
Protocol *protocol
// Port number within the container.
Expand Down Expand Up @@ -153,7 +152,7 @@ func TestCheckpointManager(t *testing.T) {
port443 := int32(443)
proto := protocol("tcp")

portMappings := []*portMapping{
portMappings := []*PortMapping{
{
&proto,
&port80,
Expand Down
7 changes: 5 additions & 2 deletions pkg/kubelet/checkpointmanager/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,8 @@ package errors

import "fmt"

var CorruptCheckpointError = fmt.Errorf("checkpoint is corrupted.")
var CheckpointNotFoundError = fmt.Errorf("checkpoint is not found.")
// ErrCorruptCheckpoint error is reported when checksum does not match
var ErrCorruptCheckpoint = fmt.Errorf("checkpoint is corrupted")

// ErrCheckpointNotFound is reported when checkpoint is not found for a given key
var ErrCheckpointNotFound = fmt.Errorf("checkpoint is not found")
Loading

0 comments on commit cedbd93

Please sign in to comment.