Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup to client initialization in Kubelet #3270

Merged
merged 3 commits into from
Jan 7, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Session.vim

# Go test binaries
*.test
/hack/.test-cmd-auth

# Mercurial files
**/.hg
Expand Down
4 changes: 2 additions & 2 deletions cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,11 @@ func startComponents(manifestURL string) (apiServerURL string) {
minionController.Run(10 * time.Second)

// Kubelet (localhost)
standalone.SimpleRunKubelet(etcdClient, &fakeDocker1, machineList[0], testRootDir, manifestURL, "127.0.0.1", 10250)
standalone.SimpleRunKubelet(cl, etcdClient, &fakeDocker1, machineList[0], testRootDir, manifestURL, "127.0.0.1", 10250)
// Kubelet (machine)
// Create a second kubelet so that the guestbook example's two redis slaves both
// have a place they can schedule.
standalone.SimpleRunKubelet(etcdClient, &fakeDocker2, machineList[1], testRootDir2, "", "127.0.0.1", 10251)
standalone.SimpleRunKubelet(cl, etcdClient, &fakeDocker2, machineList[1], testRootDir2, "", "127.0.0.1", 10251)

return apiServer.URL
}
Expand Down
8 changes: 6 additions & 2 deletions cmd/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,13 @@ func main() {
glog.Info(err)
}

client, err := standalone.GetAPIServerClient(*authPath, apiServerList)
if err != nil && len(apiServerList) > 0 {
glog.Warningf("No API client: %v", err)
}

