Skip to content

Commit

Permalink
BlockingFairQueue implementation #695
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Nov 10, 2016
1 parent 452cdf3 commit 2b8d524
Show file tree
Hide file tree
Showing 7 changed files with 496 additions and 84 deletions.
12 changes: 12 additions & 0 deletions redisson/src/main/java/org/redisson/Redisson.java
Expand Up @@ -29,6 +29,7 @@
import org.redisson.api.RBinaryStream;
import org.redisson.api.RBitSet;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RBlockingFairQueue;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RBloomFilter;
import org.redisson.api.RBoundedBlockingQueue;
Expand Down Expand Up @@ -92,6 +93,7 @@ public class Redisson implements RedissonClient {
RedissonReference.warmUp();
}

protected final QueueTransferService queueTransferService = new QueueTransferService();
protected final EvictionScheduler evictionScheduler;
protected final CommandExecutor commandExecutor;
protected final ConnectionManager connectionManager;
Expand Down Expand Up @@ -423,6 +425,16 @@ public <M> RPatternTopic<M> getPatternTopic(String pattern, Codec codec) {
return new RedissonPatternTopic<M>(codec, commandExecutor, pattern);
}

@Override
public <V> RBlockingFairQueue<V> getBlockingFairQueue(String name) {
return new RedissonBlockingFairQueue<V>(queueTransferService, commandExecutor, name, id);
}

@Override
public <V> RBlockingFairQueue<V> getBlockingFairQueue(String name, Codec codec) {
return new RedissonBlockingFairQueue<V>(queueTransferService, codec, commandExecutor, name, id);
}

@Override
public <V> RQueue<V> getQueue(String name) {
return new RedissonQueue<V>(commandExecutor, name);
Expand Down
340 changes: 340 additions & 0 deletions redisson/src/main/java/org/redisson/RedissonBlockingFairQueue.java
@@ -0,0 +1,340 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;

import java.util.UUID;
import java.util.concurrent.TimeUnit;

import org.redisson.api.RBlockingFairQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RFuture;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.misc.RPromise;

import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;

/**
*
* @author Nikita Koksharov
*
*/
public class RedissonBlockingFairQueue<V> extends RedissonBlockingQueue<V> implements RBlockingFairQueue<V> {

private final RedissonFairLock fairLock;
private final QueueTransferService queueTransferService;

protected RedissonBlockingFairQueue(QueueTransferService queueTransferService, CommandExecutor commandExecutor, String name, UUID id) {
super(commandExecutor, name);
String lockName = prefixName("redisson_bfq_lock", name);
fairLock = new RedissonFairLock(commandExecutor, lockName, id);
this.queueTransferService = queueTransferService;
}

protected RedissonBlockingFairQueue(QueueTransferService queueTransferService, Codec codec, CommandExecutor commandExecutor, String name, UUID id) {
super(codec, commandExecutor, name);
String lockName = prefixName("redisson_bfq_lock", name);
fairLock = new RedissonFairLock(commandExecutor, lockName, id);
this.queueTransferService = queueTransferService;
}

@Override
public RFuture<Boolean> deleteAsync() {
return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), fairLock.getName(), fairLock.getThreadsQueueName(), fairLock.getTimeoutSetName());
}

@Override
public V take() throws InterruptedException {
fairLock.lockInterruptibly();
try {
return super.take();
} finally {
fairLock.unlock();
}
}

@Override
public RFuture<V> takeAsync() {
final RPromise<V> promise = newPromise();
final long threadId = Thread.currentThread().getId();
RFuture<Void> lockFuture = fairLock.lockAsync();
lockFuture.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}

final RFuture<V> takeFuture = takeAsync();
takeFuture.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
RFuture<Void> unlockFuture = fairLock.unlockAsync(threadId);
unlockFuture.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}

if (!takeFuture.isSuccess()) {
promise.tryFailure(takeFuture.cause());
return;
}

promise.trySuccess(takeFuture.getNow());
}
});
}
});
}
});

return promise;
}

@Override
public V poll() {
if (fairLock.tryLock()) {
try {
return super.poll();
} finally {
fairLock.unlock();
}
}
return null;
}

@Override
public RFuture<V> pollAsync() {
final RPromise<V> promise = newPromise();
final long threadId = Thread.currentThread().getId();
RFuture<Boolean> tryLockFuture = fairLock.tryLockAsync();
tryLockFuture.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}

if (future.getNow()) {
final RFuture<V> pollFuture = RedissonBlockingFairQueue.super.pollAsync();
pollFuture.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
RFuture<Void> unlockFuture = fairLock.unlockAsync(threadId);
unlockFuture.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}

if (!pollFuture.isSuccess()) {
promise.tryFailure(pollFuture.cause());
return;
}

promise.trySuccess(pollFuture.getNow());
}
});
}
});
} else {
promise.trySuccess(null);
}
}
});

return promise;
}


@Override
public V poll(long timeout, TimeUnit unit) throws InterruptedException {
long startTime = System.currentTimeMillis();
if (fairLock.tryLock(timeout, unit)) {
try {
long spentTime = System.currentTimeMillis() - startTime;
long remainTime = unit.toMillis(timeout) - spentTime;
if (remainTime > 0) {
return super.poll(remainTime, TimeUnit.MILLISECONDS);
}
return null;
} finally {
fairLock.unlock();
}
}
return null;
}

@Override
public RFuture<V> pollAsync(final long timeout, final TimeUnit unit) {
final long startTime = System.currentTimeMillis();
final RPromise<V> promise = newPromise();
final long threadId = Thread.currentThread().getId();
RFuture<Boolean> tryLockFuture = fairLock.tryLockAsync(timeout, unit);
tryLockFuture.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}

if (future.getNow()) {
long spentTime = System.currentTimeMillis() - startTime;
long remainTime = unit.toMillis(timeout) - spentTime;
if (remainTime > 0) {
final RFuture<V> pollFuture = RedissonBlockingFairQueue.super.pollAsync(remainTime, TimeUnit.MILLISECONDS);
pollFuture.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
RFuture<Void> unlockFuture = fairLock.unlockAsync(threadId);
unlockFuture.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}

if (!pollFuture.isSuccess()) {
promise.tryFailure(pollFuture.cause());
return;
}

promise.trySuccess(pollFuture.getNow());
}
});
}
});
} else {
RFuture<Void> unlockFuture = fairLock.unlockAsync(threadId);
unlockFuture.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}

promise.trySuccess(null);
}
});
}
} else {
promise.trySuccess(null);
}
}
});

return promise;
}

@Override
public V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException {
long startTime = System.currentTimeMillis();
if (fairLock.tryLock(timeout, unit)) {
try {
long spentTime = System.currentTimeMillis() - startTime;
long remainTime = unit.toMillis(timeout) - spentTime;
if (remainTime > 0) {
return super.pollLastAndOfferFirstTo(queueName, remainTime, TimeUnit.MILLISECONDS);
}
return null;
} finally {
fairLock.unlock();
}
}
return null;
}

@Override
public RFuture<V> pollLastAndOfferFirstToAsync(final String queueName, final long timeout, final TimeUnit unit) {
final long startTime = System.currentTimeMillis();
final RPromise<V> promise = newPromise();
final long threadId = Thread.currentThread().getId();
RFuture<Boolean> tryLockFuture = fairLock.tryLockAsync(timeout, unit);
tryLockFuture.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}

if (future.getNow()) {
long spentTime = System.currentTimeMillis() - startTime;
long remainTime = unit.toMillis(timeout) - spentTime;
if (remainTime > 0) {
final RFuture<V> pollFuture = RedissonBlockingFairQueue.super.pollLastAndOfferFirstToAsync(queueName, remainTime, TimeUnit.MILLISECONDS);
pollFuture.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
RFuture<Void> unlockFuture = fairLock.unlockAsync(threadId);
unlockFuture.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}

if (!pollFuture.isSuccess()) {
promise.tryFailure(pollFuture.cause());
return;
}

promise.trySuccess(pollFuture.getNow());
}
});
}
});
} else {
RFuture<Void> unlockFuture = fairLock.unlockAsync(threadId);
unlockFuture.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}

promise.trySuccess(null);
}
});
}
} else {
promise.trySuccess(null);
}
}
});

return promise;
}

public RDelayedQueue<V> getDealyedQueue() {
return null;
}

}

3 comments on commit 2b8d524

@smigfu
Copy link

@smigfu smigfu commented on 2b8d524 Nov 10, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add the missing QueueTransferService? I get a compile error if syncing to main.

@mrniko
Copy link
Member

@mrniko mrniko commented on 2b8d524 Nov 10, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@smigfu Sorry. I have added it

@smigfu
Copy link

@smigfu smigfu commented on 2b8d524 Nov 10, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks 👍

Please sign in to comment.