Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always pass Healthy dests to the throttler #5466

Merged
merged 1 commit into from Sep 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 12 additions & 15 deletions pkg/activator/handler/handler_test.go
Expand Up @@ -100,9 +100,8 @@ func TestActivationHandler(t *testing.T) {
wantErr: nil,
endpointsInformer: endpointsInformer(endpoints(testNamespace, testRevName, 1000, networking.ServicePortNameHTTP1)),
destsUpdates: []*activatornet.RevisionDestsUpdate{{
Rev: types.NamespacedName{testNamespace, testRevName},
Dests: dests(1000),
ReadyAddressCount: 1000,
Rev: types.NamespacedName{testNamespace, testRevName},
Dests: dests(1000),
}},
reporterCalls: []reporterCall{{
Op: "ReportRequestCount",
Expand All @@ -124,9 +123,8 @@ func TestActivationHandler(t *testing.T) {
wantErr: nil,
endpointsInformer: endpointsInformer(endpoints(testNamespace, testRevName, 1000, networking.ServicePortNameHTTP1)),
destsUpdates: []*activatornet.RevisionDestsUpdate{{
Rev: types.NamespacedName{"fake-namespace", testRevName},
Dests: []string{},
ReadyAddressCount: 0,
Rev: types.NamespacedName{"fake-namespace", testRevName},
Dests: []string{},
}},
reporterCalls: nil,
tryTimeout: 100 * time.Millisecond,
Expand All @@ -139,9 +137,8 @@ func TestActivationHandler(t *testing.T) {
wantErr: errors.New("request error"),
endpointsInformer: endpointsInformer(endpoints(testNamespace, testRevName, 1000, networking.ServicePortNameHTTP1)),
destsUpdates: []*activatornet.RevisionDestsUpdate{{
Rev: types.NamespacedName{testNamespace, testRevName},
Dests: dests(1000),
ReadyAddressCount: 1000,
Rev: types.NamespacedName{testNamespace, testRevName},
Dests: dests(1000),
}},
reporterCalls: []reporterCall{{
Op: "ReportRequestCount",
Expand Down Expand Up @@ -306,9 +303,9 @@ func TestActivationHandlerProxyHeader(t *testing.T) {
go throttler.Run(updateCh)

updateCh <- &activatornet.RevisionDestsUpdate{
Rev: types.NamespacedName{namespace, revName},
ClusterIPDest: "129.0.0.1:1234",
ReadyAddressCount: breakerParams.InitialCapacity,
Rev: types.NamespacedName{namespace, revName},
ClusterIPDest: "129.0.0.1:1234",
Dests: dests(breakerParams.InitialCapacity),
}

handler := &activationHandler{
Expand Down Expand Up @@ -405,9 +402,9 @@ func TestActivationHandlerTraceSpans(t *testing.T) {
go throttler.Run(updateCh)

updateCh <- &activatornet.RevisionDestsUpdate{
Rev: types.NamespacedName{namespace, revName},
ClusterIPDest: "129.0.0.1:1234",
ReadyAddressCount: breakerParams.InitialCapacity,
Rev: types.NamespacedName{namespace, revName},
ClusterIPDest: "129.0.0.1:1234",
Dests: dests(breakerParams.InitialCapacity),
}

handler := &activationHandler{
Expand Down
22 changes: 7 additions & 15 deletions pkg/activator/net/revision_backends.go
Expand Up @@ -58,9 +58,6 @@ type RevisionDestsUpdate struct {
Rev types.NamespacedName
ClusterIPDest string
Dests []string

// Number of addresses marked ready in the private endpoint for this revision
ReadyAddressCount int
}

const (
Expand Down Expand Up @@ -196,7 +193,7 @@ func (rw *revisionWatcher) probePodIPs(dests []string) (sets.String, error) {
return hs, err
}

func (rw *revisionWatcher) sendUpdate(update *RevisionDestsUpdate) {
func (rw *revisionWatcher) sendUpdate(clusterIP string, dests []string) {
select {
case <-rw.doneCh:
// We're not closing updateCh because this would result in 1 close per revisionWatcher.
Expand All @@ -205,7 +202,7 @@ func (rw *revisionWatcher) sendUpdate(update *RevisionDestsUpdate) {
// TODO(greghaynes) find a way to explicitly close the channel. Potentially use channel per watcher.
return
default:
rw.updateCh <- update
rw.updateCh <- &RevisionDestsUpdate{Rev: rw.rev, ClusterIPDest: clusterIP, Dests: dests}
}
}

Expand All @@ -218,8 +215,8 @@ func (rw *revisionWatcher) checkDests(dests []string) {

rw.logger.Debug("ClusterIP is no longer healthy.")

// Send update that we are now inactive.
rw.sendUpdate(&RevisionDestsUpdate{Rev: rw.rev})
// Send update that we are now inactive (both params invalid).
rw.sendUpdate("", nil)
return
}

Expand All @@ -234,7 +231,7 @@ func (rw *revisionWatcher) checkDests(dests []string) {
if rw.clusterIPHealthy {
// cluster IP is healthy and we haven't scaled down, short circuit.
rw.logger.Debugf("ClusterIP %s already probed (backends: %d)", dest, len(dests))
rw.sendUpdate(&RevisionDestsUpdate{Rev: rw.rev, ClusterIPDest: dest, ReadyAddressCount: len(dests)})
rw.sendUpdate(dest, dests)
return
}

Expand All @@ -245,7 +242,7 @@ func (rw *revisionWatcher) checkDests(dests []string) {
rw.logger.Debugf("ClusterIP is successfully probed: %s (backends: %d)", dest, len(dests))
rw.clusterIPHealthy = true
rw.healthyPods = nil
rw.sendUpdate(&RevisionDestsUpdate{Rev: rw.rev, ClusterIPDest: dest, ReadyAddressCount: len(dests)})
rw.sendUpdate(dest, dests)
return
}

Expand All @@ -257,13 +254,8 @@ func (rw *revisionWatcher) checkDests(dests []string) {

rw.logger.Debugf("Done probing, got %d healthy pods", len(hs))
if !reflect.DeepEqual(rw.healthyPods, hs) {
destsUpdate := &RevisionDestsUpdate{
Rev: rw.rev,
Dests: hs.UnsortedList(),
ReadyAddressCount: len(dests),
}
rw.healthyPods = hs
rw.sendUpdate(destsUpdate)
rw.sendUpdate("" /*clusterIP not ready yet*/, hs.UnsortedList())
}
}

Expand Down
33 changes: 14 additions & 19 deletions pkg/activator/net/revision_backends_test.go
Expand Up @@ -115,7 +115,7 @@ func TestRevisionWatcher(t *testing.T) {
Port: 1234,
},
clusterIP: "129.0.0.1",
expectUpdates: []RevisionDestsUpdate{{Dests: []string{"128.0.0.1:1234"}, ReadyAddressCount: 1}},
expectUpdates: []RevisionDestsUpdate{{Dests: []string{"128.0.0.1:1234"}}},
probeResponses: []activatortest.FakeResponse{{
Err: errors.New("clusterIP transport error"),
}, {
Expand All @@ -132,7 +132,7 @@ func TestRevisionWatcher(t *testing.T) {
Port: 1234,
},
clusterIP: "129.0.0.1",
expectUpdates: []RevisionDestsUpdate{{Dests: []string{"128.0.0.1:1234"}, ReadyAddressCount: 1}},
expectUpdates: []RevisionDestsUpdate{{Dests: []string{"128.0.0.1:1234"}}},
probeHostResponses: map[string][]activatortest.FakeResponse{
"129.0.0.1:1234": {{
Err: errors.New("clusterIP transport error"),
Expand All @@ -152,7 +152,7 @@ func TestRevisionWatcher(t *testing.T) {
Port: 1234,
},
clusterIP: "129.0.0.1",
expectUpdates: []RevisionDestsUpdate{{ClusterIPDest: "129.0.0.1:1234", ReadyAddressCount: 1}},
expectUpdates: []RevisionDestsUpdate{{ClusterIPDest: "129.0.0.1:1234", Dests: []string{"128.0.0.1:1234"}}},
probeHostResponses: map[string][]activatortest.FakeResponse{
"129.0.0.1:1234": {{
Err: nil,
Expand Down Expand Up @@ -206,7 +206,7 @@ func TestRevisionWatcher(t *testing.T) {
Port: 1234,
},
clusterIP: "129.0.0.1",
expectUpdates: []RevisionDestsUpdate{{Dests: []string{"128.0.0.1:1234"}, ReadyAddressCount: 1}},
expectUpdates: []RevisionDestsUpdate{{Dests: []string{"128.0.0.1:1234"}}},
probeResponses: []activatortest.FakeResponse{{
Err: errors.New("clusterIP transport error"),
}, {
Expand All @@ -229,7 +229,7 @@ func TestRevisionWatcher(t *testing.T) {
Port: 1234,
},
clusterIP: "129.0.0.1",
expectUpdates: []RevisionDestsUpdate{{Dests: []string{"128.0.0.1:1234", "128.0.0.2:1234"}, ReadyAddressCount: 2}},
expectUpdates: []RevisionDestsUpdate{{Dests: []string{"128.0.0.1:1234", "128.0.0.2:1234"}}},
probeResponses: []activatortest.FakeResponse{{
Err: errors.New("clusterIP transport error"),
}, {
Expand All @@ -245,7 +245,7 @@ func TestRevisionWatcher(t *testing.T) {
Port: 1234,
},
clusterIP: "129.0.0.1",
expectUpdates: []RevisionDestsUpdate{{Dests: []string{"128.0.0.2:1234"}, ReadyAddressCount: 2}},
expectUpdates: []RevisionDestsUpdate{{Dests: []string{"128.0.0.2:1234"}}},
probeHostResponses: map[string][]activatortest.FakeResponse{
"129.0.0.1:1234": {{
Err: errors.New("clusterIP transport error"),
Expand All @@ -268,8 +268,8 @@ func TestRevisionWatcher(t *testing.T) {
},
clusterIP: "129.0.0.1",
expectUpdates: []RevisionDestsUpdate{
{Dests: []string{"128.0.0.1:1234"}, ReadyAddressCount: 1},
{ClusterIPDest: "129.0.0.1:1234", ReadyAddressCount: 1},
{Dests: []string{"128.0.0.1:1234"}},
{Dests: []string{"128.0.0.1:1234"}, ClusterIPDest: "129.0.0.1:1234"},
},
probeHostResponses: map[string][]activatortest.FakeResponse{
"129.0.0.1:1234": {{
Expand Down Expand Up @@ -448,8 +448,7 @@ func TestRevisionBackendManagerAddEndpoint(t *testing.T) {
},
expectDests: map[types.NamespacedName]RevisionDestsUpdate{
{Namespace: "test-namespace", Name: "test-revision"}: {
Dests: []string{"128.0.0.1:1234"},
ReadyAddressCount: 1,
Dests: []string{"128.0.0.1:1234"},
},
},
updateCnt: 1,
Expand Down Expand Up @@ -479,8 +478,7 @@ func TestRevisionBackendManagerAddEndpoint(t *testing.T) {
},
expectDests: map[types.NamespacedName]RevisionDestsUpdate{
{Namespace: "test-namespace", Name: "test-revision"}: {
Dests: []string{"128.0.0.1:1234"},
ReadyAddressCount: 1,
Dests: []string{"128.0.0.1:1234"},
},
},
updateCnt: 1,
Expand All @@ -506,12 +504,10 @@ func TestRevisionBackendManagerAddEndpoint(t *testing.T) {
},
expectDests: map[types.NamespacedName]RevisionDestsUpdate{
{Namespace: "test-namespace", Name: "test-revision1"}: {
Dests: []string{"128.0.0.1:1234"},
ReadyAddressCount: 1,
Dests: []string{"128.0.0.1:1234"},
},
{Namespace: "test-namespace", Name: "test-revision2"}: {
Dests: []string{"128.1.0.2:1235"},
ReadyAddressCount: 1,
Dests: []string{"128.1.0.2:1235"},
},
},
updateCnt: 2,
Expand Down Expand Up @@ -547,8 +543,8 @@ func TestRevisionBackendManagerAddEndpoint(t *testing.T) {
},
expectDests: map[types.NamespacedName]RevisionDestsUpdate{
{Namespace: "test-namespace", Name: "test-revision"}: {
ClusterIPDest: "129.0.0.1:1234",
ReadyAddressCount: 1,
ClusterIPDest: "129.0.0.1:1234",
Dests: []string{"128.0.0.1:1234"},
},
},
updateCnt: 2,
Expand All @@ -573,7 +569,6 @@ func TestRevisionBackendManagerAddEndpoint(t *testing.T) {
}},
},
expectDests: map[types.NamespacedName]RevisionDestsUpdate{},
updateCnt: 0,
}} {
t.Run(tc.name, func(t *testing.T) {
fakeRT := activatortest.FakeRoundTripper{
Expand Down
13 changes: 8 additions & 5 deletions pkg/activator/net/throttler.go
Expand Up @@ -232,9 +232,12 @@ func (rt *revisionThrottler) updateThrottleState(throttler *Throttler, backendCo
// This function will never be called in parallel but try can be called in parallel to this so we need
// to lock on updating concurrency / trackers
func (rt *revisionThrottler) handleUpdate(throttler *Throttler, update *RevisionDestsUpdate) {
rt.logger.Debugf("Handling update w/ %d ready and dests: %v", update.ReadyAddressCount, update.Dests)
if update.Dests != nil {
// Because this wont be called in parallel we can build a new podIPTrackers array before taking out a lock
rt.logger.Debugf("Handling update w/ %d ready and dests: %v", len(update.Dests), update.Dests)

// ClusterIP is not yet ready, so we want to send requests directly to the pods.
// NB: this will not be called in parallel, thus we can build a new podIPTrackers
// array before taking out a lock.
if update.ClusterIPDest == "" {
// Create a map for fast lookup of existing trackers
trackersMap := make(map[string]*podIPTracker, len(rt.podIPTrackers))
for _, tracker := range rt.podIPTrackers {
Expand All @@ -252,11 +255,11 @@ func (rt *revisionThrottler) handleUpdate(throttler *Throttler, update *Revision
trackers = append(trackers, tracker)
}

rt.updateThrottleState(throttler, update.ReadyAddressCount, trackers, "")
rt.updateThrottleState(throttler, len(update.Dests), trackers, "" /*clusterIP*/)
return
}

rt.updateThrottleState(throttler, update.ReadyAddressCount, nil, update.ClusterIPDest)
rt.updateThrottleState(throttler, len(update.Dests), nil /*trackers*/, update.ClusterIPDest)
}

// Throttler load balances requests to revisions based on capacity. When `Run` is called it listens for
Expand Down
45 changes: 20 additions & 25 deletions pkg/activator/net/throttler_test.go
Expand Up @@ -61,9 +61,8 @@ func TestThrottler(t *testing.T) {
revision(types.NamespacedName{"test-namespace", "test-revision"}, networking.ProtocolHTTP1),
},
initUpdates: []*RevisionDestsUpdate{{
Rev: types.NamespacedName{"test-namespace", "test-revision"},
Dests: []string{"128.0.0.1:1234"},
ReadyAddressCount: 1,
Rev: types.NamespacedName{"test-namespace", "test-revision"},
Dests: []string{"128.0.0.1:1234"},
}},
trys: []types.NamespacedName{
{Namespace: "test-namespace", Name: "test-revision"},
Expand All @@ -77,9 +76,9 @@ func TestThrottler(t *testing.T) {
revision(types.NamespacedName{"test-namespace", "test-revision"}, networking.ProtocolHTTP1),
},
initUpdates: []*RevisionDestsUpdate{{
Rev: types.NamespacedName{"test-namespace", "test-revision"},
ClusterIPDest: "129.0.0.1:1234",
ReadyAddressCount: 1,
Rev: types.NamespacedName{"test-namespace", "test-revision"},
ClusterIPDest: "129.0.0.1:1234",
Dests: []string{"128.0.0.1:1234"},
}},
trys: []types.NamespacedName{
{Namespace: "test-namespace", Name: "test-revision"},
Expand All @@ -93,9 +92,8 @@ func TestThrottler(t *testing.T) {
revision(types.NamespacedName{"test-namespace", "test-revision"}, networking.ProtocolHTTP1),
},
initUpdates: []*RevisionDestsUpdate{{
Rev: types.NamespacedName{"test-namespace", "test-revision"},
Dests: []string{"128.0.0.1:1234", "128.0.0.2:1234"},
ReadyAddressCount: 2,
Rev: types.NamespacedName{"test-namespace", "test-revision"},
Dests: []string{"128.0.0.1:1234", "128.0.0.2:1234"},
}},
trys: []types.NamespacedName{
{Namespace: "test-namespace", Name: "test-revision"},
Expand All @@ -106,18 +104,17 @@ func TestThrottler(t *testing.T) {
{Dest: "128.0.0.2:1234"},
},
}, {
name: "multiple clusterip requests after podip",
name: "multiple ClusterIP requests after PodIP",
revisions: []*v1alpha1.Revision{
revision(types.NamespacedName{"test-namespace", "test-revision"}, networking.ProtocolHTTP1),
},
initUpdates: []*RevisionDestsUpdate{{
Rev: types.NamespacedName{"test-namespace", "test-revision"},
Dests: []string{"128.0.0.1:1234", "128.0.0.2:1234"},
ReadyAddressCount: 2,
Rev: types.NamespacedName{"test-namespace", "test-revision"},
Dests: []string{"128.0.0.1:1234", "128.0.0.2:1234"},
}, {
Rev: types.NamespacedName{"test-namespace", "test-revision"},
ClusterIPDest: "129.0.0.1:1234",
ReadyAddressCount: 2,
Rev: types.NamespacedName{"test-namespace", "test-revision"},
ClusterIPDest: "129.0.0.1:1234",
Dests: []string{"128.0.0.1:1234", "128.0.0.2:1234"},
}},
trys: []types.NamespacedName{
{Namespace: "test-namespace", Name: "test-revision"},
Expand All @@ -133,9 +130,9 @@ func TestThrottler(t *testing.T) {
revision(types.NamespacedName{"test-namespace", "test-revision"}, networking.ProtocolHTTP1),
},
initUpdates: []*RevisionDestsUpdate{{
Rev: types.NamespacedName{"test-namespace", "test-revision"},
ClusterIPDest: "129.0.0.1:1234",
ReadyAddressCount: 1,
Rev: types.NamespacedName{"test-namespace", "test-revision"},
ClusterIPDest: "129.0.0.1:1234",
Dests: []string{"128.0.0.1:1234"},
}},
trys: []types.NamespacedName{
{Namespace: "test-namespace", Name: "test-revision"},
Expand All @@ -151,9 +148,8 @@ func TestThrottler(t *testing.T) {
revision(types.NamespacedName{"test-namespace", "test-revision"}, networking.ProtocolHTTP1),
},
initUpdates: []*RevisionDestsUpdate{{
Rev: types.NamespacedName{"test-namespace", "test-revision"},
Dests: []string{"128.0.0.1:1234"},
ReadyAddressCount: 1,
Rev: types.NamespacedName{"test-namespace", "test-revision"},
Dests: []string{"128.0.0.1:1234"},
}},
deletes: []types.NamespacedName{
{"test-namespace", "test-revision"},
Expand Down Expand Up @@ -260,9 +256,8 @@ func TestMultipleActivator(t *testing.T) {
revID := types.NamespacedName{"test-namespace", "test-revision"}
updateCh := make(chan *RevisionDestsUpdate, 10)
updateCh <- &RevisionDestsUpdate{
Rev: revID,
Dests: []string{"128.0.0.1:1234"},
ReadyAddressCount: 3,
Rev: revID,
Dests: []string{"128.0.0.1:1234", "128.0.0.2:1234", "128.0.0.23:1234"},
}
close(updateCh)
throttler.Run(updateCh)
Expand Down