-
Notifications
You must be signed in to change notification settings - Fork 15
/
sync.go
133 lines (116 loc) · 2.94 KB
/
sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package redis
import (
"sync"
"github.com/joomcode/errorx"
)
// Sync provides convenient synchronouse interface over asynchronouse Sender.
type Sync struct {
S Sender
}
// Do is convenient method to construct and send request.
// Returns value that could be either result or error.
func (s Sync) Do(cmd string, args ...interface{}) interface{} {
return s.Send(Request{cmd, args})
}
// Send sends request to redis.
// Returns value that could be either result or error.
func (s Sync) Send(r Request) interface{} {
var res syncRes
res.Add(1)
s.S.Send(r, &res, 0)
res.Wait()
if CollectTrace {
if err := AsErrorx(res.r); err != nil {
res.r = errorx.EnsureStackTrace(err)
}
}
return res.r
}
// SendMany sends several requests in "parallel" and returns slice or results in a same order.
// Each result could be value or error.
func (s Sync) SendMany(reqs []Request) []interface{} {
if len(reqs) == 0 {
return nil
}
res := syncBatch{
r: make([]interface{}, len(reqs)),
}
res.Add(len(reqs))
s.S.SendMany(reqs, &res, 0)
res.Wait()
if CollectTrace {
for i, v := range res.r {
if err := AsErrorx(v); err != nil {
res.r[i] = errorx.EnsureStackTrace(err)
}
}
}
return res.r
}
// SendTransaction sends several requests as a single MULTI+EXEC transaction.
// It returns array of responses and an error, if transaction fails.
// Since Redis transaction either fully executed or fully failed,
// all values are valid if err == nil.
func (s Sync) SendTransaction(reqs []Request) ([]interface{}, error) {
var res syncRes
res.Add(1)
s.S.SendTransaction(reqs, &res, 0)
res.Wait()
ress, err := TransactionResponse(res.r)
if CollectTrace && err != nil {
err = errorx.EnsureStackTrace(err)
}
return ress, err
}
// Scanner returns synchronous iterator over redis keyspace/key.
func (s Sync) Scanner(opts ScanOpts) SyncIterator {
return SyncIterator{s.S.Scanner(opts)}
}
type syncRes struct {
r interface{}
sync.WaitGroup
}
// Cancelled implements Future.Cancelled
func (s *syncRes) Cancelled() error {
return nil
}
// Resolve implements Future.Resolve
func (s *syncRes) Resolve(res interface{}, _ uint64) {
s.r = res
s.Done()
}
type syncBatch struct {
r []interface{}
sync.WaitGroup
}
// Cancelled implements Future.Cancelled
func (s *syncBatch) Cancelled() error {
return nil
}
// Resolve implements Future.Resolve
func (s *syncBatch) Resolve(res interface{}, i uint64) {
s.r[i] = res
s.Done()
}
// SyncIterator is synchronous iterator over repeating *SCAN command.
type SyncIterator struct {
s Scanner
}
// Next returns next bunch of keys, or error.
// ScanEOF error signals for regular iteration completion.
func (s SyncIterator) Next() ([]string, error) {
var res syncRes
res.Add(1)
s.s.Next(&res)
res.Wait()
if err := AsError(res.r); err != nil {
if CollectTrace {
err = errorx.EnsureStackTrace(err.(*errorx.Error))
}
return nil, err
} else if res.r == nil {
return nil, ScanEOF
} else {
return res.r.([]string), nil
}
}