Skip to content

Commit

Permalink
use LUA scripts for transactional operations
Browse files Browse the repository at this point in the history
  • Loading branch information
ako-ts committed Jul 3, 2015
1 parent 4e83f4e commit 98c21fb
Show file tree
Hide file tree
Showing 31 changed files with 704 additions and 566 deletions.
31 changes: 26 additions & 5 deletions src/main/java/com/lambdaworks/redis/RedisAsyncConnection.java
Expand Up @@ -36,11 +36,7 @@

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -312,6 +308,31 @@ public <T> Future<T> eval(V script, ScriptOutputType type, List<K> keys, V... va
return dispatch(EVAL, output, args);
}

public <T> Future<T> evalR(V script, ScriptOutputType type, List<K> keys, List<?> values, List<?> rawValues) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
args.add(script.toString()).add(keys.size()).addKeys(keys);
for (Object value : values) {
args.addMapValue((V) value);
}
for (Object value : rawValues) {
if (value instanceof String) {
args.add((String) value);
} else if (value instanceof Integer) {
args.add((Integer) value);
} else if (value instanceof Long) {
args.add((Long) value);
} else if (value instanceof Double) {
args.add((Double) value);
} else if (value instanceof byte[]) {
args.add((byte[]) value);
} else {
throw new IllegalArgumentException("Unsupported raw value type: " + value.getClass());
}
}
CommandOutput<K, V, T> output = newScriptOutput(codec, type);
return dispatch(EVAL, output, args);
}

