Skip to content

Commit

Permalink
Merge pull request #4769 from planetscale/ss-prefilled-pool
Browse files Browse the repository at this point in the history
pools: option to prefill
  • Loading branch information
sougou committed Jun 17, 2019
2 parents 640588c + 61655c0 commit f25720a
Show file tree
Hide file tree
Showing 17 changed files with 284 additions and 127 deletions.
81 changes: 64 additions & 17 deletions go/pools/resource_pool.go
Expand Up @@ -21,6 +21,7 @@ package pools
import (
"errors"
"fmt"
"sync"
"time"

"golang.org/x/net/context"
Expand All @@ -36,6 +37,8 @@ var (

// ErrTimeout is returned if a resource get times out.
ErrTimeout = errors.New("resource pool timed out")

prefillTimeout = 30 * time.Second
)

// Factory is a function that can be used to create a resource.
Expand Down Expand Up @@ -77,9 +80,12 @@ type resourceWrapper struct {
// maxCap specifies the extent to which the pool can be resized
// in the future through the SetCapacity function.
// You cannot resize the pool beyond maxCap.
// If a resource is unused beyond idleTimeout, it's discarded.
// If a resource is unused beyond idleTimeout, it's replaced
// with a new one.
// An idleTimeout of 0 means that there is no timeout.
func NewResourcePool(factory Factory, capacity, maxCap int, idleTimeout time.Duration) *ResourcePool {
// A non-zero value of prefillParallism causes the pool to be pre-filled.
// The value specifies how many resources can be opened in parallel.
func NewResourcePool(factory Factory, capacity, maxCap int, idleTimeout time.Duration, prefillParallelism int) *ResourcePool {
if capacity <= 0 || maxCap <= 0 || capacity > maxCap {
panic(errors.New("invalid/out of range capacity"))
}
Expand All @@ -94,6 +100,35 @@ func NewResourcePool(factory Factory, capacity, maxCap int, idleTimeout time.Dur
rp.resources <- resourceWrapper{}
}

ctx, cancel := context.WithTimeout(context.TODO(), prefillTimeout)
defer cancel()
if prefillParallelism != 0 {
sem := sync2.NewSemaphore(prefillParallelism, 0 /* timeout */)
var wg sync.WaitGroup
for i := 0; i < capacity; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_ = sem.Acquire()
defer sem.Release()

// If context has expired, give up.
select {
case <-ctx.Done():
return
default:
}

r, err := rp.Get(ctx)
if err != nil {
return
}
rp.Put(r)
}()
}
wg.Wait()
}

if idleTimeout != 0 {
rp.idleTimer = timer.NewTimer(idleTimeout / 10)
rp.idleTimer.Start(rp.closeIdleResources)
Expand Down Expand Up @@ -131,14 +166,16 @@ func (rp *ResourcePool) closeIdleResources() {
return
}

if wrapper.resource != nil && idleTimeout > 0 && time.Until(wrapper.timeUsed.Add(idleTimeout)) < 0 {
wrapper.resource.Close()
wrapper.resource = nil
rp.idleClosed.Add(1)
rp.active.Add(-1)
}
func() {
defer func() { rp.resources <- wrapper }()

if wrapper.resource != nil && idleTimeout > 0 && time.Until(wrapper.timeUsed.Add(idleTimeout)) < 0 {
wrapper.resource.Close()
rp.idleClosed.Add(1)
rp.reopenResource(&wrapper)
}
}()

rp.resources <- wrapper
}
}

Expand All @@ -153,10 +190,10 @@ func (rp *ResourcePool) Get(ctx context.Context) (resource Resource, err error)
span.Annotate("available", rp.available.Get())
span.Annotate("active", rp.active.Get())
defer span.Finish()
return rp.get(ctx, true)
return rp.get(ctx)
}

