From 27be4c6e0f1e904fb71c272b61d542cd50d4d645 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 20 Jul 2015 14:24:17 +0300 Subject: [PATCH] RDequeAsync interface added. #186 --- src/main/java/org/redisson/RedissonDeque.java | 118 +++++++++++++++--- src/main/java/org/redisson/RedissonList.java | 2 +- src/main/java/org/redisson/RedissonQueue.java | 2 +- .../decoder/ListFirstObjectDecoder.java | 45 +++++++ src/main/java/org/redisson/core/RDeque.java | 3 +- .../java/org/redisson/core/RDequeAsync.java | 59 +++++++++ .../java/org/redisson/RedissonDequeTest.java | 62 ++++++++- 7 files changed, 270 insertions(+), 21 deletions(-) create mode 100644 src/main/java/org/redisson/connection/decoder/ListFirstObjectDecoder.java create mode 100644 src/main/java/org/redisson/core/RDequeAsync.java diff --git a/src/main/java/org/redisson/RedissonDeque.java b/src/main/java/org/redisson/RedissonDeque.java index 56c106d32ab..097c088ceee 100644 --- a/src/main/java/org/redisson/RedissonDeque.java +++ b/src/main/java/org/redisson/RedissonDeque.java @@ -16,13 +16,19 @@ package org.redisson; import java.util.Iterator; -import java.util.List; import java.util.NoSuchElementException; +import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; +import org.redisson.client.protocol.convertor.TrueReplayConvertor; +import org.redisson.client.protocol.convertor.VoidReplayConvertor; import org.redisson.connection.ConnectionManager; +import org.redisson.connection.decoder.ListFirstObjectDecoder; import org.redisson.core.RDeque; +import io.netty.util.concurrent.Future; + /** * Distributed and concurrent implementation of {@link java.util.Queue} * @@ -32,20 +38,37 @@ */ public class RedissonDeque extends RedissonQueue implements RDeque { + private static final RedisCommand LPUSH_VOID = new RedisCommand("LPUSH", new VoidReplayConvertor()); + private static final RedisCommand LPUSH_BOOLEAN = new RedisCommand("LPUSH", new TrueReplayConvertor()); + private static final RedisCommand RPUSH_VOID = new RedisCommand("RPUSH", new VoidReplayConvertor(), 2, ValueType.OBJECTS); + private static final RedisCommand LRANGE_SINGLE = new RedisCommand("LRANGE", new ListFirstObjectDecoder()); + + protected RedissonDeque(ConnectionManager connectionManager, String name) { super(connectionManager, name); } @Override public void addFirst(V e) { - connectionManager.write(getName(), RedisCommands.LPUSH, getName(), e); + connectionManager.get(addFirstAsync(e)); + } + + @Override + public Future addFirstAsync(V e) { + return connectionManager.writeAsync(getName(), LPUSH_VOID, getName(), e); } @Override public void addLast(V e) { - connectionManager.write(getName(), RedisCommands.RPUSH, getName(), e); + connectionManager.get(addLastAsync(e)); } + @Override + public Future addLastAsync(V e) { + return connectionManager.writeAsync(getName(), RPUSH_VOID, getName(), e); + } + + @Override public Iterator descendingIterator() { return new Iterator() { @@ -82,38 +105,63 @@ public void remove() { }; } + @Override + public Future getLastAsync() { + return connectionManager.readAsync(getName(), LRANGE_SINGLE, getName(), -1, -1); + } + @Override public V getLast() { - List list = connectionManager.read(getName(), RedisCommands.LRANGE, getName(), -1, -1); - if (list.isEmpty()) { + V result = connectionManager.get(getLastAsync()); + if (result == null) { throw new NoSuchElementException(); } - return list.get(0); + return result; } @Override public boolean offerFirst(V e) { - connectionManager.write(getName(), RedisCommands.LPUSH, getName(), e); - return true; + return connectionManager.get(offerFirstAsync(e)); + } + + @Override + public Future offerFirstAsync(V e) { + return connectionManager.writeAsync(getName(), LPUSH_BOOLEAN, getName(), e); + } + + @Override + public Future offerLastAsync(V e) { + return offerAsync(e); } @Override public boolean offerLast(V e) { - return offer(e); + return connectionManager.get(offerLastAsync(e)); + } + + @Override + public Future peekFirstAsync() { + return getAsync(0); } @Override public V peekFirst() { - return peek(); + return connectionManager.get(peekFirstAsync()); + } + + @Override + public Future peekLastAsync() { + return getLastAsync(); } @Override public V peekLast() { - List list = connectionManager.read(getName(), RedisCommands.LRANGE, getName(), -1, -1); - if (list.isEmpty()) { - return null; - } - return list.get(0); + return connectionManager.get(getLastAsync()); + } + + @Override + public Future pollFirstAsync() { + return pollAsync(); } @Override @@ -121,9 +169,20 @@ public V pollFirst() { return poll(); } + @Override + public Future pollLastAsync() { + return connectionManager.writeAsync(getName(), RedisCommands.RPOP, getName()); + } + + @Override public V pollLast() { - return connectionManager.write(getName(), RedisCommands.RPOP, getName()); + return connectionManager.get(pollLastAsync()); + } + + @Override + public Future popAsync() { + return pollAsync(); } @Override @@ -131,25 +190,50 @@ public V pop() { return removeFirst(); } + @Override + public Future pushAsync(V e) { + return addFirstAsync(e); + } + @Override public void push(V e) { addFirst(e); } + @Override + public Future removeFirstOccurrenceAsync(Object o) { + return removeAsync(o, 1); + } + @Override public boolean removeFirstOccurrence(Object o) { return remove(o, 1); } + @Override + public Future removeFirstAsync() { + return pollAsync(); + } + + @Override + public Future removeLastAsync() { + return connectionManager.writeAsync(getName(), RedisCommands.RPOP, getName()); + } + @Override public V removeLast() { - V value = connectionManager.write(getName(), RedisCommands.RPOP, getName()); + V value = connectionManager.get(removeLastAsync()); if (value == null) { throw new NoSuchElementException(); } return value; } + @Override + public Future removeLastOccurrenceAsync(Object o) { + return removeAsync(o, -1); + } + @Override public boolean removeLastOccurrence(Object o) { return remove(o, -1); diff --git a/src/main/java/org/redisson/RedissonList.java b/src/main/java/org/redisson/RedissonList.java index 5ab6894f296..d63301df87a 100644 --- a/src/main/java/org/redisson/RedissonList.java +++ b/src/main/java/org/redisson/RedissonList.java @@ -276,7 +276,7 @@ public V get(int index) { return getValue(index); } - private V getValue(int index) { + V getValue(int index) { return connectionManager.get(getAsync(index)); } diff --git a/src/main/java/org/redisson/RedissonQueue.java b/src/main/java/org/redisson/RedissonQueue.java index bdbf94bee4c..0df9cdaf27e 100644 --- a/src/main/java/org/redisson/RedissonQueue.java +++ b/src/main/java/org/redisson/RedissonQueue.java @@ -47,7 +47,7 @@ public Future offerAsync(V e) { } public V getFirst() { - V value = connectionManager.read(getName(), RedisCommands.LINDEX, getName(), 0); + V value = getValue(0); if (value == null) { throw new NoSuchElementException(); } diff --git a/src/main/java/org/redisson/connection/decoder/ListFirstObjectDecoder.java b/src/main/java/org/redisson/connection/decoder/ListFirstObjectDecoder.java new file mode 100644 index 00000000000..96ce789ff95 --- /dev/null +++ b/src/main/java/org/redisson/connection/decoder/ListFirstObjectDecoder.java @@ -0,0 +1,45 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.connection.decoder; + +import java.util.List; + +import org.redisson.client.handler.State; +import org.redisson.client.protocol.decoder.MultiDecoder; + +import io.netty.buffer.ByteBuf; + +public class ListFirstObjectDecoder implements MultiDecoder { + + @Override + public Object decode(ByteBuf buf, State state) { + throw new UnsupportedOperationException(); + } + + @Override + public Object decode(List parts, State state) { + if (!parts.isEmpty()) { + return parts.get(0); + } + return null; + } + + @Override + public boolean isApplicable(int paramNum, State state) { + return false; + } + +} diff --git a/src/main/java/org/redisson/core/RDeque.java b/src/main/java/org/redisson/core/RDeque.java index ceefbc2b04f..96a3d707498 100644 --- a/src/main/java/org/redisson/core/RDeque.java +++ b/src/main/java/org/redisson/core/RDeque.java @@ -24,6 +24,7 @@ * * @param the type of elements held in this collection */ -public interface RDeque extends Deque, RQueue { +public interface RDeque extends Deque, RQueue, RDequeAsync { + } diff --git a/src/main/java/org/redisson/core/RDequeAsync.java b/src/main/java/org/redisson/core/RDequeAsync.java new file mode 100644 index 00000000000..46ee2cf650d --- /dev/null +++ b/src/main/java/org/redisson/core/RDequeAsync.java @@ -0,0 +1,59 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.core; + +import io.netty.util.concurrent.Future; + +/** + * {@link java.util.Deque} backed by Redis + * + * @author Nikita Koksharov + * + * @param the type of elements held in this collection + */ +public interface RDequeAsync extends RQueueAsync { + + Future removeLastOccurrenceAsync(Object o); + + Future removeLastAsync(); + + Future removeFirstAsync(); + + Future removeFirstOccurrenceAsync(Object o); + + Future pushAsync(V e); + + Future popAsync(); + + Future pollLastAsync(); + + Future pollFirstAsync(); + + Future peekLastAsync(); + + Future peekFirstAsync(); + + Future offerLastAsync(V e); + + Future getLastAsync(); + + Future addLastAsync(V e); + + Future addFirstAsync(V e); + + Future offerFirstAsync(V e); + +} diff --git a/src/test/java/org/redisson/RedissonDequeTest.java b/src/test/java/org/redisson/RedissonDequeTest.java index b02cbb34e1e..9ccb56362d2 100644 --- a/src/test/java/org/redisson/RedissonDequeTest.java +++ b/src/test/java/org/redisson/RedissonDequeTest.java @@ -13,6 +13,66 @@ public class RedissonDequeTest extends BaseTest { + @Test + public void testRemoveLastOccurrence() { + RDeque queue1 = redisson.getDeque("deque1"); + queue1.addFirst(3); + queue1.addFirst(1); + queue1.addFirst(2); + queue1.addFirst(3); + + queue1.removeLastOccurrence(3); + + MatcherAssert.assertThat(queue1, Matchers.containsInAnyOrder(3, 2, 1)); + } + + @Test + public void testRemoveFirstOccurrence() { + RDeque queue1 = redisson.getDeque("deque1"); + queue1.addFirst(3); + queue1.addFirst(1); + queue1.addFirst(2); + queue1.addFirst(3); + + queue1.removeFirstOccurrence(3); + + MatcherAssert.assertThat(queue1, Matchers.containsInAnyOrder(2, 1, 3)); + } + + @Test + public void testRemoveLast() { + RDeque queue1 = redisson.getDeque("deque1"); + queue1.addFirst(1); + queue1.addFirst(2); + queue1.addFirst(3); + + Assert.assertEquals(1, (int)queue1.removeLast()); + Assert.assertEquals(2, (int)queue1.removeLast()); + Assert.assertEquals(3, (int)queue1.removeLast()); + } + + @Test + public void testRemoveFirst() { + RDeque queue1 = redisson.getDeque("deque1"); + queue1.addFirst(1); + queue1.addFirst(2); + queue1.addFirst(3); + + Assert.assertEquals(3, (int)queue1.removeFirst()); + Assert.assertEquals(2, (int)queue1.removeFirst()); + Assert.assertEquals(1, (int)queue1.removeFirst()); + } + + @Test + public void testPeek() { + RDeque queue1 = redisson.getDeque("deque1"); + Assert.assertNull(queue1.peekFirst()); + Assert.assertNull(queue1.peekLast()); + queue1.addFirst(2); + Assert.assertEquals(2, (int)queue1.peekFirst()); + Assert.assertEquals(2, (int)queue1.peekLast()); + } + @Test public void testPollLastAndOfferFirstTo() { RDeque queue1 = redisson.getDeque("deque1"); @@ -28,7 +88,7 @@ public void testPollLastAndOfferFirstTo() { queue1.pollLastAndOfferFirstTo(queue2); MatcherAssert.assertThat(queue2, Matchers.contains(3, 4, 5, 6)); } - + @Test public void testAddFirstOrigin() { Deque queue = new ArrayDeque();