Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start health checks early. #1319

Merged
merged 1 commit into from
Apr 8, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
109 changes: 62 additions & 47 deletions healthcheck/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ func GetHealthCheck() *HealthCheck {

// BackendHealthCheck HealthCheck configuration for a backend
type BackendHealthCheck struct {
URL string
Interval time.Duration
DisabledURLs []*url.URL
lb loadBalancer
URL string
Interval time.Duration
DisabledURLs []*url.URL
requestTimeout time.Duration
lb loadBalancer
}

var launch = false
Expand All @@ -46,12 +47,19 @@ type loadBalancer interface {
}

func newHealthCheck() *HealthCheck {
return &HealthCheck{make(map[string]*BackendHealthCheck), nil}
return &HealthCheck{
Backends: make(map[string]*BackendHealthCheck),
}
}

// NewBackendHealthCheck Instantiate a new BackendHealthCheck
func NewBackendHealthCheck(URL string, interval time.Duration, lb loadBalancer) *BackendHealthCheck {
return &BackendHealthCheck{URL, interval, nil, lb}
return &BackendHealthCheck{
URL: URL,
Interval: interval,
requestTimeout: 5 * time.Second,
lb: lb,
}
}

//SetBackendsConfiguration set backends configuration
Expand All @@ -62,56 +70,63 @@ func (hc *HealthCheck) SetBackendsConfiguration(parentCtx context.Context, backe
}
ctx, cancel := context.WithCancel(parentCtx)
hc.cancel = cancel
hc.execute(ctx)
}

func (hc *HealthCheck) execute(ctx context.Context) {
for backendID, backend := range hc.Backends {
currentBackend := backend
currentBackendID := backendID
currentBackend := backend
safe.Go(func() {
for {
ticker := time.NewTicker(currentBackend.Interval)
select {
case <-ctx.Done():
log.Debugf("Stopping all current Healthcheck goroutines")
return
case <-ticker.C:
log.Debugf("Refreshing Healthcheck for currentBackend %s ", currentBackendID)
enabledURLs := currentBackend.lb.Servers()
var newDisabledURLs []*url.URL
for _, url := range currentBackend.DisabledURLs {
if checkHealth(url, currentBackend.URL) {
log.Debugf("HealthCheck is up [%s]: Upsert in server list", url.String())
currentBackend.lb.UpsertServer(url, roundrobin.Weight(1))
} else {
newDisabledURLs = append(newDisabledURLs, url)
}
}
currentBackend.DisabledURLs = newDisabledURLs

for _, url := range enabledURLs {
if !checkHealth(url, currentBackend.URL) {
log.Debugf("HealthCheck has failed [%s]: Remove from server list", url.String())
currentBackend.lb.RemoveServer(url)
currentBackend.DisabledURLs = append(currentBackend.DisabledURLs, url)
}
}

}
}
hc.execute(ctx, currentBackendID, currentBackend)
})
}
}

func checkHealth(serverURL *url.URL, checkURL string) bool {
timeout := time.Duration(5 * time.Second)
func (hc *HealthCheck) execute(ctx context.Context, backendID string, backend *BackendHealthCheck) {
log.Debugf("Initial healthcheck for currentBackend %s ", backendID)
checkBackend(backend)
ticker := time.NewTicker(backend.Interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Debugf("Stopping all current Healthcheck goroutines")
return
case <-ticker.C:
log.Debugf("Refreshing healthcheck for currentBackend %s ", backendID)
checkBackend(backend)
}
}
}

func checkBackend(currentBackend *BackendHealthCheck) {
enabledURLs := currentBackend.lb.Servers()
var newDisabledURLs []*url.URL
for _, url := range currentBackend.DisabledURLs {
if checkHealth(url, currentBackend) {
log.Debugf("HealthCheck is up [%s]: Upsert in server list", url.String())
currentBackend.lb.UpsertServer(url, roundrobin.Weight(1))
} else {
log.Warnf("HealthCheck is still failing [%s]", url.String())
newDisabledURLs = append(newDisabledURLs, url)
}
}
currentBackend.DisabledURLs = newDisabledURLs

for _, url := range enabledURLs {
if !checkHealth(url, currentBackend) {
log.Warnf("HealthCheck has failed [%s]: Remove from server list", url.String())
currentBackend.lb.RemoveServer(url)
currentBackend.DisabledURLs = append(currentBackend.DisabledURLs, url)
}
}
}

func checkHealth(serverURL *url.URL, backend *BackendHealthCheck) bool {
client := http.Client{
Timeout: timeout,
Timeout: backend.requestTimeout,
}
resp, err := client.Get(serverURL.String() + checkURL)
if err != nil || resp.StatusCode != 200 {
return false
resp, err := client.Get(serverURL.String() + backend.URL)
if err == nil {
defer resp.Body.Close()
}
return true
return err == nil && resp.StatusCode == 200
}
198 changes: 198 additions & 0 deletions healthcheck/healthcheck_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
package healthcheck

import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"sync"
"testing"
"time"

"github.com/vulcand/oxy/roundrobin"
)

