-
Notifications
You must be signed in to change notification settings - Fork 0
/
JobLock.go
114 lines (97 loc) · 2.56 KB
/
JobLock.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package worker
import (
"context"
"github.com/coreos/etcd/clientv3"
"github.com/staryjie/crontab/common"
)
// 分布式锁 TXN事物
type JobLock struct {
kv clientv3.KV // etcd kv API子集
lease clientv3.Lease // etcd lease API子集
jobName string // 任务名
cancelFunc context.CancelFunc // 终止租约自动续期的取消函数
leaseId clientv3.LeaseID // 租约ID
isLocked bool // 是否上锁成功
}
// 初始化一把锁
func InitJobLock(jobName string, kv clientv3.KV, lease clientv3.Lease) (jobLock *JobLock) {
jobLock = &JobLock{
jobName: jobName,
kv: kv,
lease: lease,
}
return
}
// 尝试上锁
func (jobLock *JobLock) TryLock() (err error) {
var (
leaseGrantResp *clientv3.LeaseGrantResponse
cancelCtx context.Context
cancelFunc context.CancelFunc
leaseId clientv3.LeaseID
keepRespChan <-chan *clientv3.LeaseKeepAliveResponse
txn clientv3.Txn
lockKey string
txnResp *clientv3.TxnResponse
)
// 1.创建租约(5秒)
if leaseGrantResp, err = jobLock.lease.Grant(context.TODO(), 5); err != nil {
return
}
// 用于取消自动续期的 cancelCtx, cancelFunc
cancelCtx, cancelFunc = context.WithCancel(context.TODO())
// 获取租约ID
leaseId = leaseGrantResp.ID
// 2.未执行完成,自动续租
if keepRespChan, err = jobLock.lease.KeepAlive(cancelCtx, leaseId); err != nil {
goto FAIL
}
// 3.处理续租自动应答
go func() {
var (
keepResp *clientv3.LeaseKeepAliveResponse
)
for {
select {
case keepResp = <-keepRespChan:
if keepResp == nil {
goto END
}
}
}
END:
}()
// 4.创建txn事务
txn = jobLock.kv.Txn(context.TODO())
// 锁路径
lockKey = common.JOB_LOCK_DIR
// 5.抢锁
txn.If(clientv3.Compare(clientv3.CreateRevision(lockKey), "=", 0)).
Then(clientv3.OpPut(lockKey, "locked", clientv3.WithLease(leaseId))).
Else(clientv3.OpGet(lockKey))
// 提交事务
if txnResp, err = txn.Commit(); err != nil {
goto FAIL
}
// 6.抢锁成功返回,失败释放租约
if !txnResp.Succeeded { // 抢锁失败
err = common.ERR_LOCK_ALREADY_REQUIRED
goto FAIL
}
// 抢锁成功
jobLock.leaseId = leaseId
jobLock.cancelFunc = cancelFunc
jobLock.isLocked = true
return
FAIL:
cancelFunc() // 取消自动续期
jobLock.lease.Revoke(context.TODO(), leaseId) // 释放租约
return
}
// 释放锁
func (jobLock *JobLock) Unlock() {
if jobLock.isLocked {
jobLock.cancelFunc()
jobLock.lease.Revoke(context.TODO(), jobLock.leaseId) // 释放租约
}
}