/
pipeline.go
120 lines (109 loc) · 2.54 KB
/
pipeline.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
// Tideland Go Library - Redis Client - Pipeline
//
// Copyright (C) 2009-2017 Frank Mueller / Oldenburg / Germany
//
// All rights reserved. Use of this source code is governed
// by the new BSD license.
package redis
//--------------------
// IMPORTS
//--------------------
import (
"strings"
"github.com/tideland/golib/errors"
"github.com/tideland/golib/identifier"
"github.com/tideland/golib/monitoring"
)
//--------------------
// CONNECTION
//--------------------
// Pipeline manages a Redis connection executing
// pipelined commands.
type Pipeline struct {
database *Database
resp *resp
counter int
}
// newPipeline creates a new pipeline instance.
func newPipeline(db *Database) (*Pipeline, error) {
ppl := &Pipeline{
database: db,
}
err := ppl.ensureProtocol()
if err != nil {
return nil, err
}
// Perform authentication and database selection.
if err != nil {
return nil, err
}
err = ppl.resp.authenticate()
if err != nil {
ppl.database.pool.kill(ppl.resp)
return nil, err
}
err = ppl.resp.selectDatabase()
if err != nil {
ppl.database.pool.kill(ppl.resp)
return nil, err
}
return ppl, nil
}
// Do executes one Redis command and returns
// the result as result set.
func (ppl *Pipeline) Do(cmd string, args ...interface{}) error {
cmd = strings.ToLower(cmd)
if strings.Contains(cmd, "subscribe") {
return errors.New(ErrUseSubscription, errorMessages)
}
err := ppl.ensureProtocol()
if err != nil {
return err
}
if ppl.database.monitoring {
m := monitoring.BeginMeasuring(identifier.Identifier("redis", "command", cmd))
defer m.EndMeasuring()
}
err = ppl.resp.sendCommand(cmd, args...)
logCommand(cmd, args, err, ppl.database.logging)
if err != nil {
return err
}
ppl.counter++
return err
}
// Collect collects all the result sets of the commands and returns
// the connection back into the pool.
func (ppl *Pipeline) Collect() ([]*ResultSet, error) {
defer func() {
ppl.resp = nil
}()
err := ppl.ensureProtocol()
if err != nil {
return nil, err
}
results := []*ResultSet{}
for i := ppl.counter; i > 0; i-- {
result, err := ppl.resp.receiveResultSet()
if err != nil {
ppl.database.pool.kill(ppl.resp)
return nil, err
}
results = append(results, result)
}
ppl.database.pool.push(ppl.resp)
return results, nil
}
// ensureProtocol retrieves a protocol from the pool if needed.
func (ppl *Pipeline) ensureProtocol() error {
if ppl.resp == nil {
p, err := ppl.database.pool.pull(unforcedPull)
if err != nil {
return err
}
ppl.resp = p
ppl.counter = 0
}
return nil
}
// EOF