public <T> Future<T> evalsha(String digest, ScriptOutputType type, List<K> keys, V... values) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
args.add(digest).add(keys.size()).addKeys(keys).addMapValues(values);
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/lambdaworks/redis/ScriptOutputType.java
Expand Up @@ -10,6 +10,7 @@
* <li>{@link #INTEGER} 64-bit integer</li>
* <li>{@link #STATUS} status string</li>
* <li>{@link #VALUE} value</li>
* <li>{@link #MAPVALUE} typed value</li>
* <li>{@link #MULTI} of these types</li>.
* </ul>
*
Expand Down
Expand Up @@ -5,7 +5,9 @@
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;

/**
* 64-bit integer output, may be null.
Expand All @@ -24,6 +26,6 @@ public void set(long integer) {

@Override
public void set(ByteBuffer bytes) {
output = null;
output = bytes == null ? null : new Long(decodeAscii(bytes));
}
}
Expand Up @@ -16,7 +16,7 @@ public MapScanOutput(RedisCodec<K, V> codec) {
@Override
public void set(ByteBuffer bytes) {
if (output.getPos() == null) {
output.setPos(((Number) codec.decodeValue(bytes)).longValue());
output.setPos(toLong(bytes));
} else {
if (counter % 2 == 0) {
output.addValue(codec.decodeMapValue(bytes));
Expand All @@ -27,4 +27,9 @@ public void set(ByteBuffer bytes) {
counter++;
}

private Long toLong(ByteBuffer bytes) {
return bytes == null ? null : new Long(new String(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.limit()));
}


}
Expand Up @@ -14,10 +14,14 @@ public ValueSetScanOutput(RedisCodec<K, V> codec) {
@Override
public void set(ByteBuffer bytes) {
if (output.getPos() == null) {
output.setPos(Long.valueOf(codec.decodeMapValue(bytes).toString()));
output.setPos(toLong(bytes));
} else {
output.addValue(codec.decodeMapValue(bytes));
}
}

private Long toLong(ByteBuffer bytes) {
return bytes == null ? null : new Long(new String(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.limit()));
}

}
20 changes: 20 additions & 0 deletions src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java
Expand Up @@ -3,8 +3,10 @@
package com.lambdaworks.redis.protocol;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufProcessor;
import io.netty.channel.*;
import io.netty.util.CharsetUtil;
import io.netty.util.internal.StringUtil;

import java.util.concurrent.BlockingQueue;

Expand Down Expand Up @@ -63,10 +65,28 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
ByteBuf buf = ctx.alloc().heapBuffer();
cmd.encode(buf);
// System.out.println("out: " + buf.toString(CharsetUtil.UTF_8));
// System.out.println("out: " + toHexString(buf));

ctx.write(buf, promise);
}

private String toHexString(ByteBuf buf) {
final StringBuilder builder = new StringBuilder(buf.readableBytes() * 2);
buf.forEachByte(new ByteBufProcessor() {
@Override
public boolean process(byte value) throws Exception {
char b = (char) value;
if ((b < ' ' && b != '\n' && b != '\r') || b > '~') {
builder.append("\\x").append(StringUtil.byteToHexStringPadded(value));
} else {
builder.append(b);
}
return true;
}
});
return builder.toString();
}

protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException {
while (true) {
Command<K, V, ?> cmd = queue.peek();
Expand Down
96 changes: 29 additions & 67 deletions src/main/java/org/redisson/RedissonAtomicLong.java
Expand Up @@ -24,6 +24,11 @@

import com.lambdaworks.redis.RedisAsyncConnection;
import com.lambdaworks.redis.RedisConnection;
import org.redisson.core.RScript;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;

/**
* Distributed alternative to the {@link java.util.concurrent.atomic.AtomicLong}
Expand All @@ -49,26 +54,12 @@ protected Future<Long> execute(RedisAsyncConnection<Object, Object> async) {

@Override
public boolean compareAndSet(final long expect, final long update) {
return connectionManager.write(getName(), new SyncOperation<Object, Boolean>() {
@Override
public Boolean execute(RedisConnection<Object, Object> conn) {
while (true) {
conn.watch(getName());

Long value = getLongSafe(conn);
if (value != expect) {
conn.unwatch();
return false;
}

conn.multi();
conn.set(getName(), update);
if (conn.exec().size() == 1) {
return true;
}
}
}
});
ArrayList<Object> keys = new ArrayList<Object>();
keys.add(getName());
return new RedissonScript(connectionManager).evalR(
"if redis.call('get', KEYS[1]) == ARGV[1] then redis.call('set', KEYS[1], ARGV[2]); return true else return false end",
RScript.ReturnType.BOOLEAN,
keys, Collections.EMPTY_LIST, Arrays.asList(expect, update));
}

@Override
Expand All @@ -88,51 +79,22 @@ public long get() {

@Override
public long getAndAdd(final long delta) {
return connectionManager.write(getName(), new SyncOperation<Object, Long>() {
@Override
public Long execute(RedisConnection<Object, Object> conn) {
while (true) {
conn.watch(getName());

Long value = getLongSafe(conn);

conn.multi();
conn.set(getName(), value + delta);
if (conn.exec().size() == 1) {
return value;
}
}
}

});
}

private Long getLongSafe(RedisConnection<Object, Object> conn) {
Number n = (Number) conn.get(getName());
if (n != null) {
return n.longValue();
}
return 0L;
ArrayList<Object> keys = new ArrayList<Object>();
keys.add(getName());
return new RedissonScript(connectionManager).evalR(
"local v = redis.call('get', KEYS[1]) or 0; redis.call('set', KEYS[1], v + ARGV[1]); return tonumber(v)",
RScript.ReturnType.INTEGER,
keys, Collections.EMPTY_LIST, Collections.singletonList(delta));
}

@Override
public long getAndSet(final long newValue) {
return connectionManager.write(getName(), new SyncOperation<Object, Long>() {
@Override
public Long execute(RedisConnection<Object, Object> conn) {
while (true) {
conn.watch(getName());

Long value = getLongSafe(conn);

conn.multi();
conn.set(getName(), newValue);
if (conn.exec().size() == 1) {
return value;
}
}
}
});
ArrayList<Object> keys = new ArrayList<Object>();
keys.add(getName());
return new RedissonScript(connectionManager).evalR(
"local v = redis.call('get', KEYS[1]) or 0; redis.call('set', KEYS[1], ARGV[1]); return tonumber(v)",
RScript.ReturnType.INTEGER,
keys, Collections.EMPTY_LIST, Collections.singletonList(newValue));
}

@Override
Expand All @@ -156,12 +118,12 @@ public long getAndDecrement() {

@Override
public void set(final long newValue) {
connectionManager.write(getName(), new ResultOperation<String, Object>() {
@Override
protected Future<String> execute(RedisAsyncConnection<Object, Object> async) {
return async.set(getName(), newValue);
}
});
ArrayList<Object> keys = new ArrayList<Object>();
keys.add(getName());
new RedissonScript(connectionManager).evalR(
"redis.call('set', KEYS[1], ARGV[1])",
RScript.ReturnType.STATUS,
keys, Collections.EMPTY_LIST, Collections.singletonList(newValue));
}

public String toString() {
Expand Down
66 changes: 29 additions & 37 deletions src/main/java/org/redisson/RedissonBlockingQueue.java
Expand Up @@ -15,6 +15,7 @@
*/
package org.redisson;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand All @@ -28,6 +29,7 @@
import org.redisson.core.RBlockingQueue;

import com.lambdaworks.redis.RedisConnection;
import org.redisson.core.RScript;

/**
* Offers blocking queue facilities through an intermediary
Expand Down Expand Up @@ -97,49 +99,39 @@ public int remainingCapacity() {

@Override
public int drainTo(Collection<? super V> c) {
List<V> list = connectionManager.write(getName(), new SyncOperation<V, List<V>>() {
@Override
public List<V> execute(RedisConnection<Object, V> conn) {
while (true) {
conn.watch(getName());
conn.multi();
conn.lrange(getName(), 0, -1);
conn.ltrim(getName(), 0, -1);
List<Object> res = conn.exec();
if (res.size() == 2) {
List<V> items = (List<V>) res.get(0);
return items;
}
}
}
});
if (c == null) {
throw new NullPointerException();
}
ArrayList<Object> keys = new ArrayList<Object>();
keys.add(getName());
List<V> list = new RedissonScript(connectionManager).eval(
"local vals = redis.call('lrange', KEYS[1], 0, -1); " +
"redis.call('ltrim', KEYS[1], -1, 0); " +
"return vals",
RScript.ReturnType.MAPVALUELIST,
keys);
c.addAll(list);
return list.size();
}

@Override
public int drainTo(Collection<? super V> c, final int maxElements) {
List<V> list = connectionManager.write(getName(), new SyncOperation<V, List<V>>() {
@Override
public List<V> execute(RedisConnection<Object, V> conn) {
while (true) {
conn.watch(getName());
Long len = Math.min(conn.llen(getName()), maxElements);
if (len == 0) {
conn.unwatch();
return Collections.emptyList();
}
conn.multi();
conn.lrange(getName(), 0, len - 1);
conn.ltrim(getName(), len, -1);
List<Object> res = conn.exec();
if (res.size() == 2) {
List<V> items = (List<V>) res.get(0);
return items;
}
}
}
});
if (maxElements <= 0) {
return 0;
}
if (c == null) {
throw new NullPointerException();
}

ArrayList<Object> keys = new ArrayList<Object>();
keys.add(getName());
List<V> list = new RedissonScript(connectionManager).evalR(
"local elemNum = math.min(ARGV[1], redis.call('llen', KEYS[1])) - 1;" +
"local vals = redis.call('lrange', KEYS[1], 0, elemNum); " +
"redis.call('ltrim', KEYS[1], elemNum + 1, -1); " +
"return vals",
RScript.ReturnType.MAPVALUELIST,
keys, Collections.emptyList(), Collections.singletonList(maxElements));
c.addAll(list);
return list.size();
}
Expand Down

0 comments on commit 98c21fb

Please sign in to comment.