From 64aa0ffce9e22de02b63659221c5c17281ea3d33 Mon Sep 17 00:00:00 2001 From: Brad Moylan Date: Wed, 1 Jun 2022 09:29:27 -0700 Subject: [PATCH 1/9] break: refreshable/v2 with Generic type handling --- refreshable/go.mod | 2 +- refreshable/refreshable.go | 23 +- refreshable/refreshable_default.go | 42 +-- refreshable/refreshable_default_test.go | 32 +- refreshable/refreshable_types.go | 406 --------------------- refreshable/refreshable_validating.go | 54 ++- refreshable/refreshable_validating_test.go | 66 ++-- 7 files changed, 94 insertions(+), 531 deletions(-) delete mode 100644 refreshable/refreshable_types.go diff --git a/refreshable/go.mod b/refreshable/go.mod index 974bd9ba..e4a9cff0 100644 --- a/refreshable/go.mod +++ b/refreshable/go.mod @@ -1,4 +1,4 @@ -module github.com/palantir/pkg/refreshable +module github.com/palantir/pkg/refreshable/v2 go 1.18 diff --git a/refreshable/refreshable.go b/refreshable/refreshable.go index 1a6e7cb7..ea0d888f 100644 --- a/refreshable/refreshable.go +++ b/refreshable/refreshable.go @@ -4,14 +4,21 @@ package refreshable -type Refreshable interface { - // Current returns the most recent value of this Refreshable. - Current() interface{} +type Refreshable[T any] interface { + // Current returns the most recent value of this Supplier. + // If the value has not been initialized, returns T's zero value. + Current() T - // Subscribe subscribes to changes of this Refreshable. The provided function is called with the value of Current() - // whenever the value changes. - Subscribe(consumer func(interface{})) (unsubscribe func()) + // Subscribe subscribes to changes of this Supplier. + // The provided function is called with the value of Get() whenever the value changes. + Subscribe(consumer func(T)) (unsubscribe func()) +} - // Map returns a new Refreshable based on the current one that handles updates based on the current Refreshable. - Map(func(interface{}) interface{}) Refreshable +// Map returns a new Refreshable based on the current one that handles updates based on the current Refreshable. +func Map[T any, M any](t Refreshable[T], mapFn func(T) M) (m Refreshable[M], unsubscribe func()) { + out := New(mapFn(t.Current())) + unsubscribe = t.Subscribe(func(v T) { + out.Update(mapFn(v)) + }) + return out, unsubscribe } diff --git a/refreshable/refreshable_default.go b/refreshable/refreshable_default.go index 2ad8488c..1f18583e 100644 --- a/refreshable/refreshable_default.go +++ b/refreshable/refreshable_default.go @@ -5,54 +5,48 @@ package refreshable import ( - "fmt" "reflect" "sync" "sync/atomic" ) -type DefaultRefreshable struct { - typ reflect.Type +type DefaultRefreshable[T any] struct { current *atomic.Value sync.Mutex // protects subscribers - subscribers []*func(interface{}) + subscribers []*func(T) } -func NewDefaultRefreshable(val interface{}) *DefaultRefreshable { +func New[T any](val T) *DefaultRefreshable[T] { current := atomic.Value{} - current.Store(val) + current.Store(&val) - return &DefaultRefreshable{ + return &DefaultRefreshable[T]{ current: ¤t, - typ: reflect.TypeOf(val), } } -func (d *DefaultRefreshable) Update(val interface{}) error { +// Update changes the value of the Refreshable, then blocks while subscribers are executed. +func (d *DefaultRefreshable[T]) Update(val T) { d.Lock() defer d.Unlock() - if valType := reflect.TypeOf(val); valType != d.typ { - return fmt.Errorf("new refreshable value must be type %s: got %s", d.typ, valType) + if reflect.DeepEqual(d.Current(), val) { + return } - if reflect.DeepEqual(d.current.Load(), val) { - return nil - } - d.current.Store(val) + d.current.Store(&val) for _, sub := range d.subscribers { (*sub)(val) } - return nil } -func (d *DefaultRefreshable) Current() interface{} { - return d.current.Load() +func (d *DefaultRefreshable[T]) Current() T { + return *(d.current.Load().(*T)) } -func (d *DefaultRefreshable) Subscribe(consumer func(interface{})) (unsubscribe func()) { +func (d *DefaultRefreshable[T]) Subscribe(consumer func(T)) (unsubscribe func()) { d.Lock() defer d.Unlock() @@ -63,7 +57,7 @@ func (d *DefaultRefreshable) Subscribe(consumer func(interface{})) (unsubscribe } } -func (d *DefaultRefreshable) unsubscribe(consumerFnPtr *func(interface{})) { +func (d *DefaultRefreshable[T]) unsubscribe(consumerFnPtr *func(T)) { d.Lock() defer d.Unlock() @@ -78,11 +72,3 @@ func (d *DefaultRefreshable) unsubscribe(consumerFnPtr *func(interface{})) { d.subscribers = append(d.subscribers[:matchIdx], d.subscribers[matchIdx+1:]...) } } - -func (d *DefaultRefreshable) Map(mapFn func(interface{}) interface{}) Refreshable { - newRefreshable := NewDefaultRefreshable(mapFn(d.Current())) - d.Subscribe(func(updatedVal interface{}) { - _ = newRefreshable.Update(mapFn(updatedVal)) - }) - return newRefreshable -} diff --git a/refreshable/refreshable_default_test.go b/refreshable/refreshable_default_test.go index 75be7915..bd0692d7 100644 --- a/refreshable/refreshable_default_test.go +++ b/refreshable/refreshable_default_test.go @@ -7,57 +7,51 @@ package refreshable_test import ( "testing" - "github.com/palantir/pkg/refreshable" + "github.com/palantir/pkg/refreshable/v2" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func TestDefaultRefreshable(t *testing.T) { type container struct{ Value string } v := &container{Value: "original"} - r := refreshable.NewDefaultRefreshable(v) + r := refreshable.New(v) assert.Equal(t, r.Current(), v) t.Run("Update", func(t *testing.T) { v2 := &container{Value: "updated"} - err := r.Update(v2) - require.NoError(t, err) + r.Update(v2) assert.Equal(t, r.Current(), v2) }) t.Run("Subscribe", func(t *testing.T) { var v1, v2 container - unsub1 := r.Subscribe(func(i interface{}) { - v1 = *(i.(*container)) + unsub1 := r.Subscribe(func(i *container) { + v1 = *i }) - _ = r.Subscribe(func(i interface{}) { - v2 = *(i.(*container)) + _ = r.Subscribe(func(i *container) { + v2 = *i }) assert.Equal(t, v1.Value, "") assert.Equal(t, v2.Value, "") - err := r.Update(&container{Value: "value"}) - require.NoError(t, err) + r.Update(&container{Value: "value"}) assert.Equal(t, v1.Value, "value") assert.Equal(t, v2.Value, "value") unsub1() - err = r.Update(&container{Value: "value2"}) - require.NoError(t, err) + r.Update(&container{Value: "value2"}) assert.Equal(t, v1.Value, "value", "should be unchanged after unsubscribing") assert.Equal(t, v2.Value, "value2", "should be updated after unsubscribing other") }) t.Run("Map", func(t *testing.T) { - err := r.Update(&container{Value: "value"}) - require.NoError(t, err) - m := r.Map(func(i interface{}) interface{} { - return len(i.(*container).Value) + r.Update(&container{Value: "value"}) + m, _ := refreshable.Map[*container, int](r, func(i *container) int { + return len(i.Value) }) assert.Equal(t, m.Current(), 5) - err = r.Update(&container{Value: "updated"}) - require.NoError(t, err) + r.Update(&container{Value: "updated"}) assert.Equal(t, m.Current(), 7) }) diff --git a/refreshable/refreshable_types.go b/refreshable/refreshable_types.go deleted file mode 100644 index e4a784af..00000000 --- a/refreshable/refreshable_types.go +++ /dev/null @@ -1,406 +0,0 @@ -// Copyright (c) 2021 Palantir Technologies. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package refreshable - -import ( - "time" -) - -type String interface { - Refreshable - CurrentString() string - MapString(func(string) interface{}) Refreshable - SubscribeToString(func(string)) (unsubscribe func()) -} - -type StringPtr interface { - Refreshable - CurrentStringPtr() *string - MapStringPtr(func(*string) interface{}) Refreshable - SubscribeToStringPtr(func(*string)) (unsubscribe func()) -} - -type StringSlice interface { - Refreshable - CurrentStringSlice() []string - MapStringSlice(func([]string) interface{}) Refreshable - SubscribeToStringSlice(func([]string)) (unsubscribe func()) -} - -type Int interface { - Refreshable - CurrentInt() int - MapInt(func(int) interface{}) Refreshable - SubscribeToInt(func(int)) (unsubscribe func()) -} - -type IntPtr interface { - Refreshable - CurrentIntPtr() *int - MapIntPtr(func(*int) interface{}) Refreshable - SubscribeToIntPtr(func(*int)) (unsubscribe func()) -} - -type Int64 interface { - Refreshable - CurrentInt64() int64 - MapInt64(func(int64) interface{}) Refreshable - SubscribeToInt64(func(int64)) (unsubscribe func()) -} - -type Int64Ptr interface { - Refreshable - CurrentInt64Ptr() *int64 - MapInt64Ptr(func(*int64) interface{}) Refreshable - SubscribeToInt64Ptr(func(*int64)) (unsubscribe func()) -} - -type Float64 interface { - Refreshable - CurrentFloat64() float64 - MapFloat64(func(float64) interface{}) Refreshable - SubscribeToFloat64(func(float64)) (unsubscribe func()) -} - -type Float64Ptr interface { - Refreshable - CurrentFloat64Ptr() *float64 - MapFloat64Ptr(func(*float64) interface{}) Refreshable - SubscribeToFloat64Ptr(func(*float64)) (unsubscribe func()) -} - -type Bool interface { - Refreshable - CurrentBool() bool - MapBool(func(bool) interface{}) Refreshable - SubscribeToBool(func(bool)) (unsubscribe func()) -} - -type BoolPtr interface { - Refreshable - CurrentBoolPtr() *bool - MapBoolPtr(func(*bool) interface{}) Refreshable - SubscribeToBoolPtr(func(*bool)) (unsubscribe func()) -} - -// Duration is a Refreshable that can return the current time.Duration. -type Duration interface { - Refreshable - CurrentDuration() time.Duration - MapDuration(func(time.Duration) interface{}) Refreshable - SubscribeToDuration(func(time.Duration)) (unsubscribe func()) -} - -type DurationPtr interface { - Refreshable - CurrentDurationPtr() *time.Duration - MapDurationPtr(func(*time.Duration) interface{}) Refreshable - SubscribeToDurationPtr(func(*time.Duration)) (unsubscribe func()) -} - -func NewBool(in Refreshable) Bool { - return refreshableTyped{ - Refreshable: in, - } -} - -func NewBoolPtr(in Refreshable) BoolPtr { - return refreshableTyped{ - Refreshable: in, - } -} - -func NewDuration(in Refreshable) Duration { - return refreshableTyped{ - Refreshable: in, - } -} - -func NewDurationPtr(in Refreshable) DurationPtr { - return refreshableTyped{ - Refreshable: in, - } -} - -func NewString(in Refreshable) String { - return refreshableTyped{ - Refreshable: in, - } -} - -func NewStringPtr(in Refreshable) StringPtr { - return refreshableTyped{ - Refreshable: in, - } -} - -func NewStringSlice(in Refreshable) StringSlice { - return refreshableTyped{ - Refreshable: in, - } -} - -func NewInt(in Refreshable) Int { - return refreshableTyped{ - Refreshable: in, - } -} - -func NewIntPtr(in Refreshable) IntPtr { - return refreshableTyped{ - Refreshable: in, - } -} - -func NewInt64(in Refreshable) Int64 { - return refreshableTyped{ - Refreshable: in, - } -} - -func NewInt64Ptr(in Refreshable) Int64Ptr { - return refreshableTyped{ - Refreshable: in, - } -} - -func NewFloat64(in Refreshable) Float64 { - return refreshableTyped{ - Refreshable: in, - } -} - -func NewFloat64Ptr(in Refreshable) Float64Ptr { - return refreshableTyped{ - Refreshable: in, - } -} - -var ( - _ Bool = (*refreshableTyped)(nil) - _ BoolPtr = (*refreshableTyped)(nil) - _ Duration = (*refreshableTyped)(nil) - _ Int = (*refreshableTyped)(nil) - _ IntPtr = (*refreshableTyped)(nil) - _ Int64 = (*refreshableTyped)(nil) - _ Int64Ptr = (*refreshableTyped)(nil) - _ Float64 = (*refreshableTyped)(nil) - _ Float64Ptr = (*refreshableTyped)(nil) - _ String = (*refreshableTyped)(nil) - _ StringPtr = (*refreshableTyped)(nil) - _ StringSlice = (*refreshableTyped)(nil) -) - -type refreshableTyped struct { - Refreshable -} - -func (rt refreshableTyped) CurrentString() string { - return rt.Current().(string) -} - -func (rt refreshableTyped) MapString(mapFn func(string) interface{}) Refreshable { - return rt.Map(func(i interface{}) interface{} { - return mapFn(i.(string)) - }) -} - -func (rt refreshableTyped) SubscribeToString(subFn func(string)) (unsubscribe func()) { - return rt.Subscribe(func(i interface{}) { - subFn(i.(string)) - }) -} - -func (rt refreshableTyped) CurrentStringPtr() *string { - return rt.Current().(*string) -} - -func (rt refreshableTyped) MapStringPtr(mapFn func(*string) interface{}) Refreshable { - return rt.Map(func(i interface{}) interface{} { - return mapFn(i.(*string)) - }) -} - -func (rt refreshableTyped) SubscribeToStringPtr(subFn func(*string)) (unsubscribe func()) { - return rt.Subscribe(func(i interface{}) { - subFn(i.(*string)) - }) -} - -func (rt refreshableTyped) CurrentStringSlice() []string { - return rt.Current().([]string) -} - -func (rt refreshableTyped) MapStringSlice(mapFn func([]string) interface{}) Refreshable { - return rt.Map(func(i interface{}) interface{} { - return mapFn(i.([]string)) - }) -} - -func (rt refreshableTyped) SubscribeToStringSlice(subFn func([]string)) (unsubscribe func()) { - return rt.Subscribe(func(i interface{}) { - subFn(i.([]string)) - }) -} - -func (rt refreshableTyped) CurrentInt() int { - return rt.Current().(int) -} - -func (rt refreshableTyped) MapInt(mapFn func(int) interface{}) Refreshable { - return rt.Map(func(i interface{}) interface{} { - return mapFn(i.(int)) - }) -} - -func (rt refreshableTyped) SubscribeToInt(subFn func(int)) (unsubscribe func()) { - return rt.Subscribe(func(i interface{}) { - subFn(i.(int)) - }) -} - -func (rt refreshableTyped) CurrentIntPtr() *int { - return rt.Current().(*int) -} - -func (rt refreshableTyped) MapIntPtr(mapFn func(*int) interface{}) Refreshable { - return rt.Map(func(i interface{}) interface{} { - return mapFn(i.(*int)) - }) -} - -func (rt refreshableTyped) SubscribeToIntPtr(subFn func(*int)) (unsubscribe func()) { - return rt.Subscribe(func(i interface{}) { - subFn(i.(*int)) - }) -} - -func (rt refreshableTyped) CurrentInt64() int64 { - return rt.Current().(int64) -} - -func (rt refreshableTyped) MapInt64(mapFn func(int64) interface{}) Refreshable { - return rt.Map(func(i interface{}) interface{} { - return mapFn(i.(int64)) - }) -} - -func (rt refreshableTyped) SubscribeToInt64(subFn func(int64)) (unsubscribe func()) { - return rt.Subscribe(func(i interface{}) { - subFn(i.(int64)) - }) -} - -func (rt refreshableTyped) CurrentInt64Ptr() *int64 { - return rt.Current().(*int64) -} - -func (rt refreshableTyped) MapInt64Ptr(mapFn func(*int64) interface{}) Refreshable { - return rt.Map(func(i interface{}) interface{} { - return mapFn(i.(*int64)) - }) -} - -func (rt refreshableTyped) SubscribeToInt64Ptr(subFn func(*int64)) (unsubscribe func()) { - return rt.Subscribe(func(i interface{}) { - subFn(i.(*int64)) - }) -} - -func (rt refreshableTyped) CurrentFloat64() float64 { - return rt.Current().(float64) -} - -func (rt refreshableTyped) MapFloat64(mapFn func(float64) interface{}) Refreshable { - return rt.Map(func(i interface{}) interface{} { - return mapFn(i.(float64)) - }) -} - -func (rt refreshableTyped) SubscribeToFloat64(subFn func(float64)) (unsubscribe func()) { - return rt.Subscribe(func(i interface{}) { - subFn(i.(float64)) - }) -} - -func (rt refreshableTyped) CurrentFloat64Ptr() *float64 { - return rt.Current().(*float64) -} - -func (rt refreshableTyped) MapFloat64Ptr(mapFn func(*float64) interface{}) Refreshable { - return rt.Map(func(i interface{}) interface{} { - return mapFn(i.(*float64)) - }) -} - -func (rt refreshableTyped) SubscribeToFloat64Ptr(subFn func(*float64)) (unsubscribe func()) { - return rt.Subscribe(func(i interface{}) { - subFn(i.(*float64)) - }) -} - -func (rt refreshableTyped) CurrentBool() bool { - return rt.Current().(bool) -} - -func (rt refreshableTyped) MapBool(mapFn func(bool) interface{}) Refreshable { - return rt.Map(func(i interface{}) interface{} { - return mapFn(i.(bool)) - }) -} - -func (rt refreshableTyped) SubscribeToBool(subFn func(bool)) (unsubscribe func()) { - return rt.Subscribe(func(i interface{}) { - subFn(i.(bool)) - }) -} - -func (rt refreshableTyped) CurrentBoolPtr() *bool { - return rt.Current().(*bool) -} - -func (rt refreshableTyped) MapBoolPtr(mapFn func(*bool) interface{}) Refreshable { - return rt.Map(func(i interface{}) interface{} { - return mapFn(i.(*bool)) - }) -} - -func (rt refreshableTyped) SubscribeToBoolPtr(subFn func(*bool)) (unsubscribe func()) { - return rt.Subscribe(func(i interface{}) { - subFn(i.(*bool)) - }) -} - -func (rt refreshableTyped) CurrentDuration() time.Duration { - return rt.Current().(time.Duration) -} - -func (rt refreshableTyped) MapDuration(mapFn func(time.Duration) interface{}) Refreshable { - return rt.Map(func(i interface{}) interface{} { - return mapFn(i.(time.Duration)) - }) -} - -func (rt refreshableTyped) SubscribeToDuration(subFn func(time.Duration)) (unsubscribe func()) { - return rt.Subscribe(func(i interface{}) { - subFn(i.(time.Duration)) - }) -} - -func (rt refreshableTyped) CurrentDurationPtr() *time.Duration { - return rt.Current().(*time.Duration) -} - -func (rt refreshableTyped) MapDurationPtr(mapFn func(*time.Duration) interface{}) Refreshable { - return rt.Map(func(i interface{}) interface{} { - return mapFn(i.(*time.Duration)) - }) -} - -func (rt refreshableTyped) SubscribeToDurationPtr(subFn func(*time.Duration)) (unsubscribe func()) { - return rt.Subscribe(func(i interface{}) { - subFn(i.(*time.Duration)) - }) -} diff --git a/refreshable/refreshable_validating.go b/refreshable/refreshable_validating.go index d00bcf58..32ec9105 100644 --- a/refreshable/refreshable_validating.go +++ b/refreshable/refreshable_validating.go @@ -9,8 +9,8 @@ import ( "sync/atomic" ) -type ValidatingRefreshable struct { - Refreshable +type ValidatingRefreshable[T any] struct { + Refreshable[T] lastValidateErr *atomic.Value } @@ -19,70 +19,60 @@ type errorWrapper struct { err error } -func (v *ValidatingRefreshable) LastValidateErr() error { +func (v *ValidatingRefreshable[T]) LastValidateErr() error { return v.lastValidateErr.Load().(errorWrapper).err } // NewValidatingRefreshable returns a new Refreshable whose current value is the latest value that passes the provided // validatingFn successfully. This returns an error if the current value of the passed in Refreshable does not pass the // validatingFn or if the validatingFn or Refreshable are nil. -func NewValidatingRefreshable(origRefreshable Refreshable, validatingFn func(interface{}) error) (*ValidatingRefreshable, error) { - mappingFn := func(i interface{}) (interface{}, error) { +func NewValidatingRefreshable[T any](origRefreshable Refreshable[T], validatingFn func(T) error) (*ValidatingRefreshable[T], error) { + mappingFn := func(i T) (T, error) { if err := validatingFn(i); err != nil { - return nil, err + var zero T + return zero, err } - return nil, nil + return i, nil } - return newValidatingRefreshable(origRefreshable, mappingFn, false) + return newValidatingRefreshable(origRefreshable, mappingFn) } // NewMapValidatingRefreshable is similar to NewValidatingRefreshable but allows for the function to return a mapping/mutation // of the input object in addition to returning an error. The returned ValidatingRefreshable will contain the mapped value. // The mapped value must always be of the same type (but not necessarily that of the input type). -func NewMapValidatingRefreshable(origRefreshable Refreshable, mappingFn func(interface{}) (interface{}, error)) (*ValidatingRefreshable, error) { - return newValidatingRefreshable(origRefreshable, mappingFn, true) +func NewMapValidatingRefreshable[T any, M any](origRefreshable Refreshable[T], mappingFn func(T) (M, error)) (*ValidatingRefreshable[M], error) { + return newValidatingRefreshable(origRefreshable, mappingFn) } -func newValidatingRefreshable(origRefreshable Refreshable, validatingFn func(interface{}) (interface{}, error), storeMappedVal bool) (*ValidatingRefreshable, error) { - if validatingFn == nil { +func newValidatingRefreshable[T any, M any](origRefreshable Refreshable[T], mappingFn func(T) (M, error)) (*ValidatingRefreshable[M], error) { + if mappingFn == nil { return nil, errors.New("failed to create validating Refreshable because the validating function was nil") } - if origRefreshable == nil { return nil, errors.New("failed to create validating Refreshable because the passed in Refreshable was nil") } - var validatedRefreshable *DefaultRefreshable + var validatedRefreshable *DefaultRefreshable[M] currentVal := origRefreshable.Current() - mappedVal, err := validatingFn(currentVal) + mappedVal, err := mappingFn(currentVal) if err != nil { return nil, err } - if storeMappedVal { - validatedRefreshable = NewDefaultRefreshable(mappedVal) - } else { - validatedRefreshable = NewDefaultRefreshable(currentVal) - } + validatedRefreshable = New(mappedVal) var lastValidateErr atomic.Value lastValidateErr.Store(errorWrapper{}) - v := ValidatingRefreshable{ + v := ValidatingRefreshable[M]{ Refreshable: validatedRefreshable, lastValidateErr: &lastValidateErr, } - updateValueFn := func(i interface{}) { - mappedVal, err := validatingFn(i) - if err != nil { - v.lastValidateErr.Store(errorWrapper{err}) - return - } - if storeMappedVal { - err = validatedRefreshable.Update(mappedVal) - } else { - err = validatedRefreshable.Update(i) + updateValueFn := func(i T) { + mappedVal, err := mappingFn(i) + v.lastValidateErr.Store(errorWrapper{err}) + if err == nil { + validatedRefreshable.Update(mappedVal) } - v.lastValidateErr.Store(errorWrapper{err: err}) } origRefreshable.Subscribe(updateValueFn) diff --git a/refreshable/refreshable_validating_test.go b/refreshable/refreshable_validating_test.go index f7d1c384..e05b52d0 100644 --- a/refreshable/refreshable_validating_test.go +++ b/refreshable/refreshable_validating_test.go @@ -9,69 +9,65 @@ import ( "net/url" "testing" - "github.com/palantir/pkg/refreshable" + "github.com/palantir/pkg/refreshable/v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestValidatingRefreshable(t *testing.T) { type container struct{ Value string } - r := refreshable.NewDefaultRefreshable(container{Value: "value"}) - vr, err := refreshable.NewValidatingRefreshable(r, func(i interface{}) error { - if len(i.(container).Value) == 0 { + r := refreshable.New(container{Value: "value"}) + vr, err := refreshable.NewValidatingRefreshable[container](r, func(i container) error { + if len(i.Value) == 0 { return errors.New("empty") } return nil }) require.NoError(t, err) require.NoError(t, vr.LastValidateErr()) - require.Equal(t, r.Current().(container).Value, "value") - require.Equal(t, vr.Current().(container).Value, "value") + require.Equal(t, r.Current().Value, "value") + require.Equal(t, vr.Current().Value, "value") // attempt bad update - err = r.Update(container{}) - require.NoError(t, err, "no err expected from default refreshable") - require.Equal(t, r.Current().(container).Value, "") + r.Update(container{}) + require.Equal(t, r.Current().Value, "") require.EqualError(t, vr.LastValidateErr(), "empty", "expected err from validating refreshable") - require.Equal(t, vr.Current().(container).Value, "value", "expected unchanged validating refreshable") + require.Equal(t, vr.Current().Value, "value", "expected unchanged validating refreshable") // attempt good update - require.NoError(t, r.Update(container{Value: "value2"})) + r.Update(container{Value: "value2"}) require.NoError(t, vr.LastValidateErr()) - require.Equal(t, "value2", vr.Current().(container).Value) - require.Equal(t, "value2", r.Current().(container).Value) + require.Equal(t, "value2", vr.Current().Value) + require.Equal(t, "value2", r.Current().Value) } func TestMapValidatingRefreshable(t *testing.T) { - r := refreshable.NewDefaultRefreshable("https://palantir.com:443") - vr, err := refreshable.NewMapValidatingRefreshable(r, func(i interface{}) (interface{}, error) { - return url.Parse(i.(string)) - }) + r := refreshable.New("https://palantir.com:443") + vr, err := refreshable.NewMapValidatingRefreshable[string, *url.URL](r, url.Parse) require.NoError(t, err) require.NoError(t, vr.LastValidateErr()) - require.Equal(t, r.Current().(string), "https://palantir.com:443") - require.Equal(t, vr.Current().(*url.URL).Hostname(), "palantir.com") + require.Equal(t, r.Current(), "https://palantir.com:443") + require.Equal(t, vr.Current().Hostname(), "palantir.com") // attempt bad update - err = r.Update(":::error.com") - require.NoError(t, err, "no err expected from default refreshable") - assert.Equal(t, r.Current().(string), ":::error.com") + r.Update(":::error.com") + assert.Equal(t, r.Current(), ":::error.com") require.EqualError(t, vr.LastValidateErr(), "parse \":::error.com\": missing protocol scheme", "expected err from validating refreshable") - assert.Equal(t, vr.Current().(*url.URL).Hostname(), "palantir.com", "expected unchanged validating refreshable") + assert.Equal(t, vr.Current().Hostname(), "palantir.com", "expected unchanged validating refreshable") // attempt good update - require.NoError(t, r.Update("https://example.com")) + r.Update("https://example.com") require.NoError(t, vr.LastValidateErr()) - require.Equal(t, r.Current().(string), "https://example.com") - require.Equal(t, vr.Current().(*url.URL).Hostname(), "example.com") + require.Equal(t, r.Current(), "https://example.com") + require.Equal(t, vr.Current().Hostname(), "example.com") } // TestValidatingRefreshable_SubscriptionRaceCondition tests that the ValidatingRefreshable stays current // if the underlying refreshable updates during the creation process. func TestValidatingRefreshable_SubscriptionRaceCondition(t *testing.T) { - r := &updateImmediatelyRefreshable{r: refreshable.NewDefaultRefreshable(1), newValue: 2} - vr, err := refreshable.NewValidatingRefreshable(r, func(i interface{}) error { return nil }) + r := &updateImmediatelyRefreshable{r: refreshable.New(1), newValue: 2} + vr, err := refreshable.NewValidatingRefreshable[int](r, func(i int) error { return nil }) require.NoError(t, err) // If this returns 1, it is likely because the VR contains a stale value assert.Equal(t, 2, vr.Current()) @@ -79,20 +75,16 @@ func TestValidatingRefreshable_SubscriptionRaceCondition(t *testing.T) { // updateImmediatelyRefreshable is a mock implementation which updates to newValue immediately when Current() is called type updateImmediatelyRefreshable struct { - r *refreshable.DefaultRefreshable - newValue interface{} + r *refreshable.DefaultRefreshable[int] + newValue int } -func (r *updateImmediatelyRefreshable) Current() interface{} { +func (r *updateImmediatelyRefreshable) Current() int { c := r.r.Current() - _ = r.r.Update(r.newValue) + r.r.Update(r.newValue) return c } -func (r *updateImmediatelyRefreshable) Subscribe(f func(interface{})) func() { +func (r *updateImmediatelyRefreshable) Subscribe(f func(int)) func() { return r.r.Subscribe(f) } - -func (r *updateImmediatelyRefreshable) Map(f func(interface{}) interface{}) refreshable.Refreshable { - return r.r.Map(f) -} From 3ac12b08aeaf772935da8ef0010f4f91e73355d2 Mon Sep 17 00:00:00 2001 From: Brad Moylan Date: Sat, 2 Jul 2022 16:20:58 -0700 Subject: [PATCH 2/9] wip --- refreshable/refreshable.go | 2 +- refreshable/refreshable_provider.go | 95 +++++++++++++++++++++++++++++ 2 files changed, 96 insertions(+), 1 deletion(-) create mode 100644 refreshable/refreshable_provider.go diff --git a/refreshable/refreshable.go b/refreshable/refreshable.go index ea0d888f..a6743d22 100644 --- a/refreshable/refreshable.go +++ b/refreshable/refreshable.go @@ -5,7 +5,7 @@ package refreshable type Refreshable[T any] interface { - // Current returns the most recent value of this Supplier. + // Current returns the most recent value of this Refreshable. // If the value has not been initialized, returns T's zero value. Current() T diff --git a/refreshable/refreshable_provider.go b/refreshable/refreshable_provider.go new file mode 100644 index 00000000..a3ccaaef --- /dev/null +++ b/refreshable/refreshable_provider.go @@ -0,0 +1,95 @@ +// Copyright (c) 2022 Palantir Technologies. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package refreshable + +import ( + "context" + "sync" + "time" +) + +type ProviderRefreshable[T any] interface { + Refreshable[T] + Start(ctx context.Context) + ReadyC() <-chan struct{} + ValuesC(buf int) (values <-chan T, cleanup func()) +} + +func NewTickerRefreshable[T any](interval time.Duration, getValue func(context.Context) (T, bool)) *TickerRefreshable[T] { + var zeroValue T + return &TickerRefreshable[T]{ + DefaultRefreshable: New[T](zeroValue), + ready: make(chan struct{}), + interval: interval, + getValue: getValue, + } +} + +type TickerRefreshable[T any] struct { + *DefaultRefreshable[T] + readyOnce sync.Once + ready chan struct{} + interval time.Duration + getValue func(ctx context.Context) (T, bool) +} + +// compile time interface check +var _ Refreshable[any] = (*TickerRefreshable[any])(nil) + +func (p *TickerRefreshable[T]) Ready() bool { + select { + case <-p.ReadyC(): + return true + default: + return false + } +} + +func (p *TickerRefreshable[T]) ReadyC() <-chan struct{} { + return p.ready +} + +func (p *TickerRefreshable[T]) Wait(ctx context.Context) (t T, ok bool) { + select { + case <-p.ReadyC(): + return p.Current(), true + case <-ctx.Done(): + return + } +} + +func (p *TickerRefreshable[T]) Start(ctx context.Context) { + go func() { + ticker := time.NewTicker(p.interval) + defer ticker.Stop() + for { + value, ok := p.getValue(ctx) + if ok { + p.Update(value) + p.readyOnce.Do(func() { + close(p.ready) + }) + } + + select { + case <-ticker.C: + continue + case <-ctx.Done(): + return + } + } + }() +} + +func (p *TickerRefreshable[T]) ValuesC(buf int) (values <-chan T, cleanup func()) { + ch := make(chan T, buf) + unsub := p.Subscribe(func(t T) { + ch <- t + }) + return ch, func() { + unsub() + close(ch) + } +} From 96db6475a395f0598d8c88669518a1ce9b3ab2ed Mon Sep 17 00:00:00 2001 From: Brad Moylan Date: Tue, 9 Aug 2022 17:07:55 -0700 Subject: [PATCH 3/9] more --- refreshable/refreshable.go | 144 ++++++++++++++++++++- refreshable/refreshable_default.go | 62 ++++----- refreshable/refreshable_provider.go | 95 -------------- refreshable/refreshable_validating_test.go | 2 +- 4 files changed, 165 insertions(+), 138 deletions(-) delete mode 100644 refreshable/refreshable_provider.go diff --git a/refreshable/refreshable.go b/refreshable/refreshable.go index a6743d22..e59da113 100644 --- a/refreshable/refreshable.go +++ b/refreshable/refreshable.go @@ -4,21 +4,155 @@ package refreshable +import ( + "context" + "time" +) + +// A Refreshable is a generic container type for a volatile underlying value. +// It supports atomic access and user-provided callback "subscriptions" on updates. type Refreshable[T any] interface { // Current returns the most recent value of this Refreshable. // If the value has not been initialized, returns T's zero value. Current() T - // Subscribe subscribes to changes of this Supplier. - // The provided function is called with the value of Get() whenever the value changes. - Subscribe(consumer func(T)) (unsubscribe func()) + // Subscribe calls the consumer function when Value updates until stop is closed. + // The consumer should be relatively fast: Updatable.Set blocks until all subscribers have returned. + // Updates considered no-ops by reflect.DeepEqual may be skipped. + Subscribe(consumer func(T)) UnsubscribeFunc +} + +// A Updatable is a Refreshable which supports setting the value with a user-provided value. +// When a utility returns a (non-Updatable) Refreshable, it implies that value updates are handled internally. +type Updatable[T any] interface { + Refreshable[T] + // Update updates the Refreshable with a new T. + // It blocks until all subscribers have completed. + Update(T) +} + +// A Validated is a Refreshable capable of rejecting updates according to validation logic. +// Its Current method returns the most recent value to pass validation. +type Validated[T any] interface { + Refreshable[T] + // Validation returns the result of the most recent validation. + // If the last value was valid, Validation returns the same value as Current and a nil error. + // If the last value was invalid, it and the validation error are returned. Current returns the newest valid value. + Validation() (T, error) +} + +// Ready extends Refreshable for asynchronous implementations which may not have a value when they are constructed. +// Callers should check that the Ready channel is closed before using the Current value. +type Ready[T any] interface { + Refreshable[T] + // ReadyC returns a channel which is closed after a value is successfully populated. + ReadyC() <-chan struct{} +} + +type UnsubscribeFunc func() + +func New[T any](val T) *DefaultRefreshable[T] { + d := new(DefaultRefreshable[T]) + d.current.Store(&val) + return d } // Map returns a new Refreshable based on the current one that handles updates based on the current Refreshable. -func Map[T any, M any](t Refreshable[T], mapFn func(T) M) (m Refreshable[M], unsubscribe func()) { +func Map[T any, M any](t Refreshable[T], mapFn func(T) M) (Refreshable[M], UnsubscribeFunc) { out := New(mapFn(t.Current())) - unsubscribe = t.Subscribe(func(v T) { + unsubscribe := t.Subscribe(func(v T) { out.Update(mapFn(v)) }) return out, unsubscribe } + +func SubscribeWithCurrent[T any](r Refreshable[T], consumer func(T)) UnsubscribeFunc { + unsubscribe := r.Subscribe(consumer) + consumer(r.Current()) + return unsubscribe +} + +// UpdateFromChannel populates an Updatable with the values channel. +// If an element is already available, the returned Value is guaranteed to be populated. +// The channel should be closed when no longer used to avoid leaking resources. +func UpdateFromChannel[T any](in Updatable[T], values <-chan T) Ready[T] { + out := newReady(in) + select { + case initial, ok := <-values: + if !ok { + return out // channel already closed + } + out.Update(initial) + default: + } + + go func() { + for value := range values { + out.Update(value) + } + }() + + return out +} + +// UpdateFromTickerFunc returns a Refreshable populated by the result of the provider called each interval. +// If the providers bool return is false, the value is ignored. +func UpdateFromTickerFunc[T any](in Updatable[T], interval time.Duration, provider func() (T, bool)) (Ready[T], UnsubscribeFunc) { + out := newReady(in) + ctx, cancel := context.WithCancel(context.Background()) + go func() { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + if value, ok := provider(); ok { + out.Update(value) + } + select { + case <-ticker.C: + continue + case <-ctx.Done(): + return + } + } + }() + return out, UnsubscribeFunc(cancel) +} + +// Wait waits until the Ready has a current value or the context expires. +func Wait[T any](ctx context.Context, ready Ready[T]) (T, bool) { + select { + case <-ready.ReadyC(): + return ready.Current(), true + case <-ctx.Done(): + var zero T + return zero, false + } +} + +type ready[T any] struct { + in Updatable[T] + readyC <-chan struct{} + cancel context.CancelFunc +} + +func newReady[T any](in Updatable[T]) *ready[T] { + ctx, cancel := context.WithCancel(context.Background()) + return &ready[T]{in: in, readyC: ctx.Done(), cancel: cancel} +} + +func (r *ready[T]) Current() T { + return r.in.Current() +} + +func (r *ready[T]) Subscribe(consumer func(T)) UnsubscribeFunc { + return r.in.Subscribe(consumer) +} + +func (r *ready[T]) ReadyC() <-chan struct{} { + return r.readyC +} + +func (r *ready[T]) Update(val T) { + r.cancel() + r.in.Update(val) +} diff --git a/refreshable/refreshable_default.go b/refreshable/refreshable_default.go index 1f18583e..eb534e98 100644 --- a/refreshable/refreshable_default.go +++ b/refreshable/refreshable_default.go @@ -11,32 +11,19 @@ import ( ) type DefaultRefreshable[T any] struct { - current *atomic.Value - - sync.Mutex // protects subscribers + mux sync.Mutex + current atomic.Value subscribers []*func(T) } -func New[T any](val T) *DefaultRefreshable[T] { - current := atomic.Value{} - current.Store(&val) - - return &DefaultRefreshable[T]{ - current: ¤t, - } -} - // Update changes the value of the Refreshable, then blocks while subscribers are executed. func (d *DefaultRefreshable[T]) Update(val T) { - d.Lock() - defer d.Unlock() - - if reflect.DeepEqual(d.Current(), val) { + d.mux.Lock() + defer d.mux.Unlock() + old := d.current.Swap(&val) + if reflect.DeepEqual(*(old.(*T)), val) { return } - - d.current.Store(&val) - for _, sub := range d.subscribers { (*sub)(val) } @@ -46,29 +33,30 @@ func (d *DefaultRefreshable[T]) Current() T { return *(d.current.Load().(*T)) } -func (d *DefaultRefreshable[T]) Subscribe(consumer func(T)) (unsubscribe func()) { - d.Lock() - defer d.Unlock() +func (d *DefaultRefreshable[T]) Subscribe(consumer func(T)) UnsubscribeFunc { + d.mux.Lock() + defer d.mux.Unlock() consumerFnPtr := &consumer d.subscribers = append(d.subscribers, consumerFnPtr) - return func() { - d.unsubscribe(consumerFnPtr) - } + return d.unsubscribe(consumerFnPtr) } -func (d *DefaultRefreshable[T]) unsubscribe(consumerFnPtr *func(T)) { - d.Lock() - defer d.Unlock() - - matchIdx := -1 - for idx, currSub := range d.subscribers { - if currSub == consumerFnPtr { - matchIdx = idx - break +func (d *DefaultRefreshable[T]) unsubscribe(consumerFnPtr *func(T)) UnsubscribeFunc { + return func() { + d.mux.Lock() + defer d.mux.Unlock() + + matchIdx := -1 + for idx, currSub := range d.subscribers { + if currSub == consumerFnPtr { + matchIdx = idx + break + } + } + if matchIdx != -1 { + d.subscribers = append(d.subscribers[:matchIdx], d.subscribers[matchIdx+1:]...) } } - if matchIdx != -1 { - d.subscribers = append(d.subscribers[:matchIdx], d.subscribers[matchIdx+1:]...) - } + } diff --git a/refreshable/refreshable_provider.go b/refreshable/refreshable_provider.go deleted file mode 100644 index a3ccaaef..00000000 --- a/refreshable/refreshable_provider.go +++ /dev/null @@ -1,95 +0,0 @@ -// Copyright (c) 2022 Palantir Technologies. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package refreshable - -import ( - "context" - "sync" - "time" -) - -type ProviderRefreshable[T any] interface { - Refreshable[T] - Start(ctx context.Context) - ReadyC() <-chan struct{} - ValuesC(buf int) (values <-chan T, cleanup func()) -} - -func NewTickerRefreshable[T any](interval time.Duration, getValue func(context.Context) (T, bool)) *TickerRefreshable[T] { - var zeroValue T - return &TickerRefreshable[T]{ - DefaultRefreshable: New[T](zeroValue), - ready: make(chan struct{}), - interval: interval, - getValue: getValue, - } -} - -type TickerRefreshable[T any] struct { - *DefaultRefreshable[T] - readyOnce sync.Once - ready chan struct{} - interval time.Duration - getValue func(ctx context.Context) (T, bool) -} - -// compile time interface check -var _ Refreshable[any] = (*TickerRefreshable[any])(nil) - -func (p *TickerRefreshable[T]) Ready() bool { - select { - case <-p.ReadyC(): - return true - default: - return false - } -} - -func (p *TickerRefreshable[T]) ReadyC() <-chan struct{} { - return p.ready -} - -func (p *TickerRefreshable[T]) Wait(ctx context.Context) (t T, ok bool) { - select { - case <-p.ReadyC(): - return p.Current(), true - case <-ctx.Done(): - return - } -} - -func (p *TickerRefreshable[T]) Start(ctx context.Context) { - go func() { - ticker := time.NewTicker(p.interval) - defer ticker.Stop() - for { - value, ok := p.getValue(ctx) - if ok { - p.Update(value) - p.readyOnce.Do(func() { - close(p.ready) - }) - } - - select { - case <-ticker.C: - continue - case <-ctx.Done(): - return - } - } - }() -} - -func (p *TickerRefreshable[T]) ValuesC(buf int) (values <-chan T, cleanup func()) { - ch := make(chan T, buf) - unsub := p.Subscribe(func(t T) { - ch <- t - }) - return ch, func() { - unsub() - close(ch) - } -} diff --git a/refreshable/refreshable_validating_test.go b/refreshable/refreshable_validating_test.go index e05b52d0..eb5a9875 100644 --- a/refreshable/refreshable_validating_test.go +++ b/refreshable/refreshable_validating_test.go @@ -85,6 +85,6 @@ func (r *updateImmediatelyRefreshable) Current() int { return c } -func (r *updateImmediatelyRefreshable) Subscribe(f func(int)) func() { +func (r *updateImmediatelyRefreshable) Subscribe(f func(int)) refreshable.UnsubscribeFunc { return r.r.Subscribe(f) } From 615f9a934d5b6529e70caa77b085de61696dcfd4 Mon Sep 17 00:00:00 2001 From: Brad Moylan Date: Sun, 4 Sep 2022 13:46:09 -0700 Subject: [PATCH 4/9] updates --- refreshable/async.go | 100 ++++++++++++++++ refreshable/refreshable.go | 126 ++++----------------- refreshable/refreshable_default.go | 23 +++- refreshable/refreshable_default_test.go | 19 +++- refreshable/refreshable_validating.go | 106 +++++++---------- refreshable/refreshable_validating_test.go | 45 +++++--- 6 files changed, 228 insertions(+), 191 deletions(-) create mode 100644 refreshable/async.go diff --git a/refreshable/async.go b/refreshable/async.go new file mode 100644 index 00000000..7ca9a2f8 --- /dev/null +++ b/refreshable/async.go @@ -0,0 +1,100 @@ +// Copyright (c) 2022 Palantir Technologies. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package refreshable + +import ( + "context" + "time" +) + +type ready[T any] struct { + in Updatable[T] + readyC <-chan struct{} + cancel context.CancelFunc +} + +func newReady[T any](in Updatable[T]) *ready[T] { + ctx, cancel := context.WithCancel(context.Background()) + return &ready[T]{ + in: in, + readyC: ctx.Done(), + cancel: cancel, + } +} + +func (r *ready[T]) Current() T { + return r.in.Current() +} + +func (r *ready[T]) Subscribe(consumer func(T)) UnsubscribeFunc { + return r.in.Subscribe(consumer) +} + +func (r *ready[T]) ReadyC() <-chan struct{} { + return r.readyC +} + +func (r *ready[T]) Update(val T) { + r.cancel() + r.in.Update(val) +} + +// NewFromChannel populates an Updatable with the values channel. +// If an element is already available, the returned Value is guaranteed to be populated. +// The channel should be closed when no longer used to avoid leaking resources. +func NewFromChannel[T any](values <-chan T) Ready[T] { + out := newReady(newZero[T]()) + select { + case initial, ok := <-values: + if !ok { + return out // channel already closed + } + out.Update(initial) + default: + } + go func() { + for value := range values { + out.Update(value) + } + }() + return out +} + +// NewFromTickerFunc returns a Ready Refreshable populated by the result of the provider called each interval. +// If the providers bool return is false, the value is ignored. +// The result's ReadyC channel is closed when a new value is populated. +func NewFromTickerFunc[T any](interval time.Duration, provider func() (T, bool)) (Ready[T], UnsubscribeFunc) { + out := newReady(newZero[T]()) + ctx, cancel := context.WithCancel(context.Background()) + values := make(chan T) + go func() { + ticker := time.NewTicker(interval) + defer ticker.Stop() + defer close(values) + for { + if value, ok := provider(); ok { + out.Update(value) + } + select { + case <-ticker.C: + continue + case <-ctx.Done(): + return + } + } + }() + return out, UnsubscribeFunc(cancel) +} + +// Wait waits until the Ready has a current value or the context expires. +func Wait[T any](ctx context.Context, ready Ready[T]) (T, bool) { + select { + case <-ready.ReadyC(): + return ready.Current(), true + case <-ctx.Done(): + var zero T + return zero, false + } +} diff --git a/refreshable/refreshable.go b/refreshable/refreshable.go index e59da113..eb5c3499 100644 --- a/refreshable/refreshable.go +++ b/refreshable/refreshable.go @@ -4,11 +4,6 @@ package refreshable -import ( - "context" - "time" -) - // A Refreshable is a generic container type for a volatile underlying value. // It supports atomic access and user-provided callback "subscriptions" on updates. type Refreshable[T any] interface { @@ -17,7 +12,8 @@ type Refreshable[T any] interface { Current() T // Subscribe calls the consumer function when Value updates until stop is closed. - // The consumer should be relatively fast: Updatable.Set blocks until all subscribers have returned. + // The consumer must be relatively fast: Updatable.Set blocks until all subscribers have returned. + // Expensive or error-prone responses to refreshed values should be asynchronous. // Updates considered no-ops by reflect.DeepEqual may be skipped. Subscribe(consumer func(T)) UnsubscribeFunc } @@ -37,7 +33,7 @@ type Validated[T any] interface { Refreshable[T] // Validation returns the result of the most recent validation. // If the last value was valid, Validation returns the same value as Current and a nil error. - // If the last value was invalid, it and the validation error are returned. Current returns the newest valid value. + // If the last value was invalid, it and the error are returned. Current returns the most recent valid value. Validation() (T, error) } @@ -49,110 +45,36 @@ type Ready[T any] interface { ReadyC() <-chan struct{} } +// UnsubscribeFunc removes a subscription from a refreshable's internal tracking and/or stops its update routine. +// It is safe to call multiple times. type UnsubscribeFunc func() -func New[T any](val T) *DefaultRefreshable[T] { - d := new(DefaultRefreshable[T]) - d.current.Store(&val) - return d +func New[T any](val T) Updatable[T] { + return newDefault(val) } // Map returns a new Refreshable based on the current one that handles updates based on the current Refreshable. -func Map[T any, M any](t Refreshable[T], mapFn func(T) M) (Refreshable[M], UnsubscribeFunc) { - out := New(mapFn(t.Current())) - unsubscribe := t.Subscribe(func(v T) { +func Map[T any, M any](original Refreshable[T], mapFn func(T) M) (Refreshable[M], UnsubscribeFunc) { + out := New(mapFn(original.Current())) + stop := original.Subscribe(func(v T) { out.Update(mapFn(v)) }) - return out, unsubscribe -} - -func SubscribeWithCurrent[T any](r Refreshable[T], consumer func(T)) UnsubscribeFunc { - unsubscribe := r.Subscribe(consumer) - consumer(r.Current()) - return unsubscribe -} - -// UpdateFromChannel populates an Updatable with the values channel. -// If an element is already available, the returned Value is guaranteed to be populated. -// The channel should be closed when no longer used to avoid leaking resources. -func UpdateFromChannel[T any](in Updatable[T], values <-chan T) Ready[T] { - out := newReady(in) - select { - case initial, ok := <-values: - if !ok { - return out // channel already closed - } - out.Update(initial) - default: - } - - go func() { - for value := range values { - out.Update(value) - } - }() - - return out -} - -// UpdateFromTickerFunc returns a Refreshable populated by the result of the provider called each interval. -// If the providers bool return is false, the value is ignored. -func UpdateFromTickerFunc[T any](in Updatable[T], interval time.Duration, provider func() (T, bool)) (Ready[T], UnsubscribeFunc) { - out := newReady(in) - ctx, cancel := context.WithCancel(context.Background()) - go func() { - ticker := time.NewTicker(interval) - defer ticker.Stop() - for { - if value, ok := provider(); ok { - out.Update(value) - } - select { - case <-ticker.C: - continue - case <-ctx.Done(): - return - } - } - }() - return out, UnsubscribeFunc(cancel) -} - -// Wait waits until the Ready has a current value or the context expires. -func Wait[T any](ctx context.Context, ready Ready[T]) (T, bool) { - select { - case <-ready.ReadyC(): - return ready.Current(), true - case <-ctx.Done(): - var zero T - return zero, false - } -} - -type ready[T any] struct { - in Updatable[T] - readyC <-chan struct{} - cancel context.CancelFunc -} - -func newReady[T any](in Updatable[T]) *ready[T] { - ctx, cancel := context.WithCancel(context.Background()) - return &ready[T]{in: in, readyC: ctx.Done(), cancel: cancel} -} - -func (r *ready[T]) Current() T { - return r.in.Current() -} - -func (r *ready[T]) Subscribe(consumer func(T)) UnsubscribeFunc { - return r.in.Subscribe(consumer) + out.Update(mapFn(original.Current())) + return out, stop } -func (r *ready[T]) ReadyC() <-chan struct{} { - return r.readyC +// MapWithError is similar to Validate but allows for the function to return a mapping/mutation +// of the input object in addition to returning an error. The returned validRefreshable will contain the mapped value. +// An error is returned if the current original value fails to map. +func MapWithError[T any, M any](original Refreshable[T], mapFn func(T) (M, error)) (Validated[M], UnsubscribeFunc, error) { + v, stop := newValidRefreshable(original, mapFn) + _, err := v.Validation() + return v, stop, err } -func (r *ready[T]) Update(val T) { - r.cancel() - r.in.Update(val) +// Validate returns a new Refreshable that returns the latest original value accepted by the validatingFn. +// If the upstream value results in an error, it is reported by Validation(). +// An error is returned if the current original value is invalid. +func Validate[T any](original Refreshable[T], validatingFn func(T) error) (Validated[T], UnsubscribeFunc, error) { + return MapWithError(original, identity(validatingFn)) } diff --git a/refreshable/refreshable_default.go b/refreshable/refreshable_default.go index eb534e98..fbaf831b 100644 --- a/refreshable/refreshable_default.go +++ b/refreshable/refreshable_default.go @@ -10,14 +10,27 @@ import ( "sync/atomic" ) -type DefaultRefreshable[T any] struct { +type defaultRefreshable[T any] struct { mux sync.Mutex current atomic.Value subscribers []*func(T) } +func newDefault[T any](val T) Updatable[T] { + d := new(defaultRefreshable[T]) + d.current.Store(&val) + return d +} + +func newZero[T any]() Updatable[T] { + d := new(defaultRefreshable[T]) + var zero T + d.current.Store(&zero) + return d +} + // Update changes the value of the Refreshable, then blocks while subscribers are executed. -func (d *DefaultRefreshable[T]) Update(val T) { +func (d *defaultRefreshable[T]) Update(val T) { d.mux.Lock() defer d.mux.Unlock() old := d.current.Swap(&val) @@ -29,11 +42,11 @@ func (d *DefaultRefreshable[T]) Update(val T) { } } -func (d *DefaultRefreshable[T]) Current() T { +func (d *defaultRefreshable[T]) Current() T { return *(d.current.Load().(*T)) } -func (d *DefaultRefreshable[T]) Subscribe(consumer func(T)) UnsubscribeFunc { +func (d *defaultRefreshable[T]) Subscribe(consumer func(T)) UnsubscribeFunc { d.mux.Lock() defer d.mux.Unlock() @@ -42,7 +55,7 @@ func (d *DefaultRefreshable[T]) Subscribe(consumer func(T)) UnsubscribeFunc { return d.unsubscribe(consumerFnPtr) } -func (d *DefaultRefreshable[T]) unsubscribe(consumerFnPtr *func(T)) UnsubscribeFunc { +func (d *defaultRefreshable[T]) unsubscribe(consumerFnPtr *func(T)) UnsubscribeFunc { return func() { d.mux.Lock() defer d.mux.Unlock() diff --git a/refreshable/refreshable_default_test.go b/refreshable/refreshable_default_test.go index bd0692d7..3116dc06 100644 --- a/refreshable/refreshable_default_test.go +++ b/refreshable/refreshable_default_test.go @@ -12,7 +12,9 @@ import ( ) func TestDefaultRefreshable(t *testing.T) { - type container struct{ Value string } + type container struct { + Value string + } v := &container{Value: "original"} r := refreshable.New(v) @@ -46,13 +48,22 @@ func TestDefaultRefreshable(t *testing.T) { t.Run("Map", func(t *testing.T) { r.Update(&container{Value: "value"}) - m, _ := refreshable.Map[*container, int](r, func(i *container) int { + rLen, _ := refreshable.Map[*container, int](r, func(i *container) int { return len(i.Value) }) - assert.Equal(t, m.Current(), 5) + assert.Equal(t, 5, rLen.Current()) + + rLenUpdated := false + rLen.Subscribe(func(int) { rLenUpdated = true }) + // update to new value with same length and ensure the + // equality check prevented unnecessary subscriber updates. + r.Update(&container{Value: "VALUE"}) + assert.Equal(t, "VALUE", r.Current().Value) + assert.False(t, rLenUpdated) r.Update(&container{Value: "updated"}) - assert.Equal(t, m.Current(), 7) + assert.Equal(t, "updated", r.Current().Value) + assert.Equal(t, 7, rLen.Current()) }) } diff --git a/refreshable/refreshable_validating.go b/refreshable/refreshable_validating.go index 32ec9105..da96ed77 100644 --- a/refreshable/refreshable_validating.go +++ b/refreshable/refreshable_validating.go @@ -1,85 +1,57 @@ -// Copyright (c) 2021 Palantir Technologies. All rights reserved. +// Copyright (c) 2022 Palantir Technologies. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. package refreshable -import ( - "errors" - "sync/atomic" -) - -type ValidatingRefreshable[T any] struct { - Refreshable[T] - lastValidateErr *atomic.Value +type validRefreshable[T any] struct { + r Updatable[validRefreshableContainer[T]] } -// this is needed to be able to store the absence of an error in an atomic.Value -type errorWrapper struct { - err error +type validRefreshableContainer[T any] struct { + validated T + unvalidated T + lastErr error } -func (v *ValidatingRefreshable[T]) LastValidateErr() error { - return v.lastValidateErr.Load().(errorWrapper).err -} +func (v *validRefreshable[T]) Current() T { return v.r.Current().validated } -// NewValidatingRefreshable returns a new Refreshable whose current value is the latest value that passes the provided -// validatingFn successfully. This returns an error if the current value of the passed in Refreshable does not pass the -// validatingFn or if the validatingFn or Refreshable are nil. -func NewValidatingRefreshable[T any](origRefreshable Refreshable[T], validatingFn func(T) error) (*ValidatingRefreshable[T], error) { - mappingFn := func(i T) (T, error) { - if err := validatingFn(i); err != nil { - var zero T - return zero, err - } - return i, nil - } - return newValidatingRefreshable(origRefreshable, mappingFn) +func (v *validRefreshable[T]) Subscribe(consumer func(T)) UnsubscribeFunc { + return v.r.Subscribe(func(val validRefreshableContainer[T]) { + consumer(val.validated) + }) } -// NewMapValidatingRefreshable is similar to NewValidatingRefreshable but allows for the function to return a mapping/mutation -// of the input object in addition to returning an error. The returned ValidatingRefreshable will contain the mapped value. -// The mapped value must always be of the same type (but not necessarily that of the input type). -func NewMapValidatingRefreshable[T any, M any](origRefreshable Refreshable[T], mappingFn func(T) (M, error)) (*ValidatingRefreshable[M], error) { - return newValidatingRefreshable(origRefreshable, mappingFn) +// Validation returns the most recent upstream Refreshable and its validation result. +// If nil, the validRefreshable is up-to-date with its original. +func (v *validRefreshable[T]) Validation() (T, error) { + c := v.r.Current() + return c.unvalidated, c.lastErr } -func newValidatingRefreshable[T any, M any](origRefreshable Refreshable[T], mappingFn func(T) (M, error)) (*ValidatingRefreshable[M], error) { - if mappingFn == nil { - return nil, errors.New("failed to create validating Refreshable because the validating function was nil") - } - if origRefreshable == nil { - return nil, errors.New("failed to create validating Refreshable because the passed in Refreshable was nil") - } - - var validatedRefreshable *DefaultRefreshable[M] - currentVal := origRefreshable.Current() - mappedVal, err := mappingFn(currentVal) - if err != nil { - return nil, err - } - validatedRefreshable = New(mappedVal) - - var lastValidateErr atomic.Value - lastValidateErr.Store(errorWrapper{}) - v := ValidatingRefreshable[M]{ - Refreshable: validatedRefreshable, - lastValidateErr: &lastValidateErr, - } +func newValidRefreshable[T any, M any](original Refreshable[T], mappingFn func(T) (M, error)) (*validRefreshable[M], UnsubscribeFunc) { + valid := &validRefreshable[M]{r: New(validRefreshableContainer[M]{})} + stop := original.Subscribe(func(valueT T) { + updateValidRefreshable(valid, valueT, mappingFn) + }) + updateValidRefreshable(valid, original.Current(), mappingFn) + return valid, stop +} - updateValueFn := func(i T) { - mappedVal, err := mappingFn(i) - v.lastValidateErr.Store(errorWrapper{err}) - if err == nil { - validatedRefreshable.Update(mappedVal) - } +func updateValidRefreshable[T any, M any](valid *validRefreshable[M], value T, mapFn func(T) (M, error)) { + validated := valid.r.Current().validated + unvalidated, err := mapFn(value) + if err == nil { + validated = unvalidated } + valid.r.Update(validRefreshableContainer[M]{ + validated: validated, + unvalidated: unvalidated, + lastErr: err, + }) +} - origRefreshable.Subscribe(updateValueFn) - - // manually update value after performing subscription. This ensures that, if the current value changed between when - // it was fetched earlier in the function and when the subscription was performed, it is properly captured. - updateValueFn(origRefreshable.Current()) - - return &v, nil +// identity is a validating map function that returns its input argument type. +func identity[T any](validatingFn func(T) error) func(i T) (T, error) { + return func(i T) (T, error) { return i, validatingFn(i) } } diff --git a/refreshable/refreshable_validating_test.go b/refreshable/refreshable_validating_test.go index eb5a9875..221c0d9f 100644 --- a/refreshable/refreshable_validating_test.go +++ b/refreshable/refreshable_validating_test.go @@ -17,48 +17,56 @@ import ( func TestValidatingRefreshable(t *testing.T) { type container struct{ Value string } r := refreshable.New(container{Value: "value"}) - vr, err := refreshable.NewValidatingRefreshable[container](r, func(i container) error { + vr, _, err := refreshable.Validate[container](r, func(i container) error { if len(i.Value) == 0 { return errors.New("empty") } return nil }) require.NoError(t, err) - require.NoError(t, vr.LastValidateErr()) - require.Equal(t, r.Current().Value, "value") - require.Equal(t, vr.Current().Value, "value") + v, err := vr.Validation() + require.NoError(t, err) + require.Equal(t, "value", v.Value) + require.Equal(t, "value", r.Current().Value) + require.Equal(t, "value", vr.Current().Value) // attempt bad update r.Update(container{}) require.Equal(t, r.Current().Value, "") - - require.EqualError(t, vr.LastValidateErr(), "empty", "expected err from validating refreshable") + v, err = vr.Validation() + require.EqualError(t, err, "empty", "expected validation error") + require.Equal(t, "", v.Value, "expected invalid value from Validation") require.Equal(t, vr.Current().Value, "value", "expected unchanged validating refreshable") // attempt good update r.Update(container{Value: "value2"}) - require.NoError(t, vr.LastValidateErr()) + v, err = vr.Validation() + require.NoError(t, err) + require.Equal(t, "value2", v.Value) require.Equal(t, "value2", vr.Current().Value) require.Equal(t, "value2", r.Current().Value) } func TestMapValidatingRefreshable(t *testing.T) { r := refreshable.New("https://palantir.com:443") - vr, err := refreshable.NewMapValidatingRefreshable[string, *url.URL](r, url.Parse) + vr, _, err := refreshable.MapWithError[string, *url.URL](r, url.Parse) + require.NoError(t, err) + _, err = vr.Validation() require.NoError(t, err) - require.NoError(t, vr.LastValidateErr()) require.Equal(t, r.Current(), "https://palantir.com:443") require.Equal(t, vr.Current().Hostname(), "palantir.com") // attempt bad update r.Update(":::error.com") assert.Equal(t, r.Current(), ":::error.com") - require.EqualError(t, vr.LastValidateErr(), "parse \":::error.com\": missing protocol scheme", "expected err from validating refreshable") + _, err = vr.Validation() + require.EqualError(t, err, "parse \":::error.com\": missing protocol scheme", "expected err from validating refreshable") assert.Equal(t, vr.Current().Hostname(), "palantir.com", "expected unchanged validating refreshable") // attempt good update r.Update("https://example.com") - require.NoError(t, vr.LastValidateErr()) + _, err = vr.Validation() + require.NoError(t, err) require.Equal(t, r.Current(), "https://example.com") require.Equal(t, vr.Current().Hostname(), "example.com") } @@ -67,15 +75,26 @@ func TestMapValidatingRefreshable(t *testing.T) { // if the underlying refreshable updates during the creation process. func TestValidatingRefreshable_SubscriptionRaceCondition(t *testing.T) { r := &updateImmediatelyRefreshable{r: refreshable.New(1), newValue: 2} - vr, err := refreshable.NewValidatingRefreshable[int](r, func(i int) error { return nil }) + var seen1, seen2 bool + vr, _, err := refreshable.Validate[int](r, func(i int) error { + switch i { + case 1: + seen1 = true + case 2: + seen2 = true + } + return nil + }) require.NoError(t, err) + assert.True(t, seen1, "expected to process 1 value") + assert.True(t, seen2, "expected to process 2 value") // If this returns 1, it is likely because the VR contains a stale value assert.Equal(t, 2, vr.Current()) } // updateImmediatelyRefreshable is a mock implementation which updates to newValue immediately when Current() is called type updateImmediatelyRefreshable struct { - r *refreshable.DefaultRefreshable[int] + r refreshable.Updatable[int] newValue int } From 6fc9aa092e33a5c540288dad1b6a4a2b6e4df694 Mon Sep 17 00:00:00 2001 From: Brad Moylan Date: Sun, 4 Sep 2022 17:59:34 -0700 Subject: [PATCH 5/9] MapContext --- refreshable/refreshable.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/refreshable/refreshable.go b/refreshable/refreshable.go index eb5c3499..51fa3b12 100644 --- a/refreshable/refreshable.go +++ b/refreshable/refreshable.go @@ -4,6 +4,10 @@ package refreshable +import ( + "context" +) + // A Refreshable is a generic container type for a volatile underlying value. // It supports atomic access and user-provided callback "subscriptions" on updates. type Refreshable[T any] interface { @@ -63,6 +67,16 @@ func Map[T any, M any](original Refreshable[T], mapFn func(T) M) (Refreshable[M] return out, stop } +// MapContext is like Map but unsubscribes when the context is cancelled. +func MapContext[T any, M any](ctx context.Context, original Refreshable[T], mapFn func(T) M) Refreshable[M] { + out, stop := Map(original, mapFn) + go func() { + <-ctx.Done() + stop() + }() + return out +} + // MapWithError is similar to Validate but allows for the function to return a mapping/mutation // of the input object in addition to returning an error. The returned validRefreshable will contain the mapped value. // An error is returned if the current original value fails to map. From 477adc2a7dc22b31e46ded5f72764114c7b0e897 Mon Sep 17 00:00:00 2001 From: Brad Moylan Date: Wed, 2 Aug 2023 09:53:37 -0700 Subject: [PATCH 6/9] Mapped refreshables do not export Update --- refreshable/async.go | 4 ++-- refreshable/refreshable.go | 4 ++-- refreshable/refreshable_default.go | 16 ++++++++++++++-- 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/refreshable/async.go b/refreshable/async.go index 7ca9a2f8..625ef306 100644 --- a/refreshable/async.go +++ b/refreshable/async.go @@ -45,7 +45,7 @@ func (r *ready[T]) Update(val T) { // If an element is already available, the returned Value is guaranteed to be populated. // The channel should be closed when no longer used to avoid leaking resources. func NewFromChannel[T any](values <-chan T) Ready[T] { - out := newReady(newZero[T]()) + out := newReady[T](newZero[T]()) select { case initial, ok := <-values: if !ok { @@ -66,7 +66,7 @@ func NewFromChannel[T any](values <-chan T) Ready[T] { // If the providers bool return is false, the value is ignored. // The result's ReadyC channel is closed when a new value is populated. func NewFromTickerFunc[T any](interval time.Duration, provider func() (T, bool)) (Ready[T], UnsubscribeFunc) { - out := newReady(newZero[T]()) + out := newReady[T](newZero[T]()) ctx, cancel := context.WithCancel(context.Background()) values := make(chan T) go func() { diff --git a/refreshable/refreshable.go b/refreshable/refreshable.go index 51fa3b12..e53dcf5f 100644 --- a/refreshable/refreshable.go +++ b/refreshable/refreshable.go @@ -59,12 +59,12 @@ func New[T any](val T) Updatable[T] { // Map returns a new Refreshable based on the current one that handles updates based on the current Refreshable. func Map[T any, M any](original Refreshable[T], mapFn func(T) M) (Refreshable[M], UnsubscribeFunc) { - out := New(mapFn(original.Current())) + out := newDefault(mapFn(original.Current())) stop := original.Subscribe(func(v T) { out.Update(mapFn(v)) }) out.Update(mapFn(original.Current())) - return out, stop + return (*readOnlyRefreshable[M])(out), stop } // MapContext is like Map but unsubscribes when the context is cancelled. diff --git a/refreshable/refreshable_default.go b/refreshable/refreshable_default.go index fbaf831b..71e8b423 100644 --- a/refreshable/refreshable_default.go +++ b/refreshable/refreshable_default.go @@ -16,13 +16,13 @@ type defaultRefreshable[T any] struct { subscribers []*func(T) } -func newDefault[T any](val T) Updatable[T] { +func newDefault[T any](val T) *defaultRefreshable[T] { d := new(defaultRefreshable[T]) d.current.Store(&val) return d } -func newZero[T any]() Updatable[T] { +func newZero[T any]() *defaultRefreshable[T] { d := new(defaultRefreshable[T]) var zero T d.current.Store(&zero) @@ -73,3 +73,15 @@ func (d *defaultRefreshable[T]) unsubscribe(consumerFnPtr *func(T)) UnsubscribeF } } + +// readOnlyRefreshable aliases defaultRefreshable but hides the Update method so the type +// does not implement Updatable. +type readOnlyRefreshable[T any] defaultRefreshable[T] + +func (r *readOnlyRefreshable[T]) Current() T { + return (*defaultRefreshable[T])(r).Current() +} + +func (r *readOnlyRefreshable[T]) Subscribe(consumer func(T)) UnsubscribeFunc { + return (*defaultRefreshable[T])(r).Subscribe(consumer) +} From 606674a9c227ad2f5cfea8d40a6a5d24bc180995 Mon Sep 17 00:00:00 2001 From: Brad Moylan Date: Wed, 2 Aug 2023 17:32:51 -0700 Subject: [PATCH 7/9] call subscribe function on registration --- refreshable/refreshable.go | 2 +- refreshable/refreshable_default.go | 9 +++--- refreshable/refreshable_default_test.go | 37 ++++++++++++---------- refreshable/refreshable_validating.go | 3 +- refreshable/refreshable_validating_test.go | 12 +++++-- 5 files changed, 36 insertions(+), 27 deletions(-) diff --git a/refreshable/refreshable.go b/refreshable/refreshable.go index e53dcf5f..41849fd9 100644 --- a/refreshable/refreshable.go +++ b/refreshable/refreshable.go @@ -19,6 +19,7 @@ type Refreshable[T any] interface { // The consumer must be relatively fast: Updatable.Set blocks until all subscribers have returned. // Expensive or error-prone responses to refreshed values should be asynchronous. // Updates considered no-ops by reflect.DeepEqual may be skipped. + // When called, consumer is executed with the Current value. Subscribe(consumer func(T)) UnsubscribeFunc } @@ -63,7 +64,6 @@ func Map[T any, M any](original Refreshable[T], mapFn func(T) M) (Refreshable[M] stop := original.Subscribe(func(v T) { out.Update(mapFn(v)) }) - out.Update(mapFn(original.Current())) return (*readOnlyRefreshable[M])(out), stop } diff --git a/refreshable/refreshable_default.go b/refreshable/refreshable_default.go index 71e8b423..22f9e8d0 100644 --- a/refreshable/refreshable_default.go +++ b/refreshable/refreshable_default.go @@ -52,6 +52,7 @@ func (d *defaultRefreshable[T]) Subscribe(consumer func(T)) UnsubscribeFunc { consumerFnPtr := &consumer d.subscribers = append(d.subscribers, consumerFnPtr) + consumer(d.Current()) return d.unsubscribe(consumerFnPtr) } @@ -78,10 +79,10 @@ func (d *defaultRefreshable[T]) unsubscribe(consumerFnPtr *func(T)) UnsubscribeF // does not implement Updatable. type readOnlyRefreshable[T any] defaultRefreshable[T] -func (r *readOnlyRefreshable[T]) Current() T { - return (*defaultRefreshable[T])(r).Current() +func (d *readOnlyRefreshable[T]) Current() T { + return (*defaultRefreshable[T])(d).Current() } -func (r *readOnlyRefreshable[T]) Subscribe(consumer func(T)) UnsubscribeFunc { - return (*defaultRefreshable[T])(r).Subscribe(consumer) +func (d *readOnlyRefreshable[T]) Subscribe(consumer func(T)) UnsubscribeFunc { + return (*defaultRefreshable[T])(d).Subscribe(consumer) } diff --git a/refreshable/refreshable_default_test.go b/refreshable/refreshable_default_test.go index 3116dc06..53a9328c 100644 --- a/refreshable/refreshable_default_test.go +++ b/refreshable/refreshable_default_test.go @@ -8,7 +8,7 @@ import ( "testing" "github.com/palantir/pkg/refreshable/v2" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestDefaultRefreshable(t *testing.T) { @@ -18,12 +18,12 @@ func TestDefaultRefreshable(t *testing.T) { v := &container{Value: "original"} r := refreshable.New(v) - assert.Equal(t, r.Current(), v) + require.Equal(t, r.Current(), v) t.Run("Update", func(t *testing.T) { v2 := &container{Value: "updated"} r.Update(v2) - assert.Equal(t, r.Current(), v2) + require.Equal(t, r.Current(), v2) }) t.Run("Subscribe", func(t *testing.T) { @@ -34,36 +34,39 @@ func TestDefaultRefreshable(t *testing.T) { _ = r.Subscribe(func(i *container) { v2 = *i }) - assert.Equal(t, v1.Value, "") - assert.Equal(t, v2.Value, "") + require.Equal(t, v1.Value, "updated") + require.Equal(t, v2.Value, "updated") r.Update(&container{Value: "value"}) - assert.Equal(t, v1.Value, "value") - assert.Equal(t, v2.Value, "value") + require.Equal(t, v1.Value, "value") + require.Equal(t, v2.Value, "value") unsub1() r.Update(&container{Value: "value2"}) - assert.Equal(t, v1.Value, "value", "should be unchanged after unsubscribing") - assert.Equal(t, v2.Value, "value2", "should be updated after unsubscribing other") + require.Equal(t, v1.Value, "value", "should be unchanged after unsubscribing") + require.Equal(t, v2.Value, "value2", "should be updated after unsubscribing other") }) t.Run("Map", func(t *testing.T) { r.Update(&container{Value: "value"}) - rLen, _ := refreshable.Map[*container, int](r, func(i *container) int { + rLen, stop := refreshable.Map[*container, int](r, func(i *container) int { return len(i.Value) }) - assert.Equal(t, 5, rLen.Current()) + defer stop() + require.Equal(t, 5, rLen.Current()) - rLenUpdated := false - rLen.Subscribe(func(int) { rLenUpdated = true }) + rLenUpdates := 0 + rLen.Subscribe(func(int) { rLenUpdates++ }) + require.Equal(t, 1, rLenUpdates) // update to new value with same length and ensure the // equality check prevented unnecessary subscriber updates. r.Update(&container{Value: "VALUE"}) - assert.Equal(t, "VALUE", r.Current().Value) - assert.False(t, rLenUpdated) + require.Equal(t, "VALUE", r.Current().Value) + require.Equal(t, 1, rLenUpdates) r.Update(&container{Value: "updated"}) - assert.Equal(t, "updated", r.Current().Value) - assert.Equal(t, 7, rLen.Current()) + require.Equal(t, "updated", r.Current().Value) + require.Equal(t, 7, rLen.Current()) + require.Equal(t, 2, rLenUpdates) }) } diff --git a/refreshable/refreshable_validating.go b/refreshable/refreshable_validating.go index da96ed77..b6fa8adf 100644 --- a/refreshable/refreshable_validating.go +++ b/refreshable/refreshable_validating.go @@ -30,11 +30,10 @@ func (v *validRefreshable[T]) Validation() (T, error) { } func newValidRefreshable[T any, M any](original Refreshable[T], mappingFn func(T) (M, error)) (*validRefreshable[M], UnsubscribeFunc) { - valid := &validRefreshable[M]{r: New(validRefreshableContainer[M]{})} + valid := &validRefreshable[M]{r: newDefault(validRefreshableContainer[M]{})} stop := original.Subscribe(func(valueT T) { updateValidRefreshable(valid, valueT, mappingFn) }) - updateValidRefreshable(valid, original.Current(), mappingFn) return valid, stop } diff --git a/refreshable/refreshable_validating_test.go b/refreshable/refreshable_validating_test.go index 221c0d9f..824f0144 100644 --- a/refreshable/refreshable_validating_test.go +++ b/refreshable/refreshable_validating_test.go @@ -8,6 +8,7 @@ import ( "errors" "net/url" "testing" + "time" "github.com/palantir/pkg/refreshable/v2" "github.com/stretchr/testify/assert" @@ -74,9 +75,11 @@ func TestMapValidatingRefreshable(t *testing.T) { // TestValidatingRefreshable_SubscriptionRaceCondition tests that the ValidatingRefreshable stays current // if the underlying refreshable updates during the creation process. func TestValidatingRefreshable_SubscriptionRaceCondition(t *testing.T) { - r := &updateImmediatelyRefreshable{r: refreshable.New(1), newValue: 2} + //r := &updateImmediatelyRefreshable{r: refreshable.New(1), newValue: 2} + r := refreshable.New(1) var seen1, seen2 bool vr, _, err := refreshable.Validate[int](r, func(i int) error { + go r.Update(2) switch i { case 1: seen1 = true @@ -86,10 +89,13 @@ func TestValidatingRefreshable_SubscriptionRaceCondition(t *testing.T) { return nil }) require.NoError(t, err) + // If this returns 1, it is likely because the VR contains a stale value + assert.Eventually(t, func() bool { + return vr.Current() == 2 + }, time.Second, time.Millisecond) + assert.True(t, seen1, "expected to process 1 value") assert.True(t, seen2, "expected to process 2 value") - // If this returns 1, it is likely because the VR contains a stale value - assert.Equal(t, 2, vr.Current()) } // updateImmediatelyRefreshable is a mock implementation which updates to newValue immediately when Current() is called From 037e8cd4578eb1aae1ae488b7e330970addc7e98 Mon Sep 17 00:00:00 2001 From: Brad Moylan Date: Mon, 7 Aug 2023 15:11:28 -0700 Subject: [PATCH 8/9] pass ctx to NewFromTickerFunc --- refreshable/async.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/refreshable/async.go b/refreshable/async.go index 625ef306..0168091f 100644 --- a/refreshable/async.go +++ b/refreshable/async.go @@ -37,8 +37,8 @@ func (r *ready[T]) ReadyC() <-chan struct{} { } func (r *ready[T]) Update(val T) { - r.cancel() r.in.Update(val) + r.cancel() } // NewFromChannel populates an Updatable with the values channel. @@ -65,16 +65,17 @@ func NewFromChannel[T any](values <-chan T) Ready[T] { // NewFromTickerFunc returns a Ready Refreshable populated by the result of the provider called each interval. // If the providers bool return is false, the value is ignored. // The result's ReadyC channel is closed when a new value is populated. -func NewFromTickerFunc[T any](interval time.Duration, provider func() (T, bool)) (Ready[T], UnsubscribeFunc) { +// The refreshable will stop updating when the provided context is cancelled or the returned UnsubscribeFunc func is called. +func NewFromTickerFunc[T any](ctx context.Context, interval time.Duration, provider func(ctx context.Context) (T, bool)) (Ready[T], UnsubscribeFunc) { out := newReady[T](newZero[T]()) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) values := make(chan T) go func() { ticker := time.NewTicker(interval) defer ticker.Stop() defer close(values) for { - if value, ok := provider(); ok { + if value, ok := provider(ctx); ok { out.Update(value) } select { From 41f56e1fbe5073aeb1412fc5f5ed75033ca94a5b Mon Sep 17 00:00:00 2001 From: Brad Moylan Date: Thu, 10 Aug 2023 08:33:31 -0700 Subject: [PATCH 9/9] comments --- refreshable/async.go | 70 ++++++++++++----------- refreshable/godel/config/check-plugin.yml | 5 -- refreshable/refreshable.go | 1 + 3 files changed, 37 insertions(+), 39 deletions(-) diff --git a/refreshable/async.go b/refreshable/async.go index 0168091f..4ccd6f75 100644 --- a/refreshable/async.go +++ b/refreshable/async.go @@ -9,43 +9,11 @@ import ( "time" ) -type ready[T any] struct { - in Updatable[T] - readyC <-chan struct{} - cancel context.CancelFunc -} - -func newReady[T any](in Updatable[T]) *ready[T] { - ctx, cancel := context.WithCancel(context.Background()) - return &ready[T]{ - in: in, - readyC: ctx.Done(), - cancel: cancel, - } -} - -func (r *ready[T]) Current() T { - return r.in.Current() -} - -func (r *ready[T]) Subscribe(consumer func(T)) UnsubscribeFunc { - return r.in.Subscribe(consumer) -} - -func (r *ready[T]) ReadyC() <-chan struct{} { - return r.readyC -} - -func (r *ready[T]) Update(val T) { - r.in.Update(val) - r.cancel() -} - // NewFromChannel populates an Updatable with the values channel. // If an element is already available, the returned Value is guaranteed to be populated. // The channel should be closed when no longer used to avoid leaking resources. func NewFromChannel[T any](values <-chan T) Ready[T] { - out := newReady[T](newZero[T]()) + out := newReady[T]() select { case initial, ok := <-values: if !ok { @@ -67,7 +35,7 @@ func NewFromChannel[T any](values <-chan T) Ready[T] { // The result's ReadyC channel is closed when a new value is populated. // The refreshable will stop updating when the provided context is cancelled or the returned UnsubscribeFunc func is called. func NewFromTickerFunc[T any](ctx context.Context, interval time.Duration, provider func(ctx context.Context) (T, bool)) (Ready[T], UnsubscribeFunc) { - out := newReady[T](newZero[T]()) + out := newReady[T]() ctx, cancel := context.WithCancel(ctx) values := make(chan T) go func() { @@ -99,3 +67,37 @@ func Wait[T any](ctx context.Context, ready Ready[T]) (T, bool) { return zero, false } } + +// ready is an Updatable which exposes a channel that is closed when a value is first available. +// Current returns the zero value before Update is called, marking the value ready. +type ready[T any] struct { + in Updatable[T] + readyC <-chan struct{} + cancel context.CancelFunc +} + +func newReady[T any]() *ready[T] { + ctx, cancel := context.WithCancel(context.Background()) + return &ready[T]{ + in: newZero[T](), + readyC: ctx.Done(), + cancel: cancel, + } +} + +func (r *ready[T]) Current() T { + return r.in.Current() +} + +func (r *ready[T]) Subscribe(consumer func(T)) UnsubscribeFunc { + return r.in.Subscribe(consumer) +} + +func (r *ready[T]) ReadyC() <-chan struct{} { + return r.readyC +} + +func (r *ready[T]) Update(val T) { + r.in.Update(val) + r.cancel() +} diff --git a/refreshable/godel/config/check-plugin.yml b/refreshable/godel/config/check-plugin.yml index aa1fc55b..e69de29b 100644 --- a/refreshable/godel/config/check-plugin.yml +++ b/refreshable/godel/config/check-plugin.yml @@ -1,5 +0,0 @@ -checks: - golint: - filters: - - value: "should have comment or be unexported" - - value: "or a comment on this block" diff --git a/refreshable/refreshable.go b/refreshable/refreshable.go index 41849fd9..f5c8fa3e 100644 --- a/refreshable/refreshable.go +++ b/refreshable/refreshable.go @@ -54,6 +54,7 @@ type Ready[T any] interface { // It is safe to call multiple times. type UnsubscribeFunc func() +// New returns a new Updatable that begins with the given value. func New[T any](val T) Updatable[T] { return newDefault(val) }