Skip to content

Commit

Permalink
Add clock.TimeSource.AfterFunc
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Jul 12, 2023
1 parent 83faadf commit aefbdff
Show file tree
Hide file tree
Showing 13 changed files with 484 additions and 113 deletions.
6 changes: 3 additions & 3 deletions common/cache/cache.go
Expand Up @@ -27,7 +27,7 @@ package cache
import (
"time"

"github.com/jonboulle/clockwork"
"go.temporal.io/server/common/clock"
)

// A Cache is a generalized interface to a cache. See cache.LRU for a specific
Expand Down Expand Up @@ -69,8 +69,8 @@ type Options struct {
// Pin prevents in-use objects from getting evicted.
Pin bool

// Clock is an optional clock to use for time-skipping and testing. If this is nil, a real clock will be used.
Clock clockwork.Clock
// TimeSource is an optional clock to use for time-skipping and testing. If this is nil, a real clock will be used.
TimeSource clock.TimeSource
}

// SimpleOptions provides options that can be used to configure SimpleCache
Expand Down
40 changes: 20 additions & 20 deletions common/cache/lru.go
Expand Up @@ -30,7 +30,7 @@ import (
"sync"
"time"

"github.com/jonboulle/clockwork"
"go.temporal.io/server/common/clock"
)

