-
Notifications
You must be signed in to change notification settings - Fork 106
分布式锁
位于
CategoryServiceImpl
两个关键点:
set nx ex
原子加锁。lua
脚本原子解锁。
- Redis 实现分布式锁的命令
setnx
(Redis 原生命令)、set ex.. nx
- Redis 实现分布式锁的关键:原子添加、原子删除(Java API)
// 对应 Redis 的 set ex nx 原子命令
Boolean lockResult = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS);
public Map<String, List<Catelog2VO>> getCatalogJsonFromDBWithRedisLock() throws InterruptedException {
// 1 Redis 占位
String uuid = UUID.randomUUID().toString();
Boolean lockResult = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS);
if (lockResult) {
// 2 加锁成功 执行业务
Map<String, List<Catelog2VO>> dataFromDB;
try {
dataFromDB = getDataFromDB();
} finally {
// 原子操作删除锁
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
stringRedisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Collections.singletonList("lock"), uuid);
}
} else {
// 3 加锁失败 睡眠 100ms 后重试
Thread.sleep(100);
return getCatalogJsonFromDBWithRedisLock();
}
}
官方GitHub Wiki
Redisson 是 Redis 中的 Redlock(Distributed locks with Redis)的 Java 实现。
摘:
“This page is an attempt to provide a more canonical algorithm to implement distributed locks with Redis. We propose an algorithm, called Redlock, which implements a DLM which we believe to be safer than the vanilla single instance approach. We hope that the community will analyze it, provide feedback, and use it as a starting point for the implementations or more complex or alternative designs.”
Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet
, Set
, Multimap
, SortedSet
, Map
, List
, Queue
, BlockingQueue
, Deque
, BlockingDeque
, Semaphore
, Lock
, AtomicLong
, CountDownLatch
, Publish / Subscribe
, Bloom filter
, Remote service
, Spring cache
, Executor service
, Live Object service
, Scheduler service
) Redisson 提供了使用 Redis 的最简单和最便捷的方法。Redisson 的宗旨是促进使用者对 Redis 的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
8. 分布式锁和同步器
- 导入
maven
依赖 - 配置Bean
@Configuration
public class MyRedissonConfig {
@Bean(destroyMethod = "shutdown")
public RedissonClient redissonClient() throws IOException {
// 默认连接地址 127.0.0.1:6379
// RedissonClient redisson = Redisson.create();
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
RedissonClient redisson = Redisson.create(config);
return redisson;
}
}
- 无缝接合
J.U.C
包下的锁
本项目中的使用
public Map<String, List<Catelog2VO>> getCatalogJsonFromDBWithRedissonLock() {
RLock lock = redisson.getLock("catalogJson-lock");
lock.lock();
Map<String, List<Catelog2VO>> dataFromDB;
try {
dataFromDB = getDataFromDB();
} finally {
lock.unlock();
}
return dataFromDB;
}
Redisson 解决了两大问题
- 锁的自动续期,如果业务执行时间很长,运行期间会自动给锁续上新的 30s。不用担心业务时间过长导致锁被删除。
- 只要加锁的业务运行结束,便不会自动续期,即使不手动解锁,锁也会在 30s 后自动解除。
Redisson 采用阻塞式等待,可以有效避免死锁。
RedissonLock.lock()
@Override
public void lock() {
try {
// 默认的不传时间,第一个参数为 -1
// 也可以自己传入续约时间,但是Redisson将不会再继续执行续约工作
// 注意看下面 tryAcquireAsync() 方法
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
重载的另一个lock(long leaseTime, TimeUnit unit, boolean interruptibly)
- 如果手动指定超时时间:看门狗将会失效,就发送
Lua
脚本给Redis
执行,进行占锁,默认超时时间为手动的指定时间; - 如果未指定超时时间:就使用默认值 30 * 1000
- 默认值:``commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
- 只要占锁成功,就会启动一个定时任务
TimerTask
用于设置新的过期时间,这个新的过期时间名为internalLockLeaseTime
,刚好就是👆的默认值。 - 续约时间:
internalLockLeaseTime / 3
,即 10s。
- 默认值:``commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
最佳实战:即使看门狗有自动续约的好处,但是实战中仍然推荐使用手动设置续约时间,比如 30s,然后手动解锁。
如果一个业务时间执行时间超过 30s,那这样的业务就不应该持续存在,最后在一定的业务时间后进行手动释放。
重点关注下面的 while(true)
循环,使用tryAcquire(xx, xx, xx)
方法尝试获取锁,直到获取锁为止。
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
}
try {
while (true) {
// ********注意这里********
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
try {
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
future.getNow().getLatch().acquire();
} else {
future.getNow().getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
// ********注意这里********
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
tryAcquireAsync
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
// ********注意这里********
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
tryLockInnerAsync
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
scheduleExpirationRenewal(threadId)
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
// ********注意这里********
renewExpiration();
}
}
renewExpiration()
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// ********注意这里********
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
// reschedule itself
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
// ********注意👆,自动续约时间,看门狗时间的 1/3 ********
ee.setTimeout(task);
}
renewExpirationAsync()
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
分布式电商系统,用来学习分布式相关理论知识与技术的试验平台,欢迎补充与PR。