Skip to content

Commit

Permalink
RDequeAsync interface added. #186
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Jul 20, 2015
1 parent 6afd7cd commit 27be4c6
Show file tree
Hide file tree
Showing 7 changed files with 270 additions and 21 deletions.
118 changes: 101 additions & 17 deletions src/main/java/org/redisson/RedissonDeque.java
Expand Up @@ -16,13 +16,19 @@
package org.redisson;

import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;

import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.TrueReplayConvertor;
import org.redisson.client.protocol.convertor.VoidReplayConvertor;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.decoder.ListFirstObjectDecoder;
import org.redisson.core.RDeque;

import io.netty.util.concurrent.Future;

/**
* Distributed and concurrent implementation of {@link java.util.Queue}
*
Expand All @@ -32,20 +38,37 @@
*/
public class RedissonDeque<V> extends RedissonQueue<V> implements RDeque<V> {

private static final RedisCommand<Void> LPUSH_VOID = new RedisCommand<Void>("LPUSH", new VoidReplayConvertor());
private static final RedisCommand<Boolean> LPUSH_BOOLEAN = new RedisCommand<Boolean>("LPUSH", new TrueReplayConvertor());
private static final RedisCommand<Void> RPUSH_VOID = new RedisCommand<Void>("RPUSH", new VoidReplayConvertor(), 2, ValueType.OBJECTS);
private static final RedisCommand<Object> LRANGE_SINGLE = new RedisCommand<Object>("LRANGE", new ListFirstObjectDecoder());


protected RedissonDeque(ConnectionManager connectionManager, String name) {
super(connectionManager, name);
}

@Override
public void addFirst(V e) {
connectionManager.write(getName(), RedisCommands.LPUSH, getName(), e);
connectionManager.get(addFirstAsync(e));
}

@Override
public Future<Void> addFirstAsync(V e) {
return connectionManager.writeAsync(getName(), LPUSH_VOID, getName(), e);
}

@Override
public void addLast(V e) {
connectionManager.write(getName(), RedisCommands.RPUSH, getName(), e);
connectionManager.get(addLastAsync(e));
}

@Override
public Future<Void> addLastAsync(V e) {
return connectionManager.writeAsync(getName(), RPUSH_VOID, getName(), e);
}


@Override
public Iterator<V> descendingIterator() {
return new Iterator<V>() {
Expand Down Expand Up @@ -82,74 +105,135 @@ public void remove() {
};
}

@Override
public Future<V> getLastAsync() {
return connectionManager.readAsync(getName(), LRANGE_SINGLE, getName(), -1, -1);
}

@Override
public V getLast() {
List<V> list = connectionManager.read(getName(), RedisCommands.LRANGE, getName(), -1, -1);
if (list.isEmpty()) {
V result = connectionManager.get(getLastAsync());
if (result == null) {
throw new NoSuchElementException();
}
return list.get(0);
return result;
}

@Override
public boolean offerFirst(V e) {
connectionManager.write(getName(), RedisCommands.LPUSH, getName(), e);
return true;
return connectionManager.get(offerFirstAsync(e));
}

@Override
public Future<Boolean> offerFirstAsync(V e) {
return connectionManager.writeAsync(getName(), LPUSH_BOOLEAN, getName(), e);
}

@Override
public Future<Boolean> offerLastAsync(V e) {
return offerAsync(e);
}

@Override
public boolean offerLast(V e) {
return offer(e);
return connectionManager.get(offerLastAsync(e));
}

@Override
public Future<V> peekFirstAsync() {
return getAsync(0);
}

@Override
public V peekFirst() {
return peek();
return connectionManager.get(peekFirstAsync());
}

@Override
public Future<V> peekLastAsync() {
return getLastAsync();
}

@Override
public V peekLast() {
List<V> list = connectionManager.read(getName(), RedisCommands.LRANGE, getName(), -1, -1);
if (list.isEmpty()) {
return null;
}
return list.get(0);
return connectionManager.get(getLastAsync());
}

@Override
public Future<V> pollFirstAsync() {
return pollAsync();
}

@Override
public V pollFirst() {
return poll();
}

@Override
public Future<V> pollLastAsync() {
return connectionManager.writeAsync(getName(), RedisCommands.RPOP, getName());
}


@Override
public V pollLast() {
return connectionManager.write(getName(), RedisCommands.RPOP, getName());
return connectionManager.get(pollLastAsync());
}

@Override
public Future<V> popAsync() {
return pollAsync();
}

@Override
public V pop() {
return removeFirst();
}

@Override
public Future<Void> pushAsync(V e) {
return addFirstAsync(e);
}

@Override
public void push(V e) {
addFirst(e);
}

@Override
public Future<Boolean> removeFirstOccurrenceAsync(Object o) {
return removeAsync(o, 1);
}

@Override
public boolean removeFirstOccurrence(Object o) {
return remove(o, 1);
}

@Override
public Future<V> removeFirstAsync() {
return pollAsync();
}

@Override
public Future<V> removeLastAsync() {
return connectionManager.writeAsync(getName(), RedisCommands.RPOP, getName());
}

@Override
public V removeLast() {
V value = connectionManager.write(getName(), RedisCommands.RPOP, getName());
V value = connectionManager.get(removeLastAsync());
if (value == null) {
throw new NoSuchElementException();
}
return value;
}

@Override
public Future<Boolean> removeLastOccurrenceAsync(Object o) {
return removeAsync(o, -1);
}

@Override
public boolean removeLastOccurrence(Object o) {
return remove(o, -1);
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/redisson/RedissonList.java
Expand Up @@ -276,7 +276,7 @@ public V get(int index) {
return getValue(index);
}

private V getValue(int index) {
V getValue(int index) {
return connectionManager.get(getAsync(index));
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/redisson/RedissonQueue.java
Expand Up @@ -47,7 +47,7 @@ public Future<Boolean> offerAsync(V e) {
}

public V getFirst() {
V value = connectionManager.read(getName(), RedisCommands.LINDEX, getName(), 0);
V value = getValue(0);
if (value == null) {
throw new NoSuchElementException();
}
Expand Down
@@ -0,0 +1,45 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.connection.decoder;

import java.util.List;

import org.redisson.client.handler.State;
import org.redisson.client.protocol.decoder.MultiDecoder;

import io.netty.buffer.ByteBuf;

public class ListFirstObjectDecoder implements MultiDecoder<Object> {

@Override
public Object decode(ByteBuf buf, State state) {
throw new UnsupportedOperationException();
}

@Override
public Object decode(List<Object> parts, State state) {
if (!parts.isEmpty()) {
return parts.get(0);
}
return null;
}

@Override
public boolean isApplicable(int paramNum, State state) {
return false;
}

}
3 changes: 2 additions & 1 deletion src/main/java/org/redisson/core/RDeque.java
Expand Up @@ -24,6 +24,7 @@
*
* @param <V> the type of elements held in this collection
*/
public interface RDeque<V> extends Deque<V>, RQueue<V> {
public interface RDeque<V> extends Deque<V>, RQueue<V>, RDequeAsync<V> {


}
59 changes: 59 additions & 0 deletions src/main/java/org/redisson/core/RDequeAsync.java
@@ -0,0 +1,59 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;

import io.netty.util.concurrent.Future;

/**
* {@link java.util.Deque} backed by Redis
*
* @author Nikita Koksharov
*
* @param <V> the type of elements held in this collection
*/
public interface RDequeAsync<V> extends RQueueAsync<V> {

Future<Boolean> removeLastOccurrenceAsync(Object o);

Future<V> removeLastAsync();

Future<V> removeFirstAsync();

Future<Boolean> removeFirstOccurrenceAsync(Object o);

Future<Void> pushAsync(V e);

Future<V> popAsync();

Future<V> pollLastAsync();

Future<V> pollFirstAsync();

Future<V> peekLastAsync();

Future<V> peekFirstAsync();

Future<Boolean> offerLastAsync(V e);

Future<V> getLastAsync();

Future<Void> addLastAsync(V e);

Future<Void> addFirstAsync(V e);

Future<Boolean> offerFirstAsync(V e);

}

0 comments on commit 27be4c6

Please sign in to comment.