Skip to content

Commit

Permalink
Merge pull request #4 from k1LoW/multi
Browse files Browse the repository at this point in the history
Support multiple keys
  • Loading branch information
k1LoW committed Jan 22, 2024
2 parents 4c4559d + c93bcba commit 1a2f4c9
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 6 deletions.
58 changes: 52 additions & 6 deletions concgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package concgroup

import (
"context"
"sort"
"sync"

"golang.org/x/sync/errgroup"
Expand All @@ -21,7 +22,7 @@ func WithContext(ctx context.Context) (*Group, context.Context) {
return &Group{eg: eg}, ctx
}

// Go calls the given function in a new goroutine like errgroup.Group.
// Go calls the given function in a new goroutine like errgroup.Group with key.
func (g *Group) Go(key string, f func() error) {
g.mu.Lock()
defer g.mu.Unlock()
Expand All @@ -38,7 +39,31 @@ func (g *Group) Go(key string, f func() error) {
})
}

// TryGo calls the given function only when the number of active goroutines is currently below the configured limit like errgroup.Group.
// GoMulti calls the given function in a new goroutine like errgroup.Group with multiple key locks.
func (g *Group) GoMulti(keys []string, f func() error) {
g.mu.Lock()
defer g.mu.Unlock()
g.init()
sort.Strings(keys)
var mus []*sync.Mutex
for _, key := range keys {
mu, ok := g.locks[key]
if !ok {
mu = &sync.Mutex{}
g.locks[key] = mu
}
mus = append(mus, mu)
}
g.eg.Go(func() error {
for _, mu := range mus {
mu.Lock()
defer mu.Unlock()
}
return f()
})
}

// TryGo calls the given function only when the number of active goroutines is currently below the configured limit like errgroup.Group with key.
func (g *Group) TryGo(key string, f func() error) bool {
g.mu.Lock()
defer g.mu.Unlock()
Expand All @@ -48,14 +73,35 @@ func (g *Group) TryGo(key string, f func() error) bool {
mu = &sync.Mutex{}
g.locks[key] = mu
}
if !g.eg.TryGo(func() error {
return g.eg.TryGo(func() error {
mu.Lock()
defer mu.Unlock()
return f()
}) {
return false
})
}

// TryGoMulti calls the given function only when the number of active goroutines is currently below the configured limit like errgroup.Group with multiple key locks.
func (g *Group) TryGoMulti(keys []string, f func() error) bool {
g.mu.Lock()
defer g.mu.Unlock()
g.init()
sort.Strings(keys)
var mus []*sync.Mutex
for _, key := range keys {
mu, ok := g.locks[key]
if !ok {
mu = &sync.Mutex{}
g.locks[key] = mu
}
mus = append(mus, mu)
}
return true
return g.eg.TryGo(func() error {
for _, mu := range mus {
mu.Lock()
defer mu.Unlock()
}
return f()
})
}

// SetLimit limits the number of active goroutines in this group to at most n like errgroup.Group.
Expand Down
94 changes: 94 additions & 0 deletions concgroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,97 @@ func TestConcurrencyGroupWithTryGo(t *testing.T) {
t.Error("Failed to skip by TryGo")
}
}

func TestConcurrencyGroupMulti(t *testing.T) {
t.Parallel()
cg := new(concgroup.Group)
mu := sync.Mutex{}
for i := 0; i < 10; i++ {
keys := []string{"samegroup", fmt.Sprintf("group-%d", i)}
cg.GoMulti(keys, func() error {
if !mu.TryLock() {
return errors.New("violate group concurrency")
}
defer mu.Unlock()
time.Sleep(50 * time.Millisecond)
return nil
})
}
if err := cg.Wait(); err != nil {
t.Error(err)
}
}

func TestConcurrencyGroupWithTryGoMulti(t *testing.T) {
t.Parallel()
const loop = 10
cg := new(concgroup.Group)
cg.SetLimit(1)
mu := sync.Mutex{}
call := 0
for i := 0; i < loop; i++ {
keys := []string{"samegroup", fmt.Sprintf("group-%d", i)}
cg.TryGoMulti(keys, func() error {
if !mu.TryLock() {
return errors.New("violate group concurrency")
}
call++
defer mu.Unlock()
time.Sleep(50 * time.Millisecond)
return nil
})
}
if err := cg.Wait(); err != nil {
t.Error(err)
}
if call == loop {
t.Error("Failed to skip by TryGoMulti")
}
}

func TestConcurrencyGroupMultiAvoidDeadlock(t *testing.T) {
keys := []string{"A0", "B0", "C0"}
var otherKeysA, otherKeysB []string
for i := 1; i < 1000; i++ {
otherKeysA = append(otherKeysA, fmt.Sprintf("A%d", i))
otherKeysB = append(otherKeysB, fmt.Sprintf("B%d", i))
}
keys = append(keys, otherKeysA...)
keys = append(keys, otherKeysB...)

tests := []struct {
name string
a []string
b []string
}{
{"Same keys", []string{"A0"}, []string{"A0"}},
{"Other keys", []string{"A0"}, []string{"B0"}},
{"Order of keys in which deadlock is likely to occur in a and b", append(append([]string{"A0"}, otherKeysA...), "B0"), append(append([]string{"B0"}, otherKeysA...), "A0")},
{"Order of keys in which deadlock is likely to occur in a and b", append(append([]string{"A0"}, otherKeysA...), "C0"), append(append([]string{"B0"}, otherKeysB...), "C0")},
{"Order of keys in which deadlock is likely to occur in a and b", append(append(append([]string{"A0"}, otherKeysA...), otherKeysB...), "C0"), append(append([]string{"B0"}, otherKeysB...), "C0")},
}
for _, tt := range tests {
mu := sync.Mutex{}
cg := new(concgroup.Group)
mu.Lock()
// Lock all keys
cg.GoMulti(keys, func() error {
mu.Lock()
defer mu.Unlock()
return nil
})
// Waiging to lock tt.a
cg.GoMulti(tt.a, func() error {
return nil
})
// Waiging to lock tt.b
cg.GoMulti(tt.b, func() error {
return nil
})
// Unlock all keys
mu.Unlock()
if err := cg.Wait(); err != nil {
t.Fatal(err)
}
}
}

0 comments on commit 1a2f4c9

Please sign in to comment.