kcfg := standalone.KubeletConfig{
Address: address,
AuthPath: *authPath,
ApiServerList: apiServerList,
AllowPrivileged: *allowPrivileged,
HostnameOverride: *hostnameOverride,
RootDirectory: *rootDirectory,
Expand All @@ -125,6 +128,7 @@ func main() {
EnableServer: *enableServer,
EnableDebuggingHandlers: *enableDebuggingHandlers,
DockerClient: util.ConnectToDockerOrDie(*dockerEndpoint),
KubeClient: client,
EtcdClient: kubelet.EtcdClientOrDie(etcdServerList, *etcdConfigFile),
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func startComponents(etcdClient tools.EtcdClient, cl *client.Client, addr string
standalone.RunControllerManager(machineList, cl, *nodeMilliCPU, *nodeMemory)

dockerClient := util.ConnectToDockerOrDie(*dockerEndpoint)
standalone.SimpleRunKubelet(etcdClient, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250)
standalone.SimpleRunKubelet(cl, etcdClient, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250)
}

func newApiClient(addr string, port int) *client.Client {
Expand Down
2 changes: 2 additions & 0 deletions hack/local-up-cluster.sh
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ sudo "${GO_OUT}/kubelet" \
--etcd_servers="http://127.0.0.1:4001" \
--hostname_override="127.0.0.1" \
--address="127.0.0.1" \
--api_servers="${API_HOST}:${API_PORT}" \
--auth_path="${KUBE_ROOT}/hack/.test-cmd-auth" \
--port="$KUBELET_PORT" >"${KUBELET_LOG}" 2>&1 &
KUBELET_PID=$!

Expand Down
2 changes: 2 additions & 0 deletions hack/test-cmd.sh
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ kube::log::status "Starting kubelet"
--etcd_servers="http://${ETCD_HOST}:${ETCD_PORT}" \
--hostname_override="127.0.0.1" \
--address="127.0.0.1" \
--api_servers="${API_HOST}:${API_PORT}" \
--auth_path="${KUBE_ROOT}/hack/.test-cmd-auth" \
--port="$KUBELET_PORT" 1>&2 &
KUBELET_PID=$!

Expand Down
28 changes: 18 additions & 10 deletions pkg/apiserver/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,21 @@ func (h *WatchHandler) setSelfLinkAddName(obj runtime.Object, req *http.Request)
return h.selfLinker.SetSelfLink(obj, newURL.String())
}

func getWatchParams(query url.Values) (label, field labels.Selector, resourceVersion string) {
if s, err := labels.ParseSelector(query.Get("labels")); err != nil {
label = labels.Everything()
} else {
label = s
func getWatchParams(query url.Values) (label, field labels.Selector, resourceVersion string, err error) {
s, perr := labels.ParseSelector(query.Get("labels"))
if perr != nil {
err = perr
return
}
if s, err := labels.ParseSelector(query.Get("fields")); err != nil {
field = labels.Everything()
} else {
field = s
label = s

s, perr = labels.ParseSelector(query.Get("fields"))
if perr != nil {
err = perr
return
}
field = s

resourceVersion = query.Get("resourceVersion")
return
}
Expand Down Expand Up @@ -95,7 +99,11 @@ func (h *WatchHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return
}
if watcher, ok := storage.(ResourceWatcher); ok {
label, field, resourceVersion := getWatchParams(req.URL.Query())
label, field, resourceVersion, err := getWatchParams(req.URL.Query())
if err != nil {
errorJSON(err, h.codec, w)
return
}
watching, err := watcher.Watch(ctx, label, field, resourceVersion)
if err != nil {
errorJSON(err, h.codec, w)
Expand Down
14 changes: 14 additions & 0 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -1132,6 +1132,8 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {

// GetKubeletContainerLogs returns logs from the container
// The second parameter of GetPodInfo and FindPodContainer methods represents pod UUID, which is allowed to be blank
// TODO: this method is returning logs of random container attempts, when it should be returning the most recent attempt
// or all of them.
func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName, tail string, follow bool, stdout, stderr io.Writer) error {
_, err := kl.GetPodInfo(podFullName, "")
if err == dockertools.ErrNoContainersInPod {
Expand All @@ -1153,6 +1155,18 @@ func (kl *Kubelet) GetBoundPods() ([]api.BoundPod, error) {
return kl.pods, nil
}

// GetPodFullName provides the first pod that matches namespace and name, or false
// if no such pod can be found.
func (kl *Kubelet) GetPodByName(namespace, name string) (*api.BoundPod, bool) {
for i := range kl.pods {
pod := &kl.pods[i]
if pod.Namespace == namespace && pod.Name == name {
return pod, true
}
}
return nil, false
}

// GetPodInfo returns information from Docker about the containers in a pod
func (kl *Kubelet) GetPodInfo(podFullName, uuid string) (api.PodInfo, error) {
var manifest api.PodSpec
Expand Down
75 changes: 30 additions & 45 deletions pkg/kubelet/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/golang/glog"
"github.com/google/cadvisor/info"
Expand Down Expand Up @@ -66,6 +65,7 @@ type HostInterface interface {
GetRootInfo(req *info.ContainerInfoRequest) (*info.ContainerInfo, error)
GetMachineInfo() (*info.MachineInfo, error)
GetBoundPods() ([]api.BoundPod, error)
GetPodByName(namespace, name string) (*api.BoundPod, bool)
GetPodInfo(name, uuid string) (api.PodInfo, error)
RunInContainer(name, uuid, container string, cmd []string) ([]byte, error)
GetKubeletContainerLogs(podFullName, containerName, tail string, follow bool, stdout, stderr io.Writer) error
Expand Down Expand Up @@ -146,13 +146,11 @@ func (s *Server) handleContainerLogs(w http.ResponseWriter, req *http.Request) {
follow, _ := strconv.ParseBool(uriValues.Get("follow"))
tail := uriValues.Get("tail")

podFullName := GetPodFullName(&api.BoundPod{
ObjectMeta: api.ObjectMeta{
Name: podID,
Namespace: podNamespace,
Annotations: map[string]string{ConfigSourceAnnotationKey: "etcd"},
},
})
pod, ok := s.host.GetPodByName(podNamespace, podID)
if !ok {
http.Error(w, "Pod does not exist", http.StatusNotFound)
return
}

fw := FlushWriter{writer: w}
if flusher, ok := fw.writer.(http.Flusher); ok {
Expand All @@ -162,7 +160,7 @@ func (s *Server) handleContainerLogs(w http.ResponseWriter, req *http.Request) {
}
w.Header().Set("Transfer-Encoding", "chunked")
w.WriteHeader(http.StatusOK)
err = s.host.GetKubeletContainerLogs(podFullName, containerName, tail, follow, &fw, &fw)
err = s.host.GetKubeletContainerLogs(GetPodFullName(pod), containerName, tail, follow, &fw, &fw)
if err != nil {
s.error(w, err)
return
Expand Down Expand Up @@ -217,19 +215,12 @@ func (s *Server) handlePodInfo(w http.ResponseWriter, req *http.Request, version
http.Error(w, "Missing 'podNamespace=' query entry.", http.StatusBadRequest)
return
}
// TODO: backwards compatibility with existing API, needs API change
podFullName := GetPodFullName(&api.BoundPod{
ObjectMeta: api.ObjectMeta{
Name: podID,
Namespace: podNamespace,
Annotations: map[string]string{ConfigSourceAnnotationKey: "etcd"},
},
})
info, err := s.host.GetPodInfo(podFullName, podUUID)
if err == dockertools.ErrNoContainersInPod {
http.Error(w, "api.BoundPod does not exist", http.StatusNotFound)
pod, ok := s.host.GetPodByName(podNamespace, podID)
if !ok {
http.Error(w, "Pod does not exist", http.StatusNotFound)
return
}
info, err := s.host.GetPodInfo(GetPodFullName(pod), podUUID)
if err != nil {
s.error(w, err)
return
Expand Down Expand Up @@ -293,15 +284,13 @@ func (s *Server) handleRun(w http.ResponseWriter, req *http.Request) {
http.Error(w, "Unexpected path for command running", http.StatusBadRequest)
return
}
podFullName := GetPodFullName(&api.BoundPod{
ObjectMeta: api.ObjectMeta{
Name: podID,
Namespace: podNamespace,
Annotations: map[string]string{ConfigSourceAnnotationKey: "etcd"},
},
})
pod, ok := s.host.GetPodByName(podNamespace, podID)
if !ok {
http.Error(w, "Pod does not exist", http.StatusNotFound)
return
}
command := strings.Split(u.Query().Get("cmd"), " ")
data, err := s.host.RunInContainer(podFullName, uuid, container, command)
data, err := s.host.RunInContainer(GetPodFullName(pod), uuid, container, command)
if err != nil {
s.error(w, err)
return
Expand Down Expand Up @@ -344,24 +333,20 @@ func (s *Server) serveStats(w http.ResponseWriter, req *http.Request) {
// TODO(monnand) Implement this
errors.New("pod level status currently unimplemented")
case 3:
// Backward compatibility without uuid information
podFullName := GetPodFullName(&api.BoundPod{
ObjectMeta: api.ObjectMeta{
Name: components[1],
Namespace: api.NamespaceDefault,
Annotations: map[string]string{ConfigSourceAnnotationKey: "etcd"},
},
})
stats, err = s.host.GetContainerInfo(podFullName, "", components[2], &query)
// Backward compatibility without uuid information, does not support namespace
pod, ok := s.host.GetPodByName(api.NamespaceDefault, components[1])
if !ok {
http.Error(w, "Pod does not exist", http.StatusNotFound)
return
}
stats, err = s.host.GetContainerInfo(GetPodFullName(pod), "", components[2], &query)
case 5:
podFullName := GetPodFullName(&api.BoundPod{
ObjectMeta: api.ObjectMeta{
Name: components[2],
Namespace: components[1],
Annotations: map[string]string{ConfigSourceAnnotationKey: "etcd"},
},
})
stats, err = s.host.GetContainerInfo(podFullName, components[3], components[4], &query)
pod, ok := s.host.GetPodByName(components[1], components[2])
if !ok {
http.Error(w, "Pod does not exist", http.StatusNotFound)
return
}
stats, err = s.host.GetContainerInfo(GetPodFullName(pod), components[3], components[4], &query)
default:
http.Error(w, "unknown resource.", http.StatusNotFound)
return
Expand Down
19 changes: 18 additions & 1 deletion pkg/kubelet/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
)

type fakeKubelet struct {
podByNameFunc func(namespace, name string) (*api.BoundPod, bool)
infoFunc func(name string) (api.PodInfo, error)
containerInfoFunc func(podFullName, uid, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error)
rootInfoFunc func(query *info.ContainerInfoRequest) (*info.ContainerInfo, error)
Expand All @@ -43,6 +44,10 @@ type fakeKubelet struct {
containerLogsFunc func(podFullName, containerName, tail string, follow bool, stdout, stderr io.Writer) error
}

func (fk *fakeKubelet) GetPodByName(namespace, name string) (*api.BoundPod, bool) {
return fk.podByNameFunc(namespace, name)
}

func (fk *fakeKubelet) GetPodInfo(name, uuid string) (api.PodInfo, error) {
return fk.infoFunc(name)
}
Expand Down Expand Up @@ -88,7 +93,19 @@ func newServerTest() *serverTestFramework {
updateChan: make(chan interface{}),
}
fw.updateReader = startReading(fw.updateChan)
fw.fakeKubelet = &fakeKubelet{}
fw.fakeKubelet = &fakeKubelet{
podByNameFunc: func(namespace, name string) (*api.BoundPod, bool) {
return &api.BoundPod{
ObjectMeta: api.ObjectMeta{
Namespace: namespace,
Name: name,
Annotations: map[string]string{
ConfigSourceAnnotationKey: "etcd",
},
},
}, true
},
}
server := NewServer(fw.fakeKubelet, true)
fw.serverUnderTest = &server
fw.testHTTPServer = httptest.NewServer(fw.serverUnderTest)
Expand Down
53 changes: 8 additions & 45 deletions pkg/kubelet/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package kubelet

import (
"fmt"
"net/http"
"os"
"path"
Expand All @@ -27,7 +26,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/clientauth"
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd"
Expand Down Expand Up @@ -97,47 +95,12 @@ func SetupLogging() {
record.StartLogging(glog.Infof)
}

// TODO: move this into pkg/client
func getApiserverClient(authPath string, apiServerList util.StringList) (*client.Client, error) {
authInfo, err := clientauth.LoadFromFile(authPath)
if err != nil {
return nil, err
}
clientConfig, err := authInfo.MergeWithConfig(client.Config{})
if err != nil {
return nil, err
}
if len(apiServerList) < 1 {
return nil, fmt.Errorf("no apiservers specified.")
}
// TODO: adapt Kube client to support LB over several servers
if len(apiServerList) > 1 {
glog.Infof("Mulitple api servers specified. Picking first one")
}
clientConfig.Host = apiServerList[0]
if c, err := client.New(&clientConfig); err != nil {
return nil, err
} else {
return c, nil
}
}

func SetupEventSending(authPath string, apiServerList util.StringList) {
// Make an API client if possible.
if len(apiServerList) < 1 {
glog.Info("No api servers specified.")
} else {
if apiClient, err := getApiserverClient(authPath, apiServerList); err != nil {
glog.Errorf("Unable to make apiserver client: %v", err)
} else {
// Send events to APIserver if there is a client.
hostname := util.GetHostname("")
glog.Infof("Sending events to APIserver.")
record.StartRecording(apiClient.Events(""),
api.EventSource{
Component: "kubelet",
Host: hostname,
})
}
}
func SetupEventSending(client *client.Client) {
glog.Infof("Sending events to api server.")
hostname := util.GetHostname("")
record.StartRecording(client.Events(""),
api.EventSource{
Component: "kubelet",
Host: hostname,
})
}