Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the async probing function to the prober library. #3986

Merged
merged 2 commits into from
May 5, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
77 changes: 71 additions & 6 deletions pkg/network/prober/prober.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,96 @@ import (
"context"
"io/ioutil"
"net/http"
"sync"
"time"

"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/knative/serving/pkg/network"
)

// Do sends a single probe to given target, e.g. `http://revision.default.svc.cluster.local:81`.
// headerValue is the value for the `k-network-probe` header.
// Do returns the status code, response body, and the request error, if any.
func Do(ctx context.Context, target, headerValue string) (int, string, error) {
// Do returns whether the probe was successful or not, or there was an error probing.
func Do(ctx context.Context, target, headerValue string) (bool, error) {
req, err := http.NewRequest(http.MethodGet, target, nil)
if err != nil {
return 0, "", errors.Wrapf(err, "%s is not a valid URL", target)
return false, errors.Wrapf(err, "%s is not a valid URL", target)
}

req.Header.Set(http.CanonicalHeaderKey(network.ProbeHeaderName), headerValue)
req = req.WithContext(ctx)
resp, err := network.AutoTransport.RoundTrip(req)
if err != nil {
return 0, "", err
return false, errors.Wrapf(err, "error roundtripping %s", target)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return 0, "", err
return false, errors.Wrap(err, "error reading body")
}
return resp.StatusCode, string(body), nil
return resp.StatusCode == http.StatusOK && string(body) == headerValue, nil
}

// Done is a callback that is executed when the async probe has finished.
// `arg` is given by the caller at the offering time, while `success` and `err`
// are the return values of the `Do` call.
type Done func(arg interface{}, success bool, err error)

// Manager manages async probes and makes sure we run concurrently only a single
// probe for the same key.
type Manager struct {
cb Done

// mu guards keys.
mu sync.Mutex
keys sets.String
}

// New creates a new Manager, that will invoke the given callback when
// async probing is finished.
func New(cb Done) *Manager {
return &Manager{
keys: sets.NewString(),
cb: cb,
}
}

// Offer executes asynchronous probe using `target` as the key.
// If a probe with the same key already exists, Offer will return false and the
// call is discarded. If the request is accepted, Offer returns true.
// Otherwise Offer starts a goroutine that periodically executes
// `Do`, until timeout is reached, the probe succeeds, or fails with an error.
// In the end the callback is invoked with the provided `arg` and probing results.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is assumed that the opaque arg is consistent for a given target and we will coalesce concurrent Offer invocations on target.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added to the Done definition.

func (m *Manager) Offer(ctx context.Context, target, headerValue string, arg interface{}, period, timeout time.Duration) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why "Offer"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we offer system to make a probe, since the system can reject doing it -- it's not Do :)

m.mu.Lock()
defer m.mu.Unlock()
if m.keys.Has(target) {
return false
}
m.keys.Insert(target)
m.doAsync(ctx, target, headerValue, arg, period, timeout)
return true
}

// doAsync starts a go routine that probes the target with given period.
func (m *Manager) doAsync(ctx context.Context, target, headerValue string, arg interface{}, period, timeout time.Duration) {
go func() {
defer func() {
m.mu.Lock()
defer m.mu.Unlock()
m.keys.Delete(target)
}()
var (
result bool
err error
)
err = wait.PollImmediate(period, timeout, func() (bool, error) {
result, err = Do(ctx, target, headerValue)
return result, err
})
m.cb(arg, result, err)
}()
vagababov marked this conversation as resolved.
Show resolved Hide resolved
}
195 changes: 170 additions & 25 deletions pkg/network/prober/prober_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import (
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/knative/serving/pkg/network"
"k8s.io/apimachinery/pkg/util/wait"
)

