Skip to content

Commit

Permalink
add expire cache (#93)
Browse files Browse the repository at this point in the history
Signed-off-by: Jason Liu <jasonliu747@gmail.com>
  • Loading branch information
jasonliu747 committed Apr 26, 2022
1 parent 61b3c8c commit 8ad6cf9
Show file tree
Hide file tree
Showing 4 changed files with 272 additions and 5 deletions.
128 changes: 128 additions & 0 deletions pkg/cache/expiration_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cache

import (
"fmt"
"sync"
"time"

"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
)

const (
defaultExpiration = 2 * time.Minute
defaultGCInterval = time.Minute
)

type item struct {
object interface{}
expirationTime time.Time
}

type Cache struct {
items map[string]item
defaultExpiration time.Duration
gcInterval time.Duration
gcStarted bool
mu sync.Mutex
}

func NewCacheDefault() *Cache {
return &Cache{
items: map[string]item{},
defaultExpiration: defaultExpiration,
gcInterval: defaultGCInterval,
}
}

func NewCache(expiration time.Duration, gcInterval time.Duration) *Cache {
cache := Cache{
items: map[string]item{},
defaultExpiration: expiration,
gcInterval: gcInterval,
}
if cache.defaultExpiration <= 0 {
cache.defaultExpiration = defaultExpiration
}
if cache.gcInterval <= time.Second {
cache.gcInterval = defaultGCInterval
}
return &cache
}

func (c *Cache) Run(stopCh <-chan struct{}) error {
defer runtime.HandleCrash()
c.gcStarted = true
go wait.Until(func() {
c.gcExpiredCache()
}, c.gcInterval, stopCh)
return nil
}

func (c *Cache) gcExpiredCache() {
c.mu.Lock()
defer c.mu.Unlock()
gcTime := time.Now()
var gcKeys []string
for key, item := range c.items {
if gcTime.After(item.expirationTime) {
gcKeys = append(gcKeys, key)
}
}
for _, key := range gcKeys {
delete(c.items, key)
}
klog.V(5).Infof("gc resource update executor, current size %v", len(c.items))
}

func (c *Cache) Set(key string, value interface{}, expiration time.Duration) error {
return c.set(key, value, expiration)
}

func (c *Cache) SetDefault(key string, value interface{}) error {
return c.set(key, value, c.defaultExpiration)
}

func (c *Cache) set(key string, value interface{}, expiration time.Duration) error {
if !c.gcStarted {
return fmt.Errorf("cache GC is not started yet")
}
item := item{
object: value,
expirationTime: time.Now().Add(expiration),
}
c.mu.Lock()
defer c.mu.Unlock()
c.items[key] = item
return nil
}

func (c *Cache) Get(key string) (interface{}, bool) {
c.mu.Lock()
defer c.mu.Unlock()
item, ok := c.items[key]
if !ok {
return nil, false
}
if item.expirationTime.Before(time.Now()) {
return nil, false
}
return item.object, true
}
114 changes: 114 additions & 0 deletions pkg/cache/expiration_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cache

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func Test_Cache_Get(t *testing.T) {
cache := NewCacheDefault()
cache.gcStarted = true
cache.items = map[string]item{
"keyExpire": {object: "value1", expirationTime: time.Now().Add(-1 * time.Minute)},
"keyNotExpire": {object: "value2", expirationTime: time.Now().Add(1 * time.Minute)},
}
value, found := cache.Get("keyExpire")
assert.True(t, !found, "value not found", "keyExpire")
assert.Nil(t, value, "value must be nil", "keyExpire")

value, found = cache.Get("keyNotExpire")
assert.True(t, found, "value found", "keyNotExpire")
assert.Equal(t, "value2", value, "keyNotExpire")
}

func Test_Cache_Set(t *testing.T) {
cache := NewCacheDefault()
cache.gcStarted = true
value, found := cache.Get("key")
assert.True(t, !found, "value not found")
assert.Nil(t, value, "value must be nil")

_ = cache.SetDefault("key", "value")
value, found = cache.Get("key")
assert.True(t, found, "value found", "checkSetDefault")
assert.Equal(t, "value", value, "checkSetDefault")

_ = cache.Set("key", "value", -1*time.Minute)
value, found = cache.Get("key")
assert.True(t, !found, "value not found", "checkSet")
assert.Nil(t, value, "value must be nil", "checkSet")

}

func Test_gcExpiredCache(t *testing.T) {
tests := []struct {
name string
initItems map[string]item
cache *Cache
expectItemsAfterGC map[string]item
}{
{
name: "test_gcExpiredCache_NewCacheDefault",
initItems: map[string]item{
"keyNeedExpire": {object: "value1", expirationTime: time.Now().Add(-1 * time.Minute)},
"keyNotExpire": {object: "value2", expirationTime: time.Now().Add(time.Minute)},
},
cache: NewCacheDefault(),
expectItemsAfterGC: map[string]item{
"keyNotExpire": {object: "value2", expirationTime: time.Now().Add(time.Minute)},
},
},
{
name: "test_gcExpiredCache_NewCache",
initItems: map[string]item{
"keyNeedExpire": {object: "value1", expirationTime: time.Now().Add(-1 * time.Minute)},
"keyNotExpire": {object: "value2", expirationTime: time.Now().Add(time.Minute)},
},
cache: NewCache(time.Minute, time.Minute),
expectItemsAfterGC: map[string]item{
"keyNotExpire": {object: "value2", expirationTime: time.Now().Add(time.Minute)},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.cache.items = tt.initItems
tt.cache.gcStarted = true
tt.cache.gcExpiredCache()
got := tt.cache.items
assert.Equal(t, len(tt.expectItemsAfterGC), len(got), "checkLen")
checkValueEqual(t, tt.expectItemsAfterGC, got)
})
}
}

func checkValueEqual(t *testing.T, expect, got map[string]item) {
assert.Equal(t, len(expect), len(got), "checkLen")
for key, item := range expect {
gotItem, ok := got[key]
if !ok {
assert.True(t, ok, "checkFound", key)
return
}
assert.Equal(t, item.object, gotItem.object, "checkValue", key)
}
}
4 changes: 1 addition & 3 deletions pkg/koordlet/resmanager/memory_evict.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,7 @@ func (m *MemoryEvictor) killAndEvictBEPods(node *corev1.Node, podMetrics []*metr
}
}

