Skip to content

Commit

Permalink
Merge 0cc028e into 9126ec4
Browse files Browse the repository at this point in the history
  • Loading branch information
hzxuzhonghu committed Sep 15, 2017
2 parents 9126ec4 + 0cc028e commit 2db9c80
Showing 1 changed file with 67 additions and 65 deletions.
132 changes: 67 additions & 65 deletions core/pkg/ingress/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,22 +461,22 @@ func (ic *GenericController) syncIngress(key interface{}) error {
}
}

pcfg := ingress.Configuration{
cfg := ingress.Configuration{
Backends: upstreams,
Servers: servers,
TCPEndpoints: ic.getStreamServices(ic.cfg.TCPConfigMapName, api.ProtocolTCP),
UDPEndpoints: ic.getStreamServices(ic.cfg.UDPConfigMapName, api.ProtocolUDP),
PassthroughBackends: passUpstreams,
}

if !ic.forceReload && ic.runningConfig != nil && ic.runningConfig.Equal(&pcfg) {
if !ic.forceReload && ic.runningConfig != nil && ic.runningConfig.Equal(&cfg) {
glog.V(3).Infof("skipping backend reload (no changes detected)")
return nil
}

glog.Infof("backend reload required")

err := ic.cfg.Backend.OnUpdate(pcfg)
err := ic.cfg.Backend.OnUpdate(cfg)
if err != nil {
incReloadErrorCount()
glog.Errorf("unexpected failure restarting the backend: \n%v", err)
Expand All @@ -487,7 +487,7 @@ func (ic *GenericController) syncIngress(key interface{}) error {
incReloadCount()
setSSLExpireTime(servers)

ic.runningConfig = &pcfg
ic.runningConfig = &cfg
ic.forceReload = false

return nil
Expand Down Expand Up @@ -913,25 +913,7 @@ func (ic *GenericController) createUpstreams(data []interface{}) map[string]*ing
upstreams[defBackend] = newUpstream(defBackend)
svcKey := fmt.Sprintf("%v/%v", ing.GetNamespace(), ing.Spec.Backend.ServiceName)

// Add the service cluster endpoint as the upstream instead of individual endpoints
// if the serviceUpstream annotation is enabled
if serviceUpstream {
endpoint, err := ic.getServiceClusterEndpoint(svcKey, ing.Spec.Backend)
if err != nil {
glog.Errorf("Failed to get service cluster endpoint for service %s: %v", svcKey, err)
} else {
upstreams[defBackend].Endpoints = []ingress.Endpoint{endpoint}
}
}

if len(upstreams[defBackend].Endpoints) == 0 {
endps, err := ic.serviceEndpoints(svcKey, ing.Spec.Backend.ServicePort.String(), hz)
upstreams[defBackend].Endpoints = append(upstreams[defBackend].Endpoints, endps...)
if err != nil {
glog.Warningf("error creating upstream %v: %v", defBackend, err)
}
}

upstreams = ic.createUpstreamEndpoint(defBackend, svcKey, serviceUpstream, ing.Spec.Backend, upstreams, hz)
}

for _, rule := range ing.Spec.Rules {
Expand Down Expand Up @@ -963,25 +945,7 @@ func (ic *GenericController) createUpstreams(data []interface{}) map[string]*ing

svcKey := fmt.Sprintf("%v/%v", ing.GetNamespace(), path.Backend.ServiceName)

// Add the service cluster endpoint as the upstream instead of individual endpoints
// if the serviceUpstream annotation is enabled
if serviceUpstream {
endpoint, err := ic.getServiceClusterEndpoint(svcKey, &path.Backend)
if err != nil {
glog.Errorf("failed to get service cluster endpoint for service %s: %v", svcKey, err)
} else {
upstreams[name].Endpoints = []ingress.Endpoint{endpoint}
}
}

if len(upstreams[name].Endpoints) == 0 {
endp, err := ic.serviceEndpoints(svcKey, path.Backend.ServicePort.String(), hz)
if err != nil {
glog.Warningf("error obtaining service endpoints: %v", err)
continue
}
upstreams[name].Endpoints = endp
}
upstreams = ic.createUpstreamEndpoint(name, svcKey, serviceUpstream, &path.Backend, upstreams, hz)

s, exists, err := ic.svcLister.Store.GetByKey(svcKey)
if err != nil {
Expand Down Expand Up @@ -1022,11 +986,11 @@ func (ic *GenericController) getServiceClusterEndpoint(svcKey string, backend *e

// serviceEndpoints returns the upstream servers (endpoints) associated
// to a service.
func (ic *GenericController) serviceEndpoints(svcKey, backendPort string,
func (ic *GenericController) serviceEndpoints(
svcKey, backendPort string,
hz *healthcheck.Upstream) ([]ingress.Endpoint, error) {
svcObj, svcExists, err := ic.svcLister.Store.GetByKey(svcKey)

var upstreams []ingress.Endpoint
svcObj, svcExists, err := ic.svcLister.Store.GetByKey(svcKey)
if err != nil {
return upstreams, fmt.Errorf("error getting service %v from the cache: %v", svcKey, err)
}
Expand Down Expand Up @@ -1068,25 +1032,55 @@ func (ic *GenericController) serviceEndpoints(svcKey, backendPort string,
return upstreams, nil
}

// createUpstreamEndpoint returns the upstream servers (endpoints) associated
// to a ingress.
func (ic *GenericController) createUpstreamEndpoint(
upstreamName, svcKey string,
serviceUpstream bool,
backend *extensions.IngressBackend,
upstreams map[string]*ingress.Backend,
hz *healthcheck.Upstream,
) map[string]*ingress.Backend {
// Add the service cluster endpoint as the upstream instead of individual endpoints
// if the serviceUpstream annotation is enabled
if serviceUpstream {
endpoint, err := ic.getServiceClusterEndpoint(svcKey, backend)
if err != nil {
glog.Errorf("Failed to get service cluster endpoint for service %s: %v", svcKey, err)
} else {
upstreams[upstreamName].Endpoints = []ingress.Endpoint{endpoint}
}
}

if len(upstreams[upstreamName].Endpoints) == 0 {
endps, err := ic.serviceEndpoints(svcKey, backend.ServicePort.String(), hz)
upstreams[upstreamName].Endpoints = append(upstreams[upstreamName].Endpoints, endps...)
if err != nil {
glog.Warningf("error creating upstream %v: %v", upstreamName, err)
}
}
return upstreams
}

// createServers initializes a map that contains information about the list of
// FDQN referenced by ingress rules and the common name field in the referenced
// SSL certificates. Each server is configured with location / using a default
// backend specified by the user or the one inside the ingress spec.
func (ic *GenericController) createServers(data []interface{},
upstreams map[string]*ingress.Backend) map[string]*ingress.Server {
servers := make(map[string]*ingress.Server)

bdef := ic.GetDefaultBackend()
func (ic *GenericController) createServers(
data []interface{},
upstreams map[string]*ingress.Backend,
) map[string]*ingress.Server {
defBackend := ic.GetDefaultBackend()
ngxProxy := proxy.Configuration{
BodySize: bdef.ProxyBodySize,
ConnectTimeout: bdef.ProxyConnectTimeout,
SendTimeout: bdef.ProxySendTimeout,
ReadTimeout: bdef.ProxyReadTimeout,
BufferSize: bdef.ProxyBufferSize,
CookieDomain: bdef.ProxyCookieDomain,
CookiePath: bdef.ProxyCookiePath,
NextUpstream: bdef.ProxyNextUpstream,
RequestBuffering: bdef.ProxyRequestBuffering,
BodySize: defBackend.ProxyBodySize,
ConnectTimeout: defBackend.ProxyConnectTimeout,
SendTimeout: defBackend.ProxySendTimeout,
ReadTimeout: defBackend.ProxyReadTimeout,
BufferSize: defBackend.ProxyBufferSize,
CookieDomain: defBackend.ProxyCookieDomain,
CookiePath: defBackend.ProxyCookiePath,
NextUpstream: defBackend.ProxyNextUpstream,
RequestBuffering: defBackend.ProxyRequestBuffering,
}

defaultPemFileName := fakeCertificatePath
Expand All @@ -1100,7 +1094,9 @@ func (ic *GenericController) createServers(data []interface{},
}

// initialize the default server
du := ic.getDefaultUpstream()
du := upstreams[defUpstreamName]
servers := make(map[string]*ingress.Server)

servers[defServerName] = &ingress.Server{
Hostname: defServerName,
SSLCertificate: defaultPemFileName,
Expand All @@ -1113,7 +1109,8 @@ func (ic *GenericController) createServers(data []interface{},
Proxy: ngxProxy,
Service: du.Service,
},
}}
},
}

// initialize all the servers
for _, ingIf := range data {
Expand All @@ -1124,11 +1121,13 @@ func (ic *GenericController) createServers(data []interface{},

// check if ssl passthrough is configured
sslpt := ic.annotations.SSLPassthrough(ing)
du := ic.getDefaultUpstream()
un := du.Name
if ing.Spec.Backend != nil {
// replace default backend
defUpstream := fmt.Sprintf("%v-%v-%v", ing.GetNamespace(), ing.Spec.Backend.ServiceName, ing.Spec.Backend.ServicePort.String())
defUpstream := fmt.Sprintf("%v-%v-%v",
ing.GetNamespace(),
ing.Spec.Backend.ServiceName,
ing.Spec.Backend.ServicePort.String())
if backendUpstream, ok := upstreams[defUpstream]; ok {
un = backendUpstream.Name
}
Expand All @@ -1154,7 +1153,9 @@ func (ic *GenericController) createServers(data []interface{},
Proxy: ngxProxy,
Service: &api.Service{},
},
}, SSLPassthrough: sslpt}
},
SSLPassthrough: sslpt,
}
}
}

Expand Down Expand Up @@ -1242,7 +1243,8 @@ func (ic *GenericController) getEndpoints(
s *api.Service,
servicePort *api.ServicePort,
proto api.Protocol,
hz *healthcheck.Upstream) []ingress.Endpoint {
hz *healthcheck.Upstream,
) []ingress.Endpoint {

upsServers := []ingress.Endpoint{}

Expand Down

0 comments on commit 2db9c80

Please sign in to comment.