Skip to content

Commit

Permalink
MapOf: Generic Keys
Browse files Browse the repository at this point in the history
  • Loading branch information
iamcalledrob committed Aug 12, 2022
1 parent 9dba66e commit 12e0ac8
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 30 deletions.
2 changes: 1 addition & 1 deletion export_mapof_test.go
Expand Up @@ -3,6 +3,6 @@

package xsync

func CollectMapOfStats[V any](m *MapOf[V]) MapStats {
func CollectMapOfStats[K, V any](m *MapOf[K, V]) MapStats {
return MapStats{m.stats()}
}
60 changes: 35 additions & 25 deletions mapof.go
Expand Up @@ -30,19 +30,29 @@ import (
// One important difference with sync.Map is that only string keys
// are supported. That's because Golang standard library does not
// expose the built-in hash functions for interface{} values.
type MapOf[V any] struct {
type MapOf[K, V any] struct {
totalGrowths int64
totalShrinks int64
resizing int64 // resize in progress flag; updated atomically
resizeMu sync.Mutex // only used along with resizeCond
resizeCond sync.Cond // used to wake up resize waiters (concurrent modifications)
table unsafe.Pointer // *mapTable
hasher func(K) uint64
}

// NewMapOf creates a new MapOf instance.
func NewMapOf[V any]() *MapOf[V] {
m := &MapOf[V]{}
// NewMapOf creates a new MapOf instance with string keys
func NewMapOf[V any]() *MapOf[string, V] {
return NewTypedMapOf[string, V](func(k string) uint64 {
return maphash64(k)
})
}

// NewTypedMapOf creates a new MapOf instance with arbitrarily typed keys.
// Keys are hashed to uint64 using the hasher fn.
func NewTypedMapOf[K, V any](hasher func(K) uint64) *MapOf[K, V] {
m := &MapOf[K, V]{}
m.resizeCond = *sync.NewCond(&m.resizeMu)
m.hasher = hasher
table := newMapTable(minMapTableLen)
atomic.StorePointer(&m.table, unsafe.Pointer(table))
return m
Expand All @@ -51,8 +61,8 @@ func NewMapOf[V any]() *MapOf[V] {
// Load returns the value stored in the map for a key, or nil if no
// value is present.
// The ok result indicates whether value was found in the map.
func (m *MapOf[V]) Load(key string) (value V, ok bool) {
hash := maphash64(key)
func (m *MapOf[K, V]) Load(key K) (value V, ok bool) {
hash := m.hasher(key)
table := (*mapTable)(atomic.LoadPointer(&m.table))
bidx := bucketIdx(table, hash)
b := &table.buckets[bidx]
Expand All @@ -66,7 +76,7 @@ func (m *MapOf[V]) Load(key string) (value V, ok bool) {
vp := atomic.LoadPointer(&b.values[i])
kp := atomic.LoadPointer(&b.keys[i])
if kp != nil && vp != nil {
if key == derefKey(kp) {
if hash == m.hasher(derefTypedValue[K](kp)) {
if uintptr(vp) == uintptr(atomic.LoadPointer(&b.values[i])) {
// Atomic snapshot succeeded.
return derefTypedValue[V](vp), true
Expand All @@ -80,34 +90,34 @@ func (m *MapOf[V]) Load(key string) (value V, ok bool) {
}

// Store sets the value for a key.
func (m *MapOf[V]) Store(key string, value V) {
func (m *MapOf[K, V]) Store(key K, value V) {
m.doStore(key, value, false)
}

// LoadOrStore returns the existing value for the key if present.
// Otherwise, it stores and returns the given value.
// The loaded result is true if the value was loaded, false if stored.
func (m *MapOf[V]) LoadOrStore(key string, value V) (actual V, loaded bool) {
func (m *MapOf[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) {
return m.doStore(key, value, true)
}

// LoadAndStore returns the existing value for the key if present,
// while setting the new value for the key.
// Otherwise, it stores and returns the given value.
// The loaded result is true if the value was loaded, false otherwise.
func (m *MapOf[V]) LoadAndStore(key string, value V) (actual V, loaded bool) {
func (m *MapOf[K, V]) LoadAndStore(key K, value V) (actual V, loaded bool) {
return m.doStore(key, value, false)
}

func (m *MapOf[V]) doStore(key string, value V, loadIfExists bool) (V, bool) {
func (m *MapOf[K, V]) doStore(key K, value V, loadIfExists bool) (V, bool) {
// Read-only path.
if loadIfExists {
if v, ok := m.Load(key); ok {
return v, true
}
}
// Write path.
hash := maphash64(key)
hash := m.hasher(key)
for {
var (
emptykp, emptyvp *unsafe.Pointer
Expand Down Expand Up @@ -140,7 +150,7 @@ func (m *MapOf[V]) doStore(key string, value V, loadIfExists bool) (V, bool) {
if !topHashMatch(hash, b.topHashes, i) {
continue
}
if key == derefKey(b.keys[i]) {
if hash == m.hasher(derefTypedValue[K](b.keys[i])) {
vp := b.values[i]
if loadIfExists {
b.mu.Unlock()
Expand Down Expand Up @@ -171,24 +181,24 @@ func (m *MapOf[V]) doStore(key string, value V, loadIfExists bool) (V, bool) {
}
}

func (m *MapOf[V]) newerTableExists(table *mapTable) bool {
func (m *MapOf[K, V]) newerTableExists(table *mapTable) bool {
curTablePtr := atomic.LoadPointer(&m.table)
return uintptr(curTablePtr) != uintptr(unsafe.Pointer(table))
}

func (m *MapOf[V]) resizeInProgress() bool {
func (m *MapOf[K, V]) resizeInProgress() bool {
return atomic.LoadInt64(&m.resizing) == 1
}

func (m *MapOf[V]) waitForResize() {
func (m *MapOf[K, V]) waitForResize() {
m.resizeMu.Lock()
for m.resizeInProgress() {
m.resizeCond.Wait()
}
m.resizeMu.Unlock()
}

func (m *MapOf[V]) resize(table *mapTable, hint mapResizeHint) {
func (m *MapOf[K, V]) resize(table *mapTable, hint mapResizeHint) {
var shrinkThreshold int64
tableLen := len(table.buckets)
// Fast path for shrink attempts.
Expand Down Expand Up @@ -247,8 +257,8 @@ copy:
// LoadAndDelete deletes the value for a key, returning the previous
// value if any. The loaded result reports whether the key was
// present.
func (m *MapOf[V]) LoadAndDelete(key string) (value V, loaded bool) {
hash := maphash64(key)
func (m *MapOf[K, V]) LoadAndDelete(key K) (value V, loaded bool) {
hash := m.hasher(key)
delete_attempt:
hintNonEmpty := 0
table := (*mapTable)(atomic.LoadPointer(&m.table))
Expand All @@ -271,7 +281,7 @@ delete_attempt:
if kp == nil || !topHashMatch(hash, b.topHashes, i) {
continue
}
if key == derefKey(kp) {
if hash == m.hasher(derefTypedValue[K](kp)) {
vp := b.values[i]
// Deletion case. First we update the value, then the key.
// This is important for atomic snapshot states.
Expand All @@ -297,7 +307,7 @@ delete_attempt:
}

// Delete deletes the value for a key.
func (m *MapOf[V]) Delete(key string) {
func (m *MapOf[K, V]) Delete(key K) {
m.LoadAndDelete(key)
}

Expand All @@ -313,14 +323,14 @@ func (m *MapOf[V]) Delete(key string) {
// It is safe to modify the map while iterating it. However, the
// concurrent modification rule apply, i.e. the changes may be not
// reflected in the subsequently iterated entries.
func (m *MapOf[V]) Range(f func(key string, value V) bool) {
func (m *MapOf[K, V]) Range(f func(key K, value V) bool) {
var bentries [entriesPerMapBucket]rangeEntry
tablep := atomic.LoadPointer(&m.table)
table := *(*mapTable)(tablep)
for i := range table.buckets {
n := copyRangeEntries(&table.buckets[i], &bentries)
for j := 0; j < n; j++ {
k := derefKey(bentries[j].key)
k := derefTypedValue[K](bentries[j].key)
v := derefTypedValue[V](bentries[j].value)
if !f(k, v) {
return
Expand All @@ -330,7 +340,7 @@ func (m *MapOf[V]) Range(f func(key string, value V) bool) {
}

// Size returns current size of the map.
func (m *MapOf[V]) Size() int {
func (m *MapOf[K, V]) Size() int {
table := (*mapTable)(atomic.LoadPointer(&m.table))
return int(sumSize(table))
}
Expand All @@ -340,7 +350,7 @@ func derefTypedValue[V any](valuePtr unsafe.Pointer) (val V) {
}

// O(N) operation; use for debug purposes only
func (m *MapOf[V]) stats() mapStats {
func (m *MapOf[K, V]) stats() mapStats {
stats := mapStats{
TotalGrowths: atomic.LoadInt64(&m.totalGrowths),
TotalShrinks: atomic.LoadInt64(&m.totalShrinks),
Expand Down
8 changes: 4 additions & 4 deletions mapof_test.go
Expand Up @@ -330,7 +330,7 @@ func TestMapOfResize_CounterLenLimit(t *testing.T) {
}
}

func parallelSeqTypedStorer(t *testing.T, m *MapOf[int], storeEach, numIters, numEntries int, cdone chan bool) {
func parallelSeqTypedStorer(t *testing.T, m *MapOf[string, int], storeEach, numIters, numEntries int, cdone chan bool) {
for i := 0; i < numIters; i++ {
for j := 0; j < numEntries; j++ {
if storeEach == 0 || j%storeEach == 0 {
Expand Down Expand Up @@ -376,7 +376,7 @@ func TestMapOfParallelStores(t *testing.T) {
}
}

func parallelRandTypedStorer(t *testing.T, m *MapOf[int], numIters, numEntries int, cdone chan bool) {
func parallelRandTypedStorer(t *testing.T, m *MapOf[string, int], numIters, numEntries int, cdone chan bool) {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := 0; i < numIters; i++ {
j := r.Intn(numEntries)
Expand All @@ -389,7 +389,7 @@ func parallelRandTypedStorer(t *testing.T, m *MapOf[int], numIters, numEntries i
cdone <- true
}

func parallelRandTypedDeleter(t *testing.T, m *MapOf[int], numIters, numEntries int, cdone chan bool) {
func parallelRandTypedDeleter(t *testing.T, m *MapOf[string, int], numIters, numEntries int, cdone chan bool) {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := 0; i < numIters; i++ {
j := r.Intn(numEntries)
Expand All @@ -402,7 +402,7 @@ func parallelRandTypedDeleter(t *testing.T, m *MapOf[int], numIters, numEntries
cdone <- true
}

func parallelTypedLoader(t *testing.T, m *MapOf[int], numIters, numEntries int, cdone chan bool) {
func parallelTypedLoader(t *testing.T, m *MapOf[string, int], numIters, numEntries int, cdone chan bool) {
for i := 0; i < numIters; i++ {
for j := 0; j < numEntries; j++ {
// Due to atomic snapshots we must either see no entry, or a "<j>"/j pair.
Expand Down

0 comments on commit 12e0ac8

Please sign in to comment.