Skip to content

Commit

Permalink
pool: add a new method Pool.DoInstance
Browse files Browse the repository at this point in the history
The method allows to execute a request on the target instance in
a pool.

Closes #376
  • Loading branch information
oleg-jukovec committed Feb 3, 2024
1 parent e765b0a commit ba5b6a6
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
check (#301)
- `GreetingDialer` type for creating a dialer, that fills `Greeting` of a
connection (#301)
- New method `Pool.DoInstance` to execute a request on a target instance in
a pool (#376).

### Changed

Expand Down
10 changes: 10 additions & 0 deletions pool/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,16 @@ func (p *ConnectionPool) Do(req tarantool.Request, userMode Mode) *tarantool.Fut
return conn.Do(req)
}

// DoInstance sends the request into a target instance and returns a future.
func (p *ConnectionPool) DoInstance(req tarantool.Request, name string) *tarantool.Future {
conn := p.anyPool.GetConnection(name)
if conn == nil {
return newErrorFuture(ErrNoHealthyInstance)
}

return conn.Do(req)
}

//
// private
//
Expand Down
70 changes: 70 additions & 0 deletions pool/connection_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2540,6 +2540,76 @@ func TestDo_concurrent(t *testing.T) {
wg.Wait()
}

func TestDoInstance(t *testing.T) {
ctx, cancel := test_helpers.GetPoolConnectContext()
defer cancel()

connPool, err := pool.Connect(ctx, instances)
require.Nilf(t, err, "failed to connect")
require.NotNilf(t, connPool, "conn is nil after Connect")

defer connPool.Close()

req := tarantool.NewEvalRequest("return box.cfg.listen")
for _, server := range servers {
data, err := connPool.DoInstance(req, server).Get()
require.NoError(t, err)
assert.Equal(t, []interface{}{server}, data)
}
}

func TestDoInstance_not_found(t *testing.T) {
roles := []bool{true, true, false, true, false}

err := test_helpers.SetClusterRO(dialers, connOpts, roles)
require.Nilf(t, err, "fail to set roles for cluster")

ctx, cancel := test_helpers.GetPoolConnectContext()
defer cancel()

connPool, err := pool.Connect(ctx, []pool.Instance{})
require.Nilf(t, err, "failed to connect")
require.NotNilf(t, connPool, "conn is nil after Connect")

defer connPool.Close()

data, err := connPool.DoInstance(tarantool.NewPingRequest(), "not_exist").Get()
assert.Nil(t, data)
require.ErrorIs(t, err, pool.ErrNoHealthyInstance)
}

func TestDoInstance_concurrent(t *testing.T) {
ctx, cancel := test_helpers.GetPoolConnectContext()
defer cancel()
connPool, err := pool.Connect(ctx, instances)
require.Nilf(t, err, "failed to connect")
require.NotNilf(t, connPool, "conn is nil after Connect")

defer connPool.Close()

eval := tarantool.NewEvalRequest("return box.cfg.listen")
ping := tarantool.NewPingRequest()
const concurrency = 100
var wg sync.WaitGroup
wg.Add(concurrency)

for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()

for _, server := range servers {
data, err := connPool.DoInstance(eval, server).Get()
require.NoError(t, err)
assert.Equal(t, []interface{}{server}, data)
}
_, err := connPool.DoInstance(ping, "not_exist").Get()
require.ErrorIs(t, err, pool.ErrNoHealthyInstance)
}()
}

wg.Wait()
}

func TestNewPrepared(t *testing.T) {
test_helpers.SkipIfSQLUnsupported(t)

Expand Down

0 comments on commit ba5b6a6

Please sign in to comment.