Skip to content

Commit

Permalink
feat: improve error messages
Browse files Browse the repository at this point in the history
  • Loading branch information
rueian committed May 24, 2022
1 parent b35181e commit 2f0d7d2
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 30 deletions.
30 changes: 14 additions & 16 deletions mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

type connFn func(dst string, opt *ClientOption) conn
type dialFn func(dst string, opt *ClientOption) (net.Conn, error)
type wireFn func() (wire, error)
type wireFn func() wire

type singleconnect struct {
w wire
Expand Down Expand Up @@ -43,19 +43,24 @@ type mux struct {
}

func makeMux(dst string, option *ClientOption, dialFn dialFn) *mux {
return newMux(dst, option, (*pipe)(nil), dead, func() (w wire, err error) {
dead := deadFn()
return newMux(dst, option, (*pipe)(nil), dead, func() (w wire) {
conn, err := dialFn(dst, option)
if err == nil {
w, err = newPipe(conn, option)
}
return w, err
if err != nil {
dead.error.Store(&errs{error: err})
w = dead
}
return w
})
}

func newMux(dst string, option *ClientOption, init, dead wire, wireFn wireFn) *mux {
m := &mux{dst: dst, init: init, dead: dead, wireFn: wireFn}
m.wire.Store(init)
m.pool = newPool(option.BlockingPoolSize, dead, m._newPooledWire)
m.pool = newPool(option.BlockingPoolSize, dead, wireFn)
return m
}

Expand All @@ -65,18 +70,9 @@ func (m *mux) Override(cc conn) {
}
}

func (m *mux) _newPooledWire() wire {
if w, err := m.wireFn(); err == nil {
return w
}
return m.dead
}

func (m *mux) pipe() wire {
if w, err := m._pipe(); err == nil {
return w
}
return m.dead
w, _ := m._pipe()
return w
}

func (m *mux) _pipe() (w wire, err error) {
Expand All @@ -98,8 +94,10 @@ func (m *mux) _pipe() (w wire, err error) {
}

if w = m.wire.Load().(wire); w == m.init {
if w, err = m.wireFn(); err == nil {
if w = m.wireFn(); w != m.dead {
m.wire.Store(w)
} else {
err = w.Error()
}
}

Expand Down
8 changes: 4 additions & 4 deletions mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ import (
func setupMux(wires []*mockWire) (conn *mux, checkClean func(t *testing.T)) {
var mu sync.Mutex
var count = -1
return newMux("", &ClientOption{}, (*mockWire)(nil), (*mockWire)(nil), func() (wire, error) {
return newMux("", &ClientOption{}, (*mockWire)(nil), (*mockWire)(nil), func() wire {
mu.Lock()
defer mu.Unlock()
count++
return wires[count], nil
return wires[count]
}), func(t *testing.T) {
if count != len(wires)-1 {
t.Fatalf("there is %d remaining unused wires", len(wires)-count-1)
Expand Down Expand Up @@ -104,10 +104,10 @@ func TestMuxIs(t *testing.T) {
func TestMuxDialSuppress(t *testing.T) {
var wires, waits, done int64
blocking := make(chan struct{})
m := newMux("", &ClientOption{}, (*mockWire)(nil), (*mockWire)(nil), func() (wire, error) {
m := newMux("", &ClientOption{}, (*mockWire)(nil), (*mockWire)(nil), func() wire {
atomic.AddInt64(&wires, 1)
<-blocking
return &mockWire{}, nil
return &mockWire{}
})
for i := 0; i < 1000; i++ {
go func() {
Expand Down
9 changes: 4 additions & 5 deletions pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (p *pipe) _background() {
)

// clean up cache and free pending calls
p.cache.FreeAndClose(RedisMessage{typ: '-', string: ErrClosing.Error()})
p.cache.FreeAndClose(RedisMessage{typ: '-', string: p.Error().Error()})
for atomic.LoadInt32(&p.waits) != 0 {
if ones[0], multi, ch = p.queue.NextWriteCmd(); ch != nil {
if multi == nil {
Expand Down Expand Up @@ -678,11 +678,10 @@ func (p *pipe) Close() {
atomic.AddInt32(&p.waits, -1)
}

var dead *pipe

func init() {
dead = &pipe{state: 3}
func deadFn() *pipe {
dead := &pipe{state: 3}
dead.error.Store(errClosing)
return dead
}

const (
Expand Down
10 changes: 5 additions & 5 deletions pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1276,19 +1276,19 @@ func TestPingOnConnError(t *testing.T) {

func TestDeadPipe(t *testing.T) {
ctx := context.Background()
if err := dead.Error(); err != ErrClosing {
if err := deadFn().Error(); err != ErrClosing {
t.Fatalf("unexpected err %v", err)
}
if err := dead.Do(ctx, cmds.NewCompleted(nil)).Error(); err != ErrClosing {
if err := deadFn().Do(ctx, cmds.NewCompleted(nil)).Error(); err != ErrClosing {
t.Fatalf("unexpected err %v", err)
}
if err := dead.DoMulti(ctx, cmds.NewCompleted(nil))[0].Error(); err != ErrClosing {
if err := deadFn().DoMulti(ctx, cmds.NewCompleted(nil))[0].Error(); err != ErrClosing {
t.Fatalf("unexpected err %v", err)
}
if err := dead.DoCache(ctx, cmds.Cacheable(cmds.NewCompleted(nil)), time.Second).Error(); err != ErrClosing {
if err := deadFn().DoCache(ctx, cmds.Cacheable(cmds.NewCompleted(nil)), time.Second).Error(); err != ErrClosing {
t.Fatalf("unexpected err %v", err)
}
if err := dead.Receive(ctx, cmds.NewCompleted(nil), func(message PubSubMessage) {}); err != ErrClosing {
if err := deadFn().Receive(ctx, cmds.NewCompleted(nil), func(message PubSubMessage) {}); err != ErrClosing {
t.Fatalf("unexpected err %v", err)
}
}
2 changes: 2 additions & 0 deletions pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"testing"
)

var dead = deadFn()

//gocyclo:ignore
func TestPool(t *testing.T) {
setup := func(size int) (*pool, *int32) {
Expand Down

0 comments on commit 2f0d7d2

Please sign in to comment.