Skip to content

Commit

Permalink
connpool: Allow time out during shutdown (#15979)
Browse files Browse the repository at this point in the history
Signed-off-by: Vicent Marti <vmg@strn.cat>
  • Loading branch information
vmg committed May 23, 2024
1 parent 0d8ca1b commit afbce6a
Show file tree
Hide file tree
Showing 11 changed files with 189 additions and 77 deletions.
101 changes: 79 additions & 22 deletions go/pools/smartconnpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,16 @@ import (

var (
// ErrTimeout is returned if a connection get times out.
ErrTimeout = vterrors.New(vtrpcpb.Code_RESOURCE_EXHAUSTED, "resource pool timed out")
ErrTimeout = vterrors.New(vtrpcpb.Code_RESOURCE_EXHAUSTED, "connection pool timed out")

// ErrCtxTimeout is returned if a ctx is already expired by the time the connection pool is used
ErrCtxTimeout = vterrors.New(vtrpcpb.Code_DEADLINE_EXCEEDED, "resource pool context already expired")
ErrCtxTimeout = vterrors.New(vtrpcpb.Code_DEADLINE_EXCEEDED, "connection pool context already expired")

// ErrConnPoolClosed is returned when trying to get a connection from a closed conn pool
ErrConnPoolClosed = vterrors.New(vtrpcpb.Code_INTERNAL, "connection pool is closed")

// PoolCloseTimeout is how long to wait for all connections to be returned to the pool during close
PoolCloseTimeout = 10 * time.Second
)

type Metrics struct {
Expand Down Expand Up @@ -119,8 +125,9 @@ type ConnPool[C Connection] struct {
capacity atomic.Int64

// workers is a waitgroup for all the currently running worker goroutines
workers sync.WaitGroup
close chan struct{}
workers sync.WaitGroup
close chan struct{}
capacityMu sync.Mutex

config struct {
// connect is the callback to create a new connection for the pool
Expand All @@ -142,6 +149,7 @@ type ConnPool[C Connection] struct {
}

Metrics Metrics
Name string
}

// NewPool creates a new connection pool with the given Config.
Expand Down Expand Up @@ -236,29 +244,60 @@ func (pool *ConnPool[C]) Open(connect Connector[C], refresh RefreshCheck) *ConnP

// Close shuts down the pool. No connections will be returned from ConnPool.Get after calling this,
// but calling ConnPool.Put is still allowed. This function will not return until all of the pool's
// connections have been returned.
// connections have been returned or the default PoolCloseTimeout has elapsed
func (pool *ConnPool[C]) Close() {
if pool.close == nil {
ctx, cancel := context.WithTimeout(context.Background(), PoolCloseTimeout)
defer cancel()

if err := pool.CloseWithContext(ctx); err != nil {
log.Errorf("failed to close pool %q: %v", pool.Name, err)
}
}

// CloseWithContext behaves like Close but allows passing in a Context to time out the
// pool closing operation
func (pool *ConnPool[C]) CloseWithContext(ctx context.Context) error {
pool.capacityMu.Lock()
defer pool.capacityMu.Unlock()

if pool.close == nil || pool.capacity.Load() == 0 {
// already closed
return
return nil
}

pool.SetCapacity(0)
// close all the connections in the pool; if we time out while waiting for
// users to return our connections, we still want to finish the shutdown
// for the pool
err := pool.setCapacity(ctx, 0)

close(pool.close)
pool.workers.Wait()
pool.close = nil
return err
}

func (pool *ConnPool[C]) reopen() {
pool.capacityMu.Lock()
defer pool.capacityMu.Unlock()

capacity := pool.capacity.Load()
if capacity == 0 {
return
}

pool.Close()
pool.open()
pool.SetCapacity(capacity)
ctx, cancel := context.WithTimeout(context.Background(), PoolCloseTimeout)
defer cancel()

// to re-open the connection pool, first set the capacity to 0 so we close
// all the existing connections, as they're now connected to a stale MySQL
// instance.
if err := pool.setCapacity(ctx, 0); err != nil {
log.Errorf("failed to reopen pool %q: %v", pool.Name, err)
}

// the second call to setCapacity cannot fail because it's only increasing the number
// of connections and doesn't need to shut down any
_ = pool.setCapacity(ctx, capacity)
}

// IsOpen returns whether the pool is open
Expand Down Expand Up @@ -322,7 +361,7 @@ func (pool *ConnPool[C]) Get(ctx context.Context, setting *Setting) (*Pooled[C],
return nil, ErrCtxTimeout
}
if pool.capacity.Load() == 0 {
return nil, ErrTimeout
return nil, ErrConnPoolClosed
}
if setting == nil {
return pool.get(ctx)
Expand Down Expand Up @@ -571,39 +610,55 @@ func (pool *ConnPool[C]) getWithSetting(ctx context.Context, setting *Setting) (
// If the capacity is smaller than the number of connections that there are
// currently open, we'll close enough connections before returning, even if
// that means waiting for clients to return connections to the pool.
func (pool *ConnPool[C]) SetCapacity(newcap int64) {
// If the given context times out before we've managed to close enough connections
// an error will be returned.
func (pool *ConnPool[C]) SetCapacity(ctx context.Context, newcap int64) error {
pool.capacityMu.Lock()
defer pool.capacityMu.Unlock()
return pool.setCapacity(ctx, newcap)
}

// setCapacity is the internal implementation for SetCapacity; it must be called
// with pool.capacityMu being held
func (pool *ConnPool[C]) setCapacity(ctx context.Context, newcap int64) error {
if newcap < 0 {
panic("negative capacity")
}

oldcap := pool.capacity.Swap(newcap)
if oldcap == newcap {
return
return nil
}

backoff := 1 * time.Millisecond
const delay = 10 * time.Millisecond

// close connections until we're under capacity
for pool.active.Load() > newcap {
if err := ctx.Err(); err != nil {
return vterrors.Errorf(vtrpcpb.Code_ABORTED,
"timed out while waiting for connections to be returned to the pool (capacity=%d, active=%d, borrowed=%d)",
pool.capacity.Load(), pool.active.Load(), pool.borrowed.Load())
}
// if we're closing down the pool, make sure there's no clients waiting
// for connections because they won't be returned in the future
if newcap == 0 {
pool.wait.expire(true)
}

// try closing from connections which are currently idle in the stacks
conn := pool.getFromSettingsStack(nil)
if conn == nil {
conn, _ = pool.clean.Pop()
}
if conn == nil {
time.Sleep(backoff)
backoff += 1 * time.Millisecond
time.Sleep(delay)
continue
}
conn.Close()
pool.closedConn()
}

// if we're closing down the pool, wake up any blocked waiters because no connections
// are going to be returned in the future
if newcap == 0 {
pool.wait.expire(true)
}
return nil
}

func (pool *ConnPool[C]) closeIdleResources(now time.Time) {
Expand Down Expand Up @@ -659,6 +714,8 @@ func (pool *ConnPool[C]) RegisterStats(stats *servenv.Exporter, name string) {
return
}

pool.Name = name

stats.NewGaugeFunc(name+"Capacity", "Tablet server conn pool capacity", func() int64 {
return pool.Capacity()
})
Expand Down
72 changes: 62 additions & 10 deletions go/pools/smartconnpool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,15 @@ func TestOpen(t *testing.T) {
assert.EqualValues(t, 6, state.lastID.Load())

// SetCapacity
p.SetCapacity(3)
err = p.SetCapacity(ctx, 3)
require.NoError(t, err)
assert.EqualValues(t, 3, state.open.Load())
assert.EqualValues(t, 6, state.lastID.Load())
assert.EqualValues(t, 3, p.Capacity())
assert.EqualValues(t, 3, p.Available())

p.SetCapacity(6)
err = p.SetCapacity(ctx, 6)
require.NoError(t, err)
assert.EqualValues(t, 6, p.Capacity())
assert.EqualValues(t, 6, p.Available())

Expand Down Expand Up @@ -265,7 +267,9 @@ func TestShrinking(t *testing.T) {
}
done := make(chan bool)
go func() {
p.SetCapacity(3)
err := p.SetCapacity(ctx, 3)
require.NoError(t, err)

done <- true
}()
expected := map[string]any{
Expand Down Expand Up @@ -335,7 +339,8 @@ func TestShrinking(t *testing.T) {

// This will also wait
go func() {
p.SetCapacity(2)
err := p.SetCapacity(ctx, 2)
require.NoError(t, err)
done <- true
}()
time.Sleep(10 * time.Millisecond)
Expand All @@ -353,7 +358,8 @@ func TestShrinking(t *testing.T) {
assert.EqualValues(t, 2, state.open.Load())

// Test race condition of SetCapacity with itself
p.SetCapacity(3)
err = p.SetCapacity(ctx, 3)
require.NoError(t, err)
for i := 0; i < 3; i++ {
var r *Pooled[*TestConn]
var err error
Expand All @@ -375,9 +381,15 @@ func TestShrinking(t *testing.T) {
time.Sleep(10 * time.Millisecond)

// This will wait till we Put
go p.SetCapacity(2)
go func() {
err := p.SetCapacity(ctx, 2)
require.NoError(t, err)
}()
time.Sleep(10 * time.Millisecond)
go p.SetCapacity(4)
go func() {
err := p.SetCapacity(ctx, 4)
require.NoError(t, err)
}()
time.Sleep(10 * time.Millisecond)

// This should not hang
Expand All @@ -387,7 +399,7 @@ func TestShrinking(t *testing.T) {
<-done

assert.Panics(t, func() {
p.SetCapacity(-1)
_ = p.SetCapacity(ctx, -1)
})

assert.EqualValues(t, 4, p.Capacity())
Expand Down Expand Up @@ -530,6 +542,46 @@ func TestReopen(t *testing.T) {
assert.EqualValues(t, 0, state.open.Load())
}

func TestUserClosing(t *testing.T) {
var state TestState

ctx := context.Background()
p := NewPool(&Config[*TestConn]{
Capacity: 5,
IdleTimeout: time.Second,
LogWait: state.LogWait,
}).Open(newConnector(&state), nil)

var resources [5]*Pooled[*TestConn]
for i := 0; i < 5; i++ {
var err error
resources[i], err = p.Get(ctx, nil)
require.NoError(t, err)
}

for _, r := range resources[:4] {
r.Recycle()
}

ch := make(chan error)
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

err := p.CloseWithContext(ctx)
ch <- err
close(ch)
}()

select {
case <-time.After(5 * time.Second):
t.Fatalf("Pool did not shutdown after 5s")
case err := <-ch:
require.Error(t, err)
t.Logf("Shutdown error: %v", err)
}
}

func TestIdleTimeout(t *testing.T) {
testTimeout := func(t *testing.T, setting *Setting) {
var state TestState
Expand Down Expand Up @@ -818,7 +870,7 @@ func TestTimeout(t *testing.T) {
newctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
_, err = p.Get(newctx, setting)
cancel()
assert.EqualError(t, err, "resource pool timed out")
assert.EqualError(t, err, "connection pool timed out")

}

Expand All @@ -842,7 +894,7 @@ func TestExpired(t *testing.T) {
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-1*time.Second))
_, err := p.Get(ctx, setting)
cancel()
require.EqualError(t, err, "resource pool context already expired")
require.EqualError(t, err, "connection pool context already expired")
}
}

Expand Down
6 changes: 4 additions & 2 deletions go/vt/vttablet/endtoend/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,10 @@ func TestSidecarTables(t *testing.T) {
}

func TestConsolidation(t *testing.T) {
defer framework.Server.SetPoolSize(framework.Server.PoolSize())
framework.Server.SetPoolSize(1)
defer framework.Server.SetPoolSize(context.Background(), framework.Server.PoolSize())

err := framework.Server.SetPoolSize(context.Background(), 1)
require.NoError(t, err)

const tag = "Waits/Histograms/Consolidations/Count"

Expand Down
7 changes: 5 additions & 2 deletions go/vt/vttablet/endtoend/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package endtoend

import (
"context"
"errors"
"fmt"
"reflect"
Expand Down Expand Up @@ -98,11 +99,13 @@ func TestStreamConsolidation(t *testing.T) {

defaultPoolSize := framework.Server.StreamPoolSize()

framework.Server.SetStreamPoolSize(4)
err = framework.Server.SetStreamPoolSize(context.Background(), 4)
require.NoError(t, err)

framework.Server.SetStreamConsolidationBlocking(true)

defer func() {
framework.Server.SetStreamPoolSize(defaultPoolSize)
_ = framework.Server.SetStreamPoolSize(context.Background(), defaultPoolSize)
framework.Server.SetStreamConsolidationBlocking(false)
}()

Expand Down
6 changes: 0 additions & 6 deletions go/vt/vttablet/tabletserver/connpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,9 @@ import (
"vitess.io/vitess/go/vt/dbconnpool"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// ErrConnPoolClosed is returned when the connection pool is closed.
var ErrConnPoolClosed = vterrors.New(vtrpcpb.Code_INTERNAL, "internal error: unexpected: conn pool is closed")

const (
getWithoutS = "GetWithoutSettings"
getWithS = "GetWithSettings"
Expand Down
Loading

0 comments on commit afbce6a

Please sign in to comment.