Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBSize,ScriptLoad,ScriptFlush and ScriptExists should use hook #1811

Merged
merged 1 commit into from
Jul 5, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
102 changes: 56 additions & 46 deletions cluster_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,55 +8,63 @@ import (

func (c *ClusterClient) DBSize(ctx context.Context) *IntCmd {
cmd := NewIntCmd(ctx, "dbsize")
var size int64
err := c.ForEachMaster(ctx, func(ctx context.Context, master *Client) error {
n, err := master.DBSize(ctx).Result()
_ = c.hooks.process(ctx, cmd, func(ctx context.Context, _ Cmder) error {
var size int64
err := c.ForEachMaster(ctx, func(ctx context.Context, master *Client) error {
n, err := master.DBSize(ctx).Result()
if err != nil {
return err
}
atomic.AddInt64(&size, n)
return nil
})
if err != nil {
return err
cmd.SetErr(err)
} else {
cmd.val = size
}
atomic.AddInt64(&size, n)
return nil
})
if err != nil {
cmd.SetErr(err)
return cmd
}
cmd.val = size
return cmd
}

func (c *ClusterClient) ScriptLoad(ctx context.Context, script string) *StringCmd {
cmd := NewStringCmd(ctx, "script", "load", script)
mu := &sync.Mutex{}
err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error {
val, err := shard.ScriptLoad(ctx, script).Result()
_ = c.hooks.process(ctx, cmd, func(ctx context.Context, _ Cmder) error {
mu := &sync.Mutex{}
err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error {
val, err := shard.ScriptLoad(ctx, script).Result()
if err != nil {
return err
}

mu.Lock()
if cmd.Val() == "" {
cmd.val = val
}
mu.Unlock()

return nil
})
if err != nil {
return err
}

mu.Lock()
if cmd.Val() == "" {
cmd.val = val
cmd.SetErr(err)
}
mu.Unlock()

return nil
})
if err != nil {
cmd.SetErr(err)
}

return cmd
}

func (c *ClusterClient) ScriptFlush(ctx context.Context) *StatusCmd {
cmd := NewStatusCmd(ctx, "script", "flush")
_ = c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error {
shard.ScriptFlush(ctx)

_ = c.hooks.process(ctx, cmd, func(ctx context.Context, _ Cmder) error {
err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error {
return shard.ScriptFlush(ctx).Err()
})
if err != nil {
cmd.SetErr(err)
}
return nil
})

return cmd
}

Expand All @@ -74,26 +82,28 @@ func (c *ClusterClient) ScriptExists(ctx context.Context, hashes ...string) *Boo
result[i] = true
}

mu := &sync.Mutex{}
err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error {
val, err := shard.ScriptExists(ctx, hashes...).Result()
_ = c.hooks.process(ctx, cmd, func(ctx context.Context, _ Cmder) error {
mu := &sync.Mutex{}
err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error {
val, err := shard.ScriptExists(ctx, hashes...).Result()
if err != nil {
return err
}

mu.Lock()
for i, v := range val {
result[i] = result[i] && v
}
mu.Unlock()

return nil
})
if err != nil {
return err
}

mu.Lock()
for i, v := range val {
result[i] = result[i] && v
cmd.SetErr(err)
} else {
cmd.val = result
}
mu.Unlock()

return nil
})
if err != nil {
cmd.SetErr(err)
}

cmd.val = result

return cmd
}