const (
Expand Down Expand Up @@ -52,55 +54,198 @@ func TestDoServing(t *testing.T) {
tests := []struct {
name string
headerValue string
status int
body string
want bool
}{{
name: "ok",
headerValue: systemName,
status: http.StatusOK,
body: systemName,
want: true,
}, {
name: "wrong system",
headerValue: "bells-and-whistles",
status: http.StatusBadRequest,
body: unexpectedProbeMessage,
want: false,
}, {
name: "no header",
headerValue: "",
status: http.StatusNotFound,
body: "",
want: false,
}}
for _, test := range tests {
st, body, err := Do(context.Background(), ts.URL, test.headerValue)
if got, want := st, test.status; got != want {
t.Errorf("Status = %v, want: %v", got, want)
}
if got, want := body, test.body; got != want {
t.Errorf("Body = %q, want: %q", got, want)
}
if err != nil {
t.Errorf("Do returned error: %v", err)
}
t.Run(test.name, func(t *testing.T) {
got, err := Do(context.Background(), ts.URL, test.headerValue)
if want := test.want; got != want {
t.Errorf("Got = %v, want: %v", got, want)
}
if err != nil {
t.Errorf("Do returned error: %v", err)
}
})
}
}

func TestBlackHole(t *testing.T) {
st, body, err := Do(context.Background(), "http://gone.fishing.svc.custer.local:8080", systemName)
if got, want := st, 0; got != want {
t.Errorf("Status = %v, want: %v", got, want)
}
if got, want := body, ""; got != want {
t.Errorf("Body = %q, want: %q", got, want)
got, err := Do(context.Background(), "http://gone.fishing.svc.custer.local:8080", systemName)
if want := false; got != want {
t.Errorf("Got = %v, want: %v", got, want)
}
if err == nil {
t.Error("Do did not return an error")
}
}

func TestBadURL(t *testing.T) {
_, _, err := Do(context.Background(), ":foo", systemName)
_, err := Do(context.Background(), ":foo", systemName)
if err == nil {
t.Error("Do did not return an error")
}
t.Logf("For the curious the error was: %v", err)
}

func TestDoAsync(t *testing.T) {
// This replicates the TestDo.
ts := httptest.NewServer(http.HandlerFunc(probeServeFunc))
defer ts.Close()

wch := make(chan interface{})
defer close(wch)
tests := []struct {
name string
headerValue string
cb Done
}{{
name: "ok",
headerValue: systemName,
cb: func(arg interface{}, ret bool, err error) {
defer func() {
wch <- 42
}()
if got, want := arg.(string), "ok"; got != want {
t.Errorf("arg = %s, want: %s", got, want)
}
if !ret {
t.Error("result was false")
}
},
}, {
name: "wrong system",
headerValue: "bells-and-whistles",
cb: func(arg interface{}, ret bool, err error) {
defer func() {
wch <- 1984
}()
if ret {
t.Error("result was true")
}
},
}, {
name: "no header",
headerValue: "",
cb: func(arg interface{}, ret bool, err error) {
defer func() {
wch <- 2006
}()
if ret {
t.Error("result was true")
}
},
}}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
m := New(test.cb)
m.Offer(context.Background(), ts.URL, test.headerValue, test.name, 50*time.Millisecond, 2*time.Second)
<-wch
})
}
}

type thirdTimesTheCharmProber struct {
calls int
}

func (t *thirdTimesTheCharmProber) ServeHTTP(w http.ResponseWriter, r *http.Request) {
t.calls++
if t.calls < 3 {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(unexpectedProbeMessage))
return
}
w.Write([]byte(systemName))
}

func TestDoAsyncRepeat(t *testing.T) {
c := &thirdTimesTheCharmProber{}
ts := httptest.NewServer(c)
defer ts.Close()

wch := make(chan interface{})
defer close(wch)
cb := func(arg interface{}, done bool, err error) {
if !done {
t.Error("done was false")
}
if err != nil {
t.Errorf("Unexpected error = %v", err)
}
wch <- arg
}
m := New(cb)
m.Offer(context.Background(), ts.URL, systemName, 42, 50*time.Millisecond, 3*time.Second)
<-wch
if got, want := c.calls, 3; got != want {
t.Errorf("Probe invocation count = %d, want: %d", got, want)
}
}

func TestDoAsyncTimeout(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
}))
defer ts.Close()

wch := make(chan interface{})
defer close(wch)

cb := func(arg interface{}, done bool, err error) {
if err != wait.ErrWaitTimeout {
t.Errorf("Unexpected error = %v", err)
}
wch <- arg
}
m := New(cb)
m.Offer(context.Background(), ts.URL, systemName, 2009, 10*time.Millisecond, 200*time.Millisecond)
<-wch
}

func TestAsyncMultiple(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(probeServeFunc))
defer ts.Close()

wch := make(chan interface{})
defer close(wch)
cb := func(arg interface{}, done bool, err error) {
<-wch
wch <- 2006
}
m := New(cb)
if !m.Offer(context.Background(), ts.URL, systemName, 1984, 100*time.Millisecond, 1*time.Second) {
t.Error("First call to offer returned false")
}
if m.Offer(context.Background(), ts.URL, systemName, 1982, 100*time.Millisecond, 1*time.Second) {
t.Error("Second call to offer returned true")
}
if got, want := m.len(), 1; got != want {
t.Errorf("Number of queued items = %d, want: %d", got, want)
}
// Make sure we terminate the first probe.
wch <- 2009
<-wch
// ಠ_ಠ gotta wait for the cb to end.
time.Sleep(300 * time.Millisecond)
if got, want := m.len(), 0; got != want {
t.Errorf("Number of queued items = %d, want: %d", got, want)
}
}

func (m *Manager) len() int {
m.mu.Lock()
defer m.mu.Unlock()
return m.keys.Len()
}
5 changes: 2 additions & 3 deletions pkg/reconciler/autoscaling/kpa/scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package kpa
import (
"context"
"fmt"
"net/http"

"github.com/knative/pkg/apis"
"github.com/knative/pkg/apis/duck"
Expand Down Expand Up @@ -78,11 +77,11 @@ func activatorProbe(pa *pav1alpha1.PodAutoscaler) (bool, error) {
// Resolve the hostname and port to probe.
svc := network.GetServiceHostname(pa.Status.ServiceName, pa.Namespace)
port := networking.ServicePort(pa.Spec.ProtocolType)
st, body, err := prober.Do(context.Background(), fmt.Sprintf("http://%s:%d/", svc, port), activator.Name)
success, err := prober.Do(context.Background(), fmt.Sprintf("http://%s:%d/", svc, port), activator.Name)
if err != nil {
return false, err
}
return st == http.StatusOK && body == activator.Name, nil
return success, nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just return prober.Do(...) now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

certainly. 🤦‍♂

}

// podScalableTypedInformerFactory returns a duck.InformerFactory that returns
Expand Down