Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lock across item expiration in the ttl store. #21856

Merged
merged 2 commits into from
Feb 24, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
35 changes: 22 additions & 13 deletions pkg/client/cache/expiration_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,32 @@ limitations under the License.
package cache

import (
"sync"
"time"

"github.com/golang/glog"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/runtime"
)

// ExpirationCache implements the store interface
// 1. All entries are automatically time stamped on insert
// a. The key is computed based off the original item/keyFunc
// b. The value inserted under that key is the timestamped item
// 2. Expiration happens lazily on read based on the expiration policy
// a. No item can be inserted into the store while we're expiring
// *any* item in the cache.
// 3. Time-stamps are stripped off unexpired entries before return
// Note that the ExpirationCache is inherently slower than a normal
// threadSafeStore because it takes a write lock everytime it checks if
// an item has expired.
type ExpirationCache struct {
cacheStorage ThreadSafeStore
keyFunc KeyFunc
clock util.Clock
expirationPolicy ExpirationPolicy
// expirationLock is a write lock used to guarantee that we don't clobber
// newly inserted objects because of a stale expiration timestamp comparison
expirationLock sync.Mutex
}

// ExpirationPolicy dictates when an object expires. Currently only abstracted out
Expand Down Expand Up @@ -68,32 +76,27 @@ type timestampedEntry struct {
// getTimestampedEntry returnes the timestampedEntry stored under the given key.
func (c *ExpirationCache) getTimestampedEntry(key string) (*timestampedEntry, bool) {
item, _ := c.cacheStorage.Get(key)
// TODO: Check the cast instead
if tsEntry, ok := item.(*timestampedEntry); ok {
return tsEntry, true
}
return nil, false
}

// getOrExpire retrieves the object from the timestampedEntry if and only if it hasn't
// already expired. It kicks-off a go routine to delete expired objects from
// the store and sets exists=false.
// already expired. It holds a write lock across deletion.
func (c *ExpirationCache) getOrExpire(key string) (interface{}, bool) {
// Prevent all inserts from the time we deem an item as "expired" to when we
// delete it, so an un-expired item doesn't sneak in under the same key, just
// before the Delete.
c.expirationLock.Lock()
defer c.expirationLock.Unlock()
timestampedItem, exists := c.getTimestampedEntry(key)
if !exists {
return nil, false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternative solution: can you make a DeleteIfStillExpired function, and lock between that and the Add function? Then you don't need to take the write lock here in the access function, I think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just simpler? I think this cache is used usually for a small number of items (RC expectatios, assumed pods) so it won't turn into a performance bottleneck.

With DeleteIFStillExpired I'd have to either check expiration twice, or send everything through the DeleteIFStillExpired function.

}
if c.expirationPolicy.IsExpired(timestampedItem) {
glog.V(4).Infof("Entry %v: %+v has expired", key, timestampedItem.obj)
// Since expiration happens lazily on read, don't hold up
// the reader trying to acquire a write lock for the delete.
// The next reader will retry the delete even if this one
// fails; as long as we only return un-expired entries a
// reader doesn't need to wait for the result of the delete.
go func() {
defer runtime.HandleCrash()
c.cacheStorage.Delete(key)
}()
c.cacheStorage.Delete(key)
return nil, false
}
return timestampedItem.obj, true
Expand Down Expand Up @@ -141,6 +144,8 @@ func (c *ExpirationCache) ListKeys() []string {
// Add timestamps an item and inserts it into the cache, overwriting entries
// that might exist under the same key.
func (c *ExpirationCache) Add(obj interface{}) error {
c.expirationLock.Lock()
defer c.expirationLock.Unlock()
key, err := c.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
Expand All @@ -157,6 +162,8 @@ func (c *ExpirationCache) Update(obj interface{}) error {

// Delete removes an item from the cache.
func (c *ExpirationCache) Delete(obj interface{}) error {
c.expirationLock.Lock()
defer c.expirationLock.Unlock()
key, err := c.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
Expand All @@ -169,6 +176,8 @@ func (c *ExpirationCache) Delete(obj interface{}) error {
// before attempting the replace operation. The replace operation will
// delete the contents of the ExpirationCache `c`.
func (c *ExpirationCache) Replace(list []interface{}, resourceVersion string) error {
c.expirationLock.Lock()
defer c.expirationLock.Unlock()
items := map[string]interface{}{}
ts := c.clock.Now()
for _, item := range list {
Expand Down
1 change: 1 addition & 0 deletions pkg/client/cache/expiration_cache_fakes.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type fakeThreadSafeMap struct {

func (c *fakeThreadSafeMap) Delete(key string) {
if c.deletedKeys != nil {
c.ThreadSafeStore.Delete(key)
c.deletedKeys <- key
}
}
Expand Down
57 changes: 55 additions & 2 deletions pkg/client/cache/expiration_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

func TestTTLExpirationBasic(t *testing.T) {
testObj := testStoreObject{id: "foo", val: "bar"}
deleteChan := make(chan string)
deleteChan := make(chan string, 1)
ttlStore := NewFakeExpirationStore(
testStoreKeyFunc, deleteChan,
&FakeExpirationPolicy{
Expand Down Expand Up @@ -62,14 +62,67 @@ func TestTTLExpirationBasic(t *testing.T) {
close(deleteChan)
}

func TestReAddExpiredItem(t *testing.T) {
deleteChan := make(chan string, 1)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is tricky to test. As is the test will always pass, but it will fail with the old code and a sleep in the goroutine that triggers the race.

exp := &FakeExpirationPolicy{
NeverExpire: sets.NewString(),
RetrieveKeyFunc: func(obj interface{}) (string, error) {
return obj.(*timestampedEntry).obj.(testStoreObject).id, nil
},
}
ttlStore := NewFakeExpirationStore(
testStoreKeyFunc, deleteChan, exp, util.RealClock{})
testKey := "foo"
testObj := testStoreObject{id: testKey, val: "bar"}
err := ttlStore.Add(testObj)
if err != nil {
t.Errorf("Unable to add obj %#v", testObj)
}

// This get will expire the item.
item, exists, err := ttlStore.Get(testObj)
if err != nil {
t.Errorf("Failed to get from store, %v", err)
}
if exists || item != nil {
t.Errorf("Got unexpected item %#v", item)
}

key, _ := testStoreKeyFunc(testObj)
differentValue := "different_bar"
err = ttlStore.Add(
testStoreObject{id: testKey, val: differentValue})
if err != nil {
t.Errorf("Failed to add second value")
}

select {
case delKey := <-deleteChan:
if delKey != key {
t.Errorf("Unexpected delete for key %s", key)
}
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("Unexpected timeout waiting on delete")
}
exp.NeverExpire = sets.NewString(testKey)
item, exists, err = ttlStore.GetByKey(testKey)
if err != nil {
t.Errorf("Failed to get from store, %v", err)
}
if !exists || item == nil || item.(testStoreObject).val != differentValue {
t.Errorf("Got unexpected item %#v", item)
}
close(deleteChan)
}

func TestTTLList(t *testing.T) {
testObjs := []testStoreObject{
{id: "foo", val: "bar"},
{id: "foo1", val: "bar1"},
{id: "foo2", val: "bar2"},
}
expireKeys := sets.NewString(testObjs[0].id, testObjs[2].id)
deleteChan := make(chan string)
deleteChan := make(chan string, len(testObjs))
defer close(deleteChan)

ttlStore := NewFakeExpirationStore(
Expand Down
18 changes: 2 additions & 16 deletions pkg/controller/controller_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,7 @@ import (
"k8s.io/kubernetes/pkg/runtime"
)

const (
CreatedByAnnotation = "kubernetes.io/created-by"

// If a watch drops a delete event for a pod, it'll take this long
// before a dormant controller waiting for those packets is woken up anyway. It is
// specifically targeted at the case where some problem prevents an update
// of expectations, without it the controller could stay asleep forever. This should
// be set based on the expected latency of watch events.
//
// Currently a controller can service (create *and* observe the watch events for said
// creation) about 10-20 pods a second, so it takes about 1 min to service
// 500 pods. Just creation is limited to 20qps, and watching happens with ~10-30s
// latency/pod at the scale of 3000 pods over 100 nodes.
ExpectationsTimeout = 3 * time.Minute
)
const CreatedByAnnotation = "kubernetes.io/created-by"

var (
KeyFunc = framework.DeletionHandlingMetaNamespaceKeyFunc
Expand Down Expand Up @@ -221,7 +207,7 @@ func (e *ControlleeExpectations) GetExpectations() (int64, int64) {

// NewControllerExpectations returns a store for ControlleeExpectations.
func NewControllerExpectations() *ControllerExpectations {
return &ControllerExpectations{cache.NewTTLStore(ExpKeyFunc, ExpectationsTimeout)}
return &ControllerExpectations{cache.NewStore(ExpKeyFunc)}
}

// PodControlInterface is an interface that knows how to add or delete pods
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/deployment/deployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,12 +334,12 @@ func (dc *DeploymentController) deletePod(obj interface{}) {
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Couldn't get object from tombstone %+v, could take up to %v before a ReplicaSet recreates a replica", obj, controller.ExpectationsTimeout)
glog.Errorf("Couldn't get object from tombstone %+v", obj)
return
}
pod, ok = tombstone.Obj.(*api.Pod)
if !ok {
glog.Errorf("Tombstone contained object that is not a pod %+v, could take up to %v before ReplicaSet recreates a replica", obj, controller.ExpectationsTimeout)
glog.Errorf("Tombstone contained object that is not a pod %+v", obj)
return
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/job/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,12 +224,12 @@ func (jm *JobController) deletePod(obj interface{}) {
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Couldn't get object from tombstone %+v, could take up to %v before a job recreates a pod", obj, controller.ExpectationsTimeout)
glog.Errorf("Couldn't get object from tombstone %+v", obj)
return
}
pod, ok = tombstone.Obj.(*api.Pod)
if !ok {
glog.Errorf("Tombstone contained object that is not a pod %+v, could take up to %v before job recreates a pod", obj, controller.ExpectationsTimeout)
glog.Errorf("Tombstone contained object that is not a pod %+v", obj)
return
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/replicaset/replica_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,12 +283,12 @@ func (rsc *ReplicaSetController) deletePod(obj interface{}) {
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Couldn't get object from tombstone %+v, could take up to %v before a replica set recreates a replica", obj, controller.ExpectationsTimeout)
glog.Errorf("Couldn't get object from tombstone %+v", obj)
return
}
pod, ok = tombstone.Obj.(*api.Pod)
if !ok {
glog.Errorf("Tombstone contained object that is not a pod %+v, could take up to %v before replica set recreates a replica", obj, controller.ExpectationsTimeout)
glog.Errorf("Tombstone contained object that is not a pod %+v", obj)
return
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/replication/replication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,12 +285,12 @@ func (rm *ReplicationManager) deletePod(obj interface{}) {
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Couldn't get object from tombstone %+v, could take up to %v before a controller recreates a replica", obj, controller.ExpectationsTimeout)
glog.Errorf("Couldn't get object from tombstone %+v", obj)
return
}
pod, ok = tombstone.Obj.(*api.Pod)
if !ok {
glog.Errorf("Tombstone contained object that is not a pod %+v, could take up to %v before controller recreates a replica", obj, controller.ExpectationsTimeout)
glog.Errorf("Tombstone contained object that is not a pod %+v", obj)
return
}
}
Expand Down