Skip to content

Commit

Permalink
Use circular queue for idle connections
Browse files Browse the repository at this point in the history
  • Loading branch information
Jan Dubsky authored and jackc committed Oct 28, 2022
1 parent 021588b commit 2c35738
Show file tree
Hide file tree
Showing 3 changed files with 227 additions and 24 deletions.
56 changes: 56 additions & 0 deletions internal/circ/queue.go
@@ -0,0 +1,56 @@
package circ

type Queue[T any] struct {
arr []T
begin int
len int
}

func NewQueue[T any](capacity int) *Queue[T] {
return &Queue[T]{
// TODO: Do not preallocate whole capacity of Go upstream
// accepts this: https://github.com/golang/go/issues/55978
arr: make([]T, capacity),
}
}

func (q *Queue[T]) Cap() int { return len(q.arr) }
func (q *Queue[T]) Len() int { return q.len }

func (q *Queue[T]) end() int {
e := q.begin + q.len
if l := len(q.arr); e >= l {
e -= l
}

return e
}

func (q *Queue[T]) Enqueue(elem T) {
if q.len == len(q.arr) {
panic("enqueue: queue is full")
}

q.arr[q.end()] = elem
q.len++
}

func (q *Queue[T]) Dequeue() T {
if q.len < 1 {
panic("dequeue: queue is empty")
}

elem := q.arr[q.begin]

// Avoid memory leaks if T is pointer or contains pointers.
var zeroVal T
q.arr[q.begin] = zeroVal

q.len--
q.begin++
if q.begin == len(q.arr) {
q.begin = 0
}

return elem
}
148 changes: 148 additions & 0 deletions internal/circ/queue_test.go
@@ -0,0 +1,148 @@
package circ_test

import (
"fmt"
"io"
"testing"

"github.com/jackc/puddle/v2/internal/circ"
"github.com/stretchr/testify/require"
)

func TestQueue_EnqueueDequeue(t *testing.T) {
r := require.New(t)

q := circ.NewQueue[int](10)
r.Equal(10, q.Cap())

for i := 0; i < 10; i++ {
q.Enqueue(i)
r.Equal(i+1, q.Len())
}

r.Panics(func() { q.Enqueue(10) })
r.Equal(10, q.Len())

for i := 0; i < 10; i++ {
j := q.Dequeue()
r.Equal(i, j)

r.Equal(10-i-1, q.Len())
}

r.Panics(func() { q.Dequeue() })
r.Equal(0, q.Len())
}

func TestQueue_EnqueueDequeueOverflow(t *testing.T) {
r := require.New(t)

q := circ.NewQueue[int](10)
r.Equal(10, q.Cap())

for i := 0; i < 10; i++ {
q.Enqueue(i)
r.Equal(i+1, q.Len())
}

r.Panics(func() { q.Enqueue(10) })
r.Equal(10, q.Len())

for i := 0; i < 5; i++ {
j := q.Dequeue()
r.Equal(i, j)

r.Equal(10-i-1, q.Len())
}

for i := 10; i < 15; i++ {
q.Enqueue(i)
r.Equal(i-5+1, q.Len())
}

for i := 0; i < 10; i++ {
j := q.Dequeue()
r.Equal(i+5, j)

r.Equal(10-i-1, q.Len())
}

r.Panics(func() { q.Dequeue() })
r.Equal(0, q.Len())
}

func BenchmarkArrayAppend(b *testing.B) {
arr := make([]int, 0, b.N)

b.ResetTimer()
for i := 0; i < b.N; i++ {
arr = append(arr, i)
}

// Make sure that the Go compiler doesn't optimize writes above.
b.StopTimer()
for i := 0; i < b.N; i++ {
fmt.Fprintf(io.Discard, "%d\n", arr[i])
}
}

func BenchmarkArrayWrite(b *testing.B) {
arr := make([]int, b.N)

b.ResetTimer()
for i := 0; i < b.N; i++ {
arr[i] = i
}

// Make sure that the Go compiler doesn't optimize writes above.
b.StopTimer()
for i := 0; i < b.N; i++ {
fmt.Fprintf(io.Discard, "%d\n", arr[i])
}
}

func BenchmarkEnqueue(b *testing.B) {
q := circ.NewQueue[int](b.N)

b.ResetTimer()
for i := 0; i < b.N; i++ {
q.Enqueue(i)
}

// Make sure that the Go compiler doesn't optimize writes above.
b.StopTimer()
for i := 0; i < b.N; i++ {
fmt.Fprintf(io.Discard, "%d\n", q.Dequeue())
}
}

func BenchmarkChanWrite(b *testing.B) {
// Chennels are another way how to represent a queue.
ch := make(chan int, b.N)

b.ResetTimer()
for i := 0; i < b.N; i++ {
ch <- i
}

// Make sure that the Go compiler doesn't optimize writes above.
b.StopTimer()
for i := 0; i < b.N; i++ {
fmt.Fprintf(io.Discard, "%d\n", <-ch)
}
}

func BenchmarkDequeue(b *testing.B) {
q := circ.NewQueue[int](b.N)

for i := 0; i < b.N; i++ {
q.Enqueue(i)
}

out := make([]int, b.N)

b.ResetTimer()
for i := 0; i < b.N; i++ {
out[i] = q.Dequeue()
}
}
47 changes: 23 additions & 24 deletions pool.go
Expand Up @@ -7,6 +7,7 @@ import (
"sync/atomic"
"time"

"github.com/jackc/puddle/v2/internal/circ"
"golang.org/x/sync/semaphore"
)

Expand Down Expand Up @@ -120,7 +121,7 @@ type Pool[T any] struct {
destructWG sync.WaitGroup

allResources []*Resource[T]
idleResources []*Resource[T]
idleResources *circ.Queue[*Resource[T]]

constructor Constructor[T]
destructor Destructor[T]
Expand Down Expand Up @@ -154,6 +155,7 @@ func NewPool[T any](config *Config[T]) (*Pool[T], error) {

return &Pool[T]{
acquireSem: semaphore.NewWeighted(int64(config.MaxSize)),
idleResources: circ.NewQueue[*Resource[T]](int(config.MaxSize)),
maxSize: config.MaxSize,
constructor: config.Constructor,
destructor: config.Destructor,
Expand All @@ -176,7 +178,8 @@ func (p *Pool[T]) Close() {
p.closed = true
p.cancelBaseAcquireCtx()

for _, res := range p.idleResources {
for p.idleResources.Len() > 0 {
res := p.idleResources.Dequeue()
p.allResources = removeResource(p.allResources, res)
go p.destructResourceValue(res.value)
}
Expand Down Expand Up @@ -280,15 +283,11 @@ func (p *Pool[T]) Stat() *Stat {
//
// WARNING: Caller of this method must hold the pool mutex!
func (p *Pool[T]) tryAcquireIdleResource() *Resource[T] {
if len(p.idleResources) == 0 {
if p.idleResources.Len() == 0 {
return nil
}

res := p.idleResources[len(p.idleResources)-1]
p.idleResources[len(p.idleResources)-1] = nil // Avoid memory leak
p.idleResources = p.idleResources[:len(p.idleResources)-1]

return res
return p.idleResources.Dequeue()
}

// createNewResource creates a new resource and inserts it into list of pool
Expand Down Expand Up @@ -471,7 +470,7 @@ func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) {

res.value = value
res.status = resourceStatusIdle
p.idleResources = append(p.idleResources, res)
p.idleResources.Enqueue(res)
}()

return nil, ErrNotAvailable
Expand Down Expand Up @@ -501,20 +500,21 @@ func (p *Pool[T]) AcquireAllIdle() []*Resource[T] {

// Some resources from the maxSize limit do not have to exist (i.e. be
// idle).
if diff := cnt - len(p.idleResources); diff > 0 {
if diff := cnt - p.idleResources.Len(); diff > 0 {
p.acquireSem.Release(int64(diff))
cnt = len(p.idleResources)
cnt = p.idleResources.Len()
}

// Resources above cnt might be reserved by the semaphore.
for i := len(p.idleResources) - cnt; i < len(p.idleResources); i++ {
res := p.idleResources[i]
p.idleResources[i] = nil // Avoid memory leak.

// We are not guaranteed that idleResources are empty after this loop.
// But we are guaranteed that all resources remain in idleResources
// after this loop will (1) either be acquired soon (semaphore was
// already acquired for them) or (2) were released after start of this
// function.
for i := 0; i < cnt; i++ {
res := p.idleResources.Dequeue()
res.status = resourceStatusAcquired
resources = append(resources, res)
}
p.idleResources = p.idleResources[:len(p.idleResources)-cnt]

return resources
}
Expand Down Expand Up @@ -555,7 +555,7 @@ func (p *Pool[T]) CreateResource(ctx context.Context) error {
return ErrClosedPool
}
p.allResources = append(p.allResources, res)
p.idleResources = append(p.idleResources, res)
p.idleResources.Enqueue(res)

return nil
}
Expand All @@ -571,12 +571,11 @@ func (p *Pool[T]) Reset() {

p.resetCount++

for i := range p.idleResources {
p.allResources = removeResource(p.allResources, p.idleResources[i])
go p.destructResourceValue(p.idleResources[i].value)
p.idleResources[i] = nil
for p.idleResources.Len() > 0 {
res := p.idleResources.Dequeue()
p.allResources = removeResource(p.allResources, res)
go p.destructResourceValue(res.value)
}
p.idleResources = p.idleResources[0:0]
}

// releaseAcquiredResource returns res to the the pool.
Expand All @@ -591,7 +590,7 @@ func (p *Pool[T]) releaseAcquiredResource(res *Resource[T], lastUsedNano int64)
} else {
res.lastUsedNano = lastUsedNano
res.status = resourceStatusIdle
p.idleResources = append(p.idleResources, res)
p.idleResources.Enqueue(res)
}
}

Expand Down

0 comments on commit 2c35738

Please sign in to comment.