Skip to content

Commit

Permalink
WIP: run kubelet in container
Browse files Browse the repository at this point in the history
  • Loading branch information
pmorie committed Apr 29, 2015
1 parent 4d2f7a3 commit b0eef85
Show file tree
Hide file tree
Showing 29 changed files with 481 additions and 107 deletions.
16 changes: 15 additions & 1 deletion cmd/kubelet/app/server.go
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount"
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"

"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
Expand Down Expand Up @@ -99,6 +100,7 @@ type KubeletServer struct {
CertDirectory string
NodeStatusUpdateFrequency time.Duration
ResourceContainer string
Containerized bool

// Flags intended for testing

Expand Down Expand Up @@ -206,6 +208,9 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) {
// Flags intended for testing, not recommended used in production environments.
fs.BoolVar(&s.ReallyCrashForTesting, "really-crash-for-testing", s.ReallyCrashForTesting, "If true, when panics occur crash. Intended for testing.")
fs.Float64Var(&s.ChaosChance, "chaos-chance", s.ChaosChance, "If > 0.0, introduce random client errors and latency. Intended for testing. [default=0.0]")

// HACK: are you containerized?
fs.BoolVar(&s.Containerized, "containerized", s.Containerized, "Experimental support for running kubelet in a container. Intended for testing.")
}

// Run runs the specified KubeletServer. This should never exit.
Expand Down Expand Up @@ -264,6 +269,11 @@ func (s *KubeletServer) Run(_ []string) error {
KeyFile: s.TLSPrivateKeyFile,
}

mounter := mount.New()
if s.Containerized {
mounter = &mount.NsenterMounter{}
}

kcfg := KubeletConfig{
Address: s.Address,
AllowPrivileged: s.AllowPrivileged,
Expand Down Expand Up @@ -301,6 +311,7 @@ func (s *KubeletServer) Run(_ []string) error {
Cloud: cloud,
NodeStatusUpdateFrequency: s.NodeStatusUpdateFrequency,
ResourceContainer: s.ResourceContainer,
Mounter: mounter,
}

RunKubelet(&kcfg, nil)
Expand Down Expand Up @@ -409,6 +420,7 @@ func SimpleKubelet(client *client.Client,
NodeStatusUpdateFrequency: 10 * time.Second,
ResourceContainer: "/kubelet",
OSInterface: osInterface,
Mounter: mount.New(),
}
return &kcfg
}
Expand Down Expand Up @@ -536,6 +548,7 @@ type KubeletConfig struct {
NodeStatusUpdateFrequency time.Duration
ResourceContainer string
OSInterface kubecontainer.OSInterface
Mounter mount.Interface
}

func createAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.PodConfig, err error) {
Expand Down Expand Up @@ -580,7 +593,8 @@ func createAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.Pod
kc.Cloud,
kc.NodeStatusUpdateFrequency,
kc.ResourceContainer,
kc.OSInterface)
kc.OSInterface,
kc.Mounter)

if err != nil {
return nil, nil, err
Expand Down
21 changes: 21 additions & 0 deletions contrib/docker-kubelet/Dockerfile
@@ -0,0 +1,21 @@
# Copyright 2015 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

FROM centos
ADD kubelet kubelet
ADD nsenter nsenter
ADD mt mt
VOLUME /var/lib/docker
VOLUME /var/lib/kubelet
CMD [ "/kubelet" ]
28 changes: 28 additions & 0 deletions hack/docker-kubelet.sh
@@ -0,0 +1,28 @@
#!/bin/bash

# Copyright 2015 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


docker run \
--volume=/:/rootfs:ro \
--volume=/var/run:/var/run:rw \
--volume=/sys:/sys:ro \
--volume=/var/lib/docker/:/var/lib/docker:ro \
--volume=/var/lib/kubelet/:/var/lib/kubelet:rw \
--net=host \
--detach=true \
--privileged=true \
kubernetes/kubelet:latest \
/kubelet --v=3 --chaos-chance="0.0" --hostname-override="127.0.0.1" --address="127.0.0.1" --api-servers="127.0.0.1:8080" --port="10250" --resource-container=""
28 changes: 17 additions & 11 deletions hack/local-up-cluster.sh
Expand Up @@ -46,6 +46,7 @@ API_CORS_ALLOWED_ORIGINS=${API_CORS_ALLOWED_ORIGINS:-"/127.0.0.1(:[0-9]+)?$,/loc
KUBELET_PORT=${KUBELET_PORT:-10250}
LOG_LEVEL=${LOG_LEVEL:-3}
CHAOS_CHANCE=${CHAOS_CHANCE:-0.0}
SKIP_KUBELET=${SKIP_KUBELET:-""}

# For the common local scenario, fail fast if server is already running.
# this can happen if you run local-up-cluster.sh twice and kill etcd in between.
Expand Down Expand Up @@ -100,7 +101,7 @@ cleanup()
echo "Cleaning up..."
[[ -n "${APISERVER_PID-}" ]] && sudo kill "${APISERVER_PID}"
[[ -n "${CTLRMGR_PID-}" ]] && sudo kill "${CTLRMGR_PID}"
[[ -n "${KUBELET_PID-}" ]] && sudo kill "${KUBELET_PID}"
[[ -n "${KUBELET_PID-}" && -z "${SKIP_KUBELET}" ]] && sudo kill "${KUBELET_PID}"
[[ -n "${PROXY_PID-}" ]] && sudo kill "${PROXY_PID}"
[[ -n "${SCHEDULER_PID-}" ]] && sudo kill "${SCHEDULER_PID}"

Expand Down Expand Up @@ -141,16 +142,21 @@ sudo -E "${GO_OUT}/kube-controller-manager" \
--master="${API_HOST}:${API_PORT}" >"${CTLRMGR_LOG}" 2>&1 &
CTLRMGR_PID=$!

KUBELET_LOG=/tmp/kubelet.log
sudo -E "${GO_OUT}/kubelet" \
--v=${LOG_LEVEL} \
--chaos_chance="${CHAOS_CHANCE}" \
--hostname_override="127.0.0.1" \
--address="127.0.0.1" \
--api_servers="${API_HOST}:${API_PORT}" \
--auth_path="${KUBE_ROOT}/hack/.test-cmd-auth" \
--port="$KUBELET_PORT" >"${KUBELET_LOG}" 2>&1 &
KUBELET_PID=$!
if [[ -z "${SKIP_KUBELET}" ]]; then
KUBELET_LOG=/tmp/kubelet.log
sudo -E "${GO_OUT}/kubelet" \
--v=${LOG_LEVEL} \
--chaos_chance="${CHAOS_CHANCE}" \
--hostname_override="127.0.0.1" \
--address="127.0.0.1" \
--api_servers="${API_HOST}:${API_PORT}" \
--auth_path="${KUBE_ROOT}/hack/.test-cmd-auth" \
--port="$KUBELET_PORT" >"${KUBELET_LOG}" 2>&1 &
KUBELET_PID=$!
else
echo "Skipping kubelet start because SKIP_KUBELET is set"
KUBELET_LOG="(no kubelet log created)"
fi

PROXY_LOG=/tmp/kube-proxy.log
sudo -E "${GO_OUT}/kube-proxy" \
Expand Down
7 changes: 6 additions & 1 deletion pkg/kubelet/kubelet.go
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
utilErrors "github.com/GoogleCloudPlatform/kubernetes/pkg/util/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount"
"github.com/GoogleCloudPlatform/kubernetes/pkg/version"
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
Expand Down Expand Up @@ -123,7 +124,8 @@ func NewMainKubelet(
cloud cloudprovider.Interface,
nodeStatusUpdateFrequency time.Duration,
resourceContainer string,
osInterface kubecontainer.OSInterface) (*Kubelet, error) {
osInterface kubecontainer.OSInterface,
mounter mount.Interface) (*Kubelet, error) {
if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
}
Expand Down Expand Up @@ -244,6 +246,7 @@ func NewMainKubelet(
nodeStatusUpdateFrequency: nodeStatusUpdateFrequency,
resourceContainer: resourceContainer,
os: osInterface,
mounter: mounter,
}

klet.podManager = newBasicPodManager(klet.kubeClient)
Expand Down Expand Up @@ -395,6 +398,8 @@ type Kubelet struct {
resourceContainer string

os kubecontainer.OSInterface

mounter mount.Interface
}

// getRootDir returns the full path to the directory under which kubelet can
Expand Down
23 changes: 13 additions & 10 deletions pkg/kubelet/volumes.go
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount"
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
"github.com/davecgh/go-spew/spew"
"github.com/golang/glog"
Expand Down Expand Up @@ -54,15 +55,15 @@ func (vh *volumeHost) GetKubeClient() client.Interface {
return vh.kubelet.kubeClient
}

func (vh *volumeHost) NewWrapperBuilder(spec *volume.Spec, podRef *api.ObjectReference, opts volume.VolumeOptions) (volume.Builder, error) {
b, err := vh.kubelet.newVolumeBuilderFromPlugins(spec, podRef, opts)
func (vh *volumeHost) NewWrapperBuilder(spec *volume.Spec, podRef *api.ObjectReference, opts volume.VolumeOptions, mounter mount.Interface) (volume.Builder, error) {
b, err := vh.kubelet.newVolumeBuilderFromPlugins(spec, podRef, opts, mounter)
if err == nil && b == nil {
return nil, errUnsupportedVolumeType
}
return b, nil
}

func (vh *volumeHost) NewWrapperCleaner(spec *volume.Spec, podUID types.UID) (volume.Cleaner, error) {
func (vh *volumeHost) NewWrapperCleaner(spec *volume.Spec, podUID types.UID, mounter mount.Interface) (volume.Cleaner, error) {
plugin, err := vh.kubelet.volumePluginMgr.FindPluginBySpec(spec)
if err != nil {
return nil, err
Expand All @@ -71,14 +72,14 @@ func (vh *volumeHost) NewWrapperCleaner(spec *volume.Spec, podUID types.UID) (vo
// Not found but not an error
return nil, nil
}
c, err := plugin.NewCleaner(spec.Name, podUID)
c, err := plugin.NewCleaner(spec.Name, podUID, mounter)
if err == nil && c == nil {
return nil, errUnsupportedVolumeType
}
return c, nil
}

func (kl *Kubelet) newVolumeBuilderFromPlugins(spec *volume.Spec, podRef *api.ObjectReference, opts volume.VolumeOptions) (volume.Builder, error) {
func (kl *Kubelet) newVolumeBuilderFromPlugins(spec *volume.Spec, podRef *api.ObjectReference, opts volume.VolumeOptions, mounter mount.Interface) (volume.Builder, error) {
plugin, err := kl.volumePluginMgr.FindPluginBySpec(spec)
if err != nil {
return nil, fmt.Errorf("can't use volume plugins for %s: %v", spew.Sprintf("%#v", *spec), err)
Expand All @@ -87,7 +88,7 @@ func (kl *Kubelet) newVolumeBuilderFromPlugins(spec *volume.Spec, podRef *api.Ob
// Not found but not an error
return nil, nil
}
builder, err := plugin.NewBuilder(spec, podRef, opts)
builder, err := plugin.NewBuilder(spec, podRef, opts, mounter)
if err != nil {
return nil, fmt.Errorf("failed to instantiate volume plugin for %s: %v", spew.Sprintf("%#v", *spec), err)
}
Expand All @@ -113,7 +114,8 @@ func (kl *Kubelet) mountExternalVolumes(pod *api.Pod) (volumeMap, error) {

// Try to use a plugin for this volume.
internal := volume.NewSpecFromVolume(volSpec)
builder, err := kl.newVolumeBuilderFromPlugins(internal, podRef, volume.VolumeOptions{rootContext})
// TODO: inject right version of mounter
builder, err := kl.newVolumeBuilderFromPlugins(internal, podRef, volume.VolumeOptions{rootContext}, kl.mounter)
if err != nil {
glog.Errorf("Could not create volume builder for pod %s: %v", pod.UID, err)
return nil, err
Expand Down Expand Up @@ -164,7 +166,8 @@ func (kl *Kubelet) getPodVolumesFromDisk() map[string]volume.Cleaner {
// or volume objects.

// Try to use a plugin for this volume.
cleaner, err := kl.newVolumeCleanerFromPlugins(volumeKind, volumeName, podUID)
// TODO: inject right version of mounter
cleaner, err := kl.newVolumeCleanerFromPlugins(volumeKind, volumeName, podUID, kl.mounter)
if err != nil {
glog.Errorf("Could not create volume cleaner for %s: %v", volumeNameDir.Name(), err)
continue
Expand All @@ -180,7 +183,7 @@ func (kl *Kubelet) getPodVolumesFromDisk() map[string]volume.Cleaner {
return currentVolumes
}

func (kl *Kubelet) newVolumeCleanerFromPlugins(kind string, name string, podUID types.UID) (volume.Cleaner, error) {
func (kl *Kubelet) newVolumeCleanerFromPlugins(kind string, name string, podUID types.UID, mounter mount.Interface) (volume.Cleaner, error) {
plugName := util.UnescapeQualifiedNameForDisk(kind)
plugin, err := kl.volumePluginMgr.FindPluginByName(plugName)
if err != nil {
Expand All @@ -191,7 +194,7 @@ func (kl *Kubelet) newVolumeCleanerFromPlugins(kind string, name string, podUID
// Not found but not an error.
return nil, nil
}
cleaner, err := plugin.NewCleaner(name, podUID)
cleaner, err := plugin.NewCleaner(name, podUID, mounter)
if err != nil {
return nil, fmt.Errorf("failed to instantiate volume plugin for %s/%s: %v", podUID, kind, err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/mount/mount.go
Expand Up @@ -47,7 +47,7 @@ type MountPoint struct {

// New returns a mount.Interface for the current system.
func New() Interface {
return &Mounter{}
return &syscallMounter{}
}

// GetMountRefs finds all other references to the device referenced
Expand Down
18 changes: 9 additions & 9 deletions pkg/util/mount/mount_linux.go
Expand Up @@ -35,32 +35,32 @@ const FlagBind = syscall.MS_BIND
const FlagReadOnly = syscall.MS_RDONLY

// Mounter implements mount.Interface for linux platform.
type Mounter struct{}
type syscallMounter struct{}

// Mount wraps syscall.Mount()
func (mounter *Mounter) Mount(source string, target string, fstype string, flags uintptr, data string) error {
func (*syscallMounter) Mount(source string, target string, fstype string, flags uintptr, data string) error {
glog.V(5).Infof("Mounting %s %s %s %d %s", source, target, fstype, flags, data)
return syscall.Mount(source, target, fstype, flags, data)
}

// Unmount wraps syscall.Unmount()
func (mounter *Mounter) Unmount(target string, flags int) error {
func (*syscallMounter) Unmount(target string, flags int) error {
return syscall.Unmount(target, flags)
}

// How many times to retry for a consistent read of /proc/mounts.
const maxListTries = 3

// List returns a list of all mounted filesystems.
func (*Mounter) List() ([]MountPoint, error) {
hash1, err := readProcMounts(nil)
func (*syscallMounter) List() ([]MountPoint, error) {
hash1, err := readProcMounts("/proc/mounts", nil)
if err != nil {
return nil, err
}

for i := 0; i < maxListTries; i++ {
mps := []MountPoint{}
hash2, err := readProcMounts(&mps)
hash2, err := readProcMounts("proc/mounts", &mps)
if err != nil {
return nil, err
}
Expand All @@ -76,7 +76,7 @@ func (*Mounter) List() ([]MountPoint, error) {
// IsMountPoint determines if a directory is a mountpoint, by comparing the device for the
// directory with the device for it's parent. If they are the same, it's not a mountpoint,
// if they're different, it is.
func (mounter *Mounter) IsMountPoint(file string) (bool, error) {
func (*syscallMounter) IsMountPoint(file string) (bool, error) {
stat, err := os.Stat(file)
if err != nil {
return false, err
Expand All @@ -94,8 +94,8 @@ const expectedNumFieldsPerLine = 6

// readProcMounts reads /proc/mounts and produces a hash of the contents. If the out
// argument is not nil, this fills it with MountPoint structs.
func readProcMounts(out *[]MountPoint) (uint32, error) {
file, err := os.Open("/proc/mounts")
func readProcMounts(mountFilePath string, out *[]MountPoint) (uint32, error) {
file, err := os.Open(mountFilePath)
if err != nil {
return 0, err
}
Expand Down

0 comments on commit b0eef85

Please sign in to comment.