Skip to content

Commit

Permalink
failed RFairLock.tryLock attempt retains caller thread in fairLock qu…
Browse files Browse the repository at this point in the history
…eue #757
  • Loading branch information
Nikita committed Jan 30, 2017
1 parent f92cd4a commit 1b9ec19
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 14 deletions.
8 changes: 8 additions & 0 deletions redisson/src/main/java/org/redisson/RedissonFairLock.java
Expand Up @@ -72,6 +72,14 @@ protected void unsubscribe(RFuture<RedissonLockEntry> future, long threadId) {
getChannelName() + ":" + getLockName(threadId), commandExecutor.getConnectionManager());
}

@Override
protected RFuture<Void> acquireFailedAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID,
"redis.call('zrem', KEYS[2], ARGV[1]); " +
"redis.call('lrem', KEYS[1], 0, ARGV[1]); ",
Arrays.<Object>asList(getThreadsQueueName(), getTimeoutSetName()), getLockName(threadId));
}

@Override
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
Expand Down
50 changes: 39 additions & 11 deletions redisson/src/main/java/org/redisson/RedissonLock.java
Expand Up @@ -110,19 +110,19 @@ public void lockInterruptibly() throws InterruptedException {

@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
Long ttl = tryAcquire(leaseTime, unit);
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}

long threadId = Thread.currentThread().getId();
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);

try {
while (true) {
ttl = tryAcquire(leaseTime, unit);
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
Expand All @@ -141,8 +141,8 @@ public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedE
// get(lockAsync(leaseTime, unit));
}

private Long tryAcquire(long leaseTime, TimeUnit unit) {
return get(tryAcquireAsync(leaseTime, unit, Thread.currentThread().getId()));
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}

private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, final long threadId) {
Expand Down Expand Up @@ -258,20 +258,29 @@ <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, R
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

private void acquireFailed(long threadId) {
get(acquireFailedAsync(threadId));
}

protected RFuture<Void> acquireFailedAsync(long threadId) {
return newSucceededFuture(null);
}

@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
final long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit);
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}

time -= (System.currentTimeMillis() - current);
if (time <= 0) {
acquireFailed(threadId);
return false;
}

Expand All @@ -288,25 +297,28 @@ public void operationComplete(Future<RedissonLockEntry> future) throws Exception
}
});
}
acquireFailed(threadId);
return false;
}

try {
time -= (System.currentTimeMillis() - current);
if (time <= 0) {
acquireFailed(threadId);
return false;
}

while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(leaseTime, unit);
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}

time -= (System.currentTimeMillis() - currentTime);
if (time <= 0) {
acquireFailed(threadId);
return false;
}

Expand All @@ -320,6 +332,7 @@ public void operationComplete(Future<RedissonLockEntry> future) throws Exception

time -= (System.currentTimeMillis() - currentTime);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
}
Expand Down Expand Up @@ -636,7 +649,7 @@ public void operationComplete(Future<Long> future) throws Exception {
time.addAndGet(-elapsed);

if (time.get() <= 0) {
result.trySuccess(false);
trySuccessFalse(currentThreadId, result);
return;
}

Expand Down Expand Up @@ -667,19 +680,33 @@ public void operationComplete(Future<RedissonLockEntry> future) throws Exception
public void run(Timeout timeout) throws Exception {
if (!subscribeFuture.isDone()) {
subscribeFuture.cancel(false);
result.trySuccess(false);
trySuccessFalse(currentThreadId, result);
}
}
}, time.get(), TimeUnit.MILLISECONDS);
futureRef.set(scheduledFuture);
}
}

});


return result;
}

private void trySuccessFalse(final long currentThreadId, final RPromise<Boolean> result) {
acquireFailedAsync(currentThreadId).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (future.isSuccess()) {
result.trySuccess(false);
} else {
result.tryFailure(future.cause());
}
}
});
}

private void tryLockAsync(final AtomicLong time, final long leaseTime, final TimeUnit unit,
final RFuture<RedissonLockEntry> subscribeFuture, final RPromise<Boolean> result, final long currentThreadId) {
if (result.isDone()) {
Expand All @@ -689,7 +716,7 @@ private void tryLockAsync(final AtomicLong time, final long leaseTime, final Tim

if (time.get() <= 0) {
unsubscribe(subscribeFuture, currentThreadId);
result.trySuccess(false);
trySuccessFalse(currentThreadId, result);
return;
}

Expand Down Expand Up @@ -719,7 +746,7 @@ public void operationComplete(Future<Long> future) throws Exception {

if (time.get() <= 0) {
unsubscribe(subscribeFuture, currentThreadId);
result.trySuccess(false);
trySuccessFalse(currentThreadId, result);
return;
}

Expand Down Expand Up @@ -776,3 +803,4 @@ public void run(Timeout timeout) throws Exception {


}
;
61 changes: 58 additions & 3 deletions redisson/src/test/java/org/redisson/RedissonFairLockTest.java
Expand Up @@ -15,6 +15,63 @@

public class RedissonFairLockTest extends BaseConcurrentTest {

@Test
public void testTryLockNonDelayed() throws InterruptedException {
String LOCK_NAME = "SOME_LOCK";

Thread t1 = new Thread(() -> {
RLock fairLock = redisson.getFairLock(LOCK_NAME);
try {
if (fairLock.tryLock(0, TimeUnit.SECONDS)) {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
Assert.fail("Unable to acquire lock for some reason");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
fairLock.unlock();
}
});

Thread t2 = new Thread(() -> {
try {
Thread.sleep(200L);
} catch (InterruptedException e) {
e.printStackTrace();
}
RLock fairLock = redisson.getFairLock(LOCK_NAME);
try {
if (fairLock.tryLock(200, TimeUnit.MILLISECONDS)) {
Assert.fail("Should not be inside second block");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
fairLock.unlock();
}
});

t1.start();
t2.start();

t1.join();
t2.join();

RLock fairLock = redisson.getFairLock(LOCK_NAME);
try {
if (!fairLock.tryLock(0, TimeUnit.SECONDS)) {
Assert.fail("Could not get unlocked lock " + LOCK_NAME);
}
} finally {
fairLock.unlock();
}
}

@Test
public void testTryLockWait() throws InterruptedException {
testSingleInstanceConcurrency(1, r -> {
Expand Down Expand Up @@ -58,14 +115,12 @@ public void testExpire() throws InterruptedException {
Thread t = new Thread() {
public void run() {
RLock lock1 = redisson.getFairLock("lock");
System.out.println("0");
lock1.lock();
System.out.println("1");

long spendTime = System.currentTimeMillis() - startTime;
System.out.println(spendTime);
Assert.assertTrue(spendTime < 2020);
lock1.unlock();
System.out.println("3");
};
};

Expand Down

0 comments on commit 1b9ec19

Please sign in to comment.