for _, pod := range killedPods {
m.resManager.evictPod(pod, node, evictPodByNodeMemoryUsage, message)
}
m.resManager.evictPodsIfNotEvicted(killedPods, node, evictPodByNodeMemoryUsage, message)

m.lastEvictTime = time.Now()
klog.Infof("killAndEvictBEPods completed, memoryNeedRelease(%v) memoryReleased(%v)", memoryNeedRelease, memoryNeedRelease)
Expand Down
31 changes: 29 additions & 2 deletions pkg/koordlet/resmanager/resmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ import (
"k8s.io/klog/v2"

slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1"
expireCache "github.com/koordinator-sh/koordinator/pkg/cache"
koordclientset "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned"
slolisterv1alpha1 "github.com/koordinator-sh/koordinator/pkg/client/listers/slo/v1alpha1"
"github.com/koordinator-sh/koordinator/pkg/features"
"github.com/koordinator-sh/koordinator/pkg/koordlet/audit"
"github.com/koordinator-sh/koordinator/pkg/koordlet/metriccache"
"github.com/koordinator-sh/koordinator/pkg/koordlet/metrics"
"github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer"
Expand All @@ -65,6 +67,7 @@ type resmanager struct {
schema *apiruntime.Scheme
statesInformer statesinformer.StatesInformer
metricCache metriccache.MetricCache
podsEvicted *expireCache.Cache
nodeSLOInformer cache.SharedIndexInformer
nodeSLOLister slolisterv1alpha1.NodeSLOLister
kubeClient clientset.Interface
Expand Down Expand Up @@ -181,6 +184,7 @@ func NewResManager(cfg *Config, schema *apiruntime.Scheme, kubeClient clientset.
schema: schema,
statesInformer: statesInformer,
metricCache: metricCache,
podsEvicted: expireCache.NewCacheDefault(),
nodeSLOInformer: informer,
nodeSLOLister: slolisterv1alpha1.NewNodeSLOLister(informer.GetIndexer()),
kubeClient: kubeClient,
Expand Down Expand Up @@ -239,6 +243,8 @@ func (r *resmanager) Run(stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
klog.Info("Starting resmanager")

r.podsEvicted.Run(stopCh)

klog.Infof("starting informer for NodeSLO")
go r.nodeSLOInformer.Run(stopCh)
if !cache.WaitForCacheSync(stopCh, r.nodeSLOInformer.HasSynced) {
Expand Down Expand Up @@ -277,9 +283,27 @@ func (r *resmanager) hasSynced() bool {
return r.nodeSLO != nil && r.nodeSLO.Spec.ResourceUsedThresholdWithBE != nil
}

func (r *resmanager) evictPod(evictPod *corev1.Pod, node *corev1.Node, reason string, message string) {
podEvictMessage := fmt.Sprintf("evict Pod:%s, reason: %s, message: %v", evictPod.Name, reason, message)
func (r *resmanager) evictPodsIfNotEvicted(evictPods []*corev1.Pod, node *corev1.Node, reason string, message string) {
for _, evictPod := range evictPods {
r.evictPodIfNotEvicted(evictPod, node, reason, message)
}
}

func (r *resmanager) evictPodIfNotEvicted(evictPod *corev1.Pod, node *corev1.Node, reason string, message string) {
_, evicted := r.podsEvicted.Get(string(evictPod.UID))
if evicted {
klog.V(5).Infof("Pod has been evicted! podID: %v, evict reason: %s", evictPod.UID, reason)
return
}
success := r.evictPod(evictPod, node, reason, message)
if success {
_ = r.podsEvicted.SetDefault(string(evictPod.UID), evictPod.UID)
}
}

func (r *resmanager) evictPod(evictPod *corev1.Pod, node *corev1.Node, reason string, message string) bool {
podEvictMessage := fmt.Sprintf("evict Pod:%s, reason: %s, message: %v", evictPod.Name, reason, message)
_ = audit.V(0).Pod(evictPod.Namespace, evictPod.Name).Reason(reason).Message(message).Do()
podEvict := policyv1.Eviction{
ObjectMeta: metav1.ObjectMeta{
Name: evictPod.Name,
Expand All @@ -291,10 +315,13 @@ func (r *resmanager) evictPod(evictPod *corev1.Pod, node *corev1.Node, reason st
r.eventRecorder.Eventf(node, corev1.EventTypeWarning, evictPodSuccess, podEvictMessage)
metrics.RecordPodEviction(reason)
klog.Infof("evict pod %v/%v success, reason: %v", evictPod.Namespace, evictPod.Name, reason)
return true
} else if !errors.IsNotFound(err) {
r.eventRecorder.Eventf(node, corev1.EventTypeWarning, evictPodFail, podEvictMessage)
klog.Errorf("evict pod %v/%v failed, reason: %v, error: %v", evictPod.Namespace, evictPod.Name, reason, err)
return false
}
return true
}

// killContainers kills containers inside the pod
Expand Down

0 comments on commit 8ad6cf9

Please sign in to comment.