Skip to content

Commit

Permalink
Merge pull request #84 from iotaledger/feat/mempool
Browse files Browse the repository at this point in the history
Conflict propagation in Mempool & tests fixes
  • Loading branch information
karimodm committed May 15, 2023
2 parents ea43e06 + 512af76 commit 09316b5
Show file tree
Hide file tree
Showing 19 changed files with 642 additions and 160 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ require (
golang.org/x/text v0.9.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.7.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.1.7 // indirect
Expand Down
11 changes: 11 additions & 0 deletions pkg/core/promise/bool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package promise

type Bool struct {
*Value[bool]
}

func (b *Bool) Unset() {}

func (b *Bool) Set() {}

func (b *Bool) Toggle() {}
48 changes: 48 additions & 0 deletions pkg/core/promise/callback.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package promise

import (
"sync"
)

// Callback is a wrapper for a callback function that is extended by an ID and a mutex.
type Callback[T any] struct {
// ID is the unique identifier of the callback.
ID UniqueID

// Invoke is the callback function that is invoked when the callback is triggered.
Invoke T

// lastUpdate is the last update that was applied to the callback.
lastUpdate UniqueID

// mutex is the mutex that is used to ensure that the callback is not triggered concurrently.
mutex sync.Mutex
}

// NewCallback is the constructor for the Callback type.
func NewCallback[T any](id UniqueID, invoke T) *Callback[T] {
return &Callback[T]{
ID: id,
Invoke: invoke,
}
}

// Lock locks the callback for the given update and returns true if the callback was locked successfully.
func (c *Callback[T]) Lock(updateID UniqueID) bool {
c.mutex.Lock()

if updateID != 0 && updateID == c.lastUpdate {
c.mutex.Unlock()

return false
}

c.lastUpdate = updateID

return true
}

// Unlock unlocks the callback.
func (c *Callback[T]) Unlock() {
c.mutex.Unlock()
}
13 changes: 6 additions & 7 deletions pkg/core/promise/callback_id.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package promise

import "sync/atomic"

// CallbackID is an identifier for a callback.
type CallbackID = uint64
type UniqueID uint64

func (u *UniqueID) Next() UniqueID {
*u++

// NewCallbackID creates a new unique callback ID.
func NewCallbackID() CallbackID {
return atomic.AddUint64(&uniqueCallbackIDCounter, 1)
return *u
}

// uniqueCallbackIDCounter is used to generate unique callback IDs.
var uniqueCallbackIDCounter CallbackID
var uniqueCallbackIDCounter UniqueID
28 changes: 15 additions & 13 deletions pkg/core/promise/promise.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import (
// Promise is a promise that can be resolved or rejected.
type Promise[T any] struct {
// successCallbacks are called when the promise is resolved successfully.
successCallbacks *orderedmap.OrderedMap[CallbackID, func(T)]
successCallbacks *orderedmap.OrderedMap[UniqueID, func(T)]

// errorCallbacks are called when the promise is rejected.
errorCallbacks *orderedmap.OrderedMap[CallbackID, func(error)]
errorCallbacks *orderedmap.OrderedMap[UniqueID, func(error)]

// completeCallbacks are called when the promise is resolved or rejected.
completeCallbacks *orderedmap.OrderedMap[CallbackID, func()]
completeCallbacks *orderedmap.OrderedMap[UniqueID, func()]

callbackIDs UniqueID

// result is the result of the promise.
result T
Expand All @@ -33,9 +35,9 @@ type Promise[T any] struct {
// New creates a new promise.
func New[T any](optResolver ...func(p *Promise[T])) *Promise[T] {
p := &Promise[T]{
successCallbacks: orderedmap.New[CallbackID, func(T)](),
errorCallbacks: orderedmap.New[CallbackID, func(error)](),
completeCallbacks: orderedmap.New[CallbackID, func()](),
successCallbacks: orderedmap.New[UniqueID, func(T)](),
errorCallbacks: orderedmap.New[UniqueID, func(error)](),
completeCallbacks: orderedmap.New[UniqueID, func()](),
}

if len(optResolver) > 0 {
Expand All @@ -54,12 +56,12 @@ func (p *Promise[T]) Resolve(result T) *Promise[T] {
return p
}

p.successCallbacks.ForEach(func(key CallbackID, callback func(T)) bool {
p.successCallbacks.ForEach(func(key UniqueID, callback func(T)) bool {
callback(result)
return true
})

p.completeCallbacks.ForEach(func(key CallbackID, callback func()) bool {
p.completeCallbacks.ForEach(func(key UniqueID, callback func()) bool {
callback()
return true
})
Expand All @@ -82,12 +84,12 @@ func (p *Promise[T]) Reject(err error) *Promise[T] {
return p
}

p.errorCallbacks.ForEach(func(key CallbackID, callback func(error)) bool {
p.errorCallbacks.ForEach(func(key UniqueID, callback func(error)) bool {
callback(err)
return true
})

p.completeCallbacks.ForEach(func(key CallbackID, callback func()) bool {
p.completeCallbacks.ForEach(func(key UniqueID, callback func()) bool {
callback()
return true
})
Expand All @@ -114,7 +116,7 @@ func (p *Promise[T]) OnSuccess(callback func(result T)) (cancel func()) {
return func() {}
}

callbackID := NewCallbackID()
callbackID := p.callbackIDs.Next()
p.successCallbacks.Set(callbackID, callback)

return func() {
Expand All @@ -140,7 +142,7 @@ func (p *Promise[T]) OnError(callback func(err error)) func() {
return func() {}
}

callbackID := NewCallbackID()
callbackID := p.callbackIDs.Next()
p.errorCallbacks.Set(callbackID, callback)

return func() {
Expand Down Expand Up @@ -173,7 +175,7 @@ func (p *Promise[T]) OnComplete(callback func()) func() {
return func() {}
}

callbackID := NewCallbackID()
callbackID := p.callbackIDs.Next()
p.completeCallbacks.Set(callbackID, callback)

return func() {
Expand Down
176 changes: 176 additions & 0 deletions pkg/core/promise/set.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package promise

import (
"sync"

"github.com/iotaledger/hive.go/ds/advancedset"
"github.com/iotaledger/hive.go/ds/shrinkingmap"
"github.com/iotaledger/hive.go/lo"
)

// Set is a wrapper for an AdvancedSet that is extended by the ability to register callbacks that are
// triggered when the value changes.
type Set[T comparable] struct {
// value is the current value of the set.
value *advancedset.AdvancedSet[T]

// updateCallbacks are the registered callbacks that are triggered when the value changes.
updateCallbacks *shrinkingmap.ShrinkingMap[UniqueID, *Callback[func(*advancedset.AdvancedSet[T], *SetMutations[T])]]

// uniqueUpdateID is the unique ID that is used to identify an update.
uniqueUpdateID UniqueID

// uniqueCallbackID is the unique ID that is used to identify a callback.
uniqueCallbackID UniqueID

// mutex is the mutex that is used to synchronize the access to the value.
mutex sync.RWMutex

// applyMutex is an additional mutex that is used to ensure that the application order of mutations is ensured.
applyMutex sync.Mutex
}

// NewSet is the constructor for the Set type.
func NewSet[T comparable]() *Set[T] {
return &Set[T]{
value: advancedset.New[T](),
updateCallbacks: shrinkingmap.New[UniqueID, *Callback[func(*advancedset.AdvancedSet[T], *SetMutations[T])]](),
}
}

// Get returns the current value of the set.
func (s *Set[T]) Get() *advancedset.AdvancedSet[T] {
s.mutex.RLock()
defer s.mutex.RUnlock()

return s.value
}

// Set sets the given value as the new value of the set.
func (s *Set[T]) Set(value *advancedset.AdvancedSet[T]) (appliedMutations *SetMutations[T]) {
s.applyMutex.Lock()
defer s.applyMutex.Unlock()

appliedMutations, updateID, callbacksToTrigger := s.set(value)
for _, callback := range callbacksToTrigger {
if callback.Lock(updateID) {
callback.Invoke(value, appliedMutations)
callback.Unlock()
}
}

return appliedMutations
}

// Apply applies the given SetMutations to the set.
func (s *Set[T]) Apply(mutations *SetMutations[T]) (updatedSet *advancedset.AdvancedSet[T], appliedMutations *SetMutations[T]) {
s.applyMutex.Lock()
defer s.applyMutex.Unlock()

updatedSet, appliedMutations, updateID, callbacksToTrigger := s.applyMutations(mutations)
for _, callback := range callbacksToTrigger {
if callback.Lock(updateID) {
callback.Invoke(updatedSet, appliedMutations)
callback.Unlock()
}
}

return updatedSet, appliedMutations
}

// OnUpdate registers the given callback to be triggered when the value of the set changes.
func (s *Set[T]) OnUpdate(callback func(updatedSet *advancedset.AdvancedSet[T], appliedMutations *SetMutations[T])) (unsubscribe func()) {
s.mutex.Lock()

currentValue := s.value

newCallback := NewCallback[func(*advancedset.AdvancedSet[T], *SetMutations[T])](s.uniqueCallbackID.Next(), callback)
s.updateCallbacks.Set(newCallback.ID, newCallback)

// we intertwine the mutexes to ensure that the callback is guaranteed to be triggered with the current value from
// here first even if the value is updated in parallel.
newCallback.Lock(s.uniqueUpdateID)
defer newCallback.Unlock()

s.mutex.Unlock()

if !currentValue.IsEmpty() {
newCallback.Invoke(currentValue, NewSetMutations(WithAddedElements(currentValue)))
}

return func() {
s.updateCallbacks.Delete(newCallback.ID)
}
}

// Add adds the given elements to the set and returns the updated set and the applied mutations.
func (s *Set[T]) Add(elements *advancedset.AdvancedSet[T]) (updatedSet *advancedset.AdvancedSet[T], appliedMutations *SetMutations[T]) {
return s.Apply(NewSetMutations(WithAddedElements(elements)))
}

// Remove removes the given elements from the set and returns the updated set and the applied mutations.
func (s *Set[T]) Remove(elements *advancedset.AdvancedSet[T]) (updatedSet *advancedset.AdvancedSet[T], appliedMutations *SetMutations[T]) {
return s.Apply(NewSetMutations(WithRemovedElements(elements)))
}

// Has returns true if the set contains the given element.
func (s *Set[T]) Has(element T) bool {
s.mutex.RLock()
defer s.mutex.RUnlock()

return s.value.Has(element)
}

// InheritFrom registers the given sets to inherit their mutations to the set.
func (s *Set[T]) InheritFrom(sources ...*Set[T]) (unsubscribe func()) {
unsubscribeCallbacks := make([]func(), len(sources))

for i, source := range sources {
unsubscribeCallbacks[i] = source.OnUpdate(func(_ *advancedset.AdvancedSet[T], appliedMutations *SetMutations[T]) {
if !appliedMutations.IsEmpty() {
s.Apply(appliedMutations)
}
})
}

return lo.Batch(unsubscribeCallbacks...)
}

func (s *Set[T]) set(value *advancedset.AdvancedSet[T]) (appliedMutations *SetMutations[T], triggerID UniqueID, callbacksToTrigger []*Callback[func(*advancedset.AdvancedSet[T], *SetMutations[T])]) {
s.mutex.Lock()
defer s.mutex.Unlock()

appliedMutations = NewSetMutations[T](WithRemovedElements(s.value), WithAddedElements(value))
s.value = value

return appliedMutations, s.uniqueUpdateID.Next(), s.updateCallbacks.Values()
}

// applyMutations applies the given mutations to the set.
func (s *Set[T]) applyMutations(mutations *SetMutations[T]) (updatedSet *advancedset.AdvancedSet[T], appliedMutations *SetMutations[T], triggerID UniqueID, callbacksToTrigger []*Callback[func(*advancedset.AdvancedSet[T], *SetMutations[T])]) {
s.mutex.Lock()
defer s.mutex.Unlock()

updatedSet = s.value.Clone()
appliedMutations = NewSetMutations[T]()

mutations.RemovedElements.ForEach(func(element T) error {
if updatedSet.Delete(element) {
appliedMutations.RemovedElements.Add(element)
}

return nil
})

mutations.AddedElements.ForEach(func(element T) error {
if updatedSet.Add(element) && !appliedMutations.RemovedElements.Delete(element) {
appliedMutations.AddedElements.Add(element)
}

return nil
})

s.value = updatedSet

return updatedSet, appliedMutations, s.uniqueUpdateID.Next(), s.updateCallbacks.Values()
}
39 changes: 39 additions & 0 deletions pkg/core/promise/set_mutations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package promise

import (
"github.com/iotaledger/hive.go/ds/advancedset"
"github.com/iotaledger/hive.go/runtime/options"
)

type SetMutations[T comparable] struct {
RemovedElements *advancedset.AdvancedSet[T]
AddedElements *advancedset.AdvancedSet[T]
}

func NewSetMutations[T comparable](opts ...options.Option[SetMutations[T]]) *SetMutations[T] {
return options.Apply(new(SetMutations[T]), opts, func(s *SetMutations[T]) {
if s.RemovedElements == nil {
s.RemovedElements = advancedset.New[T]()
}

if s.AddedElements == nil {
s.AddedElements = advancedset.New[T]()
}
})
}

func (s *SetMutations[T]) IsEmpty() bool {
return s.RemovedElements.IsEmpty() && s.AddedElements.IsEmpty()
}

func WithAddedElements[T comparable](elements *advancedset.AdvancedSet[T]) options.Option[SetMutations[T]] {
return func(args *SetMutations[T]) {
args.AddedElements = elements
}
}

func WithRemovedElements[T comparable](elements *advancedset.AdvancedSet[T]) options.Option[SetMutations[T]] {
return func(args *SetMutations[T]) {
args.RemovedElements = elements
}
}

0 comments on commit 09316b5

Please sign in to comment.