Skip to content

Commit

Permalink
Add the async probing function to the prober library. (#3986)
Browse files Browse the repository at this point in the history
* Async prober implementation

* review commentes addressed now are
  • Loading branch information
vagababov authored and knative-prow-robot committed May 5, 2019
1 parent 8bd492b commit 59dec74
Show file tree
Hide file tree
Showing 4 changed files with 248 additions and 44 deletions.
79 changes: 73 additions & 6 deletions pkg/network/prober/prober.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,98 @@ import (
"context"
"io/ioutil"
"net/http"
"sync"
"time"

"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/knative/serving/pkg/network"
)

// Do sends a single probe to given target, e.g. `http://revision.default.svc.cluster.local:81`.
// headerValue is the value for the `k-network-probe` header.
// Do returns the status code, response body, and the request error, if any.
func Do(ctx context.Context, target, headerValue string) (int, string, error) {
// Do returns whether the probe was successful or not, or there was an error probing.
func Do(ctx context.Context, target, headerValue string) (bool, error) {
req, err := http.NewRequest(http.MethodGet, target, nil)
if err != nil {
return 0, "", errors.Wrapf(err, "%s is not a valid URL", target)
return false, errors.Wrapf(err, "%s is not a valid URL", target)
}

req.Header.Set(http.CanonicalHeaderKey(network.ProbeHeaderName), headerValue)
req = req.WithContext(ctx)
resp, err := network.AutoTransport.RoundTrip(req)
if err != nil {
return 0, "", err
return false, errors.Wrapf(err, "error roundtripping %s", target)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return 0, "", err
return false, errors.Wrap(err, "error reading body")
}
return resp.StatusCode, string(body), nil
return resp.StatusCode == http.StatusOK && string(body) == headerValue, nil
}

// Done is a callback that is executed when the async probe has finished.
// `arg` is given by the caller at the offering time, while `success` and `err`
// are the return values of the `Do` call.
// It is assumed that the opaque arg is consistent for a given target and
// we will coalesce concurrent Offer invocations on target.
type Done func(arg interface{}, success bool, err error)

// Manager manages async probes and makes sure we run concurrently only a single
// probe for the same key.
type Manager struct {
cb Done

// mu guards keys.
mu sync.Mutex
keys sets.String
}

// New creates a new Manager, that will invoke the given callback when
// async probing is finished.
func New(cb Done) *Manager {
return &Manager{
keys: sets.NewString(),
cb: cb,
}
}

// Offer executes asynchronous probe using `target` as the key.
// If a probe with the same key already exists, Offer will return false and the
// call is discarded. If the request is accepted, Offer returns true.
// Otherwise Offer starts a goroutine that periodically executes
// `Do`, until timeout is reached, the probe succeeds, or fails with an error.
// In the end the callback is invoked with the provided `arg` and probing results.
func (m *Manager) Offer(ctx context.Context, target, headerValue string, arg interface{}, period, timeout time.Duration) bool {
m.mu.Lock()
defer m.mu.Unlock()
if m.keys.Has(target) {
return false
}
m.keys.Insert(target)
m.doAsync(ctx, target, headerValue, arg, period, timeout)
return true
}

// doAsync starts a go routine that probes the target with given period.
func (m *Manager) doAsync(ctx context.Context, target, headerValue string, arg interface{}, period, timeout time.Duration) {
go func() {
defer func() {
m.mu.Lock()
defer m.mu.Unlock()
m.keys.Delete(target)
}()
var (
result bool
err error
)
err = wait.PollImmediate(period, timeout, func() (bool, error) {
result, err = Do(ctx, target, headerValue)
return result, err
})
m.cb(arg, result, err)
}()
}
195 changes: 170 additions & 25 deletions pkg/network/prober/prober_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import (
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/knative/serving/pkg/network"
"k8s.io/apimachinery/pkg/util/wait"
)

const (
Expand Down Expand Up @@ -52,55 +54,198 @@ func TestDoServing(t *testing.T) {
tests := []struct {
name string
headerValue string
status int
body string
want bool
}{{
name: "ok",
headerValue: systemName,
status: http.StatusOK,
body: systemName,
want: true,
}, {
name: "wrong system",
headerValue: "bells-and-whistles",
status: http.StatusBadRequest,
body: unexpectedProbeMessage,
want: false,
}, {
name: "no header",
headerValue: "",
status: http.StatusNotFound,
body: "",
want: false,
}}
for _, test := range tests {
st, body, err := Do(context.Background(), ts.URL, test.headerValue)
if got, want := st, test.status; got != want {
t.Errorf("Status = %v, want: %v", got, want)
}
if got, want := body, test.body; got != want {
t.Errorf("Body = %q, want: %q", got, want)
}
if err != nil {
t.Errorf("Do returned error: %v", err)
}
t.Run(test.name, func(t *testing.T) {
got, err := Do(context.Background(), ts.URL, test.headerValue)
if want := test.want; got != want {
t.Errorf("Got = %v, want: %v", got, want)
}
if err != nil {
t.Errorf("Do returned error: %v", err)
}
})
}
}

func TestBlackHole(t *testing.T) {
st, body, err := Do(context.Background(), "http://gone.fishing.svc.custer.local:8080", systemName)
if got, want := st, 0; got != want {
t.Errorf("Status = %v, want: %v", got, want)
}
if got, want := body, ""; got != want {
t.Errorf("Body = %q, want: %q", got, want)
got, err := Do(context.Background(), "http://gone.fishing.svc.custer.local:8080", systemName)
if want := false; got != want {
t.Errorf("Got = %v, want: %v", got, want)
}
if err == nil {
t.Error("Do did not return an error")
}
}

func TestBadURL(t *testing.T) {
_, _, err := Do(context.Background(), ":foo", systemName)
_, err := Do(context.Background(), ":foo", systemName)
if err == nil {
t.Error("Do did not return an error")
}
t.Logf("For the curious the error was: %v", err)
}

func TestDoAsync(t *testing.T) {
// This replicates the TestDo.
ts := httptest.NewServer(http.HandlerFunc(probeServeFunc))
defer ts.Close()

wch := make(chan interface{})
defer close(wch)
tests := []struct {
name string
headerValue string
cb Done
}{{
name: "ok",
headerValue: systemName,
cb: func(arg interface{}, ret bool, err error) {
defer func() {
wch <- 42
}()
if got, want := arg.(string), "ok"; got != want {
t.Errorf("arg = %s, want: %s", got, want)
}
if !ret {
t.Error("result was false")
}
},
}, {
name: "wrong system",
headerValue: "bells-and-whistles",
cb: func(arg interface{}, ret bool, err error) {
defer func() {
wch <- 1984
}()
if ret {
t.Error("result was true")
}
},
}, {
name: "no header",
headerValue: "",
cb: func(arg interface{}, ret bool, err error) {
defer func() {
wch <- 2006
}()
if ret {
t.Error("result was true")
}
},
}}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
m := New(test.cb)
m.Offer(context.Background(), ts.URL, test.headerValue, test.name, 50*time.Millisecond, 2*time.Second)
<-wch
})
}
}

type thirdTimesTheCharmProber struct {
calls int
}

func (t *thirdTimesTheCharmProber) ServeHTTP(w http.ResponseWriter, r *http.Request) {
t.calls++
if t.calls < 3 {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(unexpectedProbeMessage))
return
}
w.Write([]byte(systemName))
}

