Skip to content

Commit

Permalink
RMapAsync interface added. #186
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Jul 17, 2015
1 parent 0e0e1c1 commit 68a1c01
Show file tree
Hide file tree
Showing 8 changed files with 234 additions and 80 deletions.
85 changes: 50 additions & 35 deletions src/main/java/org/redisson/RedissonMap.java
Expand Up @@ -21,7 +21,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -30,10 +29,11 @@

import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.convertor.LongReplayConvertor;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.StringCodec;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.convertor.LongReplayConvertor;
import org.redisson.client.protocol.convertor.NumberConvertor;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.connection.ConnectionManager;
import org.redisson.core.Predicate;
Expand Down Expand Up @@ -61,8 +61,12 @@ protected RedissonMap(ConnectionManager connectionManager, String name) {

@Override
public int size() {
Long res = connectionManager.read(getName(), RedisCommands.HLEN, getName());
return res.intValue();
return connectionManager.get(sizeAsync());
}

@Override
public Future<Integer> sizeAsync() {
return connectionManager.readAsync(getName(), RedisCommands.HLEN, getName());
}

@Override
Expand All @@ -75,10 +79,27 @@ public boolean containsKey(Object key) {
return connectionManager.read(getName(), RedisCommands.HEXISTS, getName(), key);
}

@Override
public Future<Boolean> containsKeyAsync(Object key) {
return connectionManager.readAsync(getName(), RedisCommands.HEXISTS, getName(), key);
}

@Override
public boolean containsValue(Object value) {
Collection<V> list = values();
return list.contains(value);
return connectionManager.get(containsValueAsync(value));
}

@Override
public Future<Boolean> containsValueAsync(Object value) {
return connectionManager.evalReadAsync(getName(), new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
"local s = redis.call('hvals', KEYS[1]);" +
"for i = 0, table.getn(s), 1 do "
+ "if ARGV[1] == s[i] then "
+ "return true "
+ "end "
+ "end;" +
"return false",
Collections.<Object>singletonList(getName()), value);
}

@Override
Expand Down Expand Up @@ -133,13 +154,22 @@ public void clear() {

@Override
public Set<K> keySet() {
List<K> keys = connectionManager.read(getName(), RedisCommands.HKEYS, getName());
return new HashSet<K>(keys);
return connectionManager.get(keySetAsync());
}

@Override
public Future<Set<K>> keySetAsync() {
return connectionManager.readAsync(getName(), RedisCommands.HKEYS, getName());
}

@Override
public Collection<V> values() {
return connectionManager.read(getName(), RedisCommands.HVALS, getName());
return connectionManager.get(valuesAsync());
}

@Override
public Future<Collection<V>> valuesAsync() {
return connectionManager.readAsync(getName(), RedisCommands.HVALS, getName());
}

@Override
Expand Down Expand Up @@ -232,7 +262,7 @@ public boolean fastPut(K key, V value) {
}

@Override
public Future<Long> fastRemoveAsync(final K ... keys) {
public Future<Long> fastRemoveAsync(K ... keys) {
if (keys == null || keys.length == 0) {
return connectionManager.getGroup().next().newSucceededFuture(0L);
}
Expand Down Expand Up @@ -335,31 +365,16 @@ public Map<K, V> filterEntries(Predicate<Map.Entry<K, V>> predicate) {
}

@Override
public V addAndGet(K key, V value) {
String res = connectionManager.write(getName(), StringCodec.INSTANCE,
RedisCommands.HINCRBYFLOAT, getName(), key, new BigDecimal(value.toString()).toPlainString());
public V addAndGet(K key, Number value) {
return connectionManager.get(addAndGetAsync(key, value));
}

if (value instanceof Long) {
Object obj = Long.parseLong(res);
return (V)obj;
}
if (value instanceof Integer) {
Object obj = Integer.parseInt(res);
return (V)obj;
}
if (value instanceof Float) {
Object obj = Float.parseFloat(res);
return (V)obj;
}
if (value instanceof Double) {
Object obj = Double.parseDouble(res);
return (V)obj;
}
if (value instanceof BigDecimal) {
Object obj = new BigDecimal(res);
return (V)obj;
}
throw new IllegalStateException("Wrong value type!");
@Override
public Future<V> addAndGetAsync(K key, Number value) {
return connectionManager.writeAsync(getName(), StringCodec.INSTANCE,
new RedisCommand<Object>("HINCRBYFLOAT", new NumberConvertor(value.getClass())),
getName(), key, new BigDecimal(value.toString()).toPlainString());
}


}
4 changes: 4 additions & 0 deletions src/main/java/org/redisson/client/protocol/RedisCommand.java
Expand Up @@ -126,6 +126,10 @@ public RedisCommand(String name, Convertor<R> convertor, int encodeParamIndex, L
this.inParamType = inParamTypes;
}

public RedisCommand(String name, Convertor<R> convertor) {
this(name, convertor, -1);
}

public RedisCommand(String name, Convertor<R> convertor, int encodeParamIndex) {
this(name, null, null, null, encodeParamIndex);
this.convertor = convertor;
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/org/redisson/client/protocol/RedisCommands.java
Expand Up @@ -17,6 +17,7 @@

import java.util.List;
import java.util.Map;
import java.util.Set;

import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
Expand All @@ -30,6 +31,7 @@
import org.redisson.client.protocol.decoder.NestedMultiDecoder;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder;
import org.redisson.client.protocol.decoder.StringDataDecoder;
import org.redisson.client.protocol.decoder.StringListReplayDecoder;
import org.redisson.client.protocol.decoder.StringMapReplayDecoder;
Expand Down Expand Up @@ -110,8 +112,8 @@ public interface RedisCommands {
RedisCommand<Map<Object, Object>> HGETALL = new RedisCommand<Map<Object, Object>>("HGETALL", new ObjectMapReplayDecoder(), ValueType.MAP);
RedisCommand<List<Object>> HVALS = new RedisCommand<List<Object>>("HVALS", new ObjectListReplayDecoder(), ValueType.MAP_VALUE);
RedisCommand<Boolean> HEXISTS = new RedisCommand<Boolean>("HEXISTS", new BooleanReplayConvertor(), 2, ValueType.MAP_KEY);
RedisStrictCommand<Long> HLEN = new RedisStrictCommand<Long>("HLEN");
RedisCommand<List<Object>> HKEYS = new RedisCommand<List<Object>>("HKEYS", new ObjectListReplayDecoder(), ValueType.MAP_KEY);
RedisStrictCommand<Integer> HLEN = new RedisStrictCommand<Integer>("HLEN", new IntegerReplayConvertor());
RedisCommand<Set<Object>> HKEYS = new RedisCommand<Set<Object>>("HKEYS", new ObjectSetReplayDecoder(), ValueType.MAP_KEY);
RedisCommand<String> HMSET = new RedisCommand<String>("HMSET", new StringReplayDecoder(), 1, ValueType.MAP);
RedisCommand<List<Object>> HMGET = new RedisCommand<List<Object>>("HMGET", new ObjectListReplayDecoder(), 2, ValueType.MAP_KEY, ValueType.MAP_VALUE);
RedisCommand<Object> HGET = new RedisCommand<Object>("HGET", 2, ValueType.MAP_KEY, ValueType.MAP_VALUE);
Expand Down
@@ -0,0 +1,40 @@
package org.redisson.client.protocol.convertor;

import java.math.BigDecimal;

public class NumberConvertor extends SingleConvertor<Object> {

private Class<?> resultClass;

public NumberConvertor(Class<?> resultClass) {
super();
this.resultClass = resultClass;
}

@Override
public Object convert(Object result) {
String res = (String) result;
if (resultClass.isAssignableFrom(Long.class)) {
Object obj = Long.parseLong(res);
return obj;
}
if (resultClass.isAssignableFrom(Integer.class)) {
Object obj = Integer.parseInt(res);
return obj;
}
if (resultClass.isAssignableFrom(Float.class)) {
Object obj = Float.parseFloat(res);
return obj;
}
if (resultClass.isAssignableFrom(Double.class)) {
Object obj = Double.parseDouble(res);
return obj;
}
if (resultClass.isAssignableFrom(BigDecimal.class)) {
Object obj = new BigDecimal(res);
return obj;
}
throw new IllegalStateException("Wrong value type!");
}

}
@@ -0,0 +1,41 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

import io.netty.buffer.ByteBuf;

public class ObjectSetReplayDecoder implements MultiDecoder<Set<Object>> {

@Override
public Object decode(ByteBuf buf) {
throw new UnsupportedOperationException();
}

@Override
public Set<Object> decode(List<Object> parts) {
return new HashSet<Object>(parts);
}

@Override
public boolean isApplicable(int paramNum) {
return false;
}

}
45 changes: 2 additions & 43 deletions src/main/java/org/redisson/core/RMap.java
Expand Up @@ -15,8 +15,6 @@
*/
package org.redisson.core;

import io.netty.util.concurrent.Future;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -30,7 +28,7 @@
* @param <K> key
* @param <V> value
*/
public interface RMap<K, V> extends ConcurrentMap<K, V>, RExpirable {
public interface RMap<K, V> extends ConcurrentMap<K, V>, RMapAsync<K, V> {

/**
* Atomically adds the given <code>delta</code> to the current value
Expand All @@ -42,7 +40,7 @@ public interface RMap<K, V> extends ConcurrentMap<K, V>, RExpirable {
* @param delta the value to add
* @return the updated value
*/
V addAndGet(K key, V delta);
V addAndGet(K key, Number delta);

/**
* Gets a map slice contains the mappings with defined <code>keys</code>
Expand Down Expand Up @@ -100,31 +98,6 @@ public interface RMap<K, V> extends ConcurrentMap<K, V>, RExpirable {
*/
long fastRemove(K ... keys);

/**
* Removes <code>keys</code> from map by one operation in async manner
*
* Works faster than <code>RMap.removeAsync</code> but not returning
* the value associated with <code>key</code>
*
* @param keys
* @return the number of keys that were removed from the hash, not including specified but non existing keys
*/
Future<Long> fastRemoveAsync(K ... keys);

/**
* Associates the specified <code>value</code> with the specified <code>key</code>
* in async manner.
*
* Works faster than <code>RMap.putAsync</code> but not returning
* the previous value associated with <code>key</code>
*
* @param key
* @param value
* @return <code>true</code> if key is a new key in the hash and value was set.
* <code>false</code> if key already exists in the hash and the value was updated.
*/
Future<Boolean> fastPutAsync(K key, V value);

/**
* Associates the specified <code>value</code> with the specified <code>key</code>.
*
Expand All @@ -138,18 +111,4 @@ public interface RMap<K, V> extends ConcurrentMap<K, V>, RExpirable {
*/
boolean fastPut(K key, V value);

Future<V> getAsync(K key);

Future<V> putAsync(K key, V value);

Future<V> removeAsync(K key);

Future<V> replaceAsync(K key, V value);

Future<Boolean> replaceAsync(K key, V oldValue, V newValue);

Future<Long> removeAsync(Object key, Object value);

Future<V> putIfAbsentAsync(K key, V value);

}

0 comments on commit 68a1c01

Please sign in to comment.