const healthCheckInterval = 100 * time.Millisecond

type testLoadBalancer struct {
// RWMutex needed due to parallel test execution: Both the system-under-test
// and the test assertions reference the counters.
*sync.RWMutex
numRemovedServers int
numUpsertedServers int
servers []*url.URL
}

func (lb *testLoadBalancer) RemoveServer(u *url.URL) error {
lb.Lock()
defer lb.Unlock()
lb.numRemovedServers++
lb.removeServer(u)
return nil
}

func (lb *testLoadBalancer) UpsertServer(u *url.URL, options ...roundrobin.ServerOption) error {
lb.Lock()
defer lb.Unlock()
lb.numUpsertedServers++
lb.servers = append(lb.servers, u)
return nil
}

func (lb *testLoadBalancer) Servers() []*url.URL {
return lb.servers
}

func (lb *testLoadBalancer) removeServer(u *url.URL) {
var i int
var serverURL *url.URL
for i, serverURL = range lb.servers {
if *serverURL == *u {
break
}
}

lb.servers = append(lb.servers[:i], lb.servers[i+1:]...)
}

type testHandler struct {
done func()
healthSequence []bool
}

func newTestServer(done func(), healthSequence []bool) *httptest.Server {
handler := &testHandler{
done: done,
healthSequence: healthSequence,
}
return httptest.NewServer(handler)
}

// ServeHTTP returns 200 or 503 HTTP response codes depending on whether the
// current request is marked as healthy or not.
// It calls the given 'done' function once all request health indicators have
// been depleted.
func (th *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if len(th.healthSequence) == 0 {
panic("received unexpected request")
}

healthy := th.healthSequence[0]
if healthy {
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusServiceUnavailable)
}

th.healthSequence = th.healthSequence[1:]
if len(th.healthSequence) == 0 {
th.done()
}
}

func TestSetBackendsConfiguration(t *testing.T) {
tests := []struct {
desc string
startHealthy bool
healthSequence []bool
wantNumRemovedServers int
wantNumUpsertedServers int
}{
{
desc: "healthy server staying healthy",
startHealthy: true,
healthSequence: []bool{true},
wantNumRemovedServers: 0,
wantNumUpsertedServers: 0,
},
{
desc: "healthy server becoming sick",
startHealthy: true,
healthSequence: []bool{false},
wantNumRemovedServers: 1,
wantNumUpsertedServers: 0,
},
{
desc: "sick server becoming healthy",
startHealthy: false,
healthSequence: []bool{true},
wantNumRemovedServers: 0,
wantNumUpsertedServers: 1,
},
{
desc: "sick server staying sick",
startHealthy: false,
healthSequence: []bool{false},
wantNumRemovedServers: 0,
wantNumUpsertedServers: 0,
},
{
desc: "healthy server toggling to sick and back to healthy",
startHealthy: true,
healthSequence: []bool{false, true},
wantNumRemovedServers: 1,
wantNumUpsertedServers: 1,
},
}

for _, test := range tests {
test := test
t.Run(test.desc, func(t *testing.T) {
t.Parallel()
// The context is passed to the health check and canonically cancelled by
// the test server once all expected requests have been received.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ts := newTestServer(cancel, test.healthSequence)
defer ts.Close()

lb := &testLoadBalancer{RWMutex: &sync.RWMutex{}}
backend := NewBackendHealthCheck("/path", healthCheckInterval, lb)
serverURL := MustParseURL(ts.URL)
if test.startHealthy {
lb.servers = append(lb.servers, serverURL)
} else {
backend.DisabledURLs = append(backend.DisabledURLs, serverURL)
}

healthCheck := HealthCheck{
Backends: make(map[string]*BackendHealthCheck),
}
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
healthCheck.execute(ctx, "id", backend)
wg.Done()
}()

// Make test timeout dependent on number of expected requests, health
// check interval, and a safety margin.
timeout := time.Duration(len(test.healthSequence)*int(healthCheckInterval) + 500)
select {
case <-time.After(timeout):
t.Fatal("test did not complete in time")
case <-ctx.Done():
wg.Wait()
}

lb.Lock()
defer lb.Unlock()
if lb.numRemovedServers != test.wantNumRemovedServers {
t.Errorf("got %d removed servers, wanted %d", lb.numRemovedServers, test.wantNumRemovedServers)
}

if lb.numUpsertedServers != test.wantNumUpsertedServers {
t.Errorf("got %d upserted servers, wanted %d", lb.numUpsertedServers, test.wantNumUpsertedServers)
}
})
}
}

func MustParseURL(rawurl string) *url.URL {
u, err := url.Parse(rawurl)
if err != nil {
panic(fmt.Sprintf("failed to parse URL '%s': %s", rawurl, err))
}
return u
}