func TestDoAsyncRepeat(t *testing.T) {
c := &thirdTimesTheCharmProber{}
ts := httptest.NewServer(c)
defer ts.Close()

wch := make(chan interface{})
defer close(wch)
cb := func(arg interface{}, done bool, err error) {
if !done {
t.Error("done was false")
}
if err != nil {
t.Errorf("Unexpected error = %v", err)
}
wch <- arg
}
m := New(cb)
m.Offer(context.Background(), ts.URL, systemName, 42, 50*time.Millisecond, 3*time.Second)
<-wch
if got, want := c.calls, 3; got != want {
t.Errorf("Probe invocation count = %d, want: %d", got, want)
}
}

func TestDoAsyncTimeout(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
}))
defer ts.Close()

wch := make(chan interface{})
defer close(wch)

cb := func(arg interface{}, done bool, err error) {
if err != wait.ErrWaitTimeout {
t.Errorf("Unexpected error = %v", err)
}
wch <- arg
}
m := New(cb)
m.Offer(context.Background(), ts.URL, systemName, 2009, 10*time.Millisecond, 200*time.Millisecond)
<-wch
}

func TestAsyncMultiple(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(probeServeFunc))
defer ts.Close()

wch := make(chan interface{})
defer close(wch)
cb := func(arg interface{}, done bool, err error) {
<-wch
wch <- 2006
}
m := New(cb)
if !m.Offer(context.Background(), ts.URL, systemName, 1984, 100*time.Millisecond, 1*time.Second) {
t.Error("First call to offer returned false")
}
if m.Offer(context.Background(), ts.URL, systemName, 1982, 100*time.Millisecond, 1*time.Second) {
t.Error("Second call to offer returned true")
}
if got, want := m.len(), 1; got != want {
t.Errorf("Number of queued items = %d, want: %d", got, want)
}
// Make sure we terminate the first probe.
wch <- 2009
<-wch
// ಠ_ಠ gotta wait for the cb to end.
time.Sleep(300 * time.Millisecond)
if got, want := m.len(), 0; got != want {
t.Errorf("Number of queued items = %d, want: %d", got, want)
}
}

func (m *Manager) len() int {
m.mu.Lock()
defer m.mu.Unlock()
return m.keys.Len()
}
7 changes: 1 addition & 6 deletions pkg/reconciler/autoscaling/kpa/scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package kpa
import (
"context"
"fmt"
"net/http"

"github.com/knative/pkg/apis"
"github.com/knative/pkg/apis/duck"
Expand Down Expand Up @@ -78,11 +77,7 @@ func activatorProbe(pa *pav1alpha1.PodAutoscaler) (bool, error) {
// Resolve the hostname and port to probe.
svc := network.GetServiceHostname(pa.Status.ServiceName, pa.Namespace)
port := networking.ServicePort(pa.Spec.ProtocolType)
st, body, err := prober.Do(context.Background(), fmt.Sprintf("http://%s:%d/", svc, port), activator.Name)
if err != nil {
return false, err
}
return st == http.StatusOK && body == activator.Name, nil
return prober.Do(context.Background(), fmt.Sprintf("http://%s:%d/", svc, port), activator.Name)
}

// podScalableTypedInformerFactory returns a duck.InformerFactory that returns
Expand Down

0 comments on commit 59dec74

Please sign in to comment.