Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move events to a separate etcd instance #14823

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion cluster/saltbase/salt/etcd/etcd.manifest
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
"command": [
"/bin/sh",
"-c",
"/usr/local/bin/etcd --listen-peer-urls=http://127.0.0.1:{{ server_port }} --addr 127.0.0.1:{{ port }} --bind-addr 127.0.0.1:{{ port }} --data-dir /var/etcd/data{{ suffix }} 1>>/var/log/etcd{{ suffix }}.log 2>&1"
"/usr/local/bin/etcd --listen-peer-urls http://127.0.0.1:{{ server_port }} --addr 127.0.0.1:{{ port }} --bind-addr 127.0.0.1:{{ port }} --data-dir /var/etcd/data{{ suffix }} 1>>/var/log/etcd{{ suffix }}.log 2>&1"
],
"livenessProbe": {
"httpGet": {
Expand Down
3 changes: 2 additions & 1 deletion cluster/saltbase/salt/kube-apiserver/kube-apiserver.manifest
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
{% endif -%}

{% set etcd_servers = "--etcd-servers=http://127.0.0.1:4001" -%}
{% set etcd_servers_overrides = "--etcd-servers-overrides=/events#http://127.0.0.1:4002" -%}

{% set service_cluster_ip_range = "" -%}
{% if pillar['service_cluster_ip_range'] is defined -%}
Expand Down Expand Up @@ -88,7 +89,7 @@
{% set runtime_config = "--runtime-config=" + grains.runtime_config -%}
{% endif -%}

{% set params = address + " " + etcd_servers + " " + cloud_provider + " " + cloud_config + " " + runtime_config + " " + admission_control + " " + service_cluster_ip_range + " " + client_ca_file + " " + basic_auth_file + " " + min_request_timeout -%}
{% set params = address + " " + etcd_servers + " " + etcd_servers_overrides + " " + cloud_provider + " " + cloud_config + " " + runtime_config + " " + admission_control + " " + service_cluster_ip_range + " " + client_ca_file + " " + basic_auth_file + " " + min_request_timeout -%}
{% set params = params + " " + cluster_name + " " + cert_file + " " + key_file + " --secure-port=" + secure_port + " " + token_auth_file + " " + bind_address + " " + pillar['log_level'] + " " + advertise_address + " " + proxy_ssh_options -%}

# test_args has to be kept at the end, so they'll overwrite any prior configuration
Expand Down
7 changes: 4 additions & 3 deletions cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,12 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string
}
expEtcdStorage, err := master.NewEtcdStorage(etcdClient, latest.GroupOrDie("experimental").InterfacesFor, testapi.Experimental.GroupAndVersion(), etcdtest.PathPrefix())
storageVersions["experimental"] = testapi.Experimental.GroupAndVersion()

if err != nil {
glog.Fatalf("Unable to get etcd storage for experimental: %v", err)
}
storageDestinations := master.NewStorageDestinations()
storageDestinations.AddAPIGroup("", etcdStorage)
storageDestinations.AddAPIGroup("experimental", expEtcdStorage)

// Master
host, port, err := net.SplitHostPort(strings.TrimLeft(apiServer.URL, "http://"))
Expand All @@ -166,8 +168,7 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string

// Create a master and install handlers into mux.
m := master.New(&master.Config{
DatabaseStorage: etcdStorage,
ExpDatabaseStorage: expEtcdStorage,
StorageDestinations: storageDestinations,
KubeletClient: fakeKubeletClient{},
EnableCoreControllers: true,
EnableLogsSupport: false,
Expand Down
58 changes: 52 additions & 6 deletions cmd/kube-apiserver/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type APIServer struct {
AdmissionControlConfigFile string
EtcdServerList []string
EtcdConfigFile string
EtcdServersOverrides []string
EtcdPathPrefix string
CorsAllowedOriginList []string
AllowPrivileged bool
Expand Down Expand Up @@ -211,6 +212,7 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.AdmissionControlConfigFile, "admission-control-config-file", s.AdmissionControlConfigFile, "File with admission control configuration.")
fs.StringSliceVar(&s.EtcdServerList, "etcd-servers", s.EtcdServerList, "List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd-config")
fs.StringVar(&s.EtcdConfigFile, "etcd-config", s.EtcdConfigFile, "The config file for the etcd client. Mutually exclusive with -etcd-servers.")
fs.StringSliceVar(&s.EtcdServersOverrides, "etcd-servers-overrides", s.EtcdServersOverrides, "Per-resource etcd servers overrides, comma separated. The individual override format: group/resource#servers, where servers are http://ip:port, semicolon separated.")
fs.StringVar(&s.EtcdPathPrefix, "etcd-prefix", s.EtcdPathPrefix, "The prefix for all resource paths in etcd.")
fs.StringSliceVar(&s.CorsAllowedOriginList, "cors-allowed-origins", s.CorsAllowedOriginList, "List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.")
fs.BoolVar(&s.AllowPrivileged, "allow-privileged", s.AllowPrivileged, "If true, allow privileged containers.")
Expand Down Expand Up @@ -253,6 +255,8 @@ func (s *APIServer) verifyClusterIPFlags() {
}
}

type newEtcdFunc func(string, []string, meta.VersionInterfacesFunc, string, string) (storage.Interface, error)

func newEtcd(etcdConfigFile string, etcdServerList []string, interfacesFunc meta.VersionInterfacesFunc, storageVersion, pathPrefix string) (etcdStorage storage.Interface, err error) {
if storageVersion == "" {
return etcdStorage, fmt.Errorf("storageVersion is required to create a etcd storage")
Expand Down Expand Up @@ -294,6 +298,45 @@ func generateStorageVersionMap(legacyVersion string, storageVersions string) map
return storageVersionMap
}

// parse the value of --etcd-servers-overrides and update given storageDestinations.
func updateEtcdOverrides(overrides []string, storageVersions map[string]string, prefix string, storageDestinations *master.StorageDestinations, newEtcdFn newEtcdFunc) {
if len(overrides) == 0 {
return
}
for _, override := range overrides {
tokens := strings.Split(override, "#")
if len(tokens) != 2 {
glog.Errorf("invalid value of etcd server overrides: %s", override)
continue
}

apiresource := strings.Split(tokens[0], "/")
if len(apiresource) != 2 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this always going to be valid if a server definition contains http:// ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, misread.

glog.Errorf("invalid resource definition: %s", tokens[0])
}
group := apiresource[0]
resource := apiresource[1]

apigroup, err := latest.Group(group)
if err != nil {
glog.Errorf("invalid api group %s: %v", group, err)
continue
}
if _, found := storageVersions[apigroup.Group]; !found {
glog.Errorf("Couldn't find the storage version for group %s", apigroup.Group)
continue
}

servers := strings.Split(tokens[1], ";")
etcdOverrideStorage, err := newEtcdFn("", servers, apigroup.InterfacesFor, storageVersions[apigroup.Group], prefix)
if err != nil {
glog.Fatalf("Invalid storage version or misconfigured etcd for %s: %v", tokens[0], err)
}

storageDestinations.AddStorageOverride(group, resource, etcdOverrideStorage)
}
}

// Run runs the specified APIServer. This should never exit.
func (s *APIServer) Run(_ []string) error {
s.verifyClusterIPFlags()
Expand Down Expand Up @@ -369,6 +412,8 @@ func (s *APIServer) Run(_ []string) error {
return err
}

storageDestinations := master.NewStorageDestinations()

storageVersions := generateStorageVersionMap(s.DeprecatedStorageVersion, s.StorageVersions)
if _, found := storageVersions[legacyV1Group.Group]; !found {
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", legacyV1Group.Group, storageVersions)
Expand All @@ -377,8 +422,8 @@ func (s *APIServer) Run(_ []string) error {
if err != nil {
glog.Fatalf("Invalid storage version or misconfigured etcd: %v", err)
}
storageDestinations.AddAPIGroup("", etcdStorage)

var expEtcdStorage storage.Interface
if enableExp {
expGroup, err := latest.Group("experimental")
if err != nil {
Expand All @@ -387,12 +432,15 @@ func (s *APIServer) Run(_ []string) error {
if _, found := storageVersions[expGroup.Group]; !found {
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", expGroup.Group, storageVersions)
}
expEtcdStorage, err = newEtcd(s.EtcdConfigFile, s.EtcdServerList, expGroup.InterfacesFor, storageVersions[expGroup.Group], s.EtcdPathPrefix)
expEtcdStorage, err := newEtcd(s.EtcdConfigFile, s.EtcdServerList, expGroup.InterfacesFor, storageVersions[expGroup.Group], s.EtcdPathPrefix)
if err != nil {
glog.Fatalf("Invalid experimental storage version or misconfigured etcd: %v", err)
}
storageDestinations.AddAPIGroup("experimental", expEtcdStorage)
}

updateEtcdOverrides(s.EtcdServersOverrides, storageVersions, s.EtcdPathPrefix, &storageDestinations, newEtcd)

n := s.ServiceClusterIPRange

// Default to the private server key for service account token signing
Expand Down Expand Up @@ -460,10 +508,8 @@ func (s *APIServer) Run(_ []string) error {
}
}
config := &master.Config{
DatabaseStorage: etcdStorage,
ExpDatabaseStorage: expEtcdStorage,
StorageVersions: storageVersions,

StorageDestinations: storageDestinations,
StorageVersions: storageVersions,
EventTTL: s.EventTTL,
KubeletClient: kubeletClient,
ServiceClusterIPRange: &n,
Expand Down
56 changes: 56 additions & 0 deletions cmd/kube-apiserver/app/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ package app
import (
"reflect"
"regexp"
"strings"
"testing"

"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/master"
"k8s.io/kubernetes/pkg/storage"
)

func TestLongRunningRequestRegexp(t *testing.T) {
Expand Down Expand Up @@ -98,3 +103,54 @@ func TestGenerateStorageVersionMap(t *testing.T) {
}
}
}

func TestUpdateEtcdOverrides(t *testing.T) {
storageVersions := generateStorageVersionMap("", "v1,experimental/v1alpha1")

testCases := []struct {
apigroup string
resource string
servers []string
}{
{
apigroup: "",
resource: "resource",
servers: []string{"http://127.0.0.1:10000"},
},
{
apigroup: "",
resource: "resource",
servers: []string{"http://127.0.0.1:10000", "http://127.0.0.1:20000"},
},
{
apigroup: "experimental",
resource: "resource",
servers: []string{"http://127.0.0.1:10000"},
},
}

for _, test := range testCases {
newEtcd := func(_ string, serverList []string, _ meta.VersionInterfacesFunc, _, _ string) (storage.Interface, error) {
if !reflect.DeepEqual(test.servers, serverList) {
t.Errorf("unexpected server list, expected: %#v, got: %#v", test.servers, serverList)
}
return nil, nil
}
storageDestinations := master.NewStorageDestinations()
override := test.apigroup + "/" + test.resource + "#" + strings.Join(test.servers, ";")
updateEtcdOverrides([]string{override}, storageVersions, "", &storageDestinations, newEtcd)
apigroup, ok := storageDestinations.APIGroups[test.apigroup]
if !ok {
t.Errorf("apigroup: %s not created", test.apigroup)
continue
}
if apigroup.Overrides == nil {
t.Errorf("Overrides not created for: %s", test.apigroup)
continue
}
if _, ok := apigroup.Overrides[test.resource]; !ok {
t.Errorf("override not created for: %s", test.resource)
continue
}
}
}
1 change: 1 addition & 0 deletions hack/verify-flags/known-flags.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ etcd-config
etcd-prefix
etcd-server
etcd-servers
etcd-servers-overrides
event-burst
event-qps
event-ttl
Expand Down