var (
Expand All @@ -41,13 +41,13 @@ var (
// lru is a concurrent fixed size cache that evicts elements in lru order
type (
lru struct {
mut sync.Mutex
byAccess *list.List
byKey map[interface{}]*list.Element
maxSize int
ttl time.Duration
pin bool
clock clockwork.Clock
mut sync.Mutex
byAccess *list.List
byKey map[interface{}]*list.Element
maxSize int
ttl time.Duration
pin bool
timeSource clock.TimeSource
}

iteratorImpl struct {
Expand Down Expand Up @@ -112,7 +112,7 @@ func (c *lru) Iterator() Iterator {
c.mut.Lock()
iterator := &iteratorImpl{
lru: c,
createTime: c.clock.Now().UTC(),
createTime: c.timeSource.Now().UTC(),
nextItem: c.byAccess.Front(),
}
iterator.prepareNext()
Expand All @@ -136,18 +136,18 @@ func New(maxSize int, opts *Options) Cache {
if opts == nil {
opts = &Options{}
}
clock := opts.Clock
if clock == nil {
clock = clockwork.NewRealClock()
timeSource := opts.TimeSource
if timeSource == nil {
timeSource = clock.NewRealTimeSource()
}

return &lru{
byAccess: list.New(),
byKey: make(map[interface{}]*list.Element, opts.InitialCapacity),
ttl: opts.TTL,
maxSize: maxSize,
pin: opts.Pin,
clock: clock,
byAccess: list.New(),
byKey: make(map[interface{}]*list.Element, opts.InitialCapacity),
ttl: opts.TTL,
maxSize: maxSize,
pin: opts.Pin,
timeSource: timeSource,
}
}

Expand Down Expand Up @@ -180,7 +180,7 @@ func (c *lru) Get(key interface{}) interface{} {

entry := element.Value.(*entryImpl)

if c.isEntryExpired(entry, c.clock.Now().UTC()) {
if c.isEntryExpired(entry, c.timeSource.Now().UTC()) {
// Entry has expired
c.deleteInternal(element)
return nil
Expand Down Expand Up @@ -297,7 +297,7 @@ func (c *lru) putInternal(key interface{}, value interface{}, allowUpdate bool)
}

if c.ttl != 0 {
entry.createTime = c.clock.Now().UTC()
entry.createTime = c.timeSource.Now().UTC()
}

if len(c.byKey) >= c.maxSize {
Expand Down
48 changes: 24 additions & 24 deletions common/cache/lru_test.go
Expand Up @@ -29,8 +29,8 @@ import (
"testing"
"time"

"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/assert"
"go.temporal.io/server/common/clock"
)

type keyType struct {
Expand Down Expand Up @@ -100,14 +100,14 @@ func TestGenerics(t *testing.T) {
func TestLRUWithTTL(t *testing.T) {
t.Parallel()

clock := clockwork.NewFakeClock()
timeSource := clock.NewEventTimeSource()
cache := New(5, &Options{
TTL: time.Millisecond * 100,
Clock: clock,
TTL: time.Millisecond * 100,
TimeSource: timeSource,
})
cache.Put("A", "foo")
assert.Equal(t, "foo", cache.Get("A"))
clock.Advance(time.Millisecond * 300)
timeSource.Advance(time.Millisecond * 300)
assert.Nil(t, cache.Get("A"))
assert.Equal(t, 0, cache.Size())
}
Expand Down Expand Up @@ -168,32 +168,32 @@ func TestLRUCacheConcurrentAccess(t *testing.T) {
func TestTTL(t *testing.T) {
t.Parallel()

clock := clockwork.NewFakeClock()
timeSource := clock.NewEventTimeSource()
cache := New(5, &Options{
TTL: time.Millisecond * 50,
Clock: clock,
TTL: time.Millisecond * 50,
TimeSource: timeSource,
})

cache.Put("A", t)
assert.Equal(t, t, cache.Get("A"))
clock.Advance(time.Millisecond * 100)
timeSource.Advance(time.Millisecond * 100)
assert.Nil(t, cache.Get("A"))
}

func TestTTLWithPin(t *testing.T) {
t.Parallel()

clock := clockwork.NewFakeClock()
timeSource := clock.NewEventTimeSource()
cache := New(5, &Options{
TTL: time.Millisecond * 50,
Pin: true,
Clock: clock,
TTL: time.Millisecond * 50,
Pin: true,
TimeSource: timeSource,
})

_, err := cache.PutIfNotExist("A", t)
assert.NoError(t, err)
assert.Equal(t, t, cache.Get("A"))
clock.Advance(time.Millisecond * 100)
timeSource.Advance(time.Millisecond * 100)
assert.Equal(t, t, cache.Get("A"))
// release 3 time since put if not exist also increase the counter
cache.Release("A")
Expand All @@ -205,11 +205,11 @@ func TestTTLWithPin(t *testing.T) {
func TestMaxSizeWithPin_MidItem(t *testing.T) {
t.Parallel()

clock := clockwork.NewFakeClock()
timeSource := clock.NewEventTimeSource()
cache := New(2, &Options{
TTL: time.Millisecond * 50,
Pin: true,
Clock: clock,
TTL: time.Millisecond * 50,
Pin: true,
TimeSource: timeSource,
})

_, err := cache.PutIfNotExist("A", t)
Expand All @@ -235,7 +235,7 @@ func TestMaxSizeWithPin_MidItem(t *testing.T) {
cache.Release("A") // A's ref count is 0
cache.Release("C") // C's ref count is 0

clock.Advance(time.Millisecond * 100)
timeSource.Advance(time.Millisecond * 100)
assert.Nil(t, cache.Get("A"))
assert.Nil(t, cache.Get("B"))
assert.Nil(t, cache.Get("C"))
Expand All @@ -244,11 +244,11 @@ func TestMaxSizeWithPin_MidItem(t *testing.T) {
func TestMaxSizeWithPin_LastItem(t *testing.T) {
t.Parallel()

clock := clockwork.NewFakeClock()
timeSource := clock.NewEventTimeSource()
cache := New(2, &Options{
TTL: time.Millisecond * 50,
Pin: true,
Clock: clock,
TTL: time.Millisecond * 50,
Pin: true,
TimeSource: timeSource,
})

_, err := cache.PutIfNotExist("A", t)
Expand All @@ -274,7 +274,7 @@ func TestMaxSizeWithPin_LastItem(t *testing.T) {
cache.Release("B") // B's ref count is 0
cache.Release("C") // C's ref count is 0

clock.Advance(time.Millisecond * 100)
timeSource.Advance(time.Millisecond * 100)
assert.Nil(t, cache.Get("A"))
assert.Nil(t, cache.Get("B"))
assert.Nil(t, cache.Get("C"))
Expand Down
159 changes: 159 additions & 0 deletions common/clock/event_time_source.go
@@ -0,0 +1,159 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package clock

import (
"sync"
"time"
)

type (
// EventTimeSource is a timex.TimeSource. Unlike other fake timeSource implementations, the methods of this
// timeSource are synchronous, so when you call Advance or Update, all triggered timers from AfterFunc will fire
// before the method returns, in the same goroutine.
EventTimeSource struct {
mu sync.RWMutex
now time.Time
timers []*fakeTimer
}

// fakeTimer is a fake implementation of [timex.Timer].
fakeTimer struct {
// need a link to the parent timeSource for synchronization
timeSource *EventTimeSource
// deadline for when the timer should fire
deadline time.Time
// callback to call when the timer fires
callback func()
// done is true if the timer has fired or been stopped
done bool
// index of the timer in the parent timeSource
index int
}
)

// NewEventTimeSource returns a EventTimeSource with the current time set to Unix zero: 1970-01-01 00:00:00 +0000 UTC.
func NewEventTimeSource() *EventTimeSource {
return &EventTimeSource{
now: time.Unix(0, 0),
}
}

// Now return the current time.
func (ts *EventTimeSource) Now() time.Time {
ts.mu.RLock()
defer ts.mu.RUnlock()

return ts.now
}

// AfterFunc return a timer that will fire after the specified duration. It is important to note that the timeSource is
// locked while the callback is called. This means that you must be cautious about calling any methods on the timeSource
// from within the callback. Doing so will probably result in a deadlock. To avoid this, you may want to wrap all such
// calls in a goroutine.
func (ts *EventTimeSource) AfterFunc(d time.Duration, f func()) Timer {
ts.mu.Lock()
defer ts.mu.Unlock()

t := &fakeTimer{timeSource: ts, deadline: ts.now.Add(d), callback: f}
t.index = len(ts.timers)
ts.timers = append(ts.timers, t)

return t
}

// Update the fake current time. It returns the timeSource so that you can chain calls like this:
// timeSource := NewEventTimeSource().Update(time.Now())
func (ts *EventTimeSource) Update(now time.Time) *EventTimeSource {
ts.mu.Lock()
defer ts.mu.Unlock()

ts.now = now
ts.fireTimers()
return ts
}

// Advance the timer by the specified duration.
func (ts *EventTimeSource) Advance(d time.Duration) {
ts.mu.Lock()
defer ts.mu.Unlock()

ts.now = ts.now.Add(d)
ts.fireTimers()
}

// update the fake current time without locking.

// fireTimers fires all timers that are ready.
func (ts *EventTimeSource) fireTimers() {
n := 0
for _, t := range ts.timers {
if t.deadline.After(ts.now) {
ts.timers[n] = t
t.index = n
n++
} else {
t.callback()
t.done = true
}
}
ts.timers = ts.timers[:n]
}

// Reset the timer to fire after the specified duration. Returns true if the timer was active.
func (t *fakeTimer) Reset(d time.Duration) bool {
t.timeSource.mu.Lock()
defer t.timeSource.mu.Unlock()

if t.done {
return false
}

t.deadline = t.timeSource.now.Add(d)
t.timeSource.fireTimers()
return true
}

// Stop the timer. Returns true if the timer was active.
func (t *fakeTimer) Stop() bool {
t.timeSource.mu.Lock()
defer t.timeSource.mu.Unlock()

if t.done {
return false
}

i := t.index
timers := t.timeSource.timers

timers[i] = timers[len(timers)-1] // swap with last timer
timers[i].index = i // update index of swapped timer
timers = timers[:len(timers)-1] // shrink list

t.timeSource.timers = timers
t.done = true // ensure that the timer is not reused

return true
}

0 comments on commit aefbdff

Please sign in to comment.