Skip to content

Commit

Permalink
RSetAsync interface added. #186
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Jul 17, 2015
1 parent 5f2185b commit ad5385a
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 46 deletions.
116 changes: 85 additions & 31 deletions src/main/java/org/redisson/RedissonSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;

import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.connection.ConnectionManager;
import org.redisson.core.RSet;
Expand All @@ -43,7 +46,12 @@ protected RedissonSet(ConnectionManager connectionManager, String name) {

@Override
public int size() {
return ((Long)connectionManager.read(getName(), RedisCommands.SCARD, getName())).intValue();
return connectionManager.get(sizeAsync());
}

@Override
public Future<Integer> sizeAsync() {
return connectionManager.readAsync(getName(), RedisCommands.SCARD, getName());
}

@Override
Expand All @@ -53,7 +61,12 @@ public boolean isEmpty() {

@Override
public boolean contains(Object o) {
return connectionManager.read(getName(), RedisCommands.SISMEMBER, getName(), o);
return connectionManager.get(containsAsync(o));
}

@Override
public Future<Boolean> containsAsync(Object o) {
return connectionManager.readAsync(getName(), RedisCommands.SISMEMBER, getName(), o);
}

private ListScanResult<V> scanIterator(long startPos) {
Expand Down Expand Up @@ -111,15 +124,20 @@ public void remove() {
};
}

@Override
public Future<Collection<V>> readAllAsync() {
return connectionManager.readAsync(getName(), RedisCommands.SMEMBERS, getName());
}

@Override
public Object[] toArray() {
List<Object> res = connectionManager.read(getName(), RedisCommands.SMEMBERS, getName());
List<Object> res = (List<Object>) connectionManager.get(readAllAsync());
return res.toArray();
}

@Override
public <T> T[] toArray(T[] a) {
List<Object> res = connectionManager.read(getName(), RedisCommands.SMEMBERS, getName());
List<Object> res = (List<Object>) connectionManager.get(readAllAsync());
return res.toArray(a);
}

Expand All @@ -134,8 +152,8 @@ public Future<Boolean> addAsync(V e) {
}

@Override
public Future<Boolean> removeAsync(V e) {
return connectionManager.writeAsync(getName(), RedisCommands.SREM_SINGLE, getName(), e);
public Future<Boolean> removeAsync(Object o) {
return connectionManager.writeAsync(getName(), RedisCommands.SREM_SINGLE, getName(), o);
}

@Override
Expand All @@ -145,49 +163,85 @@ public boolean remove(Object value) {

@Override
public boolean containsAll(Collection<?> c) {
for (Object object : c) {
if (!contains(object)) {
return false;
}
}
return true;
return connectionManager.get(containsAllAsync(c));
}

@Override
public boolean addAll(final Collection<? extends V> c) {
public Future<Boolean> containsAllAsync(Collection<?> c) {
return connectionManager.evalReadAsync(getName(), new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
"local s = redis.call('smembers', KEYS[1]);" +
"for i = 0, table.getn(s), 1 do " +
"for j = 0, table.getn(ARGV), 1 do "
+ "if ARGV[j] == s[i] "
+ "then table.remove(ARGV, j) end "
+ "end; "
+ "end;"
+ "return table.getn(ARGV) == 0; ",
Collections.<Object>singletonList(getName()), c.toArray());
}

@Override
public boolean addAll(Collection<? extends V> c) {
if (c.isEmpty()) {
return false;
}

return connectionManager.get(addAllAsync(c));
}

@Override
public Future<Boolean> addAllAsync(Collection<? extends V> c) {
List<Object> args = new ArrayList<Object>(c.size() + 1);
args.add(getName());
args.addAll(c);
Long res = connectionManager.write(getName(), RedisCommands.SADD, args.toArray());
return res > 0;
return connectionManager.writeAsync(getName(), RedisCommands.SADD, args.toArray());
}

@Override
public boolean retainAll(Collection<?> c) {
List<V> toRemove = new ArrayList<V>();
for (V object : this) {
if (!c.contains(object)) {
toRemove.add(object);
}
}
return removeAll(toRemove);
return connectionManager.get(retainAllAsync(c));
}

@Override
public boolean removeAll(final Collection<?> c) {
if (c.isEmpty()) {
return false;
}
public Future<Boolean> retainAllAsync(Collection<?> c) {
return connectionManager.evalWriteAsync(getName(), new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
"local changed = false " +
"local s = redis.call('smembers', KEYS[1]) "
+ "local i = 0 "
+ "while i <= table.getn(s) do "
+ "local element = s[i] "
+ "local isInAgrs = false "
+ "for j = 0, table.getn(ARGV), 1 do "
+ "if ARGV[j] == element then "
+ "isInAgrs = true "
+ "break "
+ "end "
+ "end "
+ "if isInAgrs == false then "
+ "redis.call('SREM', KEYS[1], element) "
+ "changed = true "
+ "end "
+ "i = i + 1 "
+ "end "
+ "return changed ",
Collections.<Object>singletonList(getName()), c.toArray());
}

List<Object> args = new ArrayList<Object>(c.size() + 1);
args.add(getName());
args.addAll(c);
Long res = connectionManager.write(getName(), RedisCommands.SREM, args.toArray());
return res > 0;
@Override
public Future<Boolean> removeAllAsync(Collection<?> c) {
return connectionManager.evalWriteAsync(getName(), new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
"local v = false " +
"for i = 0, table.getn(ARGV), 1 do "
+ "if redis.call('srem', KEYS[1], ARGV[i]) == 1 "
+ "then v = true end "
+"end "
+ "return v ",
Collections.<Object>singletonList(getName()), c.toArray());
}

@Override
public boolean removeAll(Collection<?> c) {
return connectionManager.get(removeAllAsync(c));
}

@Override
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/org/redisson/client/protocol/RedisCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Set;

import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.convertor.BooleanAmountReplayConvertor;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.convertor.IntegerReplayConvertor;
import org.redisson.client.protocol.convertor.VoidReplayConvertor;
Expand All @@ -46,13 +47,13 @@ public interface RedisCommands {
RedisCommand<List<Object>> EXEC = new RedisCommand<List<Object>>("EXEC", new ObjectListReplayDecoder());

RedisCommand<Long> SREM = new RedisCommand<Long>("SREM", 2, ValueType.OBJECTS);
RedisCommand<Long> SADD = new RedisCommand<Long>("SADD", 2, ValueType.OBJECTS);
RedisCommand<Boolean> SADD = new RedisCommand<Boolean>("SADD", new BooleanAmountReplayConvertor(), 2, ValueType.OBJECTS);
RedisCommand<Boolean> SADD_SINGLE = new RedisCommand<Boolean>("SADD", new BooleanReplayConvertor(), 2);
RedisCommand<Boolean> SREM_SINGLE = new RedisCommand<Boolean>("SREM", new BooleanReplayConvertor(), 2);
RedisCommand<List<Object>> SMEMBERS = new RedisCommand<List<Object>>("SMEMBERS", new ObjectListReplayDecoder());
RedisCommand<ListScanResult<Object>> SSCAN = new RedisCommand<ListScanResult<Object>>("SSCAN", new NestedMultiDecoder(new ObjectListReplayDecoder(), new ListScanResultReplayDecoder()), ValueType.MAP);
RedisCommand<Boolean> SISMEMBER = new RedisCommand<Boolean>("SISMEMBER", new BooleanReplayConvertor(), 2);
RedisStrictCommand<Long> SCARD = new RedisStrictCommand<Long>("SCARD");
RedisStrictCommand<Integer> SCARD = new RedisStrictCommand<Integer>("SCARD", new IntegerReplayConvertor());

RedisCommand<Void> LSET = new RedisCommand<Void>("LSET", new VoidReplayConvertor(), 3);
RedisCommand<Object> LPOP = new RedisCommand<Object>("LPOP");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.convertor;

public class BooleanAmountReplayConvertor extends SingleConvertor<Boolean> {

@Override
public Boolean convert(Object obj) {
return (Long)obj > 0;
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.convertor;

public class BooleanNumberReplayConvertor extends SingleConvertor<Boolean> {

@Override
public Boolean convert(Object obj) {
return (Long)obj != -1;
}


}
5 changes: 3 additions & 2 deletions src/main/java/org/redisson/core/RCollectionAsync.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.redisson.core;

import java.util.Collection;
import java.util.List;

import io.netty.util.concurrent.Future;

Expand All @@ -11,11 +10,13 @@ public interface RCollectionAsync<V> extends RExpirableAsync {

Future<Boolean> removeAllAsync(Collection<?> c);

Future<Boolean> containsAsync(Object o);

Future<Boolean> containsAllAsync(Collection<?> c);

Future<Boolean> removeAsync(Object o);

Future<List<V>> readAllAsync();
Future<Collection<V>> readAllAsync();

Future<Integer> sizeAsync();

Expand Down
8 changes: 1 addition & 7 deletions src/main/java/org/redisson/core/RSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package org.redisson.core;

import io.netty.util.concurrent.Future;

import java.util.Set;

/**
Expand All @@ -26,10 +24,6 @@
*
* @param <V> value
*/
public interface RSet<V> extends Set<V>, RExpirable {
public interface RSet<V> extends Set<V>, RExpirable, RSetAsync<V> {

Future<Boolean> addAsync(V value);

Future<Boolean> removeAsync(V value);

}
27 changes: 27 additions & 0 deletions src/main/java/org/redisson/core/RSetAsync.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;

/**
* Async set functions
*
* @author Nikita Koksharov
*
* @param <V> value
*/
public interface RSetAsync<V> extends RCollectionAsync<V> {

}
12 changes: 8 additions & 4 deletions src/test/java/org/redisson/RedissonSetTest.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
package org.redisson;

import io.netty.util.concurrent.Future;

import java.io.Serializable;
import java.util.*;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutionException;

import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.core.RSet;
import org.redisson.core.RSortedSet;

import io.netty.util.concurrent.Future;

public class RedissonSetTest extends BaseTest {

Expand Down Expand Up @@ -191,6 +194,7 @@ public void testContainsAll() {
set.add(i);
}

Assert.assertTrue(set.containsAll(Collections.emptyList()));
Assert.assertTrue(set.containsAll(Arrays.asList(30, 11)));
Assert.assertFalse(set.containsAll(Arrays.asList(30, 711, 11)));
}
Expand Down

0 comments on commit ad5385a

Please sign in to comment.