forked from kubernetes/kubernetes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.go
430 lines (391 loc) · 17.5 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package server makes it easy to create a kubelet server for various contexts.
package server
import (
"fmt"
"math/rand"
"net"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/clientauth"
"github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
"github.com/GoogleCloudPlatform/kubernetes/pkg/hyperkube"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/config"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
"github.com/spf13/pflag"
)
const defaultRootDir = "/var/lib/kubelet"
// KubeletServer encapsulates all of the parameters necessary for starting up
// a kubelet. These can either be set via command line or directly.
type KubeletServer struct {
Config string
SyncFrequency time.Duration
FileCheckFrequency time.Duration
HTTPCheckFrequency time.Duration
ManifestURL string
EnableServer bool
Address util.IP
Port uint
HostnameOverride string
PodInfraContainerImage string
DockerEndpoint string
EtcdServerList util.StringList
EtcdConfigFile string
RootDirectory string
AllowPrivileged bool
RegistryPullQPS float64
RegistryBurst int
RunOnce bool
EnableDebuggingHandlers bool
MinimumGCAge time.Duration
MaxContainerCount int
AuthPath string
CAdvisorPort uint
OOMScoreAdj int
APIServerList util.StringList
ClusterDomain string
MasterServiceNamespace string
ClusterDNS util.IP
ReallyCrashForTesting bool
}
// NewKubeletServer will create a new KubeletServer with default values.
func NewKubeletServer() *KubeletServer {
return &KubeletServer{
SyncFrequency: 10 * time.Second,
FileCheckFrequency: 20 * time.Second,
HTTPCheckFrequency: 20 * time.Second,
EnableServer: true,
Address: util.IP(net.ParseIP("127.0.0.1")),
Port: ports.KubeletPort,
PodInfraContainerImage: kubelet.PodInfraContainerImage,
RootDirectory: defaultRootDir,
RegistryBurst: 10,
EnableDebuggingHandlers: true,
MinimumGCAge: 1 * time.Minute,
MaxContainerCount: 5,
CAdvisorPort: 4194,
OOMScoreAdj: -900,
MasterServiceNamespace: api.NamespaceDefault,
}
}
// NewHyperkubeServer creates a new hyperkube Server object that includes the
// description and flags.
func NewHyperkubeServer() *hyperkube.Server {
s := NewKubeletServer()
hks := hyperkube.Server{
SimpleUsage: "kubelet",
Long: `The kubelet binary is responsible for maintaining a set of containers on a
particular node. It syncs data from a variety of sources including a
Kubernetes API server, an etcd cluster, HTTP endpoint or local file. It then
queries Docker to see what is currently running. It synchronizes the
configuration data, with the running set of containers by starting or stopping
Docker containers.`,
Run: func(_ *hyperkube.Server, args []string) error {
return s.Run(args)
},
}
s.AddFlags(hks.Flags())
return &hks
}
// AddFlags adds flags for a specific KubeletServer to the specified FlagSet
func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.Config, "config", s.Config, "Path to the config file or directory of files")
fs.DurationVar(&s.SyncFrequency, "sync_frequency", s.SyncFrequency, "Max period between synchronizing running containers and config")
fs.DurationVar(&s.FileCheckFrequency, "file_check_frequency", s.FileCheckFrequency, "Duration between checking config files for new data")
fs.DurationVar(&s.HTTPCheckFrequency, "http_check_frequency", s.HTTPCheckFrequency, "Duration between checking http for new data")
fs.StringVar(&s.ManifestURL, "manifest_url", s.ManifestURL, "URL for accessing the container manifest")
fs.BoolVar(&s.EnableServer, "enable_server", s.EnableServer, "Enable the info server")
fs.Var(&s.Address, "address", "The IP address for the info server to serve on (set to 0.0.0.0 for all interfaces)")
fs.UintVar(&s.Port, "port", s.Port, "The port for the info server to serve on")
fs.StringVar(&s.HostnameOverride, "hostname_override", s.HostnameOverride, "If non-empty, will use this string as identification instead of the actual hostname.")
fs.StringVar(&s.PodInfraContainerImage, "pod_infra_container_image", s.PodInfraContainerImage, "The image whose network/ipc namespaces containers in each pod will use.")
fs.StringVar(&s.DockerEndpoint, "docker_endpoint", s.DockerEndpoint, "If non-empty, use this for the docker endpoint to communicate with")
fs.Var(&s.EtcdServerList, "etcd_servers", "List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd_config")
fs.StringVar(&s.EtcdConfigFile, "etcd_config", s.EtcdConfigFile, "The config file for the etcd client. Mutually exclusive with -etcd_servers")
fs.StringVar(&s.RootDirectory, "root_dir", s.RootDirectory, "Directory path for managing kubelet files (volume mounts,etc).")
fs.BoolVar(&s.AllowPrivileged, "allow_privileged", s.AllowPrivileged, "If true, allow containers to request privileged mode. [default=false]")
fs.Float64Var(&s.RegistryPullQPS, "registry_qps", s.RegistryPullQPS, "If > 0, limit registry pull QPS to this value. If 0, unlimited. [default=0.0]")
fs.IntVar(&s.RegistryBurst, "registry_burst", s.RegistryBurst, "Maximum size of a bursty pulls, temporarily allows pulls to burst to this number, while still not exceeding registry_qps. Only used if --registry_qps > 0")
fs.BoolVar(&s.RunOnce, "runonce", s.RunOnce, "If true, exit after spawning pods from local manifests or remote urls. Exclusive with --etcd_servers, --api_servers, and --enable-server")
fs.BoolVar(&s.EnableDebuggingHandlers, "enable_debugging_handlers", s.EnableDebuggingHandlers, "Enables server endpoints for log collection and local running of containers and commands")
fs.DurationVar(&s.MinimumGCAge, "minimum_container_ttl_duration", s.MinimumGCAge, "Minimum age for a finished container before it is garbage collected. Examples: '300ms', '10s' or '2h45m'")
fs.IntVar(&s.MaxContainerCount, "maximum_dead_containers_per_container", s.MaxContainerCount, "Maximum number of old instances of a container to retain per container. Each container takes up some disk space. Default: 5.")
fs.StringVar(&s.AuthPath, "auth_path", s.AuthPath, "Path to .kubernetes_auth file, specifying how to authenticate to API server.")
fs.UintVar(&s.CAdvisorPort, "cadvisor_port", s.CAdvisorPort, "The port of the localhost cAdvisor endpoint")
fs.IntVar(&s.OOMScoreAdj, "oom_score_adj", s.OOMScoreAdj, "The oom_score_adj value for kubelet process. Values must be within the range [-1000, 1000]")
fs.Var(&s.APIServerList, "api_servers", "List of Kubernetes API servers for publishing events, and reading pods and services. (ip:port), comma separated.")
fs.StringVar(&s.ClusterDomain, "cluster_domain", s.ClusterDomain, "Domain for this cluster. If set, kubelet will configure all containers to search this domain in addition to the host's search domains")
fs.StringVar(&s.MasterServiceNamespace, "master_service_namespace", s.MasterServiceNamespace, "The namespace from which the kubernetes master services should be injected into pods")
fs.Var(&s.ClusterDNS, "cluster_dns", "IP address for a cluster DNS server. If set, kubelet will configure all containers to use this for DNS resolution in addition to the host's DNS servers")
fs.BoolVar(&s.ReallyCrashForTesting, "really_crash_for_testing", s.ReallyCrashForTesting, "If true, crash with panics more often.")
}
// Run runs the specified KubeletServer. This should never exit.
func (s *KubeletServer) Run(_ []string) error {
util.ReallyCrash = s.ReallyCrashForTesting
rand.Seed(time.Now().UTC().UnixNano())
// Cluster creation scripts support both kubernetes versions that 1)
// support kublet watching apiserver for pods, and 2) ones that don't. So
// they can set both --etcd_servers and --api_servers. The current code
// will ignore the --etcd_servers flag, while older kubelet code will use
// the --etcd_servers flag for pods, and use --api_servers for event
// publising.
//
// TODO(erictune): convert all cloud provider scripts and Google Container Engine to
// use only --api_servers, then delete --etcd_servers flag and the resulting dead code.
if len(s.EtcdServerList) > 0 && len(s.APIServerList) > 0 {
glog.Infof("Both --etcd_servers and --api_servers are set. Not using etcd source.")
s.EtcdServerList = util.StringList{}
}
if err := util.ApplyOomScoreAdj(s.OOMScoreAdj); err != nil {
glog.Info(err)
}
client, err := s.createAPIServerClient()
if err != nil && len(s.APIServerList) > 0 {
glog.Warningf("No API client: %v", err)
}
credentialprovider.SetPreferredDockercfgPath(s.RootDirectory)
kcfg := KubeletConfig{
Address: s.Address,
AllowPrivileged: s.AllowPrivileged,
HostnameOverride: s.HostnameOverride,
RootDirectory: s.RootDirectory,
ConfigFile: s.Config,
ManifestURL: s.ManifestURL,
FileCheckFrequency: s.FileCheckFrequency,
HTTPCheckFrequency: s.HTTPCheckFrequency,
PodInfraContainerImage: s.PodInfraContainerImage,
SyncFrequency: s.SyncFrequency,
RegistryPullQPS: s.RegistryPullQPS,
RegistryBurst: s.RegistryBurst,
MinimumGCAge: s.MinimumGCAge,
MaxContainerCount: s.MaxContainerCount,
ClusterDomain: s.ClusterDomain,
ClusterDNS: s.ClusterDNS,
Runonce: s.RunOnce,
Port: s.Port,
CAdvisorPort: s.CAdvisorPort,
EnableServer: s.EnableServer,
EnableDebuggingHandlers: s.EnableDebuggingHandlers,
DockerClient: util.ConnectToDockerOrDie(s.DockerEndpoint),
KubeClient: client,
EtcdClient: kubelet.EtcdClientOrDie(s.EtcdServerList, s.EtcdConfigFile),
MasterServiceNamespace: s.MasterServiceNamespace,
VolumePlugins: ProbeVolumePlugins(),
}
RunKubelet(&kcfg)
// runs forever
select {}
}
func (s *KubeletServer) setupRunOnce() {
if s.RunOnce {
// Don't use remote (etcd or apiserver) sources
if len(s.EtcdServerList) > 0 {
glog.Fatalf("invalid option: --runonce and --etcd_servers are mutually exclusive")
}
if len(s.APIServerList) > 0 {
glog.Fatalf("invalid option: --runonce and --api_servers are mutually exclusive")
}
if s.EnableServer {
glog.Infof("--runonce is set, disabling server")
s.EnableServer = false
}
}
}
// TODO: replace this with clientcmd
func (s *KubeletServer) createAPIServerClient() (*client.Client, error) {
authInfo, err := clientauth.LoadFromFile(s.AuthPath)
if err != nil {
glog.Warningf("Could not load kubernetes auth path: %v. Continuing with defaults.", err)
}
if authInfo == nil {
// authInfo didn't load correctly - continue with defaults.
authInfo = &clientauth.Info{}
}
clientConfig, err := authInfo.MergeWithConfig(client.Config{})
if err != nil {
return nil, err
}
if len(s.APIServerList) < 1 {
return nil, fmt.Errorf("no api servers specified")
}
// TODO: adapt Kube client to support LB over several servers
if len(s.APIServerList) > 1 {
glog.Infof("Multiple api servers specified. Picking first one")
}
clientConfig.Host = s.APIServerList[0]
c, err := client.New(&clientConfig)
if err != nil {
return nil, err
}
return c, nil
}
// SimpleRunKubelet is a simple way to start a Kubelet talking to dockerEndpoint, using an etcdClient.
// Under the hood it calls RunKubelet (below)
func SimpleRunKubelet(client *client.Client,
etcdClient tools.EtcdClient,
dockerClient dockertools.DockerInterface,
hostname, rootDir, manifestURL, address string,
port uint,
masterServiceNamespace string,
volumePlugins []volume.Plugin) {
kcfg := KubeletConfig{
KubeClient: client,
EtcdClient: etcdClient,
DockerClient: dockerClient,
HostnameOverride: hostname,
RootDirectory: rootDir,
ManifestURL: manifestURL,
PodInfraContainerImage: kubelet.PodInfraContainerImage,
Port: port,
Address: util.IP(net.ParseIP(address)),
EnableServer: true,
EnableDebuggingHandlers: true,
SyncFrequency: 3 * time.Second,
MinimumGCAge: 10 * time.Second,
MaxContainerCount: 5,
MasterServiceNamespace: masterServiceNamespace,
VolumePlugins: volumePlugins,
}
RunKubelet(&kcfg)
}
// RunKubelet is responsible for setting up and running a kubelet. It is used in three different applications:
// 1 Integration tests
// 2 Kubelet binary
// 3 Standalone 'kubernetes' binary
// Eventually, #2 will be replaced with instances of #3
func RunKubelet(kcfg *KubeletConfig) {
kcfg.Hostname = util.GetHostname(kcfg.HostnameOverride)
if kcfg.KubeClient != nil {
kubelet.SetupEventSending(kcfg.KubeClient, kcfg.Hostname)
} else {
glog.Infof("No api server defined - no events will be sent.")
}
kubelet.SetupLogging()
kubelet.SetupCapabilities(kcfg.AllowPrivileged)
credentialprovider.SetPreferredDockercfgPath(kcfg.RootDirectory)
podCfg := makePodSourceConfig(kcfg)
k, err := createAndInitKubelet(kcfg, podCfg)
if err != nil {
glog.Errorf("Failed to create kubelet: %s", err)
return
}
// process pods and exit.
if kcfg.Runonce {
if _, err := k.RunOnce(podCfg.Updates()); err != nil {
glog.Errorf("--runonce failed: %v", err)
}
} else {
startKubelet(k, podCfg, kcfg)
}
}
func startKubelet(k *kubelet.Kubelet, podCfg *config.PodConfig, kc *KubeletConfig) {
// start the kubelet
go util.Forever(func() { k.Run(podCfg.Updates()) }, 0)
// start the kubelet server
if kc.EnableServer {
go util.Forever(func() {
kubelet.ListenAndServeKubeletServer(k, net.IP(kc.Address), kc.Port, kc.EnableDebuggingHandlers)
}, 0)
}
}
func makePodSourceConfig(kc *KubeletConfig) *config.PodConfig {
// source of all configuration
cfg := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates)
// define file config source
if kc.ConfigFile != "" {
glog.Infof("Adding manifest file: %v", kc.ConfigFile)
config.NewSourceFile(kc.ConfigFile, kc.FileCheckFrequency, cfg.Channel(kubelet.FileSource))
}
// define url config source
if kc.ManifestURL != "" {
glog.Infof("Adding manifest url: %v", kc.ManifestURL)
config.NewSourceURL(kc.ManifestURL, kc.HTTPCheckFrequency, cfg.Channel(kubelet.HTTPSource))
}
if kc.EtcdClient != nil {
glog.Infof("Watching for etcd configs at %v", kc.EtcdClient.GetCluster())
config.NewSourceEtcd(config.EtcdKeyForHost(kc.Hostname), kc.EtcdClient, cfg.Channel(kubelet.EtcdSource))
}
if kc.KubeClient != nil {
glog.Infof("Watching apiserver")
config.NewSourceApiserver(kc.KubeClient, kc.Hostname, cfg.Channel(kubelet.ApiserverSource))
}
return cfg
}
// KubeletConfig is all of the parameters necessary for running a kubelet.
// TODO: This should probably be merged with KubeletServer. The extra object is a consequence of refactoring.
type KubeletConfig struct {
EtcdClient tools.EtcdClient
KubeClient *client.Client
DockerClient dockertools.DockerInterface
CAdvisorPort uint
Address util.IP
AllowPrivileged bool
HostnameOverride string
RootDirectory string
ConfigFile string
ManifestURL string
FileCheckFrequency time.Duration
HTTPCheckFrequency time.Duration
Hostname string
PodInfraContainerImage string
SyncFrequency time.Duration
RegistryPullQPS float64
RegistryBurst int
MinimumGCAge time.Duration
MaxContainerCount int
ClusterDomain string
ClusterDNS util.IP
EnableServer bool
EnableDebuggingHandlers bool
Port uint
Runonce bool
MasterServiceNamespace string
VolumePlugins []volume.Plugin
}
func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kubelet, error) {
// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
// up into "per source" synchronizations
k, err := kubelet.NewMainKubelet(
kc.Hostname,
kc.DockerClient,
kc.EtcdClient,
kc.KubeClient,
kc.RootDirectory,
kc.PodInfraContainerImage,
kc.SyncFrequency,
float32(kc.RegistryPullQPS),
kc.RegistryBurst,
kc.MinimumGCAge,
kc.MaxContainerCount,
pc.IsSourceSeen,
kc.ClusterDomain,
net.IP(kc.ClusterDNS),
kc.MasterServiceNamespace,
kc.VolumePlugins)
if err != nil {
return nil, err
}
k.BirthCry()
go k.GarbageCollectLoop()
go kubelet.MonitorCAdvisor(k, kc.CAdvisorPort)
return k, nil
}