Skip to content

Commit

Permalink
Refactored pool. Struct fields are a little more verbose so it's easi…
Browse files Browse the repository at this point in the history
…er to handle. Purge function will exclude errored connections from the pool.
  • Loading branch information
jpiontek committed Jul 20, 2016
1 parent fc889e2 commit 9be1d28
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 19 deletions.
25 changes: 16 additions & 9 deletions pool.go
Expand Up @@ -18,8 +18,8 @@ type Pool struct {

// PooledConnection represents a shared and reusable connection.
type PooledConnection struct {
p *Pool
c *Client
Pool *Pool
Client *Client
}

type idleConnection struct {
Expand Down Expand Up @@ -47,7 +47,7 @@ func (p *Pool) Get() (*PooledConnection, error) {
p.idle = append(p.idle[:0], p.idle[1:]...)
p.active++
p.mu.Unlock()
pc := &PooledConnection{p: p, c: conn.pc.c}
pc := &PooledConnection{Pool: p, Client: conn.pc.Client}
return pc, nil

}
Expand All @@ -69,7 +69,7 @@ func (p *Pool) Get() (*PooledConnection, error) {
return nil, err
}

pc := &PooledConnection{p: p, c: dc}
pc := &PooledConnection{Pool: p, Client: dc}
return pc, nil
}

Expand Down Expand Up @@ -98,8 +98,16 @@ func (p *Pool) purge() {
var valid []*idleConnection
now := time.Now()
for _, v := range p.idle {
// If the client has an error then exclude it from the pool
if v.pc.Client.Errored {
continue
}

if v.t.Add(timeout).After(now) {
valid = append(valid, v)
} else {
// Force underlying connection closed
v.pc.Client.Close()
}
}
p.idle = valid
Expand All @@ -126,10 +134,9 @@ func (p *Pool) first() *idleConnection {
// Close signals that the caller is finished with the connection and should be
// returned to the pool for future use.
func (pc *PooledConnection) Close() {
pc.p.mu.Lock()
defer pc.p.mu.Unlock()

pc.p.put(pc)
pc.p.release()
pc.Pool.mu.Lock()
defer pc.Pool.mu.Unlock()

pc.Pool.put(pc)
pc.Pool.release()
}
57 changes: 47 additions & 10 deletions pool_test.go
Expand Up @@ -9,9 +9,9 @@ func TestPurge(t *testing.T) {
n := time.Now()

// invalid has timedout and should be cleaned up
invalid := &idleConnection{t: n.Add(-30 * time.Second)}
invalid := &idleConnection{t: n.Add(-30 * time.Second), pc: &PooledConnection{Client: &Client{}}}
// valid has not yet timed out and should remain in the idle pool
valid := &idleConnection{t: n.Add(30 * time.Second)}
valid := &idleConnection{t: n.Add(30 * time.Second), pc: &PooledConnection{Client: &Client{}}}

// Pool has a 30 second timeout and an idle connection slice containing both
// the invalid and valid idle connections
Expand All @@ -33,9 +33,42 @@ func TestPurge(t *testing.T) {

}

func TestPurgeErrorClosedConnection(t *testing.T) {
n := time.Now()

p := &Pool{IdleTimeout: time.Second * 30}

valid := &idleConnection{t: n.Add(30 * time.Second), pc: &PooledConnection{Client: &Client{}}}

client := &Client{}

closed := &idleConnection{t: n.Add(30 * time.Second), pc: &PooledConnection{Pool: p, Client: client}}

idle := []*idleConnection{valid, closed}

p.idle = idle

// Simulate error
closed.pc.Client.Errored = true

if len(p.idle) != 2 {
t.Errorf("Expected 2 idle connections, got %d", len(p.idle))
}

p.purge()

if len(p.idle) != 1 {
t.Errorf("Expected 1 idle connection after purge, got %d", len(p.idle))
}

if p.idle[0] != valid {
t.Error("Expected valid connection to remain in pool")
}
}

func TestPooledConnectionClose(t *testing.T) {
pool := &Pool{}
pc := &PooledConnection{p: pool}
pc := &PooledConnection{Pool: pool}

if len(pool.idle) != 0 {
t.Errorf("Expected 0 idle connection, got %d", len(pool.idle))
Expand All @@ -62,9 +95,9 @@ func TestFirst(t *testing.T) {
n := time.Now()
pool := &Pool{MaxActive: 1, IdleTimeout: 30 * time.Second}
idled := []*idleConnection{
&idleConnection{pc: &PooledConnection{p: pool}, t: n.Add(-45 * time.Second)}, // expired
&idleConnection{pc: &PooledConnection{p: pool}, t: n.Add(-45 * time.Second)}, // expired
&idleConnection{pc: &PooledConnection{p: pool}, t: n}, // valid
&idleConnection{t: n.Add(-45 * time.Second), pc: &PooledConnection{Pool: pool, Client: &Client{}}}, // expired
&idleConnection{t: n.Add(-45 * time.Second), pc: &PooledConnection{Pool: pool, Client: &Client{}}}, // expired
&idleConnection{pc: &PooledConnection{Pool: pool, Client: &Client{}}}, // valid
}
pool.idle = idled

Expand All @@ -91,9 +124,13 @@ func TestFirst(t *testing.T) {

func TestGetAndDial(t *testing.T) {
n := time.Now()
invalid := &idleConnection{t: n.Add(-30 * time.Second)}

pool := &Pool{IdleTimeout: time.Second * 30, idle: []*idleConnection{invalid}}
pool := &Pool{IdleTimeout: time.Second * 30}

invalid := &idleConnection{t: n.Add(-30 * time.Second), pc: &PooledConnection{Pool: pool, Client: &Client{}}}

idle := []*idleConnection{invalid}
pool.idle = idle

client := &Client{}
pool.Dial = func() (*Client, error) {
Expand All @@ -118,7 +155,7 @@ func TestGetAndDial(t *testing.T) {
t.Errorf("Expected 0 idle connections, got %d", len(pool.idle))
}

if conn.c != client {
if conn.Client != client {
t.Error("Expected correct client to be returned")
}

Expand All @@ -144,7 +181,7 @@ func TestGetAndDial(t *testing.T) {
t.Error(err)
}

if conn.c != client {
if conn.Client != client {
t.Error("Expected the same connection to be reused")
}

Expand Down

0 comments on commit 9be1d28

Please sign in to comment.