Skip to content

Commit

Permalink
Abort Kubernetes Ingress update if Kubernetes API call fails
Browse files Browse the repository at this point in the history
Currently if a Kubernetes API call fails we potentially remove a working service from Traefik. This changes it so if a Kubernetes API call fails we abort out of the ingress update and use the current working config. Github issue: #1240

Also added a test to cover when requested resources (services and endpoints) that the user has specified don’t exist.
  • Loading branch information
Regner Blok-Andersen committed Mar 16, 2017
1 parent c582ea5 commit fab3cb3
Show file tree
Hide file tree
Showing 13 changed files with 338 additions and 52 deletions.
3 changes: 3 additions & 0 deletions configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,9 @@ func NewTraefikDefaultPointersConfiguration() *TraefikConfiguration {
defaultMesos.Endpoint = "http://127.0.0.1:5050"
defaultMesos.ExposedByDefault = true
defaultMesos.Constraints = types.Constraints{}
defaultMesos.RefreshSeconds = 30
defaultMesos.ZkDetectionTimeout = 30
defaultMesos.StateTimeoutSecond = 30

//default ECS
var defaultECS provider.ECS
Expand Down
14 changes: 10 additions & 4 deletions docs/basics.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,16 +236,22 @@ For example:
sticky = true
```

Healthcheck URL can be configured with a relative URL for `healthcheck.URL`.
Interval between healthcheck can be configured by using `healthcheck.interval`
(default: 30s)
A health check can be configured in order to remove a backend from LB rotation
as long as it keeps returning HTTP status codes other than 200 OK to HTTP GET
requests periodically carried out by Traefik. The check is defined by a path
appended to the backend URL and an interval (given in a format understood by [time.ParseDuration](https://golang.org/pkg/time/#ParseDuration)) specifying how
often the health check should be executed (the default being 30 seconds). Each
backend must respond to the health check within 5 seconds.

A recovering backend returning 200 OK responses again is being returned to the
LB rotation pool.

For example:
```toml
[backends]
[backends.backend1]
[backends.backend1.healthcheck]
URL = "/health"
path = "/health"
interval = "10s"
```

Expand Down
5 changes: 4 additions & 1 deletion docs/toml.md
Original file line number Diff line number Diff line change
Expand Up @@ -1005,12 +1005,14 @@ domain = "mesos.localhost"
# Zookeeper timeout (in seconds)
#
# Optional
# Default: 30
#
# ZkDetectionTimeout = 30

# Polling interval (in seconds)
#
# Optional
# Default: 30
#
# RefreshSeconds = 30

Expand All @@ -1023,8 +1025,9 @@ domain = "mesos.localhost"
# HTTP Timeout (in seconds)
#
# Optional
# Default: 30
#
# StateTimeoutSecond = "host"
# StateTimeoutSecond = "30"
```
## Kubernetes Ingress backend
Expand Down
6 changes: 3 additions & 3 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import:
- package: github.com/containous/flaeg
version: a731c034dda967333efce5f8d276aeff11f8ff87
- package: github.com/vulcand/oxy
version: 9920d3561dd2a20cf74c0fe48ec9d3d94414daa1
version: f88530866c561d24a6b5aac49f76d6351b788b9f
repo: https://github.com/containous/oxy.git
vcs: git
subpackages:
Expand Down
10 changes: 5 additions & 5 deletions healthcheck/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func GetHealthCheck() *HealthCheck {

// BackendHealthCheck HealthCheck configuration for a backend
type BackendHealthCheck struct {
URL string
Path string
Interval time.Duration
DisabledURLs []*url.URL
lb loadBalancer
Expand Down Expand Up @@ -81,7 +81,7 @@ func (hc *HealthCheck) execute(ctx context.Context) {
enabledURLs := currentBackend.lb.Servers()
var newDisabledURLs []*url.URL
for _, url := range currentBackend.DisabledURLs {
if checkHealth(url, currentBackend.URL) {
if checkHealth(url, currentBackend.Path) {
log.Debugf("HealthCheck is up [%s]: Upsert in server list", url.String())
currentBackend.lb.UpsertServer(url, roundrobin.Weight(1))
} else {
Expand All @@ -91,7 +91,7 @@ func (hc *HealthCheck) execute(ctx context.Context) {
currentBackend.DisabledURLs = newDisabledURLs

for _, url := range enabledURLs {
if !checkHealth(url, currentBackend.URL) {
if !checkHealth(url, currentBackend.Path) {
log.Debugf("HealthCheck has failed [%s]: Remove from server list", url.String())
currentBackend.lb.RemoveServer(url)
currentBackend.DisabledURLs = append(currentBackend.DisabledURLs, url)
Expand All @@ -104,12 +104,12 @@ func (hc *HealthCheck) execute(ctx context.Context) {
}
}

func checkHealth(serverURL *url.URL, checkURL string) bool {
func checkHealth(serverURL *url.URL, path string) bool {
timeout := time.Duration(5 * time.Second)
client := http.Client{
Timeout: timeout,
}
resp, err := client.Get(serverURL.String() + checkURL)
resp, err := client.Get(serverURL.String() + path)
if err != nil || resp.StatusCode != 200 {
return false
}
Expand Down
26 changes: 7 additions & 19 deletions provider/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package k8s
import (
"time"

"github.com/containous/traefik/log"
"k8s.io/client-go/1.5/kubernetes"
"k8s.io/client-go/1.5/pkg/api"
"k8s.io/client-go/1.5/pkg/api/v1"
Expand Down Expand Up @@ -39,31 +40,18 @@ type clientImpl struct {
clientset *kubernetes.Clientset
}

// NewInClusterClient returns a new Kubernetes client that expect to run inside the cluster
func NewInClusterClient() (Client, error) {
// NewClient returns a new Kubernetes client
func NewClient(endpoint string) (Client, error) {
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
log.Warnf("Kubernetes in cluster config error, trying from out of cluster: %s", err)
config = &rest.Config{}
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}

return &clientImpl{
clientset: clientset,
}, nil
}

// NewInClusterClientWithEndpoint is the same as NewInClusterClient but uses the provided endpoint URL
func NewInClusterClientWithEndpoint(endpoint string) (Client, error) {
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
if len(endpoint) > 0 {
config.Host = endpoint
}

config.Host = endpoint

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
Expand Down
31 changes: 17 additions & 14 deletions provider/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,10 @@ type Kubernetes struct {
lastConfiguration safe.Safe
}

func (provider *Kubernetes) newK8sClient() (k8s.Client, error) {
if provider.Endpoint != "" {
log.Infof("Creating in cluster Kubernetes client with endpoint %v", provider.Endpoint)
return k8s.NewInClusterClientWithEndpoint(provider.Endpoint)
}
log.Info("Creating in cluster Kubernetes client")
return k8s.NewInClusterClient()
}

// Provide allows the provider to provide configurations to traefik
// using the given configuration channel.
func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
k8sClient, err := provider.newK8sClient()
k8sClient, err := k8s.NewClient(provider.Endpoint)
if err != nil {
return err
}
Expand Down Expand Up @@ -169,8 +160,13 @@ func (provider *Kubernetes) loadIngresses(k8sClient k8s.Client) (*types.Configur
}
}
service, exists, err := k8sClient.GetService(i.ObjectMeta.Namespace, pa.Backend.ServiceName)
if err != nil || !exists {
log.Warnf("Error retrieving service %s/%s: %v", i.ObjectMeta.Namespace, pa.Backend.ServiceName, err)
if err != nil {
log.Errorf("Error while retrieving service information from k8s API %s/%s: %v", service.ObjectMeta.Namespace, pa.Backend.ServiceName, err)
return nil, err
}

if !exists {
log.Errorf("Service not found for %s/%s", service.ObjectMeta.Namespace, pa.Backend.ServiceName)
delete(templateObjects.Frontends, r.Host+pa.Path)
continue
}
Expand All @@ -193,13 +189,20 @@ func (provider *Kubernetes) loadIngresses(k8sClient k8s.Client) (*types.Configur
if port.Port == 443 {
protocol = "https"
}

endpoints, exists, err := k8sClient.GetEndpoints(service.ObjectMeta.Namespace, service.ObjectMeta.Name)
if err != nil || !exists {
if err != nil {
log.Errorf("Error retrieving endpoints %s/%s: %v", service.ObjectMeta.Namespace, service.ObjectMeta.Name, err)
return nil, err
}

if !exists {
log.Errorf("Endpoints not found for %s/%s", service.ObjectMeta.Namespace, service.ObjectMeta.Name)
continue
}

if len(endpoints.Subsets) == 0 {
log.Warnf("Endpoints not found for %s/%s, falling back to Service ClusterIP", service.ObjectMeta.Namespace, service.ObjectMeta.Name)
log.Warnf("Service endpoints not found for %s/%s, falling back to Service ClusterIP", service.ObjectMeta.Namespace, service.ObjectMeta.Name)
templateObjects.Backends[r.Host+pa.Path].Servers[string(service.UID)] = types.Server{
URL: protocol + "://" + service.Spec.ClusterIP + ":" + strconv.Itoa(int(port.Port)),
Weight: 1,
Expand Down

0 comments on commit fab3cb3

Please sign in to comment.