-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
pools: option to prefill #4769
pools: option to prefill #4769
Changes from all commits
ec49035
ba54ddc
1bbe3cc
9cebb40
94a33b4
61655c0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,7 @@ package pools | |
import ( | ||
"errors" | ||
"fmt" | ||
"sync" | ||
"time" | ||
|
||
"golang.org/x/net/context" | ||
|
@@ -36,6 +37,8 @@ var ( | |
|
||
// ErrTimeout is returned if a resource get times out. | ||
ErrTimeout = errors.New("resource pool timed out") | ||
|
||
prefillTimeout = 30 * time.Second | ||
) | ||
|
||
// Factory is a function that can be used to create a resource. | ||
|
@@ -77,9 +80,12 @@ type resourceWrapper struct { | |
// maxCap specifies the extent to which the pool can be resized | ||
// in the future through the SetCapacity function. | ||
// You cannot resize the pool beyond maxCap. | ||
// If a resource is unused beyond idleTimeout, it's discarded. | ||
// If a resource is unused beyond idleTimeout, it's replaced | ||
// with a new one. | ||
// An idleTimeout of 0 means that there is no timeout. | ||
func NewResourcePool(factory Factory, capacity, maxCap int, idleTimeout time.Duration) *ResourcePool { | ||
// A non-zero value of prefillParallism causes the pool to be pre-filled. | ||
// The value specifies how many resources can be opened in parallel. | ||
func NewResourcePool(factory Factory, capacity, maxCap int, idleTimeout time.Duration, prefillParallelism int) *ResourcePool { | ||
if capacity <= 0 || maxCap <= 0 || capacity > maxCap { | ||
panic(errors.New("invalid/out of range capacity")) | ||
} | ||
|
@@ -94,6 +100,35 @@ func NewResourcePool(factory Factory, capacity, maxCap int, idleTimeout time.Dur | |
rp.resources <- resourceWrapper{} | ||
} | ||
|
||
ctx, cancel := context.WithTimeout(context.TODO(), prefillTimeout) | ||
defer cancel() | ||
if prefillParallelism != 0 { | ||
sem := sync2.NewSemaphore(prefillParallelism, 0 /* timeout */) | ||
var wg sync.WaitGroup | ||
for i := 0; i < capacity; i++ { | ||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
_ = sem.Acquire() | ||
defer sem.Release() | ||
|
||
// If context has expired, give up. | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
default: | ||
} | ||
|
||
r, err := rp.Get(ctx) | ||
if err != nil { | ||
return | ||
} | ||
rp.Put(r) | ||
}() | ||
} | ||
wg.Wait() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it always necessary to wait for all the prefill actions to complete here? I am somewhat reticent to add a ton of optionality here, but especially given the unbounded There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a good point. I'm not sure what the best approach would be. The problem with doing it asynchronously is that we'll get into trouble if that function hangs. Then we'll have to protect it from races with I'm thinking we should let people try this and observe failure modes. That may give us better clarity on the best way forward. I've added log messages in pool.go so we can collect some data about this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the one part of this change that still worries me a bit -- as written this could potentially block startup forever, which seems like a bad idea. It would also be efficient to parallelize this with the other startup tasks. At the same time if users really want to make sure the connection pool is prewarmed before serving queries... then that seems like the thing we should give them. All in all though... I wonder if adding a bounded timeout of something like 30 seconds by default would be sufficient here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That seems better than waiting potentially forever. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've added a 30 second timeout. for prefilling the pool. We'll see how that works out. |
||
} | ||
|
||
if idleTimeout != 0 { | ||
rp.idleTimer = timer.NewTimer(idleTimeout / 10) | ||
rp.idleTimer.Start(rp.closeIdleResources) | ||
|
@@ -131,14 +166,16 @@ func (rp *ResourcePool) closeIdleResources() { | |
return | ||
} | ||
|
||
if wrapper.resource != nil && idleTimeout > 0 && time.Until(wrapper.timeUsed.Add(idleTimeout)) < 0 { | ||
wrapper.resource.Close() | ||
wrapper.resource = nil | ||
rp.idleClosed.Add(1) | ||
rp.active.Add(-1) | ||
} | ||
func() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Was there some reason this needed to be wrapped in a function and not just included inline like it was before? I find the extra layers of abstraction and the defer of putting the wrapper back into the resource pool to be more confusing this way. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's necessary for panic-proofing. If There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah that explains it 👍 |
||
defer func() { rp.resources <- wrapper }() | ||
|
||
if wrapper.resource != nil && idleTimeout > 0 && time.Until(wrapper.timeUsed.Add(idleTimeout)) < 0 { | ||
wrapper.resource.Close() | ||
rp.idleClosed.Add(1) | ||
rp.reopenResource(&wrapper) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if a cleaner implementation that would obviate the need for the manually refactored This would require actually keeping the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I personally prefer the current approach because I've always been uncomfortable with the complexity of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK -- what if instead we didn't reintroduce Then we use ErrTimeout to indicate that we've gone through all we need to, instead of the "stop early" case above. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The main thing I don't like about this approach (both before and after your change) is that it duplicates a lot of the logic related to the stats and reopening the resource, etc. If we use the existing get() / put() interface then There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (I actually wish I had done that originally tbh) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another related point which just came up in an internal discussion -- it would be nice to have vitess implement a clean mysql shutdown flow for the mysql protocol connections. Currently our logs are filled up with things like: That's because Close() simply shuts down the underlying tcp socket. I think instead we could bound the clean shutdown handshake by a relatively short context deadline (say 1-2 seconds) after which we summarily close the socket anyway. That, to me, biases again for using the regular Get/Put interface for this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It turns out that we can't use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we merge this in and do the refactor later if we still want it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it'd be worthwhile to change |
||
} | ||
}() | ||
|
||
rp.resources <- wrapper | ||
} | ||
} | ||
|
||
|
@@ -153,10 +190,10 @@ func (rp *ResourcePool) Get(ctx context.Context) (resource Resource, err error) | |
span.Annotate("available", rp.available.Get()) | ||
span.Annotate("active", rp.active.Get()) | ||
defer span.Finish() | ||
return rp.get(ctx, true) | ||
return rp.get(ctx) | ||
} | ||
|
||
func (rp *ResourcePool) get(ctx context.Context, wait bool) (resource Resource, err error) { | ||
func (rp *ResourcePool) get(ctx context.Context) (resource Resource, err error) { | ||
// If ctx has already expired, avoid racing with rp's resource channel. | ||
select { | ||
case <-ctx.Done(): | ||
|
@@ -170,9 +207,6 @@ func (rp *ResourcePool) get(ctx context.Context, wait bool) (resource Resource, | |
select { | ||
case wrapper, ok = <-rp.resources: | ||
default: | ||
if !wait { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Was this intended? Or is is not used anywhere? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I answered my own question. It was always true. |
||
return nil, nil | ||
} | ||
startTime := time.Now() | ||
select { | ||
case wrapper, ok = <-rp.resources: | ||
|
@@ -204,13 +238,16 @@ func (rp *ResourcePool) get(ctx context.Context, wait bool) (resource Resource, | |
// Put will return a resource to the pool. For every successful Get, | ||
// a corresponding Put is required. If you no longer need a resource, | ||
// you will need to call Put(nil) instead of returning the closed resource. | ||
// The will eventually cause a new resource to be created in its place. | ||
// This will cause a new resource to be created in its place. | ||
func (rp *ResourcePool) Put(resource Resource) { | ||
var wrapper resourceWrapper | ||
if resource != nil { | ||
wrapper = resourceWrapper{resource, time.Now()} | ||
wrapper = resourceWrapper{ | ||
resource: resource, | ||
timeUsed: time.Now(), | ||
} | ||
} else { | ||
rp.active.Add(-1) | ||
rp.reopenResource(&wrapper) | ||
} | ||
select { | ||
case rp.resources <- wrapper: | ||
|
@@ -221,6 +258,16 @@ func (rp *ResourcePool) Put(resource Resource) { | |
rp.available.Add(1) | ||
} | ||
|
||
func (rp *ResourcePool) reopenResource(wrapper *resourceWrapper) { | ||
if r, err := rp.factory(); err == nil { | ||
wrapper.resource = r | ||
wrapper.timeUsed = time.Now() | ||
} else { | ||
wrapper.resource = nil | ||
rp.active.Add(-1) | ||
} | ||
} | ||
|
||
// SetCapacity changes the capacity of the pool. | ||
// You can use it to shrink or expand, but not beyond | ||
// the max capacity. If the change requires the pool | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -57,7 +57,7 @@ func TestOpen(t *testing.T) { | |
ctx := context.Background() | ||
lastID.Set(0) | ||
count.Set(0) | ||
p := NewResourcePool(PoolFactory, 6, 6, time.Second) | ||
p := NewResourcePool(PoolFactory, 6, 6, time.Second, 0) | ||
p.SetCapacity(5) | ||
var resources [10]Resource | ||
|
||
|
@@ -122,9 +122,10 @@ func TestOpen(t *testing.T) { | |
t.Errorf("Unexpected error %v", err) | ||
} | ||
r.Close() | ||
// A nil Put should cause the resource to be reopened. | ||
p.Put(nil) | ||
if count.Get() != 4 { | ||
t.Errorf("Expecting 4, received %d", count.Get()) | ||
if count.Get() != 5 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The error message below doesn't match up with this expectation... |
||
t.Errorf("Expecting 5, received %d", count.Get()) | ||
} | ||
for i := 0; i < 5; i++ { | ||
r, err := p.Get(ctx) | ||
|
@@ -194,11 +195,44 @@ func TestOpen(t *testing.T) { | |
} | ||
} | ||
|
||
func TestPrefill(t *testing.T) { | ||
lastID.Set(0) | ||
count.Set(0) | ||
p := NewResourcePool(PoolFactory, 5, 5, time.Second, 1) | ||
defer p.Close() | ||
if p.Active() != 5 { | ||
t.Errorf("p.Active(): %d, want 5", p.Active()) | ||
} | ||
p = NewResourcePool(FailFactory, 5, 5, time.Second, 1) | ||
defer p.Close() | ||
if p.Active() != 0 { | ||
t.Errorf("p.Active(): %d, want 0", p.Active()) | ||
} | ||
} | ||
|
||
func TestPrefillTimeout(t *testing.T) { | ||
lastID.Set(0) | ||
count.Set(0) | ||
saveTimeout := prefillTimeout | ||
prefillTimeout = 1 * time.Millisecond | ||
defer func() { prefillTimeout = saveTimeout }() | ||
|
||
start := time.Now() | ||
p := NewResourcePool(SlowFailFactory, 5, 5, time.Second, 1) | ||
defer p.Close() | ||
if elapsed := time.Since(start); elapsed > 20*time.Millisecond { | ||
t.Errorf("elapsed: %v, should be around 10ms", elapsed) | ||
} | ||
if p.Active() != 0 { | ||
t.Errorf("p.Active(): %d, want 0", p.Active()) | ||
} | ||
} | ||
|
||
func TestShrinking(t *testing.T) { | ||
ctx := context.Background() | ||
lastID.Set(0) | ||
count.Set(0) | ||
p := NewResourcePool(PoolFactory, 5, 5, time.Second) | ||
p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0) | ||
var resources [10]Resource | ||
// Leave one empty slot in the pool | ||
for i := 0; i < 4; i++ { | ||
|
@@ -337,7 +371,7 @@ func TestClosing(t *testing.T) { | |
ctx := context.Background() | ||
lastID.Set(0) | ||
count.Set(0) | ||
p := NewResourcePool(PoolFactory, 5, 5, time.Second) | ||
p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0) | ||
var resources [10]Resource | ||
for i := 0; i < 5; i++ { | ||
r, err := p.Get(ctx) | ||
|
@@ -391,7 +425,7 @@ func TestIdleTimeout(t *testing.T) { | |
ctx := context.Background() | ||
lastID.Set(0) | ||
count.Set(0) | ||
p := NewResourcePool(PoolFactory, 1, 1, 10*time.Millisecond) | ||
p := NewResourcePool(PoolFactory, 1, 1, 10*time.Millisecond, 0) | ||
defer p.Close() | ||
|
||
r, err := p.Get(ctx) | ||
|
@@ -414,10 +448,10 @@ func TestIdleTimeout(t *testing.T) { | |
if p.IdleClosed() != 0 { | ||
t.Errorf("Expecting 0, received %d", p.IdleClosed()) | ||
} | ||
time.Sleep(20 * time.Millisecond) | ||
time.Sleep(15 * time.Millisecond) | ||
|
||
if count.Get() != 0 { | ||
t.Errorf("Expecting 0, received %d", count.Get()) | ||
if count.Get() != 1 { | ||
t.Errorf("Expecting 1, received %d", count.Get()) | ||
} | ||
if p.IdleClosed() != 1 { | ||
t.Errorf("Expecting 1, received %d", p.IdleClosed()) | ||
|
@@ -438,7 +472,7 @@ func TestIdleTimeout(t *testing.T) { | |
|
||
// sleep to let the idle closer run while all resources are in use | ||
// then make sure things are still as we expect | ||
time.Sleep(20 * time.Millisecond) | ||
time.Sleep(15 * time.Millisecond) | ||
if lastID.Get() != 2 { | ||
t.Errorf("Expecting 2, received %d", count.Get()) | ||
} | ||
|
@@ -468,7 +502,7 @@ func TestIdleTimeout(t *testing.T) { | |
p.SetIdleTimeout(1000 * time.Millisecond) | ||
p.Put(r) | ||
|
||
time.Sleep(20 * time.Millisecond) | ||
time.Sleep(15 * time.Millisecond) | ||
if lastID.Get() != 2 { | ||
t.Errorf("Expecting 2, received %d", count.Get()) | ||
} | ||
|
@@ -479,24 +513,51 @@ func TestIdleTimeout(t *testing.T) { | |
t.Errorf("Expecting 1, received %d", p.IdleClosed()) | ||
} | ||
|
||
// Get and Put to refresh timeUsed | ||
r, err = p.Get(ctx) | ||
if err != nil { | ||
t.Errorf("Unexpected error %v", err) | ||
} | ||
p.Put(r) | ||
p.SetIdleTimeout(10 * time.Millisecond) | ||
time.Sleep(20 * time.Millisecond) | ||
if lastID.Get() != 2 { | ||
t.Errorf("Expecting 2, received %d", count.Get()) | ||
time.Sleep(15 * time.Millisecond) | ||
if lastID.Get() != 3 { | ||
t.Errorf("Expecting 3, received %d", lastID.Get()) | ||
} | ||
if count.Get() != 0 { | ||
if count.Get() != 1 { | ||
t.Errorf("Expecting 1, received %d", count.Get()) | ||
} | ||
if p.IdleClosed() != 2 { | ||
t.Errorf("Expecting 2, received %d", p.IdleClosed()) | ||
} | ||
} | ||
|
||
func TestIdleTimeoutCreateFail(t *testing.T) { | ||
ctx := context.Background() | ||
lastID.Set(0) | ||
count.Set(0) | ||
p := NewResourcePool(PoolFactory, 1, 1, 10*time.Millisecond, 0) | ||
defer p.Close() | ||
r, err := p.Get(ctx) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
// Change the factory before putting back | ||
// to prevent race with the idle closer, who will | ||
// try to use it. | ||
p.factory = FailFactory | ||
p.Put(r) | ||
time.Sleep(15 * time.Millisecond) | ||
if p.Active() != 0 { | ||
t.Errorf("p.Active(): %d, want 0", p.Active()) | ||
} | ||
} | ||
|
||
func TestCreateFail(t *testing.T) { | ||
ctx := context.Background() | ||
lastID.Set(0) | ||
count.Set(0) | ||
p := NewResourcePool(FailFactory, 5, 5, time.Second) | ||
p := NewResourcePool(FailFactory, 5, 5, time.Second, 0) | ||
defer p.Close() | ||
if _, err := p.Get(ctx); err.Error() != "Failed" { | ||
t.Errorf("Expecting Failed, received %v", err) | ||
|
@@ -508,11 +569,28 @@ func TestCreateFail(t *testing.T) { | |
} | ||
} | ||
|
||
func TestCreateFailOnPut(t *testing.T) { | ||
ctx := context.Background() | ||
lastID.Set(0) | ||
count.Set(0) | ||
p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0) | ||
defer p.Close() | ||
_, err := p.Get(ctx) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
p.factory = FailFactory | ||
p.Put(nil) | ||
if p.Active() != 0 { | ||
t.Errorf("p.Active(): %d, want 0", p.Active()) | ||
} | ||
} | ||
|
||
func TestSlowCreateFail(t *testing.T) { | ||
ctx := context.Background() | ||
lastID.Set(0) | ||
count.Set(0) | ||
p := NewResourcePool(SlowFailFactory, 2, 2, time.Second) | ||
p := NewResourcePool(SlowFailFactory, 2, 2, time.Second, 0) | ||
defer p.Close() | ||
ch := make(chan bool) | ||
// The third Get should not wait indefinitely | ||
|
@@ -534,7 +612,7 @@ func TestTimeout(t *testing.T) { | |
ctx := context.Background() | ||
lastID.Set(0) | ||
count.Set(0) | ||
p := NewResourcePool(PoolFactory, 1, 1, time.Second) | ||
p := NewResourcePool(PoolFactory, 1, 1, time.Second, 0) | ||
defer p.Close() | ||
r, err := p.Get(ctx) | ||
if err != nil { | ||
|
@@ -553,7 +631,7 @@ func TestTimeout(t *testing.T) { | |
func TestExpired(t *testing.T) { | ||
lastID.Set(0) | ||
count.Set(0) | ||
p := NewResourcePool(PoolFactory, 1, 1, time.Second) | ||
p := NewResourcePool(PoolFactory, 1, 1, time.Second, 0) | ||
defer p.Close() | ||
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-1*time.Second)) | ||
r, err := p.Get(ctx) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May not be enough based on our testing. On a cold start opening 600 connections takes about ~38s.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So maybe make this configurable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤷♂️ Yet another flag. I'll begrudgingly add it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this is getting to be a lot, but I just don't see how we can make a one-size-fits-all solution here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say let's go with 30 seconds for now. If it actually becomes a problem in real life then we can address it by making it configurable then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🎉