Skip to content

Commit

Permalink
add RedLock && release new version v1.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
panjiangming committed Aug 3, 2023
1 parent 78c573c commit ca1b43f
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 34 deletions.
36 changes: 36 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ go get -u github.com/pjimming/rlock
- Reentrancy: Redis distributed locks can support the same client to acquire the same lock multiple times, avoiding deadlocks in nested calls.
- High performance: Redis is an in-memory database with high read and write performance, enabling fast locking and unlocking operations.
- Atomicity: The locking and unlocking operations of Redis distributed locks use atomic commands, which can ensure the atomicity of operations and avoid competition problems under concurrency.
- RedLock: In the implementation of RedLock, the contradiction between Consistency C and Availability A in CAP will be eased based on the majority principle, ensuring that when more than half of all Redis nodes under RedLock are available, the entire RedLock can be provided normally Serve.

## Quick Start

### RLock
```go
package test

Expand Down Expand Up @@ -100,6 +103,38 @@ func TestDelayExpire(t *testing.T) {
}
```

### RedLock
```go
package test

import (
"testing"
"time"

"github.com/pjimming/rlock"

"github.com/stretchr/testify/assert"
)

func TestRedLock(t *testing.T) {
redLock, err := rlock.NewRedLock([]rlock.RedisClientOptions{
{Addr: "127.0.0.1:7001", Password: ""},
{Addr: "127.0.0.1:7002", Password: ""},
{Addr: "127.0.0.1:7003", Password: ""},
{Addr: "127.0.0.1:7004", Password: ""},
{Addr: "127.0.0.1:7005", Password: ""},
}, "1234567_key", 30*time.Second)

if err != nil {
t.Log(err)
return
}

t.Log(redLock.TryLock())
redLock.UnLock()
}
```

## Lua Scripts
> Hint: Your redis should support lua script.
Expand Down Expand Up @@ -142,6 +177,7 @@ return -1
### DelayExpireLua
```lua
if (redis.call('HEXISTS', KEYS[1], ARGV[1]) == 1) then
-- hold lock
redis.call('PEXPIRE', KEYS[1], tonumber(ARGV[2]))
return 1
end
Expand Down
50 changes: 43 additions & 7 deletions README_ZH.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@ go get -u github.com/pjimming/rlock
```

## 实现功能
- 互斥性:Redis分布式锁可以保证同一时刻只有一个客户端可以获得锁,实现线程之间的互斥。
- 安全性:Redis分布式锁采用原子操作,可以保证并发情况下锁的安全性,避免数据竞争、死锁等问题。
- 锁超时:为了避免某个客户端获取锁后失败而导致死锁,Redis分布式锁可以设置锁超时时间,超过超时时间会自动释放锁。
- 可重入性:Redis分布式锁可以支持同一个客户端多次获取同一个锁,避免嵌套调用时出现死锁。
- 高性能:Redis是一个内存数据库,具有很高的读写性能,可以实现快速的加锁和解锁操作。
- 原子性:Redis分布式锁的加锁和解锁操作使用原子命令,可以保证操作的原子性,避免并发下的竞争问题。
- 互斥性:Redis 分布式锁可以保证同一时刻只有一个客户端可以获得锁,实现线程之间的互斥。
- 安全性:Redis 分布式锁采用原子操作,可以保证并发情况下锁的安全性,避免数据竞争、死锁等问题。
- 锁超时:为了避免某个客户端获取锁后失败而导致死锁,Redis 分布式锁可以设置锁超时时间,超过超时时间会自动释放锁。
- 可重入性:Redis 分布式锁可以支持同一个客户端多次获取同一个锁,避免嵌套调用时出现死锁。
- 高性能:Redis 是一个内存数据库,具有很高的读写性能,可以实现快速的加锁和解锁操作。
- 原子性:Redis 分布式锁的加锁和解锁操作使用原子命令,可以保证操作的原子性,避免并发下的竞争问题。
- 红锁:在红锁 RedLock 实现中,会基于多数派准则进行 CAP 中一致性 C 和可用性 A 之间矛盾的缓和,保证在 RedLock 下所有 Redis 节点中达到半数以上节点可用时,整个红锁就能够正常提供服务。

## 快速开始

### RLock
```go
package test

Expand Down Expand Up @@ -100,6 +103,38 @@ func TestDelayExpire(t *testing.T) {
}
```

