Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor kubelet, standalone k8s and integration test to all use the same code. #2689

Merged
merged 1 commit into from
Dec 2, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 4 additions & 20 deletions cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"net"
"net/http"
"net/http/httptest"
"os"
"reflect"
"runtime"
"strconv"
Expand All @@ -40,12 +39,11 @@ import (
minionControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller"
replicationControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/config"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master"
"github.com/GoogleCloudPlatform/kubernetes/pkg/service"
"github.com/GoogleCloudPlatform/kubernetes/pkg/standalone"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler"
Expand All @@ -56,6 +54,7 @@ import (
)

const testRootDir = "/tmp/kubelet"
const testRootDir2 = "/tmp/kubelet2"

var (
fakeDocker1, fakeDocker2 dockertools.FakeDockerClient
Expand Down Expand Up @@ -187,26 +186,11 @@ func startComponents(manifestURL string) (apiServerURL string) {
minionController.Run(10 * time.Second)

// Kubelet (localhost)
os.MkdirAll(testRootDir, 0750)
cfg1 := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates)
config.NewSourceEtcd(config.EtcdKeyForHost(machineList[0]), etcdClient, cfg1.Channel("etcd"))
config.NewSourceURL(manifestURL, 5*time.Second, cfg1.Channel("url"))
myKubelet := kubelet.NewIntegrationTestKubelet(machineList[0], testRootDir, &fakeDocker1)
go util.Forever(func() { myKubelet.Run(cfg1.Updates()) }, 0)
go util.Forever(func() {
kubelet.ListenAndServeKubeletServer(myKubelet, cfg1.Channel("http"), net.ParseIP("127.0.0.1"), 10250, true)
}, 0)

standalone.SimpleRunKubelet(etcdClient, &fakeDocker1, machineList[0], testRootDir, manifestURL, "127.0.0.1", 10250)
// Kubelet (machine)
// Create a second kubelet so that the guestbook example's two redis slaves both
// have a place they can schedule.
cfg2 := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates)
config.NewSourceEtcd(config.EtcdKeyForHost(machineList[1]), etcdClient, cfg2.Channel("etcd"))
otherKubelet := kubelet.NewIntegrationTestKubelet(machineList[1], testRootDir, &fakeDocker2)
go util.Forever(func() { otherKubelet.Run(cfg2.Updates()) }, 0)
go util.Forever(func() {
kubelet.ListenAndServeKubeletServer(otherKubelet, cfg2.Channel("http"), net.ParseIP("127.0.0.1"), 10251, true)
}, 0)
standalone.SimpleRunKubelet(etcdClient, &fakeDocker2, machineList[1], testRootDir2, "", "127.0.0.1", 10251)

return apiServer.URL
}
Expand Down
237 changes: 37 additions & 200 deletions cmd/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,15 @@ import (
"flag"
"math/rand"
"net"
"net/http"
"os"
"os/exec"
"path"
"strings"
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/clientauth"
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
kconfig "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/config"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports"
"github.com/GoogleCloudPlatform/kubernetes/pkg/standalone"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/version/verflag"
"github.com/coreos/go-etcd/etcd"
"github.com/fsouza/go-dockerclient"
"github.com/golang/glog"
cadvisor "github.com/google/cadvisor/client"
)

const defaultRootDir = "/var/lib/kubelet"
Expand Down Expand Up @@ -82,52 +69,15 @@ func init() {
flag.Var(&apiServerList, "api_servers", "List of Kubernetes API servers to publish events to. (ip:port), comma separated.")
}

func getDockerEndpoint() string {
var endpoint string
if len(*dockerEndpoint) > 0 {
endpoint = *dockerEndpoint
} else if len(os.Getenv("DOCKER_HOST")) > 0 {
endpoint = os.Getenv("DOCKER_HOST")
} else {
endpoint = "unix:///var/run/docker.sock"
}
glog.Infof("Connecting to docker on %s", endpoint)

return endpoint
}

func getHostname() string {
hostname := []byte(*hostnameOverride)
if string(hostname) == "" {
// Note: We use exec here instead of os.Hostname() because we
// want the FQDN, and this is the easiest way to get it.
fqdn, err := exec.Command("hostname", "-f").Output()
if err != nil {
glog.Fatalf("Couldn't determine hostname: %v", err)
func setupRunOnce() {
if *runonce {
if len(etcdServerList) > 0 {
glog.Fatalf("invalid option: --runonce and --etcd_servers are mutually exclusive")
}
if *enableServer {
glog.Infof("--runonce is set, disabling server")
*enableServer = false
}
hostname = fqdn
}
return strings.TrimSpace(string(hostname))
}

func getApiserverClient() (*client.Client, error) {
authInfo, err := clientauth.LoadFromFile(*authPath)
if err != nil {
return nil, err
}
clientConfig, err := authInfo.MergeWithConfig(client.Config{})
if err != nil {
return nil, err
}
// TODO: adapt Kube client to support LB over several servers
if len(apiServerList) > 1 {
glog.Infof("Mulitple api servers specified. Picking first one")
}
clientConfig.Host = apiServerList[0]
if c, err := client.New(&clientConfig); err != nil {
return nil, err
} else {
return c, nil
}
}

Expand All @@ -139,147 +89,34 @@ func main() {

verflag.PrintAndExitIfRequested()

if *runonce {
exclusiveFlag := "invalid option: --runonce and %s are mutually exclusive"
if len(etcdServerList) > 0 {
glog.Fatalf(exclusiveFlag, "--etcd_servers")
}
if *enableServer {
glog.Infof("--runonce is set, disabling server")
*enableServer = false
}
}

etcd.SetLogger(util.NewLogger("etcd "))

// Make an API client if possible.
if len(apiServerList) < 1 {
glog.Info("No api servers specified.")
} else {
if apiClient, err := getApiserverClient(); err != nil {
glog.Errorf("Unable to make apiserver client: %v", err)
} else {
// Send events to APIserver if there is a client.
glog.Infof("Sending events to APIserver.")
record.StartRecording(apiClient.Events(""), "kubelet")
}
}

// Log the events locally too.
record.StartLogging(glog.Infof)

capabilities.Initialize(capabilities.Capabilities{
AllowPrivileged: *allowPrivileged,
})

dockerClient, err := docker.NewClient(getDockerEndpoint())
if err != nil {
glog.Fatal("Couldn't connect to docker.")
}

hostname := getHostname()

if *rootDirectory == "" {
glog.Fatal("Invalid root directory path.")
}
*rootDirectory = path.Clean(*rootDirectory)
if err := os.MkdirAll(*rootDirectory, 0750); err != nil {
glog.Fatalf("Error creating root directory: %v", err)
}

// source of all configuration
cfg := kconfig.NewPodConfig(kconfig.PodConfigNotificationSnapshotAndUpdates)

// define file config source
if *config != "" {
kconfig.NewSourceFile(*config, *fileCheckFrequency, cfg.Channel("file"))
}

// define url config source
if *manifestURL != "" {
kconfig.NewSourceURL(*manifestURL, *httpCheckFrequency, cfg.Channel("http"))
}

// define etcd config source and initialize etcd client
var etcdClient *etcd.Client
if len(etcdServerList) > 0 {
etcdClient = etcd.NewClient(etcdServerList)
} else if *etcdConfigFile != "" {
var err error
etcdClient, err = etcd.NewClientFromFile(*etcdConfigFile)
if err != nil {
glog.Fatalf("Error with etcd config file: %v", err)
}
}

if etcdClient != nil {
glog.Infof("Watching for etcd configs at %v", etcdClient.GetCluster())
kconfig.NewSourceEtcd(kconfig.EtcdKeyForHost(hostname), etcdClient, cfg.Channel("etcd"))
}

// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
// up into "per source" synchronizations

k := kubelet.NewMainKubelet(
getHostname(),
dockerClient,
etcdClient,
*rootDirectory,
*networkContainerImage,
*syncFrequency,
float32(*registryPullQPS),
*registryBurst,
*minimumGCAge,
*maxContainerCount)

k.BirthCry()

go func() {
util.Forever(func() {
err := k.GarbageCollectContainers()
if err != nil {
glog.Errorf("Garbage collect failed: %v", err)
}
}, time.Minute*1)
}()

go func() {
defer util.HandleCrash()
// TODO: Monitor this connection, reconnect if needed?
glog.V(1).Infof("Trying to create cadvisor client.")
cadvisorClient, err := cadvisor.NewClient("http://127.0.0.1:4194")
if err != nil {
glog.Errorf("Error on creating cadvisor client: %v", err)
return
}
glog.V(1).Infof("Successfully created cadvisor client.")
k.SetCadvisorClient(cadvisorClient)
}()

// TODO: These should probably become more plugin-ish: register a factory func
// in each checker's init(), iterate those here.
health.AddHealthChecker(health.NewExecHealthChecker(k))
health.AddHealthChecker(health.NewHTTPHealthChecker(&http.Client{}))
health.AddHealthChecker(&health.TCPHealthChecker{})

// process pods and exit.
if *runonce {
if _, err := k.RunOnce(cfg.Updates()); err != nil {
glog.Fatalf("--runonce failed: %v", err)
}
return
}

// start the kubelet
go util.Forever(func() { k.Run(cfg.Updates()) }, 0)

// start the kubelet server
if *enableServer {
go util.Forever(func() {
kubelet.ListenAndServeKubeletServer(k, cfg.Channel("http"), net.IP(address), *port, *enableDebuggingHandlers)
}, 0)
}

setupRunOnce()

kcfg := standalone.KubeletConfig{
Address: address,
AuthPath: *authPath,
ApiServerList: apiServerList,
AllowPrivileged: *allowPrivileged,
HostnameOverride: *hostnameOverride,
RootDirectory: *rootDirectory,
ConfigFile: *config,
ManifestURL: *manifestURL,
FileCheckFrequency: *fileCheckFrequency,
HttpCheckFrequency: *httpCheckFrequency,
NetworkContainerImage: *networkContainerImage,
SyncFrequency: *syncFrequency,
RegistryPullQPS: *registryPullQPS,
RegistryBurst: *registryBurst,
MinimumGCAge: *minimumGCAge,
MaxContainerCount: *maxContainerCount,
Runonce: *runonce,
Port: *port,
EnableServer: *enableServer,
EnableDebuggingHandlers: *enableDebuggingHandlers,
DockerClient: kubelet.ConnectToDockerOrDie(*dockerEndpoint),
EtcdClient: kubelet.EtcdClientOrDie(etcdServerList, *etcdConfigFile),
}

standalone.RunKubelet(&kcfg)
// runs forever
select {}
}
5 changes: 4 additions & 1 deletion cmd/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/standalone"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
Expand All @@ -50,7 +51,9 @@ func startComponents(etcdClient tools.EtcdClient, cl *client.Client, addr string
standalone.RunApiServer(cl, etcdClient, addr, port)
standalone.RunScheduler(cl)
standalone.RunControllerManager(machineList, cl, *nodeMilliCPU, *nodeMemory)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this command should offer you some choice as to which components to run?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, eventually. One thing at a time ;)

standalone.RunKubelet(etcdClient, machineList[0], *dockerEndpoint)

dockerClient := kubelet.ConnectToDockerOrDie(*dockerEndpoint)
standalone.SimpleRunKubelet(etcdClient, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250)
}

func newApiClient(addr string, port int) *client.Client {
Expand Down