-
Notifications
You must be signed in to change notification settings - Fork 0
/
pipeline.go
119 lines (94 loc) · 2.54 KB
/
pipeline.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package redis
import (
"context"
"log"
)
// Pipeliner is an mechanism to realise Redis Pipeline technique.
// Pipelining is a technique to extremely speed up processing by packing operations to batches,
// send them at once to Redis and read a replies in a single step. See https://redis.io/topics/pipelining
type Pipeliner interface {
// Len is to obtain the number of commands in the pipeline that have not yet been executed.
Len() int
// Send writes the command to the client's output buffer.
Send(commandName string, args ...any) *Result
// Discard is to discard all commands in the cache that have not yet been executed.
Discard()
// Exec is to send all the commands buffered in the pipeline to the redis-server.
Exec(ctx context.Context) ([]*Result, error)
Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]*Result, error)
}
type connInterface interface {
Close() error
Err() error
Do(ctx context.Context, commandName string, args ...any) *Result
Send(commandName string, args ...any) error
Flush() error
Receive(ctx context.Context) (reply any, err error)
}
type pipeliner struct {
c connInterface
cmdAndArgs [][]any
results []*Result
tx bool
}
func (p pipeliner) Len() int {
return len(p.cmdAndArgs)
}
func (p *pipeliner) Send(commandName string, args ...any) *Result {
p.cmdAndArgs = append(p.cmdAndArgs, append([]any{commandName}, args...))
result := &Result{}
p.results = append(p.results, result)
return result
}
func (p *pipeliner) Discard() {
p.cmdAndArgs = nil
p.results = nil
}
func init() {
log.SetFlags(log.Lshortfile | log.Ltime | log.Lmicroseconds)
}
func (p *pipeliner) Exec(ctx context.Context) ([]*Result, error) {
cmdAndArgs := p.cmdAndArgs
p.cmdAndArgs = nil
results := p.results
p.results = nil
c := p.c
defer c.Close()
if p.tx {
if err := c.Send("MULTI"); err != nil {
return nil, err
}
}
for _, arg := range cmdAndArgs {
if err := c.Send(arg[0].(string), arg[1:]...); err != nil {
return nil, err
}
}
if p.tx {
res := c.Do(ctx, "EXEC")
if err := res.Err(); err != nil {
return nil, err
}
values, err := res.Values()
if err != nil {
return nil, err
}
for i, val := range values {
*results[i] = *NewResult(val, nil)
}
} else {
if err := c.Flush(); err != nil {
return nil, err
}
for i := range results {
*results[i] = *NewResult(c.Receive(ctx))
}
}
return results, nil
}
func (p *pipeliner) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]*Result, error) {
if err := fn(p); err != nil {
return nil, err
}
return p.Exec(ctx)
}