Skip to content

Commit

Permalink
[rotor] delay proxy deregistration on disconnect (fixes #5792) (#5941)
Browse files Browse the repository at this point in the history
This change fixes the pathology of an envoy in config loop causing a large spike in API requests.

Inserting the delay itself is straightforward, and done as a Registrar decorator. Since registration/deregistration is idempotent, adding a delay in deregistration is sufficient; if another proxy comes along and registers during the delay, it gets the still present proxy configuration, and when the deregistration fires it just decrements the registration count.

Doing this uncovered another problem in caching consumer, which is that there may be multiple requests pending for a given proxy on a stream, we were making no effort to track these, and we leak them on stream disconnect.

So we introduce a `streamRefs`  struct to track this. When a request is sent on a stream, we add a proxy ref for that stream ID. When a response is sent, we remove it. Most importantly, when the stream is closed, we remove all refs for the ID, which ensures that everything is properly deregistered.

Along the way, factor out `onRequest` and `onResponse` from `OnFetchRequest` and `OnFetchResponse`, so they can be called with the correct stream ID from `OnStreamRequest` and `OnStreamResponse`. StreamIDs increment from 1, so a value of -1 is fine from the Fetch methods.

Also, add some debug logging, and beef up test coverage.

fbshipit-source-id: d8f2ba2
  • Loading branch information
9len authored and tbnbot committed Oct 25, 2018
1 parent 77df51c commit a71a8c8
Show file tree
Hide file tree
Showing 6 changed files with 689 additions and 41 deletions.
7 changes: 5 additions & 2 deletions updater_fromflags.go
Expand Up @@ -37,7 +37,10 @@ import (
"github.com/turbinelabs/stats"
)

const clientApp = apiclient.App("github.com/turbinelabs/rotor")
const (
clientApp = apiclient.App("github.com/turbinelabs/rotor")
deregDelay = 10 * time.Second
)

// UpdaterFromFlags produces a fully-configured updater.Updater. It differs from
// updater.FromFlags in that it will also configure the API and potentially an
Expand Down Expand Up @@ -242,7 +245,7 @@ func (ff *updaterFromFlags) mkSvcAndXDS() (service.All, adapter.XDS, error) {

statsClient.AddTags(stats.NewKVTag(stats.ProxyVersionTag, constants.TbnPublicVersion))

registrar := poller.NewRegistrar(svc)
registrar := poller.NewDelayedRegistrar(poller.NewRegistrar(svc), deregDelay)
xds, err := ff.xdsFromFlags.Make(registrar)
if err != nil {
return nil, nil, err
Expand Down
151 changes: 140 additions & 11 deletions xds/adapter/caching_consumer.go
Expand Up @@ -18,6 +18,7 @@ package adapter

import (
"strings"
"sync"
"time"

"github.com/envoyproxy/go-control-plane/envoy/api/v2"
Expand All @@ -29,7 +30,7 @@ import (
"github.com/turbinelabs/rotor/xds/poller"
)

//go:generate $TBN_HOME/scripts/mockgen_internal.sh -type cachingConsumer -source $GOFILE -destination mock_$GOFILE -package $GOPACKAGE --write_package_comment=false
//go:generate $TBN_HOME/scripts/mockgen_internal.sh -type cachingConsumer,streamRefsIface -source $GOFILE -destination mock_$GOFILE -package $GOPACKAGE --write_package_comment=false

// cachingConsumer implements poller.Consumer, consumes poller.Objects,
// receives callbacks from the go-control-plane server, and produces cache
Expand Down Expand Up @@ -59,18 +60,117 @@ func newCachingConsumer(
newListenerAdapter(loggingCluster),
provider,
),
streamRefs: newStreamRefs(),
getObjects: func(svc service.All, proxy api.Proxy) (*poller.Objects, error) {
return poller.NewRemote(svc).Objects(proxy.ProxyKey)
},
}
}

type streamRefsIface interface {
// Add the given proxy ref to the given stream
Add(streamID int64, ref service.ProxyRef)
// Remove the given proxy ref from the given stream
Remove(streamID int64, ref service.ProxyRef)
// RemoveAll removes all proxy refs, and returns a slice of refs to
// deregister. A given ref will appear as many times in the slice as it was
// added without being removed.
RemoveAll(streamID int64) []service.ProxyRef
}

func newStreamRefs() streamRefsIface {
return &streamRefs{
refs: map[int64]map[string]*streamRefEntry{},
}
}

// streamRefs tracks what streams are interested in which proxies.
type streamRefs struct {
sync.Mutex
refs map[int64]map[string]*streamRefEntry
}

type streamRefEntry struct {
ref service.ProxyRef
count uint
}

func (r *streamRefs) Add(streamID int64, ref service.ProxyRef) {
r.Lock()
defer r.Unlock()
k := ref.MapKey()

// add the stream if necessary
if r.refs[streamID] == nil {
r.refs[streamID] = map[string]*streamRefEntry{}
}

// add the entry if necessary
if r.refs[streamID][k] == nil {
r.refs[streamID][k] = &streamRefEntry{ref: ref}
}

// increment the count
r.refs[streamID][k].count++
}

func (r *streamRefs) Remove(streamID int64, ref service.ProxyRef) {
r.Lock()
defer r.Unlock()
k := ref.MapKey()

// nothing to do
if r.refs[streamID] == nil {
return
}

if r.refs[streamID][k] != nil {
// decrement the count for this proxy ref
r.refs[streamID][k].count--

// remove the proxy ref entry from the stream if the count is zero
if r.refs[streamID][k].count == 0 {
delete(r.refs[streamID], k)
}
}

// clean up the stream if no more entries
if len(r.refs[streamID]) == 0 {
delete(r.refs, streamID)
}
}

func (r *streamRefs) RemoveAll(streamID int64) []service.ProxyRef {
r.Lock()
defer r.Unlock()

var result []service.ProxyRef

// nothing to do
if r.refs[streamID] == nil {
return result
}

// append each ref as many times as the value of the count
for _, ref := range r.refs[streamID] {
for i := uint(0); i < ref.count; i++ {
result = append(result, ref.ref)
}
}

// clean up the stream
delete(r.refs, streamID)

return result
}

// registeringCachingConsumer signals its desire to consume objects by
// registering with a poller.Registrar
type registeringCachingConsumer struct {
cache snapshotCache
registrar poller.Registrar
adapt snapshotAdapter
streamRefs streamRefsIface
getObjects func(service.All, api.Proxy) (*poller.Objects, error)
}

Expand All @@ -88,33 +188,47 @@ func (c registeringCachingConsumer) Consume(objs *poller.Objects) error {

// OnStreamOpen implements
// go-control-plane/pkg/server/Callbacks.OnStreamOpen
func (c registeringCachingConsumer) OnStreamOpen(int64, string) {}
func (c registeringCachingConsumer) OnStreamOpen(streamID int64, streamType string) {
console.Debug().Println("stream open: ", streamID, streamType)
}

// OnStreamClosed implements
// go-control-plane/pkg/server/Callbacks.OnStreamClosed
func (c registeringCachingConsumer) OnStreamClosed(int64) {}
func (c registeringCachingConsumer) OnStreamClosed(streamID int64) {
for _, ref := range c.streamRefs.RemoveAll(streamID) {
c.deregister(ref)
}
console.Debug().Println("stream closed: ", streamID)
}

// OnStreamRequest implements
// go-control-plane/pkg/server/Callbacks.OnStreamRequest
func (c registeringCachingConsumer) OnStreamRequest(_ int64, req *v2.DiscoveryRequest) {
c.OnFetchRequest(req)
func (c registeringCachingConsumer) OnStreamRequest(streamID int64, req *v2.DiscoveryRequest) {
c.streamRefs.Add(streamID, proxyRefFromNode(req.GetNode()))
c.onRequest(streamID, req)
}

// OnStreamResponse implements
// go-control-plane/pkg/server/Callbacks.OnStreamResponse
func (c registeringCachingConsumer) OnStreamResponse(
_ int64,
streamID int64,
req *v2.DiscoveryRequest,
resp *v2.DiscoveryResponse,
) {
c.OnFetchResponse(req, resp)
c.onResponse(streamID, req, resp)
c.streamRefs.Remove(streamID, proxyRefFromNode(req.GetNode()))
}

// OnFetchRequest implements
// go-control-plane/pkg/server/Callbacks.OnFetchRequest
func (c registeringCachingConsumer) OnFetchRequest(req *v2.DiscoveryRequest) {
c.onRequest(-1, req)
}

func (c registeringCachingConsumer) onRequest(streamID int64, req *v2.DiscoveryRequest) {
console.Debug().Printf(`
-----------
STREAM: %d
RECEIVED: %s
NODE: %s
CLUSTER: %s
Expand All @@ -123,6 +237,7 @@ func (c registeringCachingConsumer) OnFetchRequest(req *v2.DiscoveryRequest) {
NONCE: %s
VERSION: %s
`,
streamID,
req.GetTypeUrl(),
req.GetNode().GetId(),
req.GetNode().GetCluster(),
Expand Down Expand Up @@ -155,20 +270,34 @@ func (c registeringCachingConsumer) OnFetchRequest(req *v2.DiscoveryRequest) {
func (c registeringCachingConsumer) OnFetchResponse(
req *v2.DiscoveryRequest,
resp *v2.DiscoveryResponse,
) {
c.onResponse(-1, req, resp)
}

func (c registeringCachingConsumer) onResponse(
streamID int64,
req *v2.DiscoveryRequest,
resp *v2.DiscoveryResponse,
) {
console.Debug().Printf(
"Responding with type: %s, version: %s, resources: %d",
"Responding (%d) with type: %s, version: %s, resources: %d",
streamID,
resp.GetTypeUrl(),
resp.GetVersionInfo(),
len(resp.GetResources()),
)

pRef := proxyRefFromNode(req.GetNode())
c.deregister(pRef)
}

func (c registeringCachingConsumer) deregister(pRef service.ProxyRef) {
k := pRef.MapKey()
ifLast := func() {
console.Debug().Println("Last deregistration of", pRef.MapKey())
c.cache.ClearSnapshot(pRef.MapKey())
console.Debug().Println("Last deregistration of", k)
c.cache.ClearSnapshot(k)
}
if err := c.registrar.Deregister(pRef, ifLast); err != nil {
console.Error().Printf("Error deregistering node(%s): %s", pRef.MapKey(), err)
console.Error().Printf("Error deregistering node(%s): %s", k, err)
}
}

0 comments on commit a71a8c8

Please sign in to comment.