Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow configurable Kubelet net image for isolated networks #1546

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 17 additions & 15 deletions cmd/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,21 +48,22 @@ import (
const defaultRootDir = "/var/lib/kubelet"

var (
config = flag.String("config", "", "Path to the config file or directory of files")
syncFrequency = flag.Duration("sync_frequency", 10*time.Second, "Max period between synchronizing running containers and config")
fileCheckFrequency = flag.Duration("file_check_frequency", 20*time.Second, "Duration between checking config files for new data")
httpCheckFrequency = flag.Duration("http_check_frequency", 20*time.Second, "Duration between checking http for new data")
manifestURL = flag.String("manifest_url", "", "URL for accessing the container manifest")
enableServer = flag.Bool("enable_server", true, "Enable the info server")
address = flag.String("address", "127.0.0.1", "The address for the info server to serve on (set to 0.0.0.0 or \"\" for all interfaces)")
port = flag.Uint("port", master.KubeletPort, "The port for the info server to serve on")
hostnameOverride = flag.String("hostname_override", "", "If non-empty, will use this string as identification instead of the actual hostname.")
dockerEndpoint = flag.String("docker_endpoint", "", "If non-empty, use this for the docker endpoint to communicate with")
etcdServerList util.StringList
rootDirectory = flag.String("root_dir", defaultRootDir, "Directory path for managing kubelet files (volume mounts,etc).")
allowPrivileged = flag.Bool("allow_privileged", false, "If true, allow containers to request privileged mode. [default=false]")
registryPullQPS = flag.Float64("registry_qps", 0.0, "If > 0, limit registry pull QPS to this value. If 0, unlimited. [default=0.0]")
registryBurst = flag.Int("registry_burst", 10, "Maximum size of a bursty pulls, temporarily allows pulls to burst to this number, while still not exceeding registry_qps. Only used if --registry_qps > 0")
config = flag.String("config", "", "Path to the config file or directory of files")
syncFrequency = flag.Duration("sync_frequency", 10*time.Second, "Max period between synchronizing running containers and config")
fileCheckFrequency = flag.Duration("file_check_frequency", 20*time.Second, "Duration between checking config files for new data")
httpCheckFrequency = flag.Duration("http_check_frequency", 20*time.Second, "Duration between checking http for new data")
manifestURL = flag.String("manifest_url", "", "URL for accessing the container manifest")
enableServer = flag.Bool("enable_server", true, "Enable the info server")
address = flag.String("address", "127.0.0.1", "The address for the info server to serve on (set to 0.0.0.0 or \"\" for all interfaces)")
port = flag.Uint("port", master.KubeletPort, "The port for the info server to serve on")
hostnameOverride = flag.String("hostname_override", "", "If non-empty, will use this string as identification instead of the actual hostname.")
networkContainerImage = flag.String("network_container_image", kubelet.NetworkContainerImage, "The image that network containers in each pod will use.")
dockerEndpoint = flag.String("docker_endpoint", "", "If non-empty, use this for the docker endpoint to communicate with")
etcdServerList util.StringList
rootDirectory = flag.String("root_dir", defaultRootDir, "Directory path for managing kubelet files (volume mounts,etc).")
allowPrivileged = flag.Bool("allow_privileged", false, "If true, allow containers to request privileged mode. [default=false]")
registryPullQPS = flag.Float64("registry_qps", 0.0, "If > 0, limit registry pull QPS to this value. If 0, unlimited. [default=0.0]")
registryBurst = flag.Int("registry_burst", 10, "Maximum size of a bursty pulls, temporarily allows pulls to burst to this number, while still not exceeding registry_qps. Only used if --registry_qps > 0")
)

func init() {
Expand Down Expand Up @@ -159,6 +160,7 @@ func main() {
cadvisorClient,
etcdClient,
*rootDirectory,
*networkContainerImage,
*syncFrequency,
float32(*registryPullQPS),
*registryBurst)
Expand Down
15 changes: 13 additions & 2 deletions pkg/kubelet/dockertools/fake_docker_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (f *FakeDockerClient) CreateContainer(c docker.CreateContainerOptions) (*do
// This is not a very good fake. We'll just add this container's name to the list.
// Docker likes to add a '/', so copy that behavior.
name := "/" + c.Name
f.ContainerList = append(f.ContainerList, docker.APIContainers{ID: name, Names: []string{name}})
f.ContainerList = append(f.ContainerList, docker.APIContainers{ID: name, Names: []string{name}, Image: c.Config.Image})
return &docker.Container{ID: name}, nil
}

Expand Down Expand Up @@ -138,6 +138,7 @@ func (f *FakeDockerClient) InspectImage(name string) (*docker.Image, error) {
type FakeDockerPuller struct {
sync.Mutex

HasImages []string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StringSet?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think eventually we'd test duplicates (check for double pulls with TTL?) so set might not be appropriate.

----- Original Message -----

@@ -138,6 +138,7 @@ func (f _FakeDockerClient) InspectImage(name string)
(_docker.Image, error) {
type FakeDockerPuller struct {
sync.Mutex

  • HasImages []string

StringSet?


Reply to this email directly or view it on GitHub:
https://github.com/GoogleCloudPlatform/kubernetes/pull/1546/files#r18361721

ImagesPulled []string

// Every pull will return the first error here, and then reslice
Expand All @@ -159,5 +160,15 @@ func (f *FakeDockerPuller) Pull(image string) (err error) {
}

func (f *FakeDockerPuller) IsImagePresent(name string) (bool, error) {
return true, nil
f.Lock()
defer f.Unlock()
if f.HasImages == nil {
return true, nil
}
for _, s := range f.HasImages {
if s == name {
return true, nil
}
}
return false, nil
}
59 changes: 35 additions & 24 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,33 +67,36 @@ func NewMainKubelet(
cc CadvisorInterface,
ec tools.EtcdClient,
rd string,
ni string,
ri time.Duration,
pullQPS float32,
pullBurst int) *Kubelet {
return &Kubelet{
hostname: hn,
dockerClient: dc,
cadvisorClient: cc,
etcdClient: ec,
rootDirectory: rd,
resyncInterval: ri,
podWorkers: newPodWorkers(),
runner: dockertools.NewDockerContainerCommandRunner(),
httpClient: &http.Client{},
pullQPS: pullQPS,
pullBurst: pullBurst,
hostname: hn,
dockerClient: dc,
cadvisorClient: cc,
etcdClient: ec,
rootDirectory: rd,
resyncInterval: ri,
networkContainerImage: ni,
podWorkers: newPodWorkers(),
runner: dockertools.NewDockerContainerCommandRunner(),
httpClient: &http.Client{},
pullQPS: pullQPS,
pullBurst: pullBurst,
}
}

// NewIntegrationTestKubelet creates a new Kubelet for use in integration tests.
// TODO: add more integration tests, and expand parameter list as needed.
func NewIntegrationTestKubelet(hn string, dc dockertools.DockerInterface) *Kubelet {
return &Kubelet{
hostname: hn,
dockerClient: dc,
dockerPuller: &dockertools.FakeDockerPuller{},
resyncInterval: 3 * time.Second,
podWorkers: newPodWorkers(),
hostname: hn,
dockerClient: dc,
dockerPuller: &dockertools.FakeDockerPuller{},
networkContainerImage: NetworkContainerImage,
resyncInterval: 3 * time.Second,
podWorkers: newPodWorkers(),
}
}

Expand All @@ -103,11 +106,12 @@ type httpGetInterface interface {

// Kubelet is the main kubelet implementation.
type Kubelet struct {
hostname string
dockerClient dockertools.DockerInterface
rootDirectory string
podWorkers podWorkers
resyncInterval time.Duration
hostname string
dockerClient dockertools.DockerInterface
rootDirectory string
networkContainerImage string
podWorkers podWorkers
resyncInterval time.Duration

// Optional, no events will be sent without it
etcdClient tools.EtcdClient
Expand Down Expand Up @@ -368,7 +372,7 @@ func (kl *Kubelet) killContainerByID(ID, name string) error {

const (
networkContainerName = "net"
networkContainerImage = "kubernetes/pause:latest"
NetworkContainerImage = "kubernetes/pause:latest"
)

// createNetworkContainer starts the network container for a pod. Returns the docker container ID of the newly created container.
Expand All @@ -381,12 +385,19 @@ func (kl *Kubelet) createNetworkContainer(pod *Pod) (dockertools.DockerID, error
}
container := &api.Container{
Name: networkContainerName,
Image: networkContainerImage,
Image: kl.networkContainerImage,
Ports: ports,
}
if err := kl.dockerPuller.Pull(networkContainerImage); err != nil {
// TODO: make this a TTL based pull (if image older than X policy, pull)
ok, err := kl.dockerPuller.IsImagePresent(container.Image)
if err != nil {
return "", err
}
if !ok {
if err := kl.dockerPuller.Pull(container.Image); err != nil {
return "", err
}
}
return kl.runContainer(pod, container, nil, "")
}

Expand Down
53 changes: 53 additions & 0 deletions pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"reflect"
"regexp"
"strconv"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -206,6 +207,7 @@ func matchString(t *testing.T, pattern, str string) bool {

func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
kubelet, _, fakeDocker := newTestKubelet(t)
kubelet.networkContainerImage = "custom_image_name"
fakeDocker.ContainerList = []docker.APIContainers{}
err := kubelet.SyncPods([]Pod{
{
Expand All @@ -228,6 +230,57 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
"list", "list", "create", "start", "list", "inspect", "list", "create", "start"})

fakeDocker.Lock()

found := false
for _, c := range fakeDocker.ContainerList {
if c.Image == "custom_image_name" && strings.HasPrefix(c.Names[0], "/k8s_net") {
found = true
}
}
if !found {
t.Errorf("Custom net container not found: %v", fakeDocker.ContainerList)
}

if len(fakeDocker.Created) != 2 ||
!matchString(t, "k8s_net\\.[a-f0-9]+_foo.test_", fakeDocker.Created[0]) ||
!matchString(t, "k8s_bar\\.[a-f0-9]+_foo.test_", fakeDocker.Created[1]) {
t.Errorf("Unexpected containers created %v", fakeDocker.Created)
}
fakeDocker.Unlock()
}

func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
kubelet, _, fakeDocker := newTestKubelet(t)
puller := kubelet.dockerPuller.(*dockertools.FakeDockerPuller)
puller.HasImages = []string{}
kubelet.networkContainerImage = "custom_image_name"
fakeDocker.ContainerList = []docker.APIContainers{}
err := kubelet.SyncPods([]Pod{
{
Name: "foo",
Namespace: "test",
Manifest: api.ContainerManifest{
ID: "foo",
Containers: []api.Container{
{Name: "bar"},
},
},
},
})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
kubelet.drainWorkers()

verifyCalls(t, fakeDocker, []string{
"list", "list", "create", "start", "list", "inspect", "list", "create", "start"})

fakeDocker.Lock()

if !reflect.DeepEqual(puller.ImagesPulled, []string{"custom_image_name", ""}) {
t.Errorf("Unexpected pulled containers: %v", puller.ImagesPulled)
}

if len(fakeDocker.Created) != 2 ||
!matchString(t, "k8s_net\\.[a-f0-9]+_foo.test_", fakeDocker.Created[0]) ||
!matchString(t, "k8s_bar\\.[a-f0-9]+_foo.test_", fakeDocker.Created[1]) {
Expand Down