Skip to content

Commit

Permalink
Allow multiple routers to update route status
Browse files Browse the repository at this point in the history
Previously, if multiple differently-named routers attempted to update
their respective Ingress entries in a route status, one would go
through, while the others would recieve conflict errors.

The last-touched cache, designed to prevent fighting between two
same-named routers (for example, on a rolling update), would get
the route added with the zero-time, meaning that the router would
not submit further updates for that route (and consequently that
router would never set the proper status on the route).

Now, conflicts no longer insert an entry into the last-touched cache.
Instead, the code now properly updates the last-touched time, which
prevents the fighting between instances of the same router while
tolerating conflicts inbetween instances of different routers.

Additionally, cache entries now expire after a period of time
(the "contention period"), as originally intended.
  • Loading branch information
DirectXMan12 committed Mar 31, 2016
1 parent dbd0f56 commit d5cdfdb
Show file tree
Hide file tree
Showing 2 changed files with 195 additions and 73 deletions.
42 changes: 34 additions & 8 deletions pkg/router/controller/status.go
Expand Up @@ -97,10 +97,16 @@ func findOrCreateIngress(route *routeapi.Route, name string) (_ *routeapi.RouteI
// setIngressCondition records the condition on the ingress, returning true if the ingress was changed and
// false if no modification was made.
func setIngressCondition(ingress *routeapi.RouteIngress, condition routeapi.RouteIngressCondition) bool {
for _, existing := range ingress.Conditions {
for i, existing := range ingress.Conditions {
// ensures that the comparison is based on the actual value, not the time
existing.LastTransitionTime = condition.LastTransitionTime
if existing == condition {
// This will always be the case if we're receiving an update on the host
// value (or the like), since findOrCreateIngress sets that for us. We
// still need to set the last-touched time so that others can tell we've
// modified this Ingress value
now := nowFn()
ingress.Conditions[i].LastTransitionTime = &now
return false
}
}
Expand Down Expand Up @@ -148,6 +154,12 @@ func (a *StatusAdmitter) hasIngressBeenTouched(route *routeapi.Route, lastTouch
return false
}
old, ok := a.expected.Get(route.UID)
if ok && old.(time.Time).Before(nowFn().Add(-a.contentionInterval)) {
// throw out cache entries from before the contention interval, in case this is no longer valid
// (e.g. the previous updater no longer exists due to scale down)
ok = false
}

if !ok || old.(time.Time).Equal(lastTouch.Time) {
return false
}
Expand All @@ -157,7 +169,7 @@ func (a *StatusAdmitter) hasIngressBeenTouched(route *routeapi.Route, lastTouch
// recordIngressTouch tracks whether the ingress record updated succeeded and returns true if the admitter can
// continue. Conflict errors are treated as no error, but indicate the touch was not successful and the caller
// should retry.
func (a *StatusAdmitter) recordIngressTouch(route *routeapi.Route, touch *unversioned.Time, err error) (bool, error) {
func (a *StatusAdmitter) recordIngressTouch(route *routeapi.Route, touch *unversioned.Time, oldTouch *unversioned.Time, err error) (bool, error) {
switch {
case err == nil:
if touch != nil {
Expand All @@ -167,12 +179,15 @@ func (a *StatusAdmitter) recordIngressTouch(route *routeapi.Route, touch *unvers
// if the router can't write status updates, allow the route to go through
case errors.IsForbidden(err):
glog.Errorf("Unable to write router status - please ensure you reconcile your system policy or grant this router access to update route status: %v", err)
if touch != nil {
a.expected.Add(route.UID, touch.Time)
if oldTouch != nil {
// record oldTouch so that if the problem gets rectified in the future,
// we can proceed as normal
a.expected.Add(route.UID, oldTouch.Time)
}
return true, nil
case errors.IsConflict(err):
a.expected.Add(route.UID, time.Time{})
// just follow the normal process, and retry when we receive the update notification due to
// the other entity updating the route.
return false, nil
}
return false, err
Expand All @@ -193,7 +208,18 @@ func (a *StatusAdmitter) admitRoute(oc client.RoutesNamespacer, route *routeapi.
}
}

if a.hasIngressBeenTouched(route, ingressConditionTouched(ingress)) {
// this works by keeping a cache of what time we last touched the route.
// If the recorded last-touch time matches ours, then we were the ones to do the
// last update, and can continue forth. Additionally, if we have no entry in our
// cache, we continue forward anyways. Since replicas from a new deployment will
// have no entry, they will update the last-touch time, and therefore take "ownership"
// of updating the route. In the case of a new route being created during a rolling update,
// there will be a race to determine whether the old or new deployment gets to determine,
// but this will be corrected on the next event after contentionInterval time.

// keep lastTouch around
lastTouch := ingressConditionTouched(ingress)
if a.hasIngressBeenTouched(route, lastTouch) {
glog.V(4).Infof("admit: observed a route update from someone else: route %s/%s has been updated to an inconsistent value, doing nothing", route.Namespace, route.Name)
return true, nil
}
Expand All @@ -204,7 +230,7 @@ func (a *StatusAdmitter) admitRoute(oc client.RoutesNamespacer, route *routeapi.
})
glog.V(4).Infof("admit: admitting route by updating status: %s (%t): %s", route.Name, updated, route.Spec.Host)
_, err := oc.Routes(route.Namespace).UpdateStatus(route)
return a.recordIngressTouch(route, ingress.Conditions[0].LastTransitionTime, err)
return a.recordIngressTouch(route, ingress.Conditions[0].LastTransitionTime, lastTouch, err)
}

// RecordRouteRejection attempts to update the route status with a reason for a route being rejected.
Expand All @@ -226,7 +252,7 @@ func (a *StatusAdmitter) RecordRouteRejection(route *routeapi.Route, reason, mes
}

_, err := a.client.Routes(route.Namespace).UpdateStatus(route)
_, err = a.recordIngressTouch(route, ingress.Conditions[0].LastTransitionTime, err)
_, err = a.recordIngressTouch(route, ingress.Conditions[0].LastTransitionTime, lastTouch, err)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to write route rejection to the status: %v", err))
}
Expand Down
226 changes: 161 additions & 65 deletions pkg/router/controller/status_test.go
Expand Up @@ -68,6 +68,39 @@ func TestStatusNoOp(t *testing.T) {
}
}

func checkResult(t *testing.T, err error, c *testclient.Fake, admitter *StatusAdmitter, targetHost string, targetObjTime unversioned.Time, targetCachedTime *time.Time, ingressInd int, actionInd int) *routeapi.Route {
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(c.Actions()) != actionInd+1 {
t.Fatalf("unexpected actions: %#v", c.Actions())
}
action := c.Actions()[actionInd]
if action.GetVerb() != "update" || action.GetResource() != "routes" || action.GetSubresource() != "status" {
t.Fatalf("unexpected action: %#v", action)
}
obj := c.Actions()[actionInd].(ktestclient.UpdateAction).GetObject().(*routeapi.Route)
if len(obj.Status.Ingress) != ingressInd+1 || obj.Status.Ingress[ingressInd].Host != targetHost {
t.Fatalf("expected route reset: expected %q / actual %q -- %#v", targetHost, obj.Status.Ingress[ingressInd].Host, obj)
}
condition := obj.Status.Ingress[ingressInd].Conditions[0]
if condition.LastTransitionTime == nil || *condition.LastTransitionTime != targetObjTime || condition.Status != kapi.ConditionTrue || condition.Reason != "" {
t.Fatalf("%s: unexpected condition: %#v", targetHost, condition)
}

if targetCachedTime == nil {
if v, ok := admitter.expected.Peek(types.UID("uid1")); ok {
t.Fatalf("expected empty time: %#v", v)
}
} else {
if v, ok := admitter.expected.Peek(types.UID("uid1")); !ok || !reflect.DeepEqual(v, *targetCachedTime) {
t.Fatalf("did not record last modification time: %#v %#v", admitter.expected, v)
}
}

return obj
}

func TestStatusResetsHost(t *testing.T) {
now := unversioned.Now()
nowFn = func() unversioned.Time { return now }
Expand All @@ -94,27 +127,8 @@ func TestStatusResetsHost(t *testing.T) {
},
},
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(c.Actions()) != 1 {
t.Fatalf("unexpected actions: %#v", c.Actions())
}
action := c.Actions()[0]
if action.GetVerb() != "update" || action.GetResource() != "routes" || action.GetSubresource() != "status" {
t.Fatalf("unexpected action: %#v", action)
}
obj := c.Actions()[0].(ktestclient.UpdateAction).GetObject().(*routeapi.Route)
if len(obj.Status.Ingress) != 1 || obj.Status.Ingress[0].Host != "route1.test.local" {
t.Fatalf("expected route reset: %#v", obj)
}
condition := obj.Status.Ingress[0].Conditions[0]
if condition.LastTransitionTime == nil || *condition.LastTransitionTime != touched || condition.Status != kapi.ConditionTrue || condition.Reason != "" {
t.Fatalf("unexpected condition: %#v", condition)
}
if v, ok := admitter.expected.Peek(types.UID("uid1")); !ok || !reflect.DeepEqual(v, touched.Time) {
t.Fatalf("did not record last modification time: %#v %#v", admitter.expected, v)
}

checkResult(t, err, c, admitter, "route1.test.local", now, &now.Time, 0, 0)
}

func TestStatusAdmitsRouteOnForbidden(t *testing.T) {
Expand Down Expand Up @@ -143,27 +157,7 @@ func TestStatusAdmitsRouteOnForbidden(t *testing.T) {
},
},
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(c.Actions()) != 1 {
t.Fatalf("unexpected actions: %#v", c.Actions())
}
action := c.Actions()[0]
if action.GetVerb() != "update" || action.GetResource() != "routes" || action.GetSubresource() != "status" {
t.Fatalf("unexpected action: %#v", action)
}
obj := c.Actions()[0].(ktestclient.UpdateAction).GetObject().(*routeapi.Route)
if len(obj.Status.Ingress) != 1 || obj.Status.Ingress[0].Host != "route1.test.local" {
t.Fatalf("expected route reset: %#v", obj)
}
condition := obj.Status.Ingress[0].Conditions[0]
if condition.LastTransitionTime == nil || *condition.LastTransitionTime != touched || condition.Status != kapi.ConditionTrue || condition.Reason != "" {
t.Fatalf("unexpected condition: %#v", condition)
}
if v, ok := admitter.expected.Peek(types.UID("uid1")); !ok || !reflect.DeepEqual(v, touched.Time) {
t.Fatalf("did not record last modification time: %#v %#v", admitter.expected, v)
}
checkResult(t, err, c, admitter, "route1.test.local", now, &touched.Time, 0, 0)
}

func TestStatusBackoffOnConflict(t *testing.T) {
Expand Down Expand Up @@ -192,28 +186,7 @@ func TestStatusBackoffOnConflict(t *testing.T) {
},
},
})
if len(c.Actions()) != 1 {
t.Fatalf("unexpected actions: %#v", c.Actions())
}
action := c.Actions()[0]
if action.GetVerb() != "update" || action.GetResource() != "routes" || action.GetSubresource() != "status" {
t.Fatalf("unexpected action: %#v", action)
}
obj := c.Actions()[0].(ktestclient.UpdateAction).GetObject().(*routeapi.Route)
if len(obj.Status.Ingress) != 1 || obj.Status.Ingress[0].Host != "route1.test.local" {
t.Fatalf("expected route reset: %#v", obj)
}
condition := obj.Status.Ingress[0].Conditions[0]
if condition.LastTransitionTime == nil || *condition.LastTransitionTime != now || condition.Status != kapi.ConditionTrue || condition.Reason != "" {
t.Fatalf("unexpected condition: %#v", condition)
}

if err != nil {
t.Fatalf("unexpected error: %#v", err)
}
if v, ok := admitter.expected.Peek(types.UID("uid1")); !ok || !reflect.DeepEqual(v, time.Time{}) {
t.Fatalf("expected empty time: %#v", v)
}
checkResult(t, err, c, admitter, "route1.test.local", now, nil, 0, 0)
}

func TestStatusRecordRejection(t *testing.T) {
Expand Down Expand Up @@ -373,11 +346,134 @@ func TestStatusRecordRejectionConflict(t *testing.T) {
if condition.LastTransitionTime == nil || *condition.LastTransitionTime != now || condition.Status != kapi.ConditionFalse || condition.Reason != "Failed" || condition.Message != "generic error" {
t.Fatalf("unexpected condition: %#v", condition)
}
if v, ok := admitter.expected.Peek(types.UID("uid1")); !ok || !reflect.DeepEqual(v, time.Time{}) {
if v, ok := admitter.expected.Peek(types.UID("uid1")); ok {
t.Fatalf("expected empty time: %#v", v)
}
}

func TestStatusFightBetweenReplicas(t *testing.T) {
p := &fakePlugin{}

// the initial pre-population
now1 := unversioned.Now()
nowFn = func() unversioned.Time { return now1 }
c1 := testclient.NewSimpleFake(&routeapi.Route{})
admitter1 := NewStatusAdmitter(p, c1, "test")
err := admitter1.HandleRoute(watch.Added, &routeapi.Route{
ObjectMeta: kapi.ObjectMeta{Name: "route1", Namespace: "default", UID: types.UID("uid1")},
Spec: routeapi.RouteSpec{Host: "route1.test.local"},
Status: routeapi.RouteStatus{},
})

outObj1 := checkResult(t, err, c1, admitter1, "route1.test.local", now1, &now1.Time, 0, 0)

// the new deployment's replica
now2 := unversioned.Time{Time: now1.Time.Add(time.Minute)}
nowFn = func() unversioned.Time { return now2 }
c2 := testclient.NewSimpleFake(&routeapi.Route{})
admitter2 := NewStatusAdmitter(p, c2, "test")
outObj1.Spec.Host = "route1.test-new.local"
err = admitter2.HandleRoute(watch.Added, outObj1)

outObj2 := checkResult(t, err, c2, admitter2, "route1.test-new.local", now2, &now2.Time, 0, 0)

now3 := unversioned.Time{Time: now1.Time.Add(time.Minute)}
nowFn = func() unversioned.Time { return now3 }
outObj2.Spec.Host = "route1.test.local"
err = admitter1.HandleRoute(watch.Modified, outObj2)

if err != nil {
t.Fatalf("unexpected error: %v", err)
}

// expect the last HandleRoute not to have performed any actions
if len(c1.Actions()) != 1 {
t.Fatalf("unexpected actions: %#v", c1.Actions())
}

}

func TestStatusFightBetweenRouters(t *testing.T) {
p := &fakePlugin{}

// initial try, results in conflict
now1 := unversioned.Now()
nowFn = func() unversioned.Time { return now1 }
touched1 := unversioned.Time{Time: now1.Add(-time.Minute)}
c1 := testclient.NewSimpleFake(&(errors.NewConflict(kapi.Resource("Route"), "route1", nil).(*errors.StatusError).ErrStatus), &routeapi.Route{})
admitter1 := NewStatusAdmitter(p, c1, "test2")
err := admitter1.HandleRoute(watch.Added, &routeapi.Route{
ObjectMeta: kapi.ObjectMeta{Name: "route1", Namespace: "default", UID: types.UID("uid1")},
Spec: routeapi.RouteSpec{Host: "route2.test-new.local"},
Status: routeapi.RouteStatus{
Ingress: []routeapi.RouteIngress{
{
Host: "route1.test.local",
RouterName: "test1",
Conditions: []routeapi.RouteIngressCondition{
{
Type: routeapi.RouteAdmitted,
Status: kapi.ConditionFalse,
LastTransitionTime: &touched1,
},
},
},
{
Host: "route1.test-new.local",
RouterName: "test2",
Conditions: []routeapi.RouteIngressCondition{
{
Type: routeapi.RouteAdmitted,
Status: kapi.ConditionFalse,
LastTransitionTime: &touched1,
},
},
},
},
},
})

checkResult(t, err, c1, admitter1, "route2.test-new.local", now1, nil, 1, 0)

// second try, result should be ok
now2 := unversioned.Now()
nowFn = func() unversioned.Time { return now2 }
touched2 := unversioned.Time{Time: now2.Add(-time.Minute)}
//c2 := testclient.NewSimpleFake(&routeapi.Route{})
err = admitter1.HandleRoute(watch.Added, &routeapi.Route{
ObjectMeta: kapi.ObjectMeta{Name: "route1", Namespace: "default", UID: types.UID("uid1")},
Spec: routeapi.RouteSpec{Host: "route2.test-new.local"},
Status: routeapi.RouteStatus{
Ingress: []routeapi.RouteIngress{
{
Host: "route2.test.local",
RouterName: "test1",
Conditions: []routeapi.RouteIngressCondition{
{
Type: routeapi.RouteAdmitted,
Status: kapi.ConditionFalse,
LastTransitionTime: &touched2,
},
},
},
{
Host: "route1.test-new.local",
RouterName: "test2",
Conditions: []routeapi.RouteIngressCondition{
{
Type: routeapi.RouteAdmitted,
Status: kapi.ConditionFalse,
LastTransitionTime: &touched1,
},
},
},
},
},
})

checkResult(t, err, c1, admitter1, "route2.test-new.local", now2, &now2.Time, 1, 1)
}

func TestFindOrCreateIngress(t *testing.T) {
route := &routeapi.Route{
Status: routeapi.RouteStatus{
Expand Down

0 comments on commit d5cdfdb

Please sign in to comment.