Skip to content

Commit

Permalink
RedissonRedLock implemented. #533
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Jun 27, 2016
1 parent fe3f3df commit db4fba6
Show file tree
Hide file tree
Showing 9 changed files with 369 additions and 63 deletions.
4 changes: 3 additions & 1 deletion src/main/java/org/redisson/RedissonFairLock.java
Expand Up @@ -61,11 +61,13 @@ protected RedissonLockEntry getEntry(long threadId) {
return PUBSUB.getEntry(getEntryName() + ":" + threadId); return PUBSUB.getEntry(getEntryName() + ":" + threadId);
} }


@Override
protected Future<RedissonLockEntry> subscribe(long threadId) { protected Future<RedissonLockEntry> subscribe(long threadId) {
return PUBSUB.subscribe(getEntryName() + ":" + threadId, return PUBSUB.subscribe(getEntryName() + ":" + threadId,
getChannelName() + ":" + getLockName(threadId), commandExecutor.getConnectionManager()); getChannelName() + ":" + getLockName(threadId), commandExecutor.getConnectionManager());
} }


@Override
protected void unsubscribe(Future<RedissonLockEntry> future, long threadId) { protected void unsubscribe(Future<RedissonLockEntry> future, long threadId) {
PUBSUB.unsubscribe(future.getNow(), getEntryName() + ":" + threadId, PUBSUB.unsubscribe(future.getNow(), getEntryName() + ":" + threadId,
getChannelName() + ":" + getLockName(threadId), commandExecutor.getConnectionManager()); getChannelName() + ":" + getLockName(threadId), commandExecutor.getConnectionManager());
Expand Down Expand Up @@ -212,7 +214,7 @@ public Condition newCondition() {
} }


@Override @Override
Future<Boolean> forceUnlockAsync() { public Future<Boolean> forceUnlockAsync() {
cancelExpirationRenewal(); cancelExpirationRenewal();
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// remove stale threads // remove stale threads
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/redisson/RedissonLock.java
Expand Up @@ -371,7 +371,8 @@ public void forceUnlock() {
get(forceUnlockAsync()); get(forceUnlockAsync());
} }


Future<Boolean> forceUnlockAsync() { @Override
public Future<Boolean> forceUnlockAsync() {
cancelExpirationRenewal(); cancelExpirationRenewal();
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('del', KEYS[1]) == 1) then " "if (redis.call('del', KEYS[1]) == 1) then "
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/redisson/RedissonReadLock.java
Expand Up @@ -115,7 +115,8 @@ public Condition newCondition() {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }


Future<Boolean> forceUnlockAsync() { @Override
public Future<Boolean> forceUnlockAsync() {
Future<Boolean> result = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, Future<Boolean> result = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hget', KEYS[1], 'mode') == 'read') then " + "if (redis.call('hget', KEYS[1], 'mode') == 'read') then " +
"redis.call('del', KEYS[1]); " + "redis.call('del', KEYS[1]); " +
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/redisson/RedissonWriteLock.java
Expand Up @@ -117,7 +117,8 @@ public Condition newCondition() {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }


Future<Boolean> forceUnlockAsync() { @Override
public Future<Boolean> forceUnlockAsync() {
Future<Boolean> result = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, Future<Boolean> result = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hget', KEYS[1], 'mode') == 'write') then " + "if (redis.call('hget', KEYS[1], 'mode') == 'write') then " +
"redis.call('del', KEYS[1]); " + "redis.call('del', KEYS[1]); " +
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/redisson/core/RLock.java
Expand Up @@ -114,6 +114,8 @@ public interface RLock extends Lock, RExpirable {
*/ */
int getHoldCount(); int getHoldCount();


Future<Boolean> forceUnlockAsync();

Future<Void> unlockAsync(); Future<Void> unlockAsync();


Future<Boolean> tryLockAsync(); Future<Boolean> tryLockAsync();
Expand Down
54 changes: 32 additions & 22 deletions src/main/java/org/redisson/core/RedissonMultiLock.java
Expand Up @@ -17,6 +17,7 @@


import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
Expand All @@ -36,7 +37,7 @@
import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.Promise;


/** /**
* Groups multiple independent locks and handles them as one lock. * Groups multiple independent locks and manages them as one lock.
* *
* @author Nikita Koksharov * @author Nikita Koksharov
* *
Expand Down Expand Up @@ -78,8 +79,9 @@ public void lock(long leaseTime, TimeUnit unit) {
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException { public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
Promise<Void> promise = ImmediateEventExecutor.INSTANCE.newPromise(); Promise<Void> promise = ImmediateEventExecutor.INSTANCE.newPromise();


long currentThreadId = Thread.currentThread().getId(); long currentThreadId = Thread.currentThread().getId();
lock(promise, 0, leaseTime, unit, locks, currentThreadId); Queue<RLock> lockedLocks = new ConcurrentLinkedQueue<RLock>();
lock(promise, 0, leaseTime, unit, locks, currentThreadId, lockedLocks);


promise.sync(); promise.sync();
} }
Expand All @@ -89,19 +91,19 @@ public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(-1, null); lockInterruptibly(-1, null);
} }


private void lock(final Promise<Void> promise, final long waitTime, final long leaseTime, final TimeUnit unit, final List<RLock> locks, final long currentThreadId) throws InterruptedException { private void lock(final Promise<Void> promise, final long waitTime, final long leaseTime, final TimeUnit unit,
final List<RLock> locks, final long currentThreadId, final Queue<RLock> lockedLocks) throws InterruptedException {
final AtomicInteger tryLockRequestsAmount = new AtomicInteger(); final AtomicInteger tryLockRequestsAmount = new AtomicInteger();
final Map<Future<Boolean>, RLock> tryLockFutures = new HashMap<Future<Boolean>, RLock>(locks.size()); final Map<Future<Boolean>, RLock> tryLockFutures = new HashMap<Future<Boolean>, RLock>(locks.size());


FutureListener<Boolean> listener = new FutureListener<Boolean>() { FutureListener<Boolean> listener = new FutureListener<Boolean>() {


AtomicReference<RLock> lockedLockHolder = new AtomicReference<RLock>(); AtomicReference<RLock> lockedLockHolder = new AtomicReference<RLock>();
AtomicReference<Throwable> failed = new AtomicReference<Throwable>(); AtomicReference<Throwable> failed = new AtomicReference<Throwable>();
Queue<RLock> lockedLocks = new ConcurrentLinkedQueue<RLock>();


@Override @Override
public void operationComplete(final Future<Boolean> future) throws Exception { public void operationComplete(final Future<Boolean> future) throws Exception {
if (!future.isSuccess()) { if (isLockFailed(future)) {
failed.compareAndSet(null, future.cause()); failed.compareAndSet(null, future.cause());
} }


Expand All @@ -116,7 +118,7 @@ public void operationComplete(final Future<Boolean> future) throws Exception {
} }


if (tryLockRequestsAmount.decrementAndGet() == 0) { if (tryLockRequestsAmount.decrementAndGet() == 0) {
if (lockedLockHolder.get() == null && failed.get() == null) { if (isAllLocksAcquired(lockedLockHolder, failed, lockedLocks)) {
promise.setSuccess(null); promise.setSuccess(null);
return; return;
} }
Expand All @@ -141,7 +143,8 @@ public void operationComplete(Future<Void> future) throws Exception {
} }


protected void tryLockAgain(final Promise<Void> promise, final long waitTime, final long leaseTime, protected void tryLockAgain(final Promise<Void> promise, final long waitTime, final long leaseTime,
final TimeUnit unit, final long currentThreadId, final Map<Future<Boolean>, RLock> tryLockFutures) { final TimeUnit unit, final long currentThreadId, final Map<Future<Boolean>, RLock> tryLockFutures) throws InterruptedException {
lockedLocks.clear();
if (failed.get() != null) { if (failed.get() != null) {
promise.setFailure(failed.get()); promise.setFailure(failed.get());
} else if (lockedLockHolder.get() != null) { } else if (lockedLockHolder.get() != null) {
Expand All @@ -154,20 +157,19 @@ public void operationComplete(Future<Void> future) throws Exception {
return; return;
} }


lockedLocks.add(lockedLock);
List<RLock> newLocks = new ArrayList<RLock>(tryLockFutures.values()); List<RLock> newLocks = new ArrayList<RLock>(tryLockFutures.values());
newLocks.remove(lockedLock); newLocks.remove(lockedLock);
lock(promise, waitTime, leaseTime, unit, newLocks, currentThreadId); lock(promise, waitTime, leaseTime, unit, newLocks, currentThreadId, lockedLocks);
} }
}); });
} else {
lock(promise, waitTime, leaseTime, unit, locks, currentThreadId, lockedLocks);
} }
} }
}; };


for (RLock lock : locks) { for (RLock lock : locks) {
if (lock.isHeldByCurrentThread()) {
continue;
}

tryLockRequestsAmount.incrementAndGet(); tryLockRequestsAmount.incrementAndGet();
Future<Boolean> future; Future<Boolean> future;
if (waitTime > 0 || leaseTime > 0) { if (waitTime > 0 || leaseTime > 0) {
Expand All @@ -185,31 +187,31 @@ public void operationComplete(Future<Void> future) throws Exception {


@Override @Override
public boolean tryLock() { public boolean tryLock() {
List<Future<Boolean>> tryLockFutures = new ArrayList<Future<Boolean>>(locks.size()); Map<RLock, Future<Boolean>> tryLockFutures = new HashMap<RLock, Future<Boolean>>(locks.size());
for (RLock lock : locks) { for (RLock lock : locks) {
tryLockFutures.add(lock.tryLockAsync()); tryLockFutures.put(lock, lock.tryLockAsync());
} }


return sync(tryLockFutures); return sync(tryLockFutures);
} }


private boolean sync(List<Future<Boolean>> tryLockFutures) { protected boolean sync(Map<RLock, Future<Boolean>> tryLockFutures) {
for (Future<Boolean> future : tryLockFutures) { for (Future<Boolean> future : tryLockFutures.values()) {
try { try {
if (!future.syncUninterruptibly().getNow()) { if (!future.syncUninterruptibly().getNow()) {
unlockInner(); unlockInner(tryLockFutures.keySet());
return false; return false;
} }
} catch (RuntimeException e) { } catch (RuntimeException e) {
unlockInner(); unlockInner(tryLockFutures.keySet());
throw e; throw e;
} }
} }


return true; return true;
} }


private void unlockInner() { protected void unlockInner(Collection<RLock> locks) {
List<Future<Void>> futures = new ArrayList<Future<Void>>(locks.size()); List<Future<Void>> futures = new ArrayList<Future<Void>>(locks.size());
for (RLock lock : locks) { for (RLock lock : locks) {
futures.add(lock.unlockAsync()); futures.add(lock.unlockAsync());
Expand All @@ -226,9 +228,9 @@ public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException
} }


public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
List<Future<Boolean>> tryLockFutures = new ArrayList<Future<Boolean>>(locks.size()); Map<RLock, Future<Boolean>> tryLockFutures = new HashMap<RLock, Future<Boolean>>(locks.size());
for (RLock lock : locks) { for (RLock lock : locks) {
tryLockFutures.add(lock.tryLockAsync(waitTime, leaseTime, unit)); tryLockFutures.put(lock, lock.tryLockAsync(waitTime, leaseTime, unit));
} }


return sync(tryLockFutures); return sync(tryLockFutures);
Expand All @@ -254,4 +256,12 @@ public Condition newCondition() {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }


protected boolean isLockFailed(Future<Boolean> future) {
return !future.isSuccess();
}

protected boolean isAllLocksAcquired(AtomicReference<RLock> lockedLockHolder, AtomicReference<Throwable> failed, Queue<RLock> lockedLocks) {
return lockedLockHolder.get() == null && failed.get() == null;
}

} }
100 changes: 100 additions & 0 deletions src/main/java/org/redisson/core/RedissonRedLock.java
@@ -0,0 +1,100 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;

import io.netty.util.concurrent.Future;

/**
* RedLock locking algorithm implementation for multiple locks.
* It manages all locks as one.
*
* @see <a href="http://redis.io/topics/distlock">http://redis.io/topics/distlock</a>
*
* @author Nikita Koksharov
*
*/
public class RedissonRedLock extends RedissonMultiLock {

/**
* Creates instance with multiple {@link RLock} objects.
* Each RLock object could be created by own Redisson instance.
*
* @param locks
*/
public RedissonRedLock(RLock... locks) {
super(locks);
}

protected boolean sync(Map<RLock, Future<Boolean>> tryLockFutures) {
Queue<RLock> lockedLocks = new ConcurrentLinkedQueue<RLock>();
RuntimeException latestException = null;
for (Entry<RLock, Future<Boolean>> entry : tryLockFutures.entrySet()) {
try {
if (entry.getValue().syncUninterruptibly().getNow()) {
lockedLocks.add(entry.getKey());
}
} catch (RuntimeException e) {
latestException = e;
}
}

if (lockedLocks.size() < minLocksAmount(locks)) {
unlock();
lockedLocks.clear();
if (latestException != null) {
throw latestException;
}
return false;
}

return true;
}

public void unlock() {
List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(locks.size());

for (RLock lock : locks) {
futures.add(lock.forceUnlockAsync());
}

for (Future<Boolean> future : futures) {
future.awaitUninterruptibly();
}
}

protected int minLocksAmount(final List<RLock> locks) {
return locks.size()/2 + 1;
}

@Override
protected boolean isLockFailed(Future<Boolean> future) {
return false;
}

@Override
protected boolean isAllLocksAcquired(AtomicReference<RLock> lockedLockHolder, AtomicReference<Throwable> failed, Queue<RLock> lockedLocks) {
return (lockedLockHolder.get() == null && failed.get() == null) || lockedLocks.size() >= minLocksAmount(locks);
}

}

0 comments on commit db4fba6

Please sign in to comment.