Skip to content

Commit

Permalink
GH-3655: Add automatically delete for Redis Locks
Browse files Browse the repository at this point in the history
Fixes #3655

* support automatically clean up cache
* RedisLockRegistry.capacity desc
  • Loading branch information
Meteorkor authored and artembilan committed Nov 10, 2021
1 parent 25100d1 commit e515132
Show file tree
Hide file tree
Showing 3 changed files with 217 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2020 the original author or authors.
* Copyright 2014-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,9 +19,10 @@
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -68,6 +69,7 @@
* @author Konstantin Yakimov
* @author Artem Bilan
* @author Vedran Pavic
* @author Unseok Kim
*
* @since 4.0
*
Expand All @@ -78,6 +80,8 @@ public final class RedisLockRegistry implements ExpirableLockRegistry, Disposabl

private static final long DEFAULT_EXPIRE_AFTER = 60000L;

private static final int DEFAULT_CAPACITY = 1_000_000;

private static final String OBTAIN_LOCK_SCRIPT =
"local lockClientId = redis.call('GET', KEYS[1])\n" +
"if lockClientId == ARGV[1] then\n" +
Expand All @@ -90,7 +94,15 @@ public final class RedisLockRegistry implements ExpirableLockRegistry, Disposabl
"return false";


private final Map<String, RedisLock> locks = new ConcurrentHashMap<>();
private final Map<String, RedisLock> locks =
new LinkedHashMap<String, RedisLock>(16, 0.75F, true) {

@Override
protected boolean removeEldestEntry(Entry<String, RedisLock> eldest) {
return size() > RedisLockRegistry.this.capacity;
}

};

private final String clientId = UUID.randomUUID().toString();

Expand All @@ -102,6 +114,8 @@ public final class RedisLockRegistry implements ExpirableLockRegistry, Disposabl

private final long expireAfter;

private int capacity = DEFAULT_CAPACITY;

/**
* An {@link ExecutorService} to call {@link StringRedisTemplate#delete} in
* the separate thread when the current one is interrupted.
Expand Down Expand Up @@ -152,21 +166,34 @@ public void setExecutor(Executor executor) {
this.executorExplicitlySet = true;
}

/**
* Set the capacity of cached locks.
* @param capacity The capacity of cached lock, (default 1_000_000).
* @since 5.5.6
*/
public void setCapacity(int capacity) {
this.capacity = capacity;
}

@Override
public Lock obtain(Object lockKey) {
Assert.isInstanceOf(String.class, lockKey);
String path = (String) lockKey;
return this.locks.computeIfAbsent(path, RedisLock::new);
synchronized (this.locks) {
return this.locks.computeIfAbsent(path, RedisLock::new);
}
}

@Override
public void expireUnusedOlderThan(long age) {
long now = System.currentTimeMillis();
this.locks.entrySet()
.removeIf((entry) -> {
RedisLock lock = entry.getValue();
return now - lock.getLockedAt() > age && !lock.isAcquiredInThisProcess();
});
synchronized (this.locks) {
this.locks.entrySet()
.removeIf((entry) -> {
RedisLock lock = entry.getValue();
return now - lock.getLockedAt() > age && !lock.isAcquiredInThisProcess();
});
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2019 the original author or authors.
* Copyright 2014-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,10 +24,13 @@

import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
Expand All @@ -51,6 +54,7 @@
* @author Konstantin Yakimov
* @author Artem Bilan
* @author Vedran Pavic
* @author Unseok Kim
*
* @since 4.0
*
Expand Down Expand Up @@ -435,9 +439,176 @@ public void testExpireNotChanged() throws Exception {
lock.unlock();
}

@Test
@RedisAvailable
public void concurrentObtainCapacityTest() throws InterruptedException {
final int KEY_CNT = 500;
final int CAPACITY_CNT = 179;
final int THREAD_CNT = 4;

final CountDownLatch countDownLatch = new CountDownLatch(THREAD_CNT);
final RedisConnectionFactory connectionFactory = getConnectionFactoryForTest();
final RedisLockRegistry registry = new RedisLockRegistry(connectionFactory, this.registryKey, 10000);
registry.setCapacity(CAPACITY_CNT);
final ExecutorService executorService = Executors.newFixedThreadPool(THREAD_CNT);

for (int i = 0; i < KEY_CNT; i++) {
int finalI = i;
executorService.submit(() -> {
countDownLatch.countDown();
try {
countDownLatch.await();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
String keyId = "foo:" + finalI;
Lock obtain = registry.obtain(keyId);
obtain.lock();
obtain.unlock();
});
}
executorService.shutdown();
executorService.awaitTermination(5, TimeUnit.SECONDS);

//capacity limit test
assertThat(TestUtils.getPropertyValue(registry, "locks", Map.class).size()).isEqualTo(CAPACITY_CNT);


registry.expireUnusedOlderThan(-1000);
assertThat(TestUtils.getPropertyValue(registry, "locks", Map.class).size()).isEqualTo(0);
}

@Test
@RedisAvailable
public void concurrentObtainRemoveOrderTest() throws InterruptedException {
final int THREAD_CNT = 2;
final int DUMMY_LOCK_CNT = 3;

final int CAPACITY_CNT = THREAD_CNT;

final CountDownLatch countDownLatch = new CountDownLatch(THREAD_CNT);
final RedisConnectionFactory connectionFactory = getConnectionFactoryForTest();
final RedisLockRegistry registry = new RedisLockRegistry(connectionFactory, this.registryKey, 10000);
registry.setCapacity(CAPACITY_CNT);
final ExecutorService executorService = Executors.newFixedThreadPool(THREAD_CNT);
final Queue<String> remainLockCheckQueue = new LinkedBlockingQueue<>();

//Removed due to capcity limit
for (int i = 0; i < DUMMY_LOCK_CNT; i++) {
Lock obtainLock0 = registry.obtain("foo:" + i);
obtainLock0.lock();
obtainLock0.unlock();
}

for (int i = DUMMY_LOCK_CNT; i < THREAD_CNT + DUMMY_LOCK_CNT; i++) {
int finalI = i;
executorService.submit(() -> {
countDownLatch.countDown();
try {
countDownLatch.await();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
String keyId = "foo:" + finalI;
remainLockCheckQueue.offer(keyId);
Lock obtain = registry.obtain(keyId);
obtain.lock();
obtain.unlock();
});
}

executorService.shutdown();
executorService.awaitTermination(5, TimeUnit.SECONDS);

assertThat(getRedisLockRegistryLocks(registry)).containsKeys(
remainLockCheckQueue.toArray(new String[remainLockCheckQueue.size()]));
}

@Test
@RedisAvailable
public void concurrentObtainAccessRemoveOrderTest() throws InterruptedException {
final int THREAD_CNT = 2;
final int DUMMY_LOCK_CNT = 3;

final int CAPACITY_CNT = THREAD_CNT + 1;
final String REMAIN_DUMMY_LOCK_KEY = "foo:1";

final CountDownLatch countDownLatch = new CountDownLatch(THREAD_CNT);
final RedisConnectionFactory connectionFactory = getConnectionFactoryForTest();
final RedisLockRegistry registry = new RedisLockRegistry(connectionFactory, this.registryKey, 10000);
registry.setCapacity(CAPACITY_CNT);
final ExecutorService executorService = Executors.newFixedThreadPool(THREAD_CNT);
final Queue<String> remainLockCheckQueue = new LinkedBlockingQueue<>();

//Removed due to capcity limit
for (int i = 0; i < DUMMY_LOCK_CNT; i++) {
Lock obtainLock0 = registry.obtain("foo:" + i);
obtainLock0.lock();
obtainLock0.unlock();
}

Lock obtainLock0 = registry.obtain(REMAIN_DUMMY_LOCK_KEY);
obtainLock0.lock();
obtainLock0.unlock();
remainLockCheckQueue.offer(REMAIN_DUMMY_LOCK_KEY);

for (int i = DUMMY_LOCK_CNT; i < THREAD_CNT + DUMMY_LOCK_CNT; i++) {
int finalI = i;
executorService.submit(() -> {
countDownLatch.countDown();
try {
countDownLatch.await();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
String keyId = "foo:" + finalI;
remainLockCheckQueue.offer(keyId);
Lock obtain = registry.obtain(keyId);
obtain.lock();
obtain.unlock();
});
}

executorService.shutdown();
executorService.awaitTermination(5, TimeUnit.SECONDS);

assertThat(getRedisLockRegistryLocks(registry)).containsKeys(
remainLockCheckQueue.toArray(new String[remainLockCheckQueue.size()]));
}

@Test
@RedisAvailable
public void setCapacityTest() {
final int CAPACITY_CNT = 4;
final RedisConnectionFactory connectionFactory = getConnectionFactoryForTest();
final RedisLockRegistry registry = new RedisLockRegistry(connectionFactory, this.registryKey, 10000);
registry.setCapacity(CAPACITY_CNT);

registry.obtain("foo:1");
registry.obtain("foo:2");
registry.obtain("foo:3");

//capacity 4->3
registry.setCapacity(CAPACITY_CNT - 1);

registry.obtain("foo:4");

assertThat(TestUtils.getPropertyValue(registry, "locks", Map.class).size()).isEqualTo(3);
assertThat(getRedisLockRegistryLocks(registry)).containsKeys("foo:2", "foo:3", "foo:4");

//capacity 3->4
registry.setCapacity(CAPACITY_CNT);
registry.obtain("foo:5");
assertThat(TestUtils.getPropertyValue(registry, "locks", Map.class).size()).isEqualTo(4);
assertThat(getRedisLockRegistryLocks(registry)).containsKeys("foo:3", "foo:4", "foo:5");
}

@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void ntestUlink() {
public void testUlink() {
RedisOperations ops = mock(RedisOperations.class);
Properties props = new Properties();
willReturn(props).given(ops).execute(any(RedisCallback.class));
Expand All @@ -464,4 +635,9 @@ private void waitForExpire(String key) throws Exception {
assertThat(n < 100).as(key + " key did not expire").isTrue();
}

@SuppressWarnings("unchecked")
private Map<String, Lock> getRedisLockRegistryLocks(RedisLockRegistry registry) {
return TestUtils.getPropertyValue(registry, "locks", Map.class);
}

}
3 changes: 3 additions & 0 deletions src/reference/asciidoc/redis.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -883,3 +883,6 @@ However, the resources protected by such a lock may have been compromised, so su
You should set the expiry at a large enough value to prevent this condition, but set it low enough that the lock can be recovered after a server failure in a reasonable amount of time.

Starting with version 5.0, the `RedisLockRegistry` implements `ExpirableLockRegistry`, which removes locks last acquired more than `age` ago and that are not currently locked.

String with version 5.5.6, the `RedisLockRegistry` is support automatically clean up cache for redisLocks in `RedisLockRegistry.locks` via `RedisLockRegistry.setCapacity()`.
See its JavaDocs for more information.

0 comments on commit e515132

Please sign in to comment.