Skip to content

Commit

Permalink
RReadWriteLockReactive implemented #963
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Jul 13, 2017
1 parent 690962a commit b27acc8
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 19 deletions.
7 changes: 7 additions & 0 deletions redisson/src/main/java/org/redisson/RedissonReactive.java
Expand Up @@ -40,6 +40,7 @@
import org.redisson.api.RMapReactive; import org.redisson.api.RMapReactive;
import org.redisson.api.RPatternTopicReactive; import org.redisson.api.RPatternTopicReactive;
import org.redisson.api.RQueueReactive; import org.redisson.api.RQueueReactive;
import org.redisson.api.RReadWriteLockReactive;
import org.redisson.api.RScoredSortedSetReactive; import org.redisson.api.RScoredSortedSetReactive;
import org.redisson.api.RScriptReactive; import org.redisson.api.RScriptReactive;
import org.redisson.api.RSetCacheReactive; import org.redisson.api.RSetCacheReactive;
Expand Down Expand Up @@ -69,6 +70,7 @@
import org.redisson.reactive.RedissonMapReactive; import org.redisson.reactive.RedissonMapReactive;
import org.redisson.reactive.RedissonPatternTopicReactive; import org.redisson.reactive.RedissonPatternTopicReactive;
import org.redisson.reactive.RedissonQueueReactive; import org.redisson.reactive.RedissonQueueReactive;
import org.redisson.reactive.RedissonReadWriteLockReactive;
import org.redisson.reactive.RedissonScoredSortedSetReactive; import org.redisson.reactive.RedissonScoredSortedSetReactive;
import org.redisson.reactive.RedissonScriptReactive; import org.redisson.reactive.RedissonScriptReactive;
import org.redisson.reactive.RedissonSetCacheReactive; import org.redisson.reactive.RedissonSetCacheReactive;
Expand Down Expand Up @@ -101,6 +103,11 @@ protected RedissonReactive(Config config) {
codecProvider = config.getCodecProvider(); codecProvider = config.getCodecProvider();
} }


@Override
public RReadWriteLockReactive getReadWriteLock(String name) {
return new RedissonReadWriteLockReactive(commandExecutor, name, id);
}

@Override @Override
public RLockReactive getLock(String name) { public RLockReactive getLock(String name) {
return new RedissonLockReactive(commandExecutor, name, id); return new RedissonLockReactive(commandExecutor, name, id);
Expand Down
12 changes: 5 additions & 7 deletions redisson/src/main/java/org/redisson/RedissonReadLock.java
Expand Up @@ -26,7 +26,7 @@
import org.redisson.client.codec.StringCodec; import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand; import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.command.CommandExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.pubsub.LockPubSub; import org.redisson.pubsub.LockPubSub;


import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
Expand All @@ -40,16 +40,13 @@
*/ */
public class RedissonReadLock extends RedissonLock implements RLock { public class RedissonReadLock extends RedissonLock implements RLock {


private final CommandExecutor commandExecutor; public RedissonReadLock(CommandAsyncExecutor commandExecutor, String name, UUID id) {

protected RedissonReadLock(CommandExecutor commandExecutor, String name, UUID id) {
super(commandExecutor, name, id); super(commandExecutor, name, id);
this.commandExecutor = commandExecutor;
} }


@Override @Override
String getChannelName() { String getChannelName() {
return "redisson_rwlock__{" + getName() + "}"; return prefixName("redisson_rwlock", getName());
} }


String getWriteLockName(long threadId) { String getWriteLockName(long threadId) {
Expand Down Expand Up @@ -159,7 +156,8 @@ public void operationComplete(Future<Boolean> future) throws Exception {


@Override @Override
public boolean isLocked() { public boolean isLocked() {
String res = commandExecutor.write(getName(), StringCodec.INSTANCE, RedisCommands.HGET, getName(), "mode"); RFuture<String> future = commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.HGET, getName(), "mode");
String res = get(future);
return "read".equals(res); return "read".equals(res);
} }


Expand Down
Expand Up @@ -20,7 +20,7 @@


import org.redisson.api.RLock; import org.redisson.api.RLock;
import org.redisson.api.RReadWriteLock; import org.redisson.api.RReadWriteLock;
import org.redisson.command.CommandExecutor; import org.redisson.command.CommandAsyncExecutor;


/** /**
* A {@code ReadWriteLock} maintains a pair of associated {@link * A {@code ReadWriteLock} maintains a pair of associated {@link
Expand All @@ -38,11 +38,9 @@
public class RedissonReadWriteLock extends RedissonExpirable implements RReadWriteLock { public class RedissonReadWriteLock extends RedissonExpirable implements RReadWriteLock {


private final UUID id; private final UUID id;
private final CommandExecutor commandExecutor;


RedissonReadWriteLock(CommandExecutor commandExecutor, String name, UUID id) { public RedissonReadWriteLock(CommandAsyncExecutor commandExecutor, String name, UUID id) {
super(commandExecutor, name); super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.id = id; this.id = id;
} }


Expand Down
12 changes: 5 additions & 7 deletions redisson/src/main/java/org/redisson/RedissonWriteLock.java
Expand Up @@ -26,7 +26,7 @@
import org.redisson.client.codec.StringCodec; import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand; import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.command.CommandExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.pubsub.LockPubSub; import org.redisson.pubsub.LockPubSub;


import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
Expand All @@ -40,16 +40,13 @@
*/ */
public class RedissonWriteLock extends RedissonLock implements RLock { public class RedissonWriteLock extends RedissonLock implements RLock {


private final CommandExecutor commandExecutor; protected RedissonWriteLock(CommandAsyncExecutor commandExecutor, String name, UUID id) {

protected RedissonWriteLock(CommandExecutor commandExecutor, String name, UUID id) {
super(commandExecutor, name, id); super(commandExecutor, name, id);
this.commandExecutor = commandExecutor;
} }


@Override @Override
String getChannelName() { String getChannelName() {
return "redisson_rwlock__{" + getName() + "}"; return prefixName("redisson_rwlock", getName());
} }


@Override @Override
Expand Down Expand Up @@ -144,7 +141,8 @@ public void operationComplete(Future<Boolean> future) throws Exception {


@Override @Override
public boolean isLocked() { public boolean isLocked() {
String res = commandExecutor.write(getName(), StringCodec.INSTANCE, RedisCommands.HGET, getName(), "mode"); RFuture<String> future = commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.HGET, getName(), "mode");
String res = get(future);
return "write".equals(res); return "write".equals(res);
} }


Expand Down
@@ -0,0 +1,49 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;

import java.util.concurrent.locks.Lock;

/**
* A {@code ReadWriteLock} maintains a pair of associated {@link
* Lock locks}, one for read-only operations and one for writing.
* The {@link #readLock read lock} may be held simultaneously by
* multiple reader threads, so long as there are no writers. The
* {@link #writeLock write lock} is exclusive.
*
* Works in non-fair mode. Therefore order of read and write
* locking is unspecified.
*
* @author Nikita Koksharov
*
*/
public interface RReadWriteLockReactive extends RExpirableReactive {

/**
* Returns the lock used for reading.
*
* @return the lock used for reading
*/
RLockReactive readLock();

/**
* Returns the lock used for writing.
*
* @return the lock used for writing
*/
RLockReactive writeLock();

}
Expand Up @@ -30,6 +30,14 @@
*/ */
public interface RedissonReactiveClient { public interface RedissonReactiveClient {


/**
* Returns readWriteLock instance by name.
*
* @param name - name of object
* @return Lock object
*/
RReadWriteLockReactive getReadWriteLock(String name);

/** /**
* Returns lock instance by name. * Returns lock instance by name.
* <p> * <p>
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RLockAsync; import org.redisson.api.RLockAsync;
import org.redisson.api.RLockReactive; import org.redisson.api.RLockReactive;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandReactiveExecutor; import org.redisson.command.CommandReactiveExecutor;


import reactor.fn.Supplier; import reactor.fn.Supplier;
Expand All @@ -38,9 +39,13 @@ public class RedissonLockReactive extends RedissonExpirableReactive implements R


public RedissonLockReactive(CommandReactiveExecutor connectionManager, String name, UUID id) { public RedissonLockReactive(CommandReactiveExecutor connectionManager, String name, UUID id) {
super(connectionManager, name); super(connectionManager, name);
instance = new RedissonLock(connectionManager, name, id); instance = createLock(connectionManager, name, id);
} }


protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name, UUID id) {
return new RedissonLock(commandExecutor, name, id);
}

@Override @Override
public Publisher<Boolean> forceUnlock() { public Publisher<Boolean> forceUnlock() {
return reactive(new Supplier<RFuture<Boolean>>() { return reactive(new Supplier<RFuture<Boolean>>() {
Expand Down
@@ -0,0 +1,65 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.reactive;

import java.util.UUID;

import org.redisson.RedissonReadWriteLock;
import org.redisson.api.RLockAsync;
import org.redisson.api.RLockReactive;
import org.redisson.api.RReadWriteLock;
import org.redisson.api.RReadWriteLockReactive;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandReactiveExecutor;

/**
*
* @author Nikita Koksharov
*
*/
public class RedissonReadWriteLockReactive extends RedissonExpirableReactive implements RReadWriteLockReactive {

private final RReadWriteLock instance;
private final UUID id;

public RedissonReadWriteLockReactive(CommandReactiveExecutor commandExecutor, String name, UUID id) {
super(commandExecutor, name);
this.id = id;
this.instance = new RedissonReadWriteLock(commandExecutor, name, id);
}

@Override
public RLockReactive readLock() {
return new RedissonLockReactive(commandExecutor, getName(), id) {
@Override
protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name, UUID id) {
return instance.readLock();
}
};
}

@Override
public RLockReactive writeLock() {
return new RedissonLockReactive(commandExecutor, getName(), id) {
@Override
protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name, UUID id) {
return instance.writeLock();
}
};
}


}

0 comments on commit b27acc8

Please sign in to comment.