func (rp *ResourcePool) get(ctx context.Context, wait bool) (resource Resource, err error) {
func (rp *ResourcePool) get(ctx context.Context) (resource Resource, err error) {
// If ctx has already expired, avoid racing with rp's resource channel.
select {
case <-ctx.Done():
Expand All @@ -170,9 +207,6 @@ func (rp *ResourcePool) get(ctx context.Context, wait bool) (resource Resource,
select {
case wrapper, ok = <-rp.resources:
default:
if !wait {
return nil, nil
}
startTime := time.Now()
select {
case wrapper, ok = <-rp.resources:
Expand Down Expand Up @@ -204,13 +238,16 @@ func (rp *ResourcePool) get(ctx context.Context, wait bool) (resource Resource,
// Put will return a resource to the pool. For every successful Get,
// a corresponding Put is required. If you no longer need a resource,
// you will need to call Put(nil) instead of returning the closed resource.
// The will eventually cause a new resource to be created in its place.
// This will cause a new resource to be created in its place.
func (rp *ResourcePool) Put(resource Resource) {
var wrapper resourceWrapper
if resource != nil {
wrapper = resourceWrapper{resource, time.Now()}
wrapper = resourceWrapper{
resource: resource,
timeUsed: time.Now(),
}
} else {
rp.active.Add(-1)
rp.reopenResource(&wrapper)
}
select {
case rp.resources <- wrapper:
Expand All @@ -221,6 +258,16 @@ func (rp *ResourcePool) Put(resource Resource) {
rp.available.Add(1)
}

func (rp *ResourcePool) reopenResource(wrapper *resourceWrapper) {
if r, err := rp.factory(); err == nil {
wrapper.resource = r
wrapper.timeUsed = time.Now()
} else {
wrapper.resource = nil
rp.active.Add(-1)
}
}

// SetCapacity changes the capacity of the pool.
// You can use it to shrink or expand, but not beyond
// the max capacity. If the change requires the pool
Expand Down
116 changes: 97 additions & 19 deletions go/pools/resource_pool_test.go
Expand Up @@ -57,7 +57,7 @@ func TestOpen(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 6, 6, time.Second)
p := NewResourcePool(PoolFactory, 6, 6, time.Second, 0)
p.SetCapacity(5)
var resources [10]Resource

Expand Down Expand Up @@ -122,9 +122,10 @@ func TestOpen(t *testing.T) {
t.Errorf("Unexpected error %v", err)
}
r.Close()
// A nil Put should cause the resource to be reopened.
p.Put(nil)
if count.Get() != 4 {
t.Errorf("Expecting 4, received %d", count.Get())
if count.Get() != 5 {
t.Errorf("Expecting 5, received %d", count.Get())
}
for i := 0; i < 5; i++ {
r, err := p.Get(ctx)
Expand Down Expand Up @@ -194,11 +195,44 @@ func TestOpen(t *testing.T) {
}
}

func TestPrefill(t *testing.T) {
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 5, 5, time.Second, 1)
defer p.Close()
if p.Active() != 5 {
t.Errorf("p.Active(): %d, want 5", p.Active())
}
p = NewResourcePool(FailFactory, 5, 5, time.Second, 1)
defer p.Close()
if p.Active() != 0 {
t.Errorf("p.Active(): %d, want 0", p.Active())
}
}

func TestPrefillTimeout(t *testing.T) {
lastID.Set(0)
count.Set(0)
saveTimeout := prefillTimeout
prefillTimeout = 1 * time.Millisecond
defer func() { prefillTimeout = saveTimeout }()

start := time.Now()
p := NewResourcePool(SlowFailFactory, 5, 5, time.Second, 1)
defer p.Close()
if elapsed := time.Since(start); elapsed > 20*time.Millisecond {
t.Errorf("elapsed: %v, should be around 10ms", elapsed)
}
if p.Active() != 0 {
t.Errorf("p.Active(): %d, want 0", p.Active())
}
}

func TestShrinking(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 5, 5, time.Second)
p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0)
var resources [10]Resource
// Leave one empty slot in the pool
for i := 0; i < 4; i++ {
Expand Down Expand Up @@ -337,7 +371,7 @@ func TestClosing(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 5, 5, time.Second)
p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0)
var resources [10]Resource
for i := 0; i < 5; i++ {
r, err := p.Get(ctx)
Expand Down Expand Up @@ -391,7 +425,7 @@ func TestIdleTimeout(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 1, 1, 10*time.Millisecond)
p := NewResourcePool(PoolFactory, 1, 1, 10*time.Millisecond, 0)
defer p.Close()

r, err := p.Get(ctx)
Expand All @@ -414,10 +448,10 @@ func TestIdleTimeout(t *testing.T) {
if p.IdleClosed() != 0 {
t.Errorf("Expecting 0, received %d", p.IdleClosed())
}
time.Sleep(20 * time.Millisecond)
time.Sleep(15 * time.Millisecond)

if count.Get() != 0 {
t.Errorf("Expecting 0, received %d", count.Get())
if count.Get() != 1 {
t.Errorf("Expecting 1, received %d", count.Get())
}
if p.IdleClosed() != 1 {
t.Errorf("Expecting 1, received %d", p.IdleClosed())
Expand All @@ -438,7 +472,7 @@ func TestIdleTimeout(t *testing.T) {

// sleep to let the idle closer run while all resources are in use
// then make sure things are still as we expect
time.Sleep(20 * time.Millisecond)
time.Sleep(15 * time.Millisecond)
if lastID.Get() != 2 {
t.Errorf("Expecting 2, received %d", count.Get())
}
Expand Down Expand Up @@ -468,7 +502,7 @@ func TestIdleTimeout(t *testing.T) {
p.SetIdleTimeout(1000 * time.Millisecond)
p.Put(r)

time.Sleep(20 * time.Millisecond)
time.Sleep(15 * time.Millisecond)
if lastID.Get() != 2 {
t.Errorf("Expecting 2, received %d", count.Get())
}
Expand All @@ -479,24 +513,51 @@ func TestIdleTimeout(t *testing.T) {
t.Errorf("Expecting 1, received %d", p.IdleClosed())
}

// Get and Put to refresh timeUsed
r, err = p.Get(ctx)
if err != nil {
t.Errorf("Unexpected error %v", err)
}
p.Put(r)
p.SetIdleTimeout(10 * time.Millisecond)
time.Sleep(20 * time.Millisecond)
if lastID.Get() != 2 {
t.Errorf("Expecting 2, received %d", count.Get())
time.Sleep(15 * time.Millisecond)
if lastID.Get() != 3 {
t.Errorf("Expecting 3, received %d", lastID.Get())
}
if count.Get() != 0 {
if count.Get() != 1 {
t.Errorf("Expecting 1, received %d", count.Get())
}
if p.IdleClosed() != 2 {
t.Errorf("Expecting 2, received %d", p.IdleClosed())
}
}

func TestIdleTimeoutCreateFail(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 1, 1, 10*time.Millisecond, 0)
defer p.Close()
r, err := p.Get(ctx)
if err != nil {
t.Fatal(err)
}
// Change the factory before putting back
// to prevent race with the idle closer, who will
// try to use it.
p.factory = FailFactory
p.Put(r)
time.Sleep(15 * time.Millisecond)
if p.Active() != 0 {
t.Errorf("p.Active(): %d, want 0", p.Active())
}
}

func TestCreateFail(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(FailFactory, 5, 5, time.Second)
p := NewResourcePool(FailFactory, 5, 5, time.Second, 0)
defer p.Close()
if _, err := p.Get(ctx); err.Error() != "Failed" {
t.Errorf("Expecting Failed, received %v", err)
Expand All @@ -508,11 +569,28 @@ func TestCreateFail(t *testing.T) {
}
}

func TestCreateFailOnPut(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0)
defer p.Close()
_, err := p.Get(ctx)
if err != nil {
t.Fatal(err)
}
p.factory = FailFactory
p.Put(nil)
if p.Active() != 0 {
t.Errorf("p.Active(): %d, want 0", p.Active())
}
}

func TestSlowCreateFail(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(SlowFailFactory, 2, 2, time.Second)
p := NewResourcePool(SlowFailFactory, 2, 2, time.Second, 0)
defer p.Close()
ch := make(chan bool)
// The third Get should not wait indefinitely
Expand All @@ -534,7 +612,7 @@ func TestTimeout(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 1, 1, time.Second)
p := NewResourcePool(PoolFactory, 1, 1, time.Second, 0)
defer p.Close()
r, err := p.Get(ctx)
if err != nil {
Expand All @@ -553,7 +631,7 @@ func TestTimeout(t *testing.T) {
func TestExpired(t *testing.T) {
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 1, 1, time.Second)
p := NewResourcePool(PoolFactory, 1, 1, time.Second, 0)
defer p.Close()
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-1*time.Second))
r, err := p.Get(ctx)
Expand Down
4 changes: 2 additions & 2 deletions go/vt/dbconnpool/connection_pool.go
Expand Up @@ -145,7 +145,7 @@ func (cp *ConnectionPool) Open(info *mysql.ConnParams, mysqlStats *stats.Timings
defer cp.mu.Unlock()
cp.info = info
cp.mysqlStats = mysqlStats
cp.connections = pools.NewResourcePool(cp.connect, cp.capacity, cp.capacity, cp.idleTimeout)
cp.connections = pools.NewResourcePool(cp.connect, cp.capacity, cp.capacity, cp.idleTimeout, 0)
// Check if we need to resolve a hostname (The Host is not just an IP address).
if cp.resolutionFrequency > 0 && net.ParseIP(info.Host) == nil {
cp.hostIsNotIP = true
Expand All @@ -156,7 +156,7 @@ func (cp *ConnectionPool) Open(info *mysql.ConnParams, mysqlStats *stats.Timings
defer cp.wg.Done()
for {
select {
case _ = <-cp.ticker.C:
case <-cp.ticker.C:
cp.refreshdns()
case <-cp.stop:
return
Expand Down

0 comments on commit f25720a

Please sign in to comment.