Skip to content

Commit

Permalink
pool: fix race condition at GetNextConnection()
Browse files Browse the repository at this point in the history
The `r.current` value can be changed by concurrent threads because
the change happens under read-lock. We could use the atomic counter
for a current connection number to avoid the race condition.

Closes #309
  • Loading branch information
oleg-jukovec committed Jun 28, 2023
1 parent 44db92b commit 3e77a2f
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
### Fixed

- Flaky decimal/TestSelect (#300)
- Race condition at roundRobinStrategy.GetNextConnection() (#309)

## [1.12.0] - 2023-06-07

Expand Down
29 changes: 29 additions & 0 deletions pool/connection_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2231,6 +2231,35 @@ func TestDo(t *testing.T) {
require.NotNilf(t, resp, "response is nil after Ping")
}

func TestDo_concurrent(t *testing.T) {
roles := []bool{true, true, false, true, false}

err := test_helpers.SetClusterRO(servers, connOpts, roles)
require.Nilf(t, err, "fail to set roles for cluster")

connPool, err := pool.Connect(servers, connOpts)
require.Nilf(t, err, "failed to connect")
require.NotNilf(t, connPool, "conn is nil after Connect")

defer connPool.Close()

req := tarantool.NewPingRequest()
const concurrency = 100
var wg sync.WaitGroup
wg.Add(concurrency)

for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()

_, err := connPool.Do(req, pool.ANY).Get()
assert.Nil(t, err)
}()
}

wg.Wait()
}

func TestNewPrepared(t *testing.T) {
test_helpers.SkipIfSQLUnsupported(t)

Expand Down
14 changes: 7 additions & 7 deletions pool/round_robin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pool

import (
"sync"
"sync/atomic"

"github.com/tarantool/go-tarantool/v2"
)
Expand All @@ -10,8 +11,8 @@ type roundRobinStrategy struct {
conns []*tarantool.Connection
indexByAddr map[string]uint
mutex sync.RWMutex
size uint
current uint
size uint64
current uint64
}

func newRoundRobinStrategy(size int) *roundRobinStrategy {
Expand Down Expand Up @@ -98,13 +99,12 @@ func (r *roundRobinStrategy) AddConn(addr string, conn *tarantool.Connection) {
r.conns[idx] = conn
} else {
r.conns = append(r.conns, conn)
r.indexByAddr[addr] = r.size
r.indexByAddr[addr] = uint(r.size)
r.size += 1
}
}

func (r *roundRobinStrategy) nextIndex() uint {
ret := r.current % r.size
r.current++
return ret
func (r *roundRobinStrategy) nextIndex() uint64 {
next := atomic.AddUint64(&r.current, 1)
return (next - 1) % r.size
}

0 comments on commit 3e77a2f

Please sign in to comment.