-
-
Notifications
You must be signed in to change notification settings - Fork 4k
/
redislock.go
106 lines (90 loc) · 2.35 KB
/
redislock.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package redis
import (
"context"
"math/rand"
"strconv"
"sync/atomic"
"time"
red "github.com/go-redis/redis/v8"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stringx"
)
const (
randomLen = 16
tolerance = 500 // milliseconds
millisPerSecond = 1000
lockCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then
redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2])
return "OK"
else
return redis.call("SET", KEYS[1], ARGV[1], "NX", "PX", ARGV[2])
end`
delCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end`
)
// A RedisLock is a redis lock.
type RedisLock struct {
store *Redis
seconds uint32
key string
id string
}
func init() {
rand.Seed(time.Now().UnixNano())
}
// NewRedisLock returns a RedisLock.
func NewRedisLock(store *Redis, key string) *RedisLock {
return &RedisLock{
store: store,
key: key,
id: stringx.Randn(randomLen),
}
}
// Acquire acquires the lock.
func (rl *RedisLock) Acquire() (bool, error) {
return rl.AcquireCtx(context.Background())
}
// AcquireCtx acquires the lock with the given ctx.
func (rl *RedisLock) AcquireCtx(ctx context.Context) (bool, error) {
seconds := atomic.LoadUint32(&rl.seconds)
resp, err := rl.store.EvalCtx(ctx, lockCommand, []string{rl.key}, []string{
rl.id, strconv.Itoa(int(seconds)*millisPerSecond + tolerance),
})
if err == red.Nil {
return false, nil
} else if err != nil {
logx.Errorf("Error on acquiring lock for %s, %s", rl.key, err.Error())
return false, err
} else if resp == nil {
return false, nil
}
reply, ok := resp.(string)
if ok && reply == "OK" {
return true, nil
}
logx.Errorf("Unknown reply when acquiring lock for %s: %v", rl.key, resp)
return false, nil
}
// Release releases the lock.
func (rl *RedisLock) Release() (bool, error) {
return rl.ReleaseCtx(context.Background())
}
// ReleaseCtx releases the lock with the given ctx.
func (rl *RedisLock) ReleaseCtx(ctx context.Context) (bool, error) {
resp, err := rl.store.EvalCtx(ctx, delCommand, []string{rl.key}, []string{rl.id})
if err != nil {
return false, err
}
reply, ok := resp.(int64)
if !ok {
return false, nil
}
return reply == 1, nil
}
// SetExpire sets the expiration.
func (rl *RedisLock) SetExpire(seconds int) {
atomic.StoreUint32(&rl.seconds, uint32(seconds))
}