Skip to content

Commit

Permalink
RedissonLock switched to LongCodec
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Dec 1, 2015
1 parent 316ee32 commit 1802228
Showing 1 changed file with 10 additions and 9 deletions.
19 changes: 10 additions & 9 deletions src/main/java/org/redisson/RedissonLock.java
Expand Up @@ -23,6 +23,7 @@


import org.redisson.client.BaseRedisPubSubListener; import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.RedisPubSubListener; import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.pubsub.PubSubType; import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.codec.JsonJacksonCodec; import org.redisson.codec.JsonJacksonCodec;
Expand Down Expand Up @@ -52,7 +53,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {


private final UUID id; private final UUID id;


private static final Integer unlockMessage = 0; private static final Long unlockMessage = 0L;


private static final ConcurrentMap<String, RedissonLockEntry> ENTRIES = PlatformDependent.newConcurrentHashMap(); private static final ConcurrentMap<String, RedissonLockEntry> ENTRIES = PlatformDependent.newConcurrentHashMap();


Expand Down Expand Up @@ -98,10 +99,10 @@ private Future<RedissonLockEntry> subscribe() {
return oldValue.getPromise(); return oldValue.getPromise();
} }


RedisPubSubListener<Integer> listener = new BaseRedisPubSubListener<Integer>() { RedisPubSubListener<Long> listener = new BaseRedisPubSubListener<Long>() {


@Override @Override
public void onMessage(String channel, Integer message) { public void onMessage(String channel, Long message) {
if (message.equals(unlockMessage) && getChannelName().equals(channel)) { if (message.equals(unlockMessage) && getChannelName().equals(channel)) {
value.getLatch().release(); value.getLatch().release();
} }
Expand All @@ -119,7 +120,7 @@ public boolean onStatus(PubSubType type, String channel) {


}; };


commandExecutor.getConnectionManager().subscribe(commandExecutor.getConnectionManager().getCodec(), getChannelName(), listener); commandExecutor.getConnectionManager().subscribe(LongCodec.INSTANCE, getChannelName(), listener);
return newPromise; return newPromise;
} }
} }
Expand Down Expand Up @@ -241,7 +242,7 @@ private void stopRefreshTask() {
private Long tryLockInner(final long leaseTime, final TimeUnit unit) { private Long tryLockInner(final long leaseTime, final TimeUnit unit) {
internalLockLeaseTime = unit.toMillis(leaseTime); internalLockLeaseTime = unit.toMillis(leaseTime);


return commandExecutor.evalWrite(getName(), RedisCommands.EVAL_LONG, return commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
"local v = redis.call('get', KEYS[1]); " + "local v = redis.call('get', KEYS[1]); " +
"if (v == false) then " + "if (v == false) then " +
" redis.call('set', KEYS[1], cjson.encode({['o'] = ARGV[1], ['c'] = 1}), 'px', ARGV[2]); " + " redis.call('set', KEYS[1], cjson.encode({['o'] = ARGV[1], ['c'] = 1}), 'px', ARGV[2]); " +
Expand Down Expand Up @@ -317,7 +318,7 @@ public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {


@Override @Override
public void unlock() { public void unlock() {
Boolean opStatus = commandExecutor.evalWrite(getName(), RedisCommands.EVAL_BOOLEAN_R2, Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN_R2,
"local v = redis.call('get', KEYS[1]); " + "local v = redis.call('get', KEYS[1]); " +
"if (v == false) then " + "if (v == false) then " +
" redis.call('publish', ARGV[4], ARGV[2]); " + " redis.call('publish', ARGV[4], ARGV[2]); " +
Expand Down Expand Up @@ -360,7 +361,7 @@ public void forceUnlock() {


private Future<Boolean> forceUnlockAsync() { private Future<Boolean> forceUnlockAsync() {
stopRefreshTask(); stopRefreshTask();
return commandExecutor.evalWriteAsync(getName(), RedisCommands.EVAL_BOOLEAN_R1, return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN_R1,
"redis.call('del', KEYS[1]); redis.call('publish', ARGV[2], ARGV[1]); return true", "redis.call('del', KEYS[1]); redis.call('publish', ARGV[2], ARGV[1]); return true",
Collections.<Object>singletonList(getName()), unlockMessage, getChannelName()); Collections.<Object>singletonList(getName()), unlockMessage, getChannelName());
} }
Expand All @@ -372,7 +373,7 @@ public boolean isLocked() {


@Override @Override
public boolean isHeldByCurrentThread() { public boolean isHeldByCurrentThread() {
Boolean opStatus = commandExecutor.evalRead(getName(), RedisCommands.EVAL_BOOLEAN, Boolean opStatus = commandExecutor.evalRead(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local v = redis.call('get', KEYS[1]); " + "local v = redis.call('get', KEYS[1]); " +
"if (v == false) then " + "if (v == false) then " +
" return false; " + " return false; " +
Expand All @@ -390,7 +391,7 @@ public boolean isHeldByCurrentThread() {


@Override @Override
public int getHoldCount() { public int getHoldCount() {
Long opStatus = commandExecutor.evalRead(getName(), RedisCommands.EVAL_LONG, Long opStatus = commandExecutor.evalRead(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
"local v = redis.call('get', KEYS[1]); " + "local v = redis.call('get', KEYS[1]); " +
"if (v == false) then " + "if (v == false) then " +
" return 0; " + " return 0; " +
Expand Down

0 comments on commit 1802228

Please sign in to comment.