Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kubelet post node status to master #5265

Merged
merged 1 commit into from
Mar 13, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func startComponents(manifestURL string) (apiServerURL string) {
nodeResources := &api.NodeResources{}

nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, fakeKubeletClient{}, 10, 5*time.Minute)
nodeController.Run(5*time.Second, true, true)
nodeController.Run(5*time.Second, true, false)

// Kubelet (localhost)
testRootDir := makeTempDirOrDie("kubelet_integ_1.")
Expand Down
2 changes: 1 addition & 1 deletion cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func NewCMServer() *CMServer {
NodeMilliCPU: 1000,
NodeMemory: resource.MustParse("3Gi"),
SyncNodeList: true,
SyncNodeStatus: true,
SyncNodeStatus: false,
KubeletConfig: client.KubeletConfig{
Port: ports.KubeletPort,
EnableHttps: false,
Expand Down
21 changes: 14 additions & 7 deletions cmd/kubelet/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type KubeletServer struct {
SyncFrequency time.Duration
FileCheckFrequency time.Duration
HTTPCheckFrequency time.Duration
StatusUpdateFrequency time.Duration
ManifestURL string
EnableServer bool
Address util.IP
Expand Down Expand Up @@ -82,12 +83,13 @@ type KubeletServer struct {
// NewKubeletServer will create a new KubeletServer with default values.
func NewKubeletServer() *KubeletServer {
return &KubeletServer{
SyncFrequency: 10 * time.Second,
FileCheckFrequency: 20 * time.Second,
HTTPCheckFrequency: 20 * time.Second,
EnableServer: true,
Address: util.IP(net.ParseIP("127.0.0.1")),
Port: ports.KubeletPort,
SyncFrequency: 10 * time.Second,
FileCheckFrequency: 20 * time.Second,
HTTPCheckFrequency: 20 * time.Second,
StatusUpdateFrequency: 20 * time.Second,
EnableServer: true,
Address: util.IP(net.ParseIP("127.0.0.1")),
Port: ports.KubeletPort,
PodInfraContainerImage: kubelet.PodInfraContainerImage,
RootDirectory: defaultRootDir,
RegistryBurst: 10,
Expand All @@ -104,6 +106,7 @@ func NewKubeletServer() *KubeletServer {
func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.Config, "config", s.Config, "Path to the config file or directory of files")
fs.DurationVar(&s.SyncFrequency, "sync_frequency", s.SyncFrequency, "Max period between synchronizing running containers and config")
fs.DurationVar(&s.StatusUpdateFrequency, "status_update_frequency", s.StatusUpdateFrequency, "Duration between posting node status to master")
fs.DurationVar(&s.FileCheckFrequency, "file_check_frequency", s.FileCheckFrequency, "Duration between checking config files for new data")
fs.DurationVar(&s.HTTPCheckFrequency, "http_check_frequency", s.HTTPCheckFrequency, "Duration between checking http for new data")
fs.StringVar(&s.ManifestURL, "manifest_url", s.ManifestURL, "URL for accessing the container manifest")
Expand Down Expand Up @@ -157,6 +160,7 @@ func (s *KubeletServer) Run(_ []string) error {
RootDirectory: s.RootDirectory,
ConfigFile: s.Config,
ManifestURL: s.ManifestURL,
StatusUpdateFrequency: s.StatusUpdateFrequency,
FileCheckFrequency: s.FileCheckFrequency,
HTTPCheckFrequency: s.HTTPCheckFrequency,
PodInfraContainerImage: s.PodInfraContainerImage,
Expand Down Expand Up @@ -250,6 +254,7 @@ func SimpleRunKubelet(client *client.Client,
Address: util.IP(net.ParseIP(address)),
EnableServer: true,
EnableDebuggingHandlers: true,
StatusUpdateFrequency: 3 * time.Second,
SyncFrequency: 3 * time.Second,
MinimumGCAge: 10 * time.Second,
MaxContainerCount: 5,
Expand Down Expand Up @@ -345,6 +350,7 @@ type KubeletConfig struct {
RootDirectory string
ConfigFile string
ManifestURL string
StatusUpdateFrequency time.Duration
FileCheckFrequency time.Duration
HTTPCheckFrequency time.Duration
Hostname string
Expand Down Expand Up @@ -408,7 +414,8 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kub
kc.VolumePlugins,
kc.StreamingConnectionIdleTimeout,
kc.Recorder,
cadvisorInterface)
cadvisorInterface,
kc.StatusUpdateFrequency)

if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -903,7 +903,7 @@ type Node struct {
Status NodeStatus `json:"status,omitempty"`
}

// NodeList is a list of minions.
// NodeList is a list of nodes.
type NodeList struct {
TypeMeta `json:",inline"`
ListMeta `json:"metadata,omitempty"`
Expand Down
108 changes: 105 additions & 3 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
Expand All @@ -56,7 +57,7 @@ import (
)

const (
// taken from lmctfy https://github.com/google/lmctfy/blob/master/lmctfy/controllers/cpu_controller.cc
// Taken from lmctfy https://github.com/google/lmctfy/blob/master/lmctfy/controllers/cpu_controller.cc
minShares = 2
sharesPerCPU = 1024
milliCPUToCPU = 1000
Expand All @@ -67,6 +68,14 @@ const (

// Max amount of time to wait for the Docker daemon to come up.
maxWaitForDocker = 5 * time.Minute

// Initial node status update frequency and incremental frequency, for faster cluster startup.
// The update frequency will be increameted linearly, until it reaches status_update_frequency.
initialNodeStatusUpdateFrequency = 100 * time.Millisecond
nodeStatusUpdateFrequencyInc = 500 * time.Millisecond

// The retry count for updating node status at each sync period.
nodeStatusUpdateRetry = 5
)

var (
Expand Down Expand Up @@ -109,7 +118,8 @@ func NewMainKubelet(
volumePlugins []volume.Plugin,
streamingConnectionIdleTimeout time.Duration,
recorder record.EventRecorder,
cadvisorInterface cadvisor.Interface) (*Kubelet, error) {
cadvisorInterface cadvisor.Interface,
statusUpdateFrequency time.Duration) (*Kubelet, error) {
if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
}
Expand Down Expand Up @@ -159,6 +169,7 @@ func NewMainKubelet(
etcdClient: etcdClient,
kubeClient: kubeClient,
rootDirectory: rootDirectory,
statusUpdateFrequency: statusUpdateFrequency,
resyncInterval: resyncInterval,
podInfraContainerImage: podInfraContainerImage,
dockerIDToRef: map[dockertools.DockerID]*api.ObjectReference{},
Expand Down Expand Up @@ -218,6 +229,7 @@ type Kubelet struct {
rootDirectory string
podInfraContainerImage string
podWorkers *podWorkers
statusUpdateFrequency time.Duration
resyncInterval time.Duration
sourcesReady SourcesReadyFn

Expand Down Expand Up @@ -520,9 +532,36 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) {
if kl.dockerPuller == nil {
kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient, kl.pullQPS, kl.pullBurst)
}
if kl.kubeClient == nil {
glog.Warning("No api server defined - no node status update will be sent.")
}
go kl.syncNodeStatus()
kl.syncLoop(updates, kl)
}

// syncNodeStatus periodically synchronizes node status to master.
func (kl *Kubelet) syncNodeStatus() {
if kl.kubeClient == nil {
return
}
for feq := initialNodeStatusUpdateFrequency; feq < kl.statusUpdateFrequency; feq += nodeStatusUpdateFrequencyInc {
select {
case <-time.After(feq):
if err := kl.updateNodeStatus(); err != nil {
glog.Errorf("Unable to update node status: %v", err)
}
}
}
for {
select {
case <-time.After(kl.statusUpdateFrequency):
if err := kl.updateNodeStatus(); err != nil {
glog.Errorf("Unable to update node status: %v", err)
}
}
}
}

func makeBinds(pod *api.BoundPod, container *api.Container, podVolumes volumeMap) []string {
binds := []string{}
for _, mount := range container.VolumeMounts {
Expand All @@ -538,6 +577,7 @@ func makeBinds(pod *api.BoundPod, container *api.Container, podVolumes volumeMap
}
return binds
}

func makePortsAndBindings(container *api.Container) (map[docker.Port]struct{}, map[docker.Port][]docker.PortBinding) {
exposedPorts := map[docker.Port]struct{}{}
portBindings := map[docker.Port][]docker.PortBinding{}
Expand Down Expand Up @@ -1679,7 +1719,7 @@ func (kl *Kubelet) GetHostname() string {
return kl.hostname
}

// GetBoundPods returns all pods bound to the kubelet and their spec
// GetBoundPods returns all pods bound to the kubelet and their spec.
func (kl *Kubelet) GetBoundPods() ([]api.BoundPod, error) {
kl.podLock.RLock()
defer kl.podLock.RUnlock()
Expand All @@ -1699,6 +1739,68 @@ func (kl *Kubelet) GetPodByName(namespace, name string) (*api.BoundPod, bool) {
return nil, false
}

// updateNodeStatus updates node status to master with retries.
func (kl *Kubelet) updateNodeStatus() error {
for i := 0; i < nodeStatusUpdateRetry; i++ {
err := kl.tryUpdateNodeStatus()
if err != nil {
glog.Errorf("error updating node status, will retry: %v", err)
} else {
return nil
}
}
return fmt.Errorf("Update node status exceeds retry count")
}

// tryUpdateNodeStatus tries to update node status to master.
func (kl *Kubelet) tryUpdateNodeStatus() error {
node, err := kl.kubeClient.Nodes().Get(kl.hostname)
if err != nil {
return fmt.Errorf("error getting node %s: %v", kl.hostname, err)
}
if node == nil {
return fmt.Errorf("no node instance returned for %v", kl.hostname)
}

// TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start
// cAdvisor locally, e.g. for test-cmd.sh, and in integration test.
info, err := kl.GetMachineInfo()
if err != nil {
glog.Error("error getting machine info: %v", err)
} else {
node.Status.NodeInfo.MachineID = info.MachineID
node.Status.NodeInfo.SystemUUID = info.SystemUUID
node.Spec.Capacity = api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(
int64(info.NumCores*1000),
resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(
info.MemoryCapacity,
resource.BinarySI),
}
}

newCondition := api.NodeCondition{
Type: api.NodeReady,
Status: api.ConditionFull,
Reason: fmt.Sprintf("kubelet is posting ready status"),
LastProbeTime: util.Now(),
}
updated := false
for i := range node.Status.Conditions {
if node.Status.Conditions[i].Type == api.NodeReady {
node.Status.Conditions[i] = newCondition
updated = true
}
}
if !updated {
node.Status.Conditions = append(node.Status.Conditions, newCondition)
}

_, err = kl.kubeClient.Nodes().Update(node)
return err
}

// getPhase returns the phase of a pod given its container info.
func getPhase(spec *api.PodSpec, info api.PodInfo) api.PodPhase {
running := 0
Expand Down