Skip to content

Commit

Permalink
Cleanup.
Browse files Browse the repository at this point in the history
  • Loading branch information
vmihailenco committed Jun 28, 2014
1 parent 72a6069 commit 678b8b3
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 50 deletions.
2 changes: 1 addition & 1 deletion multi.go
Expand Up @@ -17,7 +17,7 @@ func (c *Client) Multi() *Multi {
Client: &Client{
baseClient: &baseClient{
opt: c.opt,
connPool: newSingleConnPool(c.connPool, nil, true),
connPool: newSingleConnPool(c.connPool, true),
},
},
}
Expand Down
2 changes: 1 addition & 1 deletion parser.go
Expand Up @@ -17,7 +17,7 @@ var (

//------------------------------------------------------------------------------

func appendCmd(buf []byte, args []string) []byte {
func appendArgs(buf []byte, args []string) []byte {
buf = append(buf, '*')
buf = strconv.AppendUint(buf, uint64(len(args)), 10)
buf = append(buf, '\r', '\n')
Expand Down
65 changes: 36 additions & 29 deletions pool.go
Expand Up @@ -162,7 +162,7 @@ func (p *connPool) Get() (*conn, bool, error) {
}

if p.conns.Len() < p.opt.PoolSize {
cn, err := p.dial()
cn, err := p.new()
if err != nil {
p.cond.L.Unlock()
return nil, false, err
Expand Down Expand Up @@ -277,60 +277,68 @@ func (p *connPool) Close() error {
type singleConnPool struct {
pool pool

l sync.RWMutex
cn *conn
cnMtx sync.Mutex
cn *conn

reusable bool

closed bool
}

func newSingleConnPool(pool pool, cn *conn, reusable bool) *singleConnPool {
func newSingleConnPool(pool pool, reusable bool) *singleConnPool {
return &singleConnPool{
pool: pool,
cn: cn,
reusable: reusable,
}
}

func (p *singleConnPool) SetConn(cn *conn) {
p.cnMtx.Lock()
p.cn = cn
p.cnMtx.Unlock()
}

func (p *singleConnPool) Get() (*conn, bool, error) {
p.l.RLock()
defer p.cnMtx.Unlock()
p.cnMtx.Lock()

if p.closed {
p.l.RUnlock()
return nil, false, errClosed
}
if p.cn != nil {
p.l.RUnlock()
return p.cn, false, nil
}
p.l.RUnlock()

p.l.Lock()
cn, isNew, err := p.pool.Get()
if err != nil {
p.l.Unlock()
return nil, false, err
}
p.cn = cn
p.l.Unlock()
return cn, isNew, nil

return p.cn, isNew, nil
}

func (p *singleConnPool) Put(cn *conn) error {
p.l.Lock()
defer p.cnMtx.Unlock()
p.cnMtx.Lock()
if p.cn != cn {
panic("p.cn != cn")
}
if p.closed {
p.l.Unlock()
return errClosed
}
p.l.Unlock()
return nil
}

func (p *singleConnPool) put() error {
err := p.pool.Put(p.cn)
p.cn = nil
return err
}

func (p *singleConnPool) Remove(cn *conn) error {
defer p.l.Unlock()
p.l.Lock()
defer p.cnMtx.Unlock()
p.cnMtx.Lock()
if p.cn == nil {
panic("p.cn == nil")
}
Expand All @@ -350,48 +358,47 @@ func (p *singleConnPool) remove() error {
}

func (p *singleConnPool) Len() int {
defer p.l.Unlock()
p.l.Lock()
defer p.cnMtx.Unlock()
p.cnMtx.Lock()
if p.cn == nil {
return 0
}
return 1
}

func (p *singleConnPool) Size() int {
defer p.l.Unlock()
p.l.Lock()
defer p.cnMtx.Unlock()
p.cnMtx.Lock()
if p.cn == nil {
return 0
}
return 1
}

func (p *singleConnPool) Filter(f func(*conn) bool) {
p.l.Lock()
p.cnMtx.Lock()
if p.cn != nil {
if !f(p.cn) {
p.remove()
}
}
p.l.Unlock()
p.cnMtx.Unlock()
}

func (p *singleConnPool) Close() error {
defer p.l.Unlock()
p.l.Lock()
defer p.cnMtx.Unlock()
p.cnMtx.Lock()
if p.closed {
return nil
}
p.closed = true
var err error
if p.cn != nil {
if p.reusable {
err = p.pool.Put(p.cn)
err = p.put()
} else {
err = p.pool.Remove(p.cn)
err = p.remove()
}
}
p.cn = nil
return err
}
2 changes: 1 addition & 1 deletion pubsub.go
Expand Up @@ -14,7 +14,7 @@ func (c *Client) PubSub() *PubSub {
return &PubSub{
baseClient: &baseClient{
opt: c.opt,
connPool: newSingleConnPool(c.connPool, nil, false),
connPool: newSingleConnPool(c.connPool, false),
},
}
}
Expand Down
41 changes: 24 additions & 17 deletions redis.go
Expand Up @@ -14,9 +14,9 @@ type baseClient struct {
}

func (c *baseClient) writeCmd(cn *conn, cmds ...Cmder) error {
buf := make([]byte, 0, 1000)
buf := make([]byte, 0, 64)
for _, cmd := range cmds {
buf = appendCmd(buf, cmd.args())
buf = appendArgs(buf, cmd.args())
}

_, err := cn.Write(buf)
Expand All @@ -29,8 +29,8 @@ func (c *baseClient) conn() (*conn, error) {
return nil, err
}

if isNew && (c.opt.Password != "" || c.opt.DB > 0) {
if err = c.init(cn, c.opt.Password, c.opt.DB); err != nil {
if isNew {
if err := c.initConn(cn); err != nil {
c.removeConn(cn)
return nil, err
}
Expand All @@ -39,26 +39,31 @@ func (c *baseClient) conn() (*conn, error) {
return cn, nil
}

func (c *baseClient) init(cn *conn, password string, db int64) error {
// Client is not closed on purpose.
func (c *baseClient) initConn(cn *conn) error {
if c.opt.Password == "" || c.opt.DB == 0 {

This comment has been minimized.

Copy link
@bsm

bsm Jul 7, 2014

So this means that DB setting is ignored when no password is set.

This comment has been minimized.

Copy link
@vmihailenco

vmihailenco Jul 8, 2014

Author Collaborator

Oops.

return nil
}

pool := newSingleConnPool(c.connPool, false)
pool.SetConn(cn)

// Client is not closed because we want to reuse underlying connection.
client := &Client{
baseClient: &baseClient{
opt: c.opt,
connPool: newSingleConnPool(c.connPool, cn, false),
connPool: pool,
},
}

if password != "" {
auth := client.Auth(password)
if auth.Err() != nil {
return auth.Err()
if c.opt.Password != "" {
if err := client.Auth(c.opt.Password).Err(); err != nil {
return err
}
}

if db > 0 {
sel := client.Select(db)
if sel.Err() != nil {
return sel.Err()
if c.opt.DB > 0 {
if err := client.Select(c.opt.DB).Err(); err != nil {
return err
}
}

Expand Down Expand Up @@ -102,14 +107,16 @@ func (c *baseClient) run(cmd Cmder) {
return
}

cn.writeTimeout = c.opt.WriteTimeout
if timeout := cmd.writeTimeout(); timeout != nil {
cn.writeTimeout = *timeout
} else {
cn.writeTimeout = c.opt.WriteTimeout
}

cn.readTimeout = c.opt.ReadTimeout
if timeout := cmd.readTimeout(); timeout != nil {
cn.readTimeout = *timeout
} else {
cn.readTimeout = c.opt.ReadTimeout
}

if err := c.writeCmd(cn, cmd); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion sentinel.go
Expand Up @@ -94,7 +94,7 @@ func (c *sentinelClient) PubSub() *PubSub {
return &PubSub{
baseClient: &baseClient{
opt: c.opt,
connPool: newSingleConnPool(c.connPool, nil, false),
connPool: newSingleConnPool(c.connPool, false),
},
}
}
Expand Down

0 comments on commit 678b8b3

Please sign in to comment.