Skip to content

Commit

Permalink
feat: expose DedicatedClient directly via Dedicate()
Browse files Browse the repository at this point in the history
  • Loading branch information
rueian committed May 19, 2022
1 parent 664514b commit c7a7bd6
Show file tree
Hide file tree
Showing 12 changed files with 279 additions and 2 deletions.
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,27 @@ c.Dedicated(func(client client.DedicatedClient) error {
)
return nil
})

```

Or use Dedicate and invoke `cancel()` when finished to put the connection back to the pool.

``` golang
client, cancel := c.Dedicate()
defer cancel()

// watch keys first
client.Do(ctx, client.B().Watch().Key("k1", "k2").Build())
// perform read here
client.Do(ctx, client.B().Mget().Key("k1", "k2").Build())
// perform write with MULTI EXEC
client.DoMulti(
ctx,
client.B().Multi().Build(),
client.B().Set().Key("k1").Value("1").Build(),
client.B().Set().Key("k2").Value("2").Build(),
client.B().Exec().Build(),
)
```

However, occupying a connection is not good in terms of throughput. It is better to use Lua script to perform
Expand Down
5 changes: 5 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ func (c *singleClient) Dedicated(fn func(DedicatedClient) error) (err error) {
return err
}

func (c *singleClient) Dedicate() (DedicatedClient, func()) {
wire := c.conn.Acquire()
return &dedicatedSingleClient{cmd: c.cmd, wire: wire}, func() { c.conn.Store(wire) }
}

func (c *singleClient) Close() {
atomic.StoreUint32(&c.stop, 1)
c.conn.Close()
Expand Down
49 changes: 49 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,55 @@ func TestSingleClient(t *testing.T) {
t.Fatalf("Dedicated desn't put back the wire")
}
})

t.Run("Dedicate Delegate", func(t *testing.T) {
w := &mockWire{
DoFn: func(cmd cmds.Completed) RedisResult {
return newResult(RedisMessage{typ: '+', string: "Delegate"}, nil)
},
DoMultiFn: func(cmd ...cmds.Completed) []RedisResult {
return []RedisResult{newResult(RedisMessage{typ: '+', string: "Delegate"}, nil)}
},
ReceiveFn: func(ctx context.Context, subscribe cmds.Completed, fn func(message PubSubMessage)) error {
return ErrClosing
},
ErrorFn: func() error {
return ErrClosing
},
}
m.AcquireFn = func() wire {
return w
}
stored := false
m.StoreFn = func(ww wire) {
if ww != w {
t.Fatalf("received unexpected wire %v", ww)
}
stored = true
}
c, cancel := client.Dedicate()

if v, err := c.Do(context.Background(), c.B().Get().Key("a").Build()).ToString(); err != nil || v != "Delegate" {
t.Fatalf("unexpected response %v %v", v, err)
}
if v := c.DoMulti(context.Background()); len(v) != 0 {
t.Fatalf("received unexpected response %v", v)
}
for _, resp := range c.DoMulti(context.Background(), c.B().Get().Key("a").Build()) {
if v, err := resp.ToString(); err != nil || v != "Delegate" {
t.Fatalf("unexpected response %v %v", v, err)
}
}
if err := c.Receive(context.Background(), c.B().Subscribe().Channel("a").Build(), func(msg PubSubMessage) {}); err != ErrClosing {
t.Fatalf("unexpected ret %v", err)
}

cancel()

if !stored {
t.Fatalf("Dedicated desn't put back the wire")
}
})
}

func TestSingleClientRetry(t *testing.T) {
Expand Down
7 changes: 7 additions & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,11 @@ func (c *clusterClient) Dedicated(fn func(DedicatedClient) error) (err error) {
return err
}

func (c *clusterClient) Dedicate() (DedicatedClient, func()) {
dcc := &dedicatedClusterClient{cmd: c.cmd, client: c, slot: cmds.NoSlot}
return dcc, func() { dcc.release() }
}

func (c *clusterClient) Close() {
atomic.StoreUint32(&c.stop, 1)
c.mu.RLock()
Expand Down Expand Up @@ -385,6 +390,8 @@ func (c *dedicatedClusterClient) acquire() (wire wire, err error) {
}

func (c *dedicatedClusterClient) release() {
c.mu.Lock()
defer c.mu.Unlock()
if c.wire != nil {
c.conn.Store(c.wire)
}
Expand Down
55 changes: 53 additions & 2 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,10 @@ func TestClusterClient(t *testing.T) {
return []RedisResult{newResult(RedisMessage{typ: '+', string: "Delegate"}, nil)}
},
ReceiveFn: func(ctx context.Context, subscribe cmds.Completed, fn func(message PubSubMessage)) error {
return errors.New("delegated")
return ErrClosing
},
ErrorFn: func() error {
return ErrClosing
},
}
m.AcquireFn = func() wire {
Expand All @@ -308,7 +311,7 @@ func TestClusterClient(t *testing.T) {
t.Fatalf("unexpected response %v %v", v, err)
}
}
if err := c.Receive(context.Background(), c.B().Ssubscribe().Channel("a").Build(), func(msg PubSubMessage) {}); err == nil {
if err := c.Receive(context.Background(), c.B().Ssubscribe().Channel("a").Build(), func(msg PubSubMessage) {}); err != ErrClosing {
t.Fatalf("unexpected ret %v", err)
}
return nil
Expand All @@ -319,6 +322,54 @@ func TestClusterClient(t *testing.T) {
t.Fatalf("Dedicated desn't put back the wire")
}
})

t.Run("Dedicated Delegate", func(t *testing.T) {
w := &mockWire{
DoFn: func(cmd cmds.Completed) RedisResult {
return newResult(RedisMessage{typ: '+', string: "Delegate"}, nil)
},
DoMultiFn: func(cmd ...cmds.Completed) []RedisResult {
return []RedisResult{newResult(RedisMessage{typ: '+', string: "Delegate"}, nil)}
},
ReceiveFn: func(ctx context.Context, subscribe cmds.Completed, fn func(message PubSubMessage)) error {
return ErrClosing
},
ErrorFn: func() error {
return ErrClosing
},
}
m.AcquireFn = func() wire {
return w
}
stored := false
m.StoreFn = func(ww wire) {
if ww != w {
t.Fatalf("received unexpected wire %v", ww)
}
stored = true
}
c, cancel := client.Dedicate()
if v, err := c.Do(context.Background(), c.B().Get().Key("a").Build()).ToString(); err != nil || v != "Delegate" {
t.Fatalf("unexpected response %v %v", v, err)
}
if v := c.DoMulti(context.Background()); len(v) != 0 {
t.Fatalf("received unexpected response %v", v)
}
for _, resp := range c.DoMulti(context.Background(), c.B().Get().Key("a").Build()) {
if v, err := resp.ToString(); err != nil || v != "Delegate" {
t.Fatalf("unexpected response %v %v", v, err)
}
}
if err := c.Receive(context.Background(), c.B().Ssubscribe().Channel("a").Build(), func(msg PubSubMessage) {}); err != ErrClosing {
t.Fatalf("unexpected ret %v", err)
}

cancel()

if !stored {
t.Fatalf("Dedicated desn't put back the wire")
}
})
}

//gocyclo:ignore
Expand Down
8 changes: 8 additions & 0 deletions lua_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type client struct {
DoFn func(ctx context.Context, cmd cmds.Completed) (resp RedisResult)
DoCacheFn func(ctx context.Context, cmd cmds.Cacheable, ttl time.Duration) (resp RedisResult)
DedicatedFn func(fn func(DedicatedClient) error) (err error)
DedicateFn func() (DedicatedClient, func())
CloseFn func()
}

Expand Down Expand Up @@ -119,6 +120,13 @@ func (c *client) Dedicated(fn func(DedicatedClient) error) (err error) {
return nil
}

func (c *client) Dedicate() (DedicatedClient, func()) {
if c.DedicateFn != nil {
return c.DedicateFn()
}
return nil, nil
}

func (c *client) Close() {
if c.CloseFn != nil {
c.CloseFn()
Expand Down
4 changes: 4 additions & 0 deletions rueidis.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ type Client interface {
// is not good for performance.
Dedicated(fn func(DedicatedClient) error) (err error)

// Dedicate does the same as Dedicated, but it exposes DedicatedClient directly
// and requires user to invoke cancel() manually to put connection back to the pool.
Dedicate() (client DedicatedClient, cancel func())

// Close will make further calls to the client be rejected with ErrClosing,
// and Close will wait until all pending calls finished.
Close()
Expand Down
37 changes: 37 additions & 0 deletions rueidis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,43 @@ func ExampleClient_dedicatedCAS() {
})
}

func ExampleClient_dedicateCAS() {
client, err := NewClient(ClientOption{InitAddress: []string{"127.0.0.1:6379"}})
if err != nil {
panic(err)
}
defer client.Close()

c, cancel := client.Dedicate()
defer cancel()

ctx := context.Background()

// watch keys first
if err := c.Do(ctx, c.B().Watch().Key("k1", "k2").Build()).Error(); err != nil {
panic(err)
}
// perform read here
values, err := c.Do(ctx, c.B().Mget().Key("k1", "k2").Build()).ToArray()
if err != nil {
panic(err)
}
v1, _ := values[0].ToString()
v2, _ := values[1].ToString()
// perform write with MULTI EXEC
for _, resp := range c.DoMulti(
ctx,
c.B().Multi().Build(),
c.B().Set().Key("k1").Value(v1+"1").Build(),
c.B().Set().Key("k2").Value(v2+"2").Build(),
c.B().Exec().Build(),
) {
if err := resp.Error(); err != nil {
panic(err)
}
}
}

func ExampleNewClient_cluster() {
client, _ := NewClient(ClientOption{
InitAddress: []string{"127.0.0.1:7001", "127.0.0.1:7002", "127.0.0.1:7003"},
Expand Down
5 changes: 5 additions & 0 deletions rueidisotel/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ func (o *otelclient) Dedicated(fn func(rueidis.DedicatedClient) error) (err erro
})
}

func (o *otelclient) Dedicate() (rueidis.DedicatedClient, func()) {
client, cancel := o.client.Dedicate()
return &dedicated{client: client, mAttrs: o.mAttrs, tAttrs: o.tAttrs}, cancel
}

func (o *otelclient) Receive(ctx context.Context, subscribe cmds.Completed, fn func(msg rueidis.PubSubMessage)) (err error) {
ctx, span := start(ctx, first(subscribe.Commands()), sum(subscribe.Commands()), o.tAttrs)
err = o.client.Receive(ctx, subscribe, fn)
Expand Down
29 changes: 29 additions & 0 deletions rueidisotel/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,35 @@ func TestWithClient(t *testing.T) {
return nil
})

c, cancel := client.Dedicate()
{
c.Do(ctx, c.B().Set().Key("key").Value("val").Build())
validateTrace(t, exp, "SET", codes.Ok)

c.DoMulti(
ctx,
c.B().Set().Key("key").Value("val").Build(),
c.B().Set().Key("key").Value("val").Build(),
c.B().Set().Key("key").Value("val").Build(),
c.B().Set().Key("key").Value("val").Build(),
c.B().Set().Key("key").Value("val").Build(),
c.B().Set().Key("ignored").Value("ignored").Build(),
)
validateTrace(t, exp, "SET SET SET SET SET", codes.Ok)

c.Do(ctx, cmds.NewCompleted([]string{"unknown", "command"}))
validateTrace(t, exp, "unknown", codes.Error)

c.DoMulti(ctx, cmds.NewCompleted([]string{"unknown", "command"}))
validateTrace(t, exp, "unknown", codes.Error)

ctx2, cancel := context.WithTimeout(ctx, time.Second/2)
c.Receive(ctx2, c.B().Subscribe().Channel("ch").Build(), func(msg rueidis.PubSubMessage) {})
cancel()
validateTrace(t, exp, "SUBSCRIBE", codes.Error)
}
cancel()

client.Do(ctx, cmds.NewCompleted([]string{"unknown", "command"}))
validateTrace(t, exp, "unknown", codes.Error)

Expand Down
6 changes: 6 additions & 0 deletions sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ func (c *sentinelClient) Dedicated(fn func(DedicatedClient) error) (err error) {
return err
}

func (c *sentinelClient) Dedicate() (DedicatedClient, func()) {
master := c.mConn.Load().(conn)
wire := master.Acquire()
return &dedicatedSingleClient{cmd: c.cmd, wire: wire}, func() { master.Store(wire) }
}

func (c *sentinelClient) Close() {
atomic.StoreUint32(&c.stop, 1)
c.mu.Lock()
Expand Down

0 comments on commit c7a7bd6

Please sign in to comment.