Skip to content

Commit

Permalink
WIP: NodeResourceTopology: add cache impl
Browse files Browse the repository at this point in the history
Implement the optional cache.
Depends on the reserve lugin to be enabled.
Proper docs TBD.

integration tests will be added in a followup commit.

Signed-off-by: Francesco Romani <fromani@redhat.com>
  • Loading branch information
ffromani committed Jun 21, 2022
1 parent 32b46f7 commit 74e4b1b
Show file tree
Hide file tree
Showing 12 changed files with 1,616 additions and 37 deletions.
2 changes: 2 additions & 0 deletions apis/config/types.go
Expand Up @@ -146,6 +146,8 @@ type NodeResourceTopologyMatchArgs struct {

// ScoringStrategy a scoring model that determine how the plugin will score the nodes.
ScoringStrategy ScoringStrategy
// If > 0, enables the caching facilities of the reserve plugin - which must be enabled
CacheResyncPeriodSeconds int64
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Expand Up @@ -6,6 +6,7 @@ require (
github.com/dustin/go-humanize v1.0.0
github.com/google/go-cmp v0.5.5
github.com/k8stopologyawareschedwg/noderesourcetopology-api v0.0.12
github.com/k8stopologyawareschedwg/podfingerprint v0.0.3
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/paypal/load-watcher v0.2.2
github.com/spf13/pflag v1.0.5
Expand All @@ -30,6 +31,7 @@ require (
require (
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/NYTimes/gziphandler v1.1.1 // indirect
github.com/OneOfOne/xxhash v1.2.8 // indirect
github.com/PuerkitoBio/purell v1.1.1 // indirect
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/beorn7/perks v1.0.1 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Expand Up @@ -75,6 +75,8 @@ github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb0
github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cqUQ3I=
github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8=
github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
github.com/PuerkitoBio/purell v1.1.1 h1:WEQqlqaGbrPkxLJWfBwQmfEAE1Z7ONdDLqrN38tNFfI=
github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M=
Expand Down Expand Up @@ -423,6 +425,8 @@ github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8
github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=
github.com/k8stopologyawareschedwg/noderesourcetopology-api v0.0.12 h1:NhXbOjO1st8hIcVpegr3zw/AGG12vs3z//tCDDcfPpE=
github.com/k8stopologyawareschedwg/noderesourcetopology-api v0.0.12/go.mod h1:AkACMQGiTgCt0lQw3m7TTU8PLH9lYKNK5e9DqFf5VuM=
github.com/k8stopologyawareschedwg/podfingerprint v0.0.3 h1:UPq+pw4iElwAjYbNekLUyy8NeqFxTQAOO4IkQmV3bwo=
github.com/k8stopologyawareschedwg/podfingerprint v0.0.3/go.mod h1:C23pM15t06dXg/OihGlqBvnYzLr+MXDXJ7zMfbNAyXI=
github.com/karrick/godirwalk v1.16.1/go.mod h1:j4mkqPuvaLI8mp1DroR3P6ad7cyYd4c1qeJ3RV7ULlk=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
Expand Down
165 changes: 160 additions & 5 deletions pkg/noderesourcetopology/cache.go
Expand Up @@ -17,18 +17,24 @@ limitations under the License.
package noderesourcetopology

import (
"errors"
"fmt"
"sync"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/klog/v2"

topologyv1alpha1 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha1"
listerv1alpha1 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/listers/topology/v1alpha1"
"github.com/k8stopologyawareschedwg/podfingerprint"
)

type Cache interface {
GetByNode(nodeName string) *topologyv1alpha1.NodeResourceTopology
MarkNodeDiscarded(nodeName string)
ReserveNodeResources(nodeName string, pod *corev1.Pod) error
ReleaseNodeResources(nodeName string, pod *corev1.Pod) error
ReserveNodeResources(nodeName string, pod *corev1.Pod)
ReleaseNodeResources(nodeName string, pod *corev1.Pod)
}

type PassthroughCache struct {
Expand All @@ -45,6 +51,155 @@ func (pt PassthroughCache) GetByNode(nodeName string) *topologyv1alpha1.NodeReso
return nrt
}

func (pt PassthroughCache) MarkNodeDiscarded(nodeName string) {}
func (pt PassthroughCache) ReserveNodeResources(nodeName string, pod *corev1.Pod) error { return nil }
func (pt PassthroughCache) ReleaseNodeResources(nodeName string, pod *corev1.Pod) error { return nil }
func (pt PassthroughCache) MarkNodeDiscarded(nodeName string) {}
func (pt PassthroughCache) ReserveNodeResources(nodeName string, pod *corev1.Pod) {}
func (pt PassthroughCache) ReleaseNodeResources(nodeName string, pod *corev1.Pod) {}

// this is only to make it trivial to test, cache.Indexer works just fine
type IndexGetter interface {
ByIndex(indexName, indexedValue string) ([]interface{}, error)
}

type NRTCache struct {
lock sync.RWMutex
nrts nrtStore
assumedResources map[string]resourceStore
nodeDiscarded counter
lister listerv1alpha1.NodeResourceTopologyLister
indexer IndexGetter
}

func newNRTCache(lister listerv1alpha1.NodeResourceTopologyLister, indexer IndexGetter) (*NRTCache, error) {
if lister == nil || indexer == nil {
return nil, fmt.Errorf("NRT cache received nil references")
}

nrtObjs, err := lister.List(labels.Nothing())
if err != nil {
return nil, err
}
return &NRTCache{
nrts: newNrtStore(nrtObjs),
assumedResources: make(map[string]resourceStore),
nodeDiscarded: make(counter),
lister: lister,
indexer: indexer,
}, nil
}

func (cc *NRTCache) DirtyNodeNames() []string {
cc.lock.RLock()
defer cc.lock.RUnlock()
var nodes []string
for node := range cc.assumedResources {
if !cc.nodeDiscarded.IsSet(node) {
// noone asked about this node, let it be
continue
}
nodes = append(nodes, node)
}
return nodes
}

func (cc *NRTCache) MarkNodeDiscarded(nodeName string) {
cc.lock.Lock()
defer cc.lock.Unlock()
val := cc.nodeDiscarded.Incr(nodeName)
klog.V(6).InfoS("discarded", "node", nodeName, "count", val)
}

func (cc *NRTCache) GetByNode(nodeName string) *topologyv1alpha1.NodeResourceTopology {
cc.lock.RLock()
defer cc.lock.RUnlock()
nrt := cc.nrts.GetByNode(nodeName)
if nrt == nil {
return nil
}
nodeAssumedResources := cc.assumedResources[nodeName]
nodeAssumedResources.UpdateNRT(nrt)
return nrt
}

func (cc *NRTCache) ReserveNodeResources(nodeName string, pod *corev1.Pod) {
cc.lock.Lock()
defer cc.lock.Unlock()
nodeAssumedResources, ok := cc.assumedResources[nodeName]
if !ok {
nodeAssumedResources = make(resourceStore)
cc.assumedResources[nodeName] = nodeAssumedResources
}

nodeAssumedResources.AddPod(pod)

cc.nodeDiscarded.Delete(nodeName)
klog.V(6).InfoS("reset discard counter", "node", nodeName)
}

func (cc *NRTCache) ReleaseNodeResources(nodeName string, pod *corev1.Pod) {
cc.lock.Lock()
defer cc.lock.Unlock()
nodeAssumedResources, ok := cc.assumedResources[nodeName]
if !ok {
// this should not happen, so we're vocal about it
// we don't return error because not much to do to recover anyway
klog.V(3).InfoS("no resources tracked", "node", nodeName)
}
nodeAssumedResources.DeletePod(pod)
}

func (cc *NRTCache) Resync() {
klog.V(6).Infof("resyncing NodeTopology cache")

nodeNames := cc.DirtyNodeNames()

var nrtUpdates []*topologyv1alpha1.NodeResourceTopology
for _, nodeName := range nodeNames {
nrtCandidate, err := cc.lister.Get(nodeName)
if err != nil || nrtCandidate == nil {
klog.V(3).InfoS("missing NodeTopology", "node", nodeName)
continue
}

pfpExpected := podFingerprintForNodeTopology(nrtCandidate)
if pfpExpected == "" {
klog.V(3).InfoS("missing NodeTopology podset fingerprint data", "node", nodeName)
continue
}

klog.V(6).InfoS("trying to resync NodeTopology", "node", nodeName, "fingerprint", pfpExpected)

err = checkPodFingerprintForNode(cc.indexer, nodeName, pfpExpected)
if errors.Is(err, podfingerprint.ErrSignatureMismatch) {
// can happen, not critical
klog.V(6).InfoS("NodeTopology podset fingerprint mismatch", "node", nodeName)
continue
}
if err != nil {
// should never happen, let's be vocal
klog.V(3).ErrorS(err, "checking NodeTopology podset fingerprint", "node", nodeName)
continue
}

nrtUpdates = append(nrtUpdates, nrtCandidate)
}

cc.FlushNodes(nrtUpdates...)

klog.V(6).Infof("resynced NodeTopology cache")
}

func (cc *NRTCache) FlushNodes(nrts ...*topologyv1alpha1.NodeResourceTopology) {
cc.lock.Lock()
defer cc.lock.Unlock()
for _, nrt := range nrts {
klog.V(6).InfoS("flushing", "node", nrt.Name)
cc.nrts.Update(nrt)
delete(cc.assumedResources, nrt.Name)
cc.nodeDiscarded.Delete(nrt.Name)
}
}

// to be used only in tests
func (cc *NRTCache) Store() *nrtStore {
return &cc.nrts
}

0 comments on commit 74e4b1b

Please sign in to comment.