Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conflict propagation in Mempool & tests fixes #84

Merged
merged 11 commits into from
May 15, 2023
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ require (
golang.org/x/text v0.9.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.7.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.1.7 // indirect
Expand Down
11 changes: 11 additions & 0 deletions pkg/core/promise/bool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package promise

type Bool struct {
*Value[bool]
}

func (b *Bool) Unset() {}

func (b *Bool) Set() {}

func (b *Bool) Toggle() {}
48 changes: 48 additions & 0 deletions pkg/core/promise/callback.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package promise

import (
"sync"
)

// Callback is a wrapper for a callback function that is extended by an ID and a mutex.
type Callback[T any] struct {
// ID is the unique identifier of the callback.
ID UniqueID

// Invoke is the callback function that is invoked when the callback is triggered.
Invoke T

// lastUpdate is the last update that was applied to the callback.
lastUpdate UniqueID

// mutex is the mutex that is used to ensure that the callback is not triggered concurrently.
mutex sync.Mutex
}

// NewCallback is the constructor for the Callback type.
func NewCallback[T any](id UniqueID, invoke T) *Callback[T] {
return &Callback[T]{
ID: id,
Invoke: invoke,
}
}

// Lock locks the callback for the given update and returns true if the callback was locked successfully.
func (c *Callback[T]) Lock(updateID UniqueID) bool {
c.mutex.Lock()

if updateID != 0 && updateID == c.lastUpdate {
c.mutex.Unlock()

return false
}

c.lastUpdate = updateID

return true
}

// Unlock unlocks the callback.
func (c *Callback[T]) Unlock() {
c.mutex.Unlock()
}
13 changes: 6 additions & 7 deletions pkg/core/promise/callback_id.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package promise

import "sync/atomic"

// CallbackID is an identifier for a callback.
type CallbackID = uint64
type UniqueID uint64

func (u *UniqueID) Next() UniqueID {
*u++

// NewCallbackID creates a new unique callback ID.
func NewCallbackID() CallbackID {
return atomic.AddUint64(&uniqueCallbackIDCounter, 1)
return *u
}

// uniqueCallbackIDCounter is used to generate unique callback IDs.
var uniqueCallbackIDCounter CallbackID
var uniqueCallbackIDCounter UniqueID

Check failure on line 13 in pkg/core/promise/callback_id.go

View workflow job for this annotation

GitHub Actions / golangci

[golangci] pkg/core/promise/callback_id.go#L13

var `uniqueCallbackIDCounter` is unused (unused)
Raw output
pkg/core/promise/callback_id.go:13:5: var `uniqueCallbackIDCounter` is unused (unused)
var uniqueCallbackIDCounter UniqueID
    ^
28 changes: 15 additions & 13 deletions pkg/core/promise/promise.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import (
// Promise is a promise that can be resolved or rejected.
type Promise[T any] struct {
// successCallbacks are called when the promise is resolved successfully.
successCallbacks *orderedmap.OrderedMap[CallbackID, func(T)]
successCallbacks *orderedmap.OrderedMap[UniqueID, func(T)]

// errorCallbacks are called when the promise is rejected.
errorCallbacks *orderedmap.OrderedMap[CallbackID, func(error)]
errorCallbacks *orderedmap.OrderedMap[UniqueID, func(error)]

// completeCallbacks are called when the promise is resolved or rejected.
completeCallbacks *orderedmap.OrderedMap[CallbackID, func()]
completeCallbacks *orderedmap.OrderedMap[UniqueID, func()]

callbackIDs UniqueID

// result is the result of the promise.
result T
Expand All @@ -33,9 +35,9 @@ type Promise[T any] struct {
// New creates a new promise.
func New[T any](optResolver ...func(p *Promise[T])) *Promise[T] {
p := &Promise[T]{
successCallbacks: orderedmap.New[CallbackID, func(T)](),
errorCallbacks: orderedmap.New[CallbackID, func(error)](),
completeCallbacks: orderedmap.New[CallbackID, func()](),
successCallbacks: orderedmap.New[UniqueID, func(T)](),
errorCallbacks: orderedmap.New[UniqueID, func(error)](),
completeCallbacks: orderedmap.New[UniqueID, func()](),
}

if len(optResolver) > 0 {
Expand All @@ -54,12 +56,12 @@ func (p *Promise[T]) Resolve(result T) *Promise[T] {
return p
}

p.successCallbacks.ForEach(func(key CallbackID, callback func(T)) bool {
p.successCallbacks.ForEach(func(key UniqueID, callback func(T)) bool {
callback(result)
return true
})

p.completeCallbacks.ForEach(func(key CallbackID, callback func()) bool {
p.completeCallbacks.ForEach(func(key UniqueID, callback func()) bool {
callback()
return true
})
Expand All @@ -82,12 +84,12 @@ func (p *Promise[T]) Reject(err error) *Promise[T] {
return p
}

p.errorCallbacks.ForEach(func(key CallbackID, callback func(error)) bool {
p.errorCallbacks.ForEach(func(key UniqueID, callback func(error)) bool {
callback(err)
return true
})

p.completeCallbacks.ForEach(func(key CallbackID, callback func()) bool {
p.completeCallbacks.ForEach(func(key UniqueID, callback func()) bool {
callback()
return true
})
Expand All @@ -114,7 +116,7 @@ func (p *Promise[T]) OnSuccess(callback func(result T)) (cancel func()) {
return func() {}
}

callbackID := NewCallbackID()
callbackID := p.callbackIDs.Next()
p.successCallbacks.Set(callbackID, callback)

return func() {
Expand All @@ -140,7 +142,7 @@ func (p *Promise[T]) OnError(callback func(err error)) func() {
return func() {}
}

callbackID := NewCallbackID()
callbackID := p.callbackIDs.Next()
p.errorCallbacks.Set(callbackID, callback)

return func() {
Expand Down Expand Up @@ -173,7 +175,7 @@ func (p *Promise[T]) OnComplete(callback func()) func() {
return func() {}
}

callbackID := NewCallbackID()
callbackID := p.callbackIDs.Next()
p.completeCallbacks.Set(callbackID, callback)

return func() {
Expand Down
176 changes: 176 additions & 0 deletions pkg/core/promise/set.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package promise

import (
"sync"

"github.com/iotaledger/hive.go/ds/advancedset"
"github.com/iotaledger/hive.go/ds/shrinkingmap"
"github.com/iotaledger/hive.go/lo"
)

// Set is a wrapper for an AdvancedSet that is extended by the ability to register callbacks that are
// triggered when the value changes.
type Set[T comparable] struct {
// value is the current value of the set.
value *advancedset.AdvancedSet[T]

// updateCallbacks are the registered callbacks that are triggered when the value changes.
updateCallbacks *shrinkingmap.ShrinkingMap[UniqueID, *Callback[func(*advancedset.AdvancedSet[T], *SetMutations[T])]]

// uniqueUpdateID is the unique ID that is used to identify an update.
uniqueUpdateID UniqueID

// uniqueCallbackID is the unique ID that is used to identify a callback.
uniqueCallbackID UniqueID

// mutex is the mutex that is used to synchronize the access to the value.
mutex sync.RWMutex

// applyMutex is an additional mutex that is used to ensure that the application order of mutations is ensured.
applyMutex sync.Mutex
}

// NewSet is the constructor for the Set type.
func NewSet[T comparable]() *Set[T] {
return &Set[T]{
value: advancedset.New[T](),
updateCallbacks: shrinkingmap.New[UniqueID, *Callback[func(*advancedset.AdvancedSet[T], *SetMutations[T])]](),
}
}

// Get returns the current value of the set.
func (s *Set[T]) Get() *advancedset.AdvancedSet[T] {
s.mutex.RLock()
defer s.mutex.RUnlock()

return s.value
}

// Set sets the given value as the new value of the set.
func (s *Set[T]) Set(value *advancedset.AdvancedSet[T]) (appliedMutations *SetMutations[T]) {
s.applyMutex.Lock()
defer s.applyMutex.Unlock()

appliedMutations, updateID, callbacksToTrigger := s.set(value)
for _, callback := range callbacksToTrigger {
if callback.Lock(updateID) {
callback.Invoke(value, appliedMutations)
callback.Unlock()
}
}

return appliedMutations
}

// Apply applies the given SetMutations to the set.
func (s *Set[T]) Apply(mutations *SetMutations[T]) (updatedSet *advancedset.AdvancedSet[T], appliedMutations *SetMutations[T]) {
s.applyMutex.Lock()
defer s.applyMutex.Unlock()

updatedSet, appliedMutations, updateID, callbacksToTrigger := s.applyMutations(mutations)
for _, callback := range callbacksToTrigger {
if callback.Lock(updateID) {
callback.Invoke(updatedSet, appliedMutations)
callback.Unlock()
}
}

return updatedSet, appliedMutations
}

// OnUpdate registers the given callback to be triggered when the value of the set changes.
func (s *Set[T]) OnUpdate(callback func(updatedSet *advancedset.AdvancedSet[T], appliedMutations *SetMutations[T])) (unsubscribe func()) {
s.mutex.Lock()

currentValue := s.value

newCallback := NewCallback[func(*advancedset.AdvancedSet[T], *SetMutations[T])](s.uniqueCallbackID.Next(), callback)
s.updateCallbacks.Set(newCallback.ID, newCallback)

// we intertwine the mutexes to ensure that the callback is guaranteed to be triggered with the current value from
// here first even if the value is updated in parallel.
newCallback.Lock(s.uniqueUpdateID)
defer newCallback.Unlock()

s.mutex.Unlock()

if !currentValue.IsEmpty() {
newCallback.Invoke(currentValue, NewSetMutations(WithAddedElements(currentValue)))
}

return func() {
s.updateCallbacks.Delete(newCallback.ID)
}
}

// Add adds the given elements to the set and returns the updated set and the applied mutations.
func (s *Set[T]) Add(elements *advancedset.AdvancedSet[T]) (updatedSet *advancedset.AdvancedSet[T], appliedMutations *SetMutations[T]) {
return s.Apply(NewSetMutations(WithAddedElements(elements)))
}

// Remove removes the given elements from the set and returns the updated set and the applied mutations.
func (s *Set[T]) Remove(elements *advancedset.AdvancedSet[T]) (updatedSet *advancedset.AdvancedSet[T], appliedMutations *SetMutations[T]) {
return s.Apply(NewSetMutations(WithRemovedElements(elements)))
}

// Has returns true if the set contains the given element.
func (s *Set[T]) Has(element T) bool {
s.mutex.RLock()
defer s.mutex.RUnlock()

return s.value.Has(element)
}

// InheritFrom registers the given sets to inherit their mutations to the set.
func (s *Set[T]) InheritFrom(sources ...*Set[T]) (unsubscribe func()) {
unsubscribeCallbacks := make([]func(), len(sources))

for i, source := range sources {
unsubscribeCallbacks[i] = source.OnUpdate(func(_ *advancedset.AdvancedSet[T], appliedMutations *SetMutations[T]) {
if !appliedMutations.IsEmpty() {
s.Apply(appliedMutations)
}
})
}

return lo.Batch(unsubscribeCallbacks...)
}

func (s *Set[T]) set(value *advancedset.AdvancedSet[T]) (appliedMutations *SetMutations[T], triggerID UniqueID, callbacksToTrigger []*Callback[func(*advancedset.AdvancedSet[T], *SetMutations[T])]) {
s.mutex.Lock()
defer s.mutex.Unlock()

appliedMutations = NewSetMutations[T](WithRemovedElements(s.value), WithAddedElements(value))
s.value = value

return appliedMutations, s.uniqueUpdateID.Next(), s.updateCallbacks.Values()
}

// applyMutations applies the given mutations to the set.
func (s *Set[T]) applyMutations(mutations *SetMutations[T]) (updatedSet *advancedset.AdvancedSet[T], appliedMutations *SetMutations[T], triggerID UniqueID, callbacksToTrigger []*Callback[func(*advancedset.AdvancedSet[T], *SetMutations[T])]) {
s.mutex.Lock()
defer s.mutex.Unlock()

updatedSet = s.value.Clone()
appliedMutations = NewSetMutations[T]()

mutations.RemovedElements.ForEach(func(element T) error {

Check failure on line 157 in pkg/core/promise/set.go

View workflow job for this annotation

GitHub Actions / golangci

[golangci] pkg/core/promise/set.go#L157

Error return value of `mutations.RemovedElements.ForEach` is not checked (errcheck)
Raw output
pkg/core/promise/set.go:157:35: Error return value of `mutations.RemovedElements.ForEach` is not checked (errcheck)
	mutations.RemovedElements.ForEach(func(element T) error {
	                                 ^
if updatedSet.Delete(element) {
appliedMutations.RemovedElements.Add(element)
}

return nil
})

mutations.AddedElements.ForEach(func(element T) error {

Check failure on line 165 in pkg/core/promise/set.go

View workflow job for this annotation

GitHub Actions / golangci

[golangci] pkg/core/promise/set.go#L165

Error return value of `mutations.AddedElements.ForEach` is not checked (errcheck)
Raw output
pkg/core/promise/set.go:165:33: Error return value of `mutations.AddedElements.ForEach` is not checked (errcheck)
	mutations.AddedElements.ForEach(func(element T) error {
	                               ^
if updatedSet.Add(element) && !appliedMutations.RemovedElements.Delete(element) {
appliedMutations.AddedElements.Add(element)
}

return nil
})

s.value = updatedSet

return updatedSet, appliedMutations, s.uniqueUpdateID.Next(), s.updateCallbacks.Values()
}
39 changes: 39 additions & 0 deletions pkg/core/promise/set_mutations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package promise

import (
"github.com/iotaledger/hive.go/ds/advancedset"
"github.com/iotaledger/hive.go/runtime/options"
)

type SetMutations[T comparable] struct {
RemovedElements *advancedset.AdvancedSet[T]
AddedElements *advancedset.AdvancedSet[T]
}

func NewSetMutations[T comparable](opts ...options.Option[SetMutations[T]]) *SetMutations[T] {
return options.Apply(new(SetMutations[T]), opts, func(s *SetMutations[T]) {
if s.RemovedElements == nil {
s.RemovedElements = advancedset.New[T]()
}

if s.AddedElements == nil {
s.AddedElements = advancedset.New[T]()
}
})
}

func (s *SetMutations[T]) IsEmpty() bool {
return s.RemovedElements.IsEmpty() && s.AddedElements.IsEmpty()
}

func WithAddedElements[T comparable](elements *advancedset.AdvancedSet[T]) options.Option[SetMutations[T]] {
return func(args *SetMutations[T]) {
args.AddedElements = elements
}
}

func WithRemovedElements[T comparable](elements *advancedset.AdvancedSet[T]) options.Option[SetMutations[T]] {
return func(args *SetMutations[T]) {
args.RemovedElements = elements
}
}