Skip to content

Commit 8b4fa6d

Browse files
yifei.huangvmihailenco
authored andcommitted
Add WrapProcessPipeline
1 parent 2c11cbf commit 8b4fa6d

File tree

8 files changed

+166
-101
lines changed

8 files changed

+166
-101
lines changed

cluster.go

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,10 @@ type ClusterClient struct {
445445
cmdsInfoOnce internal.Once
446446
cmdsInfo map[string]*CommandInfo
447447

448+
process func(Cmder) error
449+
processPipeline func([]Cmder) error
450+
processTxPipeline func([]Cmder) error
451+
448452
// Reports whether slots reloading is in progress.
449453
reloading uint32
450454
}
@@ -458,7 +462,12 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
458462
opt: opt,
459463
nodes: newClusterNodes(opt),
460464
}
461-
c.setProcessor(c.Process)
465+
466+
c.process = c.defaultProcess
467+
c.processPipeline = c.defaultProcessPipeline
468+
c.processTxPipeline = c.defaultProcessTxPipeline
469+
470+
c.cmdable.setProcessor(c.Process)
462471

463472
// Add initial nodes.
464473
for _, addr := range opt.Addrs {
@@ -628,7 +637,20 @@ func (c *ClusterClient) Close() error {
628637
return c.nodes.Close()
629638
}
630639

640+
func (c *ClusterClient) WrapProcess(
641+
fn func(oldProcess func(Cmder) error) func(Cmder) error,
642+
) {
643+
c.process = fn(c.process)
644+
}
645+
631646
func (c *ClusterClient) Process(cmd Cmder) error {
647+
if c.process != nil {
648+
return c.process(cmd)
649+
}
650+
return c.defaultProcess(cmd)
651+
}
652+
653+
func (c *ClusterClient) defaultProcess(cmd Cmder) error {
632654
state, err := c.state()
633655
if err != nil {
634656
cmd.setErr(err)
@@ -910,17 +932,23 @@ func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) {
910932

911933
func (c *ClusterClient) Pipeline() Pipeliner {
912934
pipe := Pipeline{
913-
exec: c.pipelineExec,
935+
exec: c.processPipeline,
914936
}
915-
pipe.setProcessor(pipe.Process)
937+
pipe.statefulCmdable.setProcessor(pipe.Process)
916938
return &pipe
917939
}
918940

919941
func (c *ClusterClient) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
920942
return c.Pipeline().Pipelined(fn)
921943
}
922944

923-
func (c *ClusterClient) pipelineExec(cmds []Cmder) error {
945+
func (c *ClusterClient) WrapProcessPipeline(
946+
fn func(oldProcess func([]Cmder) error) func([]Cmder) error,
947+
) {
948+
c.processPipeline = fn(c.processPipeline)
949+
}
950+
951+
func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
924952
cmdsMap, err := c.mapCmdsByNode(cmds)
925953
if err != nil {
926954
setCmdsErr(cmds, err)
@@ -1064,17 +1092,17 @@ func (c *ClusterClient) checkMovedErr(
10641092
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
10651093
func (c *ClusterClient) TxPipeline() Pipeliner {
10661094
pipe := Pipeline{
1067-
exec: c.txPipelineExec,
1095+
exec: c.processTxPipeline,
10681096
}
1069-
pipe.setProcessor(pipe.Process)
1097+
pipe.statefulCmdable.setProcessor(pipe.Process)
10701098
return &pipe
10711099
}
10721100

10731101
func (c *ClusterClient) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
10741102
return c.TxPipeline().Pipelined(fn)
10751103
}
10761104

1077-
func (c *ClusterClient) txPipelineExec(cmds []Cmder) error {
1105+
func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error {
10781106
state, err := c.state()
10791107
if err != nil {
10801108
return err

example_instrumentation_test.go

Lines changed: 29 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -2,58 +2,47 @@ package redis_test
22

33
import (
44
"fmt"
5-
"sync/atomic"
6-
"time"
75

86
"github.com/go-redis/redis"
97
)
108

119
func Example_instrumentation() {
12-
ring := redis.NewRing(&redis.RingOptions{
13-
Addrs: map[string]string{
14-
"shard1": ":6379",
15-
},
10+
cl := redis.NewClient(&redis.Options{
11+
Addr: ":6379",
1612
})
17-
ring.ForEachShard(func(client *redis.Client) error {
18-
wrapRedisProcess(client)
19-
return nil
13+
cl.WrapProcess(func(old func(cmd redis.Cmder) error) func(cmd redis.Cmder) error {
14+
return func(cmd redis.Cmder) error {
15+
fmt.Printf("starting processing: <%s>\n", cmd)
16+
err := old(cmd)
17+
fmt.Printf("finished processing: <%s>\n", cmd)
18+
return err
19+
}
2020
})
2121

22-
for {
23-
ring.Ping()
24-
}
22+
cl.Ping()
23+
// Output: starting processing: <ping: >
24+
// finished processing: <ping: PONG>
2525
}
2626

27-
func wrapRedisProcess(client *redis.Client) {
28-
const precision = time.Microsecond
29-
var count, avgDur uint32
30-
31-
go func() {
32-
for range time.Tick(3 * time.Second) {
33-
n := atomic.LoadUint32(&count)
34-
dur := time.Duration(atomic.LoadUint32(&avgDur)) * precision
35-
fmt.Printf("%s: processed=%d avg_dur=%s\n", client, n, dur)
36-
}
37-
}()
38-
39-
client.WrapProcess(func(oldProcess func(redis.Cmder) error) func(redis.Cmder) error {
40-
return func(cmd redis.Cmder) error {
41-
start := time.Now()
42-
err := oldProcess(cmd)
43-
dur := time.Since(start)
44-
45-
const decay = float64(1) / 100
46-
ms := float64(dur / precision)
47-
for {
48-
avg := atomic.LoadUint32(&avgDur)
49-
newAvg := uint32((1-decay)*float64(avg) + decay*ms)
50-
if atomic.CompareAndSwapUint32(&avgDur, avg, newAvg) {
51-
break
52-
}
53-
}
54-
atomic.AddUint32(&count, 1)
27+
func Example_Pipeline_instrumentation() {
28+
client := redis.NewClient(&redis.Options{
29+
Addr: ":6379",
30+
})
5531

32+
client.WrapProcessPipeline(func(old func([]redis.Cmder) error) func([]redis.Cmder) error {
33+
return func(cmds []redis.Cmder) error {
34+
fmt.Printf("pipeline starting processing: %v\n", cmds)
35+
err := old(cmds)
36+
fmt.Printf("pipeline finished processing: %v\n", cmds)
5637
return err
5738
}
5839
})
40+
41+
client.Pipelined(func(pipe redis.Pipeliner) error {
42+
pipe.Ping()
43+
pipe.Ping()
44+
return nil
45+
})
46+
// Output: pipeline starting processing: [ping: ping: ]
47+
// pipeline finished processing: [ping: PONG ping: PONG]
5948
}

redis.go

Lines changed: 57 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,12 @@ func SetLogger(logger *log.Logger) {
2222
internal.Logger = logger
2323
}
2424

25+
func (c *baseClient) init() {
26+
c.process = c.defaultProcess
27+
c.processPipeline = c.defaultProcessPipeline
28+
c.processTxPipeline = c.defaultProcessTxPipeline
29+
}
30+
2531
func (c *baseClient) String() string {
2632
return fmt.Sprintf("Redis<%s db:%d>", c.getAddr(), c.opt.DB)
2733
}
@@ -85,7 +91,8 @@ func (c *baseClient) initConn(cn *pool.Conn) error {
8591
connPool: pool.NewSingleConnPool(cn),
8692
},
8793
}
88-
conn.setProcessor(conn.Process)
94+
conn.baseClient.init()
95+
conn.statefulCmdable.setProcessor(conn.Process)
8996

9097
_, err := conn.Pipelined(func(pipe Pipeliner) error {
9198
if c.opt.Password != "" {
@@ -117,14 +124,11 @@ func (c *baseClient) initConn(cn *pool.Conn) error {
117124
// an input and returns the new wrapper process func. createWrapper should
118125
// use call the old process func within the new process func.
119126
func (c *baseClient) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) {
120-
c.process = fn(c.defaultProcess)
127+
c.process = fn(c.process)
121128
}
122129

123130
func (c *baseClient) Process(cmd Cmder) error {
124-
if c.process != nil {
125-
return c.process(cmd)
126-
}
127-
return c.defaultProcess(cmd)
131+
return c.process(cmd)
128132
}
129133

130134
func (c *baseClient) defaultProcess(cmd Cmder) error {
@@ -198,35 +202,48 @@ func (c *baseClient) getAddr() string {
198202
return c.opt.Addr
199203
}
200204

205+
func (c *baseClient) WrapProcessPipeline(
206+
fn func(oldProcess func([]Cmder) error) func([]Cmder) error,
207+
) {
208+
c.processPipeline = fn(c.processPipeline)
209+
c.processTxPipeline = fn(c.processTxPipeline)
210+
}
211+
212+
func (c *baseClient) defaultProcessPipeline(cmds []Cmder) error {
213+
return c.generalProcessPipeline(cmds, c.pipelineProcessCmds)
214+
}
215+
216+
func (c *baseClient) defaultProcessTxPipeline(cmds []Cmder) error {
217+
return c.generalProcessPipeline(cmds, c.txPipelineProcessCmds)
218+
}
219+
201220
type pipelineProcessor func(*pool.Conn, []Cmder) (bool, error)
202221

203-
func (c *baseClient) pipelineExecer(p pipelineProcessor) pipelineExecer {
204-
return func(cmds []Cmder) error {
205-
for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
206-
if attempt > 0 {
207-
time.Sleep(c.retryBackoff(attempt))
208-
}
222+
func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) error {
223+
for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
224+
if attempt > 0 {
225+
time.Sleep(c.retryBackoff(attempt))
226+
}
209227

210-
cn, _, err := c.getConn()
211-
if err != nil {
212-
setCmdsErr(cmds, err)
213-
return err
214-
}
228+
cn, _, err := c.getConn()
229+
if err != nil {
230+
setCmdsErr(cmds, err)
231+
return err
232+
}
215233

216-
canRetry, err := p(cn, cmds)
234+
canRetry, err := p(cn, cmds)
217235

218-
if err == nil || internal.IsRedisError(err) {
219-
_ = c.connPool.Put(cn)
220-
break
221-
}
222-
_ = c.connPool.Remove(cn)
236+
if err == nil || internal.IsRedisError(err) {
237+
_ = c.connPool.Put(cn)
238+
break
239+
}
240+
_ = c.connPool.Remove(cn)
223241

224-
if !canRetry || !internal.IsRetryableError(err, true) {
225-
break
226-
}
242+
if !canRetry || !internal.IsRetryableError(err, true) {
243+
break
227244
}
228-
return firstCmdsErr(cmds)
229245
}
246+
return firstCmdsErr(cmds)
230247
}
231248

232249
func (c *baseClient) pipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) {
@@ -324,14 +341,15 @@ type Client struct {
324341
}
325342

326343
func newClient(opt *Options, pool pool.Pooler) *Client {
327-
client := Client{
344+
c := Client{
328345
baseClient: baseClient{
329346
opt: opt,
330347
connPool: pool,
331348
},
332349
}
333-
client.setProcessor(client.Process)
334-
return &client
350+
c.baseClient.init()
351+
c.cmdable.setProcessor(c.Process)
352+
return &c
335353
}
336354

337355
// NewClient returns a client to the Redis Server specified by Options.
@@ -343,7 +361,7 @@ func NewClient(opt *Options) *Client {
343361
func (c *Client) copy() *Client {
344362
c2 := new(Client)
345363
*c2 = *c
346-
c2.setProcessor(c2.Process)
364+
c2.cmdable.setProcessor(c2.Process)
347365
return c2
348366
}
349367

@@ -366,9 +384,9 @@ func (c *Client) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
366384

367385
func (c *Client) Pipeline() Pipeliner {
368386
pipe := Pipeline{
369-
exec: c.pipelineExecer(c.pipelineProcessCmds),
387+
exec: c.processPipeline,
370388
}
371-
pipe.setProcessor(pipe.Process)
389+
pipe.statefulCmdable.setProcessor(pipe.Process)
372390
return &pipe
373391
}
374392

@@ -379,9 +397,9 @@ func (c *Client) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
379397
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
380398
func (c *Client) TxPipeline() Pipeliner {
381399
pipe := Pipeline{
382-
exec: c.pipelineExecer(c.txPipelineProcessCmds),
400+
exec: c.processTxPipeline,
383401
}
384-
pipe.setProcessor(pipe.Process)
402+
pipe.statefulCmdable.setProcessor(pipe.Process)
385403
return &pipe
386404
}
387405

@@ -430,9 +448,9 @@ func (c *Conn) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
430448

431449
func (c *Conn) Pipeline() Pipeliner {
432450
pipe := Pipeline{
433-
exec: c.pipelineExecer(c.pipelineProcessCmds),
451+
exec: c.processPipeline,
434452
}
435-
pipe.setProcessor(pipe.Process)
453+
pipe.statefulCmdable.setProcessor(pipe.Process)
436454
return &pipe
437455
}
438456

@@ -443,8 +461,8 @@ func (c *Conn) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
443461
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
444462
func (c *Conn) TxPipeline() Pipeliner {
445463
pipe := Pipeline{
446-
exec: c.pipelineExecer(c.txPipelineProcessCmds),
464+
exec: c.processTxPipeline,
447465
}
448-
pipe.setProcessor(pipe.Process)
466+
pipe.statefulCmdable.setProcessor(pipe.Process)
449467
return &pipe
450468
}

redis_context.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@ type baseClient struct {
1212
connPool pool.Pooler
1313
opt *Options
1414

15-
process func(Cmder) error
15+
process func(Cmder) error
16+
processPipeline func([]Cmder) error
17+
processTxPipeline func([]Cmder) error
18+
1619
onClose func() error // hook called when client is closed
1720

1821
ctx context.Context

redis_no_context.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ type baseClient struct {
1010
connPool pool.Pooler
1111
opt *Options
1212

13-
process func(Cmder) error
13+
process func(Cmder) error
14+
processPipeline func([]Cmder) error
15+
processTxPipeline func([]Cmder) error
16+
1417
onClose func() error // hook called when client is closed
1518
}

0 commit comments

Comments
 (0)