Skip to content

Commit

Permalink
adjust the code, and fix redis#1553, redis#1676
Browse files Browse the repository at this point in the history
Signed-off-by: monkey92t <golang@88.com>
  • Loading branch information
monkey92t committed Jul 30, 2021
1 parent 0965d20 commit 9d9e290
Show file tree
Hide file tree
Showing 13 changed files with 29 additions and 79 deletions.
1 change: 0 additions & 1 deletion cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,6 @@ func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
_ = pipe.Process(ctx, NewCmd(ctx, "asking"))
_ = pipe.Process(ctx, cmd)
_, lastErr = pipe.Exec(ctx)
_ = pipe.Close()
ask = false
} else {
lastErr = node.Client.Process(ctx, cmd)
Expand Down
8 changes: 2 additions & 6 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,9 +515,7 @@ var _ = Describe("ClusterClient", func() {
pipe = client.Pipeline().(*redis.Pipeline)
})

AfterEach(func() {
Expect(pipe.Close()).NotTo(HaveOccurred())
})
AfterEach(func() {})

assertPipeline()
})
Expand All @@ -527,9 +525,7 @@ var _ = Describe("ClusterClient", func() {
pipe = client.TxPipeline().(*redis.Pipeline)
})

AfterEach(func() {
Expect(pipe.Close()).NotTo(HaveOccurred())
})
AfterEach(func() {})

assertPipeline()
})
Expand Down
15 changes: 5 additions & 10 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2708,31 +2708,26 @@ func (cmd *GeoSearchLocationCmd) readReply(rd *proto.Reader) error {
return err
}
if cmd.opt.WithDist {
loc.Dist, err = rd.ReadFloatReply()
loc.Dist, err = rd.ReadFloat()
if err != nil {
return err
}
}
if cmd.opt.WithHash {
loc.GeoHash, err = rd.ReadIntReply()
loc.GeoHash, err = rd.ReadInt()
if err != nil {
return err
}
}
if cmd.opt.WithCoord {
nn, err := rd.ReadArrayLen()
if err != nil {
if err = rd.ReadFixedArrayLen(2); err != nil {
return err
}
if nn != 2 {
return fmt.Errorf("got %d coordinates, expected 2", nn)
}

loc.Longitude, err = rd.ReadFloatReply()
loc.Longitude, err = rd.ReadFloat()
if err != nil {
return err
}
loc.Latitude, err = rd.ReadFloatReply()
loc.Latitude, err = rd.ReadFloat()
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"errors"
"time"

redis "github.com/go-redis/redis/v8"
"github.com/go-redis/redis/v8"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
Expand Down
4 changes: 4 additions & 0 deletions commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ type Cmdable interface {
XTrimMinIDApprox(ctx context.Context, key string, minID string, limit int64) *IntCmd
XInfoGroups(ctx context.Context, key string) *XInfoGroupsCmd
XInfoStream(ctx context.Context, key string) *XInfoStreamCmd
XInfoStreamFull(ctx context.Context, key string, count int) *XInfoStreamFullCmd
XInfoConsumers(ctx context.Context, key string, group string) *XInfoConsumersCmd

BZPopMax(ctx context.Context, timeout time.Duration, keys ...string) *ZWithKeyCmd
Expand Down Expand Up @@ -304,6 +305,8 @@ type Cmdable interface {
ClientList(ctx context.Context) *StringCmd
ClientPause(ctx context.Context, dur time.Duration) *BoolCmd
ClientID(ctx context.Context) *IntCmd
ClientUnblock(ctx context.Context, id int64) *IntCmd
ClientUnblockWithError(ctx context.Context, id int64) *IntCmd
ConfigGet(ctx context.Context, parameter string) *MapStringStringCmd
ConfigResetStat(ctx context.Context) *StatusCmd
ConfigSet(ctx context.Context, parameter, value string) *StatusCmd
Expand All @@ -320,6 +323,7 @@ type Cmdable interface {
ShutdownSave(ctx context.Context) *StatusCmd
ShutdownNoSave(ctx context.Context) *StatusCmd
SlaveOf(ctx context.Context, host, port string) *StatusCmd
SlowLogGet(ctx context.Context, num int64) *SlowLogCmd
Time(ctx context.Context) *TimeCmd
DebugObject(ctx context.Context, key string) *StringCmd
ReadOnly(ctx context.Context) *StatusCmd
Expand Down
10 changes: 5 additions & 5 deletions export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,21 +60,21 @@ func (c *ClusterClient) SwapNodes(ctx context.Context, key string) error {
return nil
}

func (state *clusterState) IsConsistent(ctx context.Context) bool {
if len(state.Masters) < 3 {
func (c *clusterState) IsConsistent(ctx context.Context) bool {
if len(c.Masters) < 3 {
return false
}
for _, master := range state.Masters {
for _, master := range c.Masters {
s := master.Client.Info(ctx, "replication").Val()
if !strings.Contains(s, "role:master") {
return false
}
}

if len(state.Slaves) < 3 {
if len(c.Slaves) < 3 {
return false
}
for _, slave := range state.Slaves {
for _, slave := range c.Slaves {
s := slave.Client.Info(ctx, "replication").Val()
if !strings.Contains(s, "role:slave") {
return false
Expand Down
2 changes: 1 addition & 1 deletion internal/proto/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
type writer interface {
io.Writer
io.ByteWriter
// io.StringWriter
// WriteString implement io.StringWriter.
WriteString(s string) (n int, err error)
}

Expand Down
5 changes: 4 additions & 1 deletion options.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,10 @@ func setupTCPConn(u *url.URL) (*Options, error) {
}

if u.Scheme == "rediss" {
o.TLSConfig = &tls.Config{ServerName: h}
o.TLSConfig = &tls.Config{
ServerName: h,
MinVersion: tls.VersionTLS12,
}
}

return o, nil
Expand Down
40 changes: 6 additions & 34 deletions pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package redis
import (
"context"
"sync"

"github.com/go-redis/redis/v8/internal/pool"
)

type pipelineExecer func(context.Context, []Cmder) error
Expand All @@ -26,8 +24,7 @@ type Pipeliner interface {
StatefulCmdable
Do(ctx context.Context, args ...interface{}) *Cmd
Process(ctx context.Context, cmd Cmder) error
Close() error
Discard() error
Discard()
Exec(ctx context.Context) ([]Cmder, error)
}

Expand All @@ -43,9 +40,8 @@ type Pipeline struct {
ctx context.Context
exec pipelineExecer

mu sync.Mutex
cmds []Cmder
closed bool
mu sync.Mutex
cmds []Cmder
}

func (c *Pipeline) init() {
Expand All @@ -67,29 +63,11 @@ func (c *Pipeline) Process(ctx context.Context, cmd Cmder) error {
return nil
}

// Close closes the pipeline, releasing any open resources.
func (c *Pipeline) Close() error {
c.mu.Lock()
_ = c.discard()
c.closed = true
c.mu.Unlock()
return nil
}

// Discard resets the pipeline and discards queued commands.
func (c *Pipeline) Discard() error {
func (c *Pipeline) Discard() {
c.mu.Lock()
err := c.discard()
c.mu.Unlock()
return err
}

func (c *Pipeline) discard() error {
if c.closed {
return pool.ErrClosed
}
c.cmds = c.cmds[:0]
return nil
c.mu.Unlock()
}

// Exec executes all previously queued commands using one
Expand All @@ -101,10 +79,6 @@ func (c *Pipeline) Exec(ctx context.Context) ([]Cmder, error) {
c.mu.Lock()
defer c.mu.Unlock()

if c.closed {
return nil, pool.ErrClosed
}

if len(c.cmds) == 0 {
return nil, nil
}
Expand All @@ -119,9 +93,7 @@ func (c *Pipeline) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]C
if err := fn(c); err != nil {
return nil, err
}
cmds, err := c.Exec(ctx)
_ = c.Close()
return cmds, err
return c.Exec(ctx)
}

func (c *Pipeline) Pipeline() Pipeliner {
Expand Down
1 change: 0 additions & 1 deletion pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ var _ = Describe("pool", func() {
Expect(cmds).To(HaveLen(1))
Expect(ping.Err()).NotTo(HaveOccurred())
Expect(ping.Val()).To(Equal("PONG"))
Expect(pipe.Close()).NotTo(HaveOccurred())
})

pool := client.Pool()
Expand Down
17 changes: 0 additions & 17 deletions redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,17 +136,6 @@ var _ = Describe("Client", func() {
Expect(client.Ping(ctx).Err()).NotTo(HaveOccurred())
})

It("should close pipeline without closing the client", func() {
pipeline := client.Pipeline()
Expect(pipeline.Close()).NotTo(HaveOccurred())

pipeline.Ping(ctx)
_, err := pipeline.Exec(ctx)
Expect(err).To(MatchError("redis: client is closed"))

Expect(client.Ping(ctx).Err()).NotTo(HaveOccurred())
})

It("should close pubsub when client is closed", func() {
pubsub := client.Subscribe(ctx)
Expect(client.Close()).NotTo(HaveOccurred())
Expand All @@ -157,12 +146,6 @@ var _ = Describe("Client", func() {
Expect(pubsub.Close()).NotTo(HaveOccurred())
})

It("should close pipeline when client is closed", func() {
pipeline := client.Pipeline()
Expect(client.Close()).NotTo(HaveOccurred())
Expect(pipeline.Close()).NotTo(HaveOccurred())
})

It("should select DB", func() {
db2 := redis.NewClient(&redis.Options{
Addr: redisAddr,
Expand Down
2 changes: 1 addition & 1 deletion ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func (c *ringShards) Random() (*ringShard, error) {
return c.GetByKey(strconv.Itoa(rand.Int()))
}

// heartbeat monitors state of each shard in the ring.
// Heartbeat monitors state of each shard in the ring.
func (c *ringShards) Heartbeat(frequency time.Duration) {
ticker := time.NewTicker(frequency)
defer ticker.Stop()
Expand Down
1 change: 0 additions & 1 deletion ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ var _ = Describe("Redis Ring", func() {
cmds, err := pipe.Exec(ctx)
Expect(err).NotTo(HaveOccurred())
Expect(cmds).To(HaveLen(100))
Expect(pipe.Close()).NotTo(HaveOccurred())

for _, cmd := range cmds {
Expect(cmd.Err()).NotTo(HaveOccurred())
Expand Down

0 comments on commit 9d9e290

Please sign in to comment.