Skip to content

Commit

Permalink
Support Monitor Command (#2830)
Browse files Browse the repository at this point in the history
* Add monitor command

* Add monitor commadn and tests

* insure goroutine shutdown

* fix data race

* linting

* change timeout explanation

---------

Co-authored-by: Chayim <chayim@users.noreply.github.com>
  • Loading branch information
ofekshenawa and chayim committed Dec 17, 2023
1 parent 631deaf commit 277e8b7
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 1 deletion.
83 changes: 83 additions & 0 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"time"

"github.com/redis/go-redis/v9/internal"
Expand Down Expand Up @@ -5381,3 +5382,85 @@ func (cmd *InfoCmd) Item(section, key string) string {
return cmd.val[section][key]
}
}

type MonitorStatus int

const (
monitorStatusIdle MonitorStatus = iota
monitorStatusStart
monitorStatusStop
)

type MonitorCmd struct {
baseCmd
ch chan string
status MonitorStatus
mu sync.Mutex
}

func newMonitorCmd(ctx context.Context, ch chan string) *MonitorCmd {
return &MonitorCmd{
baseCmd: baseCmd{
ctx: ctx,
args: []interface{}{"monitor"},
},
ch: ch,
status: monitorStatusIdle,
mu: sync.Mutex{},
}
}

func (cmd *MonitorCmd) String() string {
return cmdString(cmd, nil)
}

func (cmd *MonitorCmd) readReply(rd *proto.Reader) error {
ctx, cancel := context.WithCancel(cmd.ctx)
go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
err := cmd.readMonitor(rd, cancel)
if err != nil {
cmd.err = err
return
}
}
}
}(ctx)
return nil
}

func (cmd *MonitorCmd) readMonitor(rd *proto.Reader, cancel context.CancelFunc) error {
for {
cmd.mu.Lock()
st := cmd.status
cmd.mu.Unlock()
if pk, _ := rd.Peek(1); len(pk) != 0 && st == monitorStatusStart {
line, err := rd.ReadString()
if err != nil {
return err
}
cmd.ch <- line
}
if st == monitorStatusStop {
cancel()
break
}
}
return nil
}

func (cmd *MonitorCmd) Start() {
cmd.mu.Lock()
defer cmd.mu.Unlock()
cmd.status = monitorStatusStart
}

func (cmd *MonitorCmd) Stop() {
cmd.mu.Lock()
defer cmd.mu.Unlock()
cmd.status = monitorStatusStop
}
18 changes: 17 additions & 1 deletion commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ type Cmdable interface {
SlowLogGet(ctx context.Context, num int64) *SlowLogCmd
Time(ctx context.Context) *TimeCmd
DebugObject(ctx context.Context, key string) *StringCmd

MemoryUsage(ctx context.Context, key string, samples ...int) *IntCmd

ModuleLoadex(ctx context.Context, conf *ModuleLoadexConfig) *StringCmd
Expand Down Expand Up @@ -700,3 +699,20 @@ func (c cmdable) ModuleLoadex(ctx context.Context, conf *ModuleLoadexConfig) *St
_ = c(ctx, cmd)
return cmd
}

/*
Monitor - represents a Redis MONITOR command, allowing the user to capture
and process all commands sent to a Redis server. This mimics the behavior of
MONITOR in the redis-cli.
Notes:
- Using MONITOR blocks the connection to the server for itself. It needs a dedicated connection
- The user should create a channel of type string
- This runs concurrently in the background. Trigger via the Start and Stop functions
See further: Redis MONITOR command: https://redis.io/commands/monitor
*/
func (c cmdable) Monitor(ctx context.Context, ch chan string) *MonitorCmd {
cmd := newMonitorCmd(ctx, ch)
_ = c(ctx, cmd)
return cmd
}
5 changes: 5 additions & 0 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ var (
redisAddr = ":" + redisPort
)

var (
rediStackPort = "6379"
rediStackAddr = ":" + rediStackPort
)

var (
sentinelAddrs = []string{":" + sentinelPort1, ":" + sentinelPort2, ":" + sentinelPort3}

Expand Down
48 changes: 48 additions & 0 deletions monitor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package redis_test

import (
"context"
"time"

. "github.com/bsm/ginkgo/v2"
. "github.com/bsm/gomega"

"github.com/redis/go-redis/v9"
)

var _ = Describe("Monitor command", Label("monitor"), func() {
ctx := context.TODO()
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(&redis.Options{Addr: ":6379"})
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
})

AfterEach(func() {
Expect(client.Close()).NotTo(HaveOccurred())
})

It("should monitor", Label("monitor"), func() {
ress := make(chan string)
client1 := redis.NewClient(&redis.Options{Addr: rediStackAddr})
mn := client1.Monitor(ctx, ress)
mn.Start()
// Wait for the Redis server to be in monitoring mode.
time.Sleep(100 * time.Millisecond)
client.Set(ctx, "foo", "bar", 0)
client.Set(ctx, "bar", "baz", 0)
client.Set(ctx, "bap", 8, 0)
client.Get(ctx, "bap")
lst := []string{}
for i := 0; i < 5; i++ {
s := <-ress
lst = append(lst, s)
}
mn.Stop()
Expect(lst[0]).To(ContainSubstring("OK"))
Expect(lst[1]).To(ContainSubstring(`"set" "foo" "bar"`))
Expect(lst[2]).To(ContainSubstring(`"set" "bar" "baz"`))
Expect(lst[3]).To(ContainSubstring(`"set" "bap" "8"`))
})
})

0 comments on commit 277e8b7

Please sign in to comment.