Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated delta FIFO doc #91435

Merged
merged 1 commit into from Jun 3, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
45 changes: 30 additions & 15 deletions staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
Expand Up @@ -41,7 +41,7 @@ import (
// affects error retrying.
// NOTE: It is possible to misuse this and cause a race when using an
// external known object source.
// Whether there is a potential race depends on how the comsumer
// Whether there is a potential race depends on how the consumer
// modifies knownObjects. In Pop(), process function is called under
// lock, so it is safe to update data structures in it that need to be
// in sync with the queue (e.g. knownObjects).
Expand Down Expand Up @@ -99,7 +99,7 @@ type DeltaFIFOOptions struct {
EmitDeltaTypeReplaced bool
}

// NewDeltaFIFOWithOptions returns a Store which can be used process changes to
// NewDeltaFIFOWithOptions returns a Queue which can be used to process changes to
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A Queue is indeed a Store. But I think DeltaFIFO is modeled as a Queue, as enforced in code.

_ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah sorry I thought I deleted that comment before I sent the review but I'm failing at github today apparently!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You did! I read your comment from email.:P

// items. See also the comment on DeltaFIFO.
func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
if opts.KeyFunction == nil {
Expand Down Expand Up @@ -144,7 +144,8 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
//
// DeltaFIFO's Pop(), Get(), and GetByKey() methods return
// interface{} to satisfy the Store/Queue interfaces, but they
// will always return an object of type Deltas.
// will always return an object of type Deltas. List() returns
// the newest objects currently in the FIFO.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"the newest objects currently in the FIFO" is kind of vague. More precise wording might be something like "the newest object from each accumulator" or "... each accumulator in the FIFO".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that's better wording.

//
// A DeltaFIFO's knownObjects KeyListerGetter provides the abilities
// to list Store keys and to get objects by Store key. The objects in
Expand All @@ -160,14 +161,16 @@ type DeltaFIFO struct {
lock sync.RWMutex
cond sync.Cond

// We depend on the property that items in the set are in
// the queue and vice versa, and that all Deltas in this
// map have at least one Delta.
// `items` maps keys to Deltas.
// `queue` maintains FIFO order of keys for consumption in Pop().
// We maintain the property that keys in the `items` and `queue` are
// strictly 1:1 mapping, and that all Deltas in `items` should have
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(a) It might be worth more explicitly emphasizing that a given key appears at most once in the queue. The code enforces this and relies on this, except as follows.

(b) In DeltaFIFO::queueActionLocked, it is contemplated that len(newDeltas) == 0 and in this case the code asserts that is OK to remove the key from f.items but not f.queue. There are also some other places that read these things and are coded as if a key can be in f.queue but not f.items. In fact it is never OK to for a key to appear in f.queue but not f.items.

Copy link
Contributor Author

@jqmichael jqmichael Jun 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keys in the items and queue are strictly 1:1 mapping

Yeah, since items is a map, keys are always unique. So the 1:1 mapping in queue also implies the keys in queue are unique. That could be more explicitly called out as well.

// at least one Delta.
items map[string]Deltas
queue []string

// populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update was called first.
// or Delete/Add/Update/AddIfNotPresent was called first.
populated bool
// initialPopulationCount is the number of items inserted by the first call of Replace()
initialPopulationCount int
Expand All @@ -180,7 +183,6 @@ type DeltaFIFO struct {
// Replace(), and Resync()
knownObjects KeyListerGetter

// Indication the queue is closed.
// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
// Currently, not used to gate any of CRED operations.
closed bool
Expand Down Expand Up @@ -225,7 +227,7 @@ func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
}

// HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first,
// or an Update called first but the first batch of items inserted by Replace() has been popped
// or the first batch of items inserted by Replace() has been popped.
func (f *DeltaFIFO) HasSynced() bool {
f.lock.Lock()
defer f.lock.Unlock()
Expand Down Expand Up @@ -282,6 +284,7 @@ func (f *DeltaFIFO) Delete(obj interface{}) error {
}
}

// exist in items and/or KnownObjects
return f.queueActionLocked(Deleted, obj)
}

Expand Down Expand Up @@ -332,6 +335,11 @@ func dedupDeltas(deltas Deltas) Deltas {
a := &deltas[n-1]
b := &deltas[n-2]
if out := isDup(a, b); out != nil {
// `a` and `b` are duplicates. Only keep the one returned from isDup().
// TODO: This extra array allocation and copy seems unnecessary if
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing proposed by this TODO is potentially risky depending on who might have taken a pointer. I'd omit or include a disclaimer / rephrase to "might be worth profiling and investigating if it is safe to optimize"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added disclaimer. I had no intention to make premature optimization unless profiling reveals this is a bottleneck for memory allocation.

// all we do to dedup is compare the new delta with the last element
// in `items`, which could be done by mutating `items` directly.
// Might be worth profiling and investigating if it is safe to optimize.
d := append(Deltas{}, deltas[:n-2]...)
return append(d, *out)
}
Expand Down Expand Up @@ -456,10 +464,12 @@ func (f *DeltaFIFO) IsClosed() bool {
// added/updated. The item is removed from the queue (and the store) before it
// is returned, so if you don't successfully process it, you need to add it back
// with AddIfNotPresent().
// process function is called under lock, so it is safe update data structures
// process function is called under lock, so it is safe to update data structures
// in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
// may return an instance of ErrRequeue with a nested error to indicate the current
// item should be requeued (equivalent to calling AddIfNotPresent under the lock).
// process should avoid expensive I/O operation so that other queue operations, i.e.
// Add() and Get(), won't be blocked for too long.
//
// Pop returns a 'Deltas', which has a complete list of all the things
// that happened to the object (deltas) while it was sitting in the queue.
Expand Down Expand Up @@ -520,6 +530,7 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
action = Replaced
}

// Add Sync/Replaced action for each new item.
for _, item := range list {
key, err := f.KeyOf(item)
if err != nil {
Expand All @@ -538,6 +549,9 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
if keys.Has(k) {
continue
}
// Delete pre-existing items not in the new list.
// This could happen if watch deletion event was missed while
// disconnected from apiserver.
var deletedObj interface{}
if n := oldItem.Newest(); n != nil {
deletedObj = n.Object
Expand Down Expand Up @@ -649,7 +663,8 @@ type KeyLister interface {

// A KeyGetter is anything that knows how to get the value stored under a given key.
type KeyGetter interface {
GetByKey(key string) (interface{}, bool, error)
// GetByKey returns the value associated with the key, or sets exists=false.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe name the return parameters?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Done.

GetByKey(key string) (value interface{}, exists bool, err error)
}

// DeltaType is the type of a change (addition, deletion, etc)
Expand Down Expand Up @@ -712,10 +727,10 @@ func copyDeltas(d Deltas) Deltas {
return d2
}

// DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where
// an object was deleted but the watch deletion event was missed. In this
// case we don't know the final "resting" state of the object, so there's
// a chance the included `Obj` is stale.
// DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where an object
// was deleted but the watch deletion event was missed while disconnected from
// apiserver. In this case we don't know the final "resting" state of the object, so
// there's a chance the included `Obj` is stale.
type DeletedFinalStateUnknown struct {
Key string
Obj interface{}
Expand Down