### 红锁
```go
package test

import (
"testing"
"time"

"github.com/pjimming/rlock"

"github.com/stretchr/testify/assert"
)

func TestRedLock(t *testing.T) {
redLock, err := rlock.NewRedLock([]rlock.RedisClientOptions{
{Addr: "127.0.0.1:7001", Password: ""},
{Addr: "127.0.0.1:7002", Password: ""},
{Addr: "127.0.0.1:7003", Password: ""},
{Addr: "127.0.0.1:7004", Password: ""},
{Addr: "127.0.0.1:7005", Password: ""},
}, "1234567_key", 30*time.Second)

if err != nil {
t.Log(err)
return
}

t.Log(redLock.TryLock())
redLock.UnLock()
}
```

## Lua 脚本
> Hint: Your redis should support lua script.
Expand Down Expand Up @@ -139,9 +174,10 @@ end
return -1
```

### 续命
### 续租
```lua
if (redis.call('HEXISTS', KEYS[1], ARGV[1]) == 1) then
-- 持有锁
redis.call('PEXPIRE', KEYS[1], tonumber(ARGV[2]))
return 1
end
Expand Down
12 changes: 12 additions & 0 deletions constants/const.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
package constants

import "time"

const (
// DefaultBlockWaitingTime default block wait time to acquire lock
DefaultBlockWaitingTime = 60 * time.Second
// DefaultExpireTime default redis key expire time
DefaultExpireTime = 30 * time.Second
// DefaultWatchdogSwitch watchdog switch default false
DefaultWatchdogSwitch = false
)

// Lua Scripts.
// We use lua scripts for redis to ensure atomicity.
const (
Expand Down Expand Up @@ -39,6 +50,7 @@ return -1

DelayExpireLua = `
if (redis.call('HEXISTS', KEYS[1], ARGV[1]) == 1) then
-- hold lock
redis.call('PEXPIRE', KEYS[1], tonumber(ARGV[2]))
return 1
end
Expand Down
12 changes: 9 additions & 3 deletions option.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
package rlock

import "time"

type RedisClientOptions struct {
Addr string // redis address
Password string // redis password
}

type lockOptions struct {
blockWaitingSecond int64 // blocking timeout time
expireSeconds int64 // key expire time
watchdogSwitch bool // watchdog on/off
blockWaitingTime time.Duration // blocking timeout time, default 60s.
expireTime time.Duration // key expire time, default 30s.
watchdogSwitch bool // watchdog on/off, default false.
}

type redLockOptions struct {
maxSingleNodeWaitTime time.Duration // max try lock wait time.
}
66 changes: 66 additions & 0 deletions redlock.go
Original file line number Diff line number Diff line change
@@ -1 +1,67 @@
package rlock

import (
"errors"
"time"

"github.com/pjimming/rlock/utils"
)

type RedLock struct {
locks []*RLock
redLockOptions
}

// NewRedLock new a RedLock from multi redis servers.
//
// It is required that the cumulative timeout threshold of all nodes is
// less than one-tenth of the distributed lock expiration time.
func NewRedLock(ops []RedisClientOptions, key string, expireTime time.Duration) (redLock *RedLock, err error) {
if key == "" {
key = utils.GenerateRandomString(10)
}

for _, op := range ops {
rlock := NewRLock(op, key)

if rlock != nil {
redLock.locks = append(redLock.locks, rlock.
SetToken(key+"_token").
SetWatchdogSwitch(true).
SetExpireTime(expireTime))
}
}

if len(redLock.locks) < 3 {
return nil, errors.New("new redlock fail, locks count less than 3")
}

redLock.maxSingleNodeWaitTime = expireTime / time.Duration(10*len(redLock.locks))

return
}

// TryLock try to acquire lock.
//
// If RedLock gets lock count greater than half of locks,
// it means acquire lock successfully.
func (l *RedLock) TryLock() bool {
successCnt := 0
for _, lock := range l.locks {
start := time.Now()
ttl := lock.TryLock()
cost := time.Since(start)
if ttl == int64(0) && cost <= l.maxSingleNodeWaitTime {
successCnt++
}
}

return successCnt >= (len(l.locks)>>1 + 1)
}

// UnLock release lock.
func (l *RedLock) UnLock() {
for _, lock := range l.locks {
lock.UnLock()
}
}
40 changes: 23 additions & 17 deletions rlock.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,18 @@ type RLock struct {
runningDog int32 // watchdog running status
stopDog context.CancelFunc // stop watchdog

ctx context.Context
logger *logx.Logger // log
ctx context.Context // rlock context
logger *logx.Logger // log
}

// NewRLock new a redis lock.
// You can set params with set function.
//
// Default lockOptions:
//
// blockWaitingTime : 60 * time.Second
// expireTime : 30 * time.Second
// watchdogSwitch : false
func NewRLock(op RedisClientOptions, key string) (rLock *RLock) {
if key == "" {
key = utils.GenerateRandomString(10)
Expand Down Expand Up @@ -76,9 +82,9 @@ func NewRLock(op RedisClientOptions, key string) (rLock *RLock) {
token: utils.GenerateToken(),
client: rc,
lockOptions: lockOptions{
blockWaitingSecond: 60,
expireSeconds: 30,
watchdogSwitch: false,
blockWaitingTime: constants.DefaultBlockWaitingTime,
expireTime: constants.DefaultExpireTime,
watchdogSwitch: constants.DefaultWatchdogSwitch,
},
runningDog: 0,
stopDog: nil,
Expand Down Expand Up @@ -170,7 +176,7 @@ func (l *RLock) tryLock() (ttl interface{}, err error) {
}()

if ttl, err = l.client.
Eval(l.ctx, constants.LockLua, []string{l.key}, l.token, l.expireSeconds*1000).
Eval(l.ctx, constants.LockLua, []string{l.key}, l.token, l.expireTime/time.Millisecond).
Result(); err != nil {
l.logger.Errorf("try lock fail, error: %v", err)
return -1, err
Expand All @@ -182,7 +188,7 @@ func (l *RLock) tryLock() (ttl interface{}, err error) {
// will blocking poll to acquire lock until context timeout or blocking timeout.
// Returns: ttl and error. If ttl == 0 means acquire lock successfully.
func (l *RLock) span() (ttl interface{}, err error) {
timeoutCh := time.After(time.Duration(l.blockWaitingSecond) * time.Second)
timeoutCh := time.After(l.blockWaitingTime)
ticker := time.NewTicker(time.Duration(50) * time.Millisecond)
defer ticker.Stop()

Expand Down Expand Up @@ -226,7 +232,7 @@ func (l *RLock) releaseLock() (res interface{}, err error) {
}()

if res, err = l.client.
Eval(l.ctx, constants.UnLockLua, []string{l.key}, l.token, l.expireSeconds*1000).
Eval(l.ctx, constants.UnLockLua, []string{l.key}, l.token, l.expireTime/time.Millisecond).
Result(); err != nil {
l.logger.Errorf("release lock fail, error: %v", err)
return -1, err
Expand All @@ -237,7 +243,7 @@ func (l *RLock) releaseLock() (res interface{}, err error) {
// delayExpire try to delay lock expire time.
func (l *RLock) delayExpire() (res interface{}, err error) {
if res, err = l.client.
Eval(l.ctx, constants.DelayExpireLua, []string{l.key}, l.token, l.expireSeconds*1000).
Eval(l.ctx, constants.DelayExpireLua, []string{l.key}, l.token, l.expireTime/time.Millisecond).
Result(); err != nil {
l.logger.Errorf("delay expire fail, error: %v", err)
return -1, err
Expand Down Expand Up @@ -269,21 +275,21 @@ func (l *RLock) SetToken(token string) *RLock {
return l
}

func (l *RLock) BlockWaitingSecond() int64 {
return l.blockWaitingSecond
func (l *RLock) BlockWaitingSecond() time.Duration {
return l.blockWaitingTime
}

func (l *RLock) SetBlockWaitingSecond(blockWaitingSecond int64) *RLock {
l.blockWaitingSecond = blockWaitingSecond
func (l *RLock) SetBlockWaitingSecond(blockWaitingTime time.Duration) *RLock {
l.blockWaitingTime = blockWaitingTime
return l
}

func (l *RLock) ExpireSeconds() int64 {
return l.expireSeconds
func (l *RLock) ExpireTime() time.Duration {
return l.expireTime
}

func (l *RLock) SetExpireSeconds(expireSeconds int64) *RLock {
l.expireSeconds = expireSeconds
func (l *RLock) SetExpireTime(expireTime time.Duration) *RLock {
l.expireTime = expireTime
return l
}

Expand Down
Loading

0 comments on commit ca1b43f

Please sign in to comment.