Skip to content

Commit

Permalink
RTopicReactive added. #210
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Dec 1, 2015
1 parent 1802228 commit 1cc0b2f
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 57 deletions.
47 changes: 0 additions & 47 deletions src/main/java/org/redisson/CommandReactiveService.java
Expand Up @@ -19,17 +19,11 @@
import java.util.List;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.ConnectionManager;

import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import reactor.core.support.Exceptions;
import reactor.rx.Stream;
import reactor.rx.action.Action;
import reactor.rx.subscription.ReactiveSubscription;

/**
*
Expand All @@ -38,47 +32,6 @@
*/
public class CommandReactiveService extends CommandAsyncService implements CommandReactiveExecutor {

static class NettyFuturePublisher<T> extends Stream<T> {
private final Future<? extends T> that;

public NettyFuturePublisher(Future<? extends T> that) {
this.that = that;
}

@Override
public void subscribe(final Subscriber<? super T> subscriber) {
try {
subscriber.onSubscribe(new ReactiveSubscription<T>(this, subscriber) {

@Override
public void request(long elements) {
Action.checkRequest(elements);
if (isComplete()) return;

that.addListener(new FutureListener<T>() {
@Override
public void operationComplete(Future<T> future) throws Exception {
if (!future.isSuccess()) {
subscriber.onError(future.cause());
return;
}

if (future.getNow() != null) {
subscriber.onNext(future.getNow());
}
onComplete();
}
});
}
});
} catch (Throwable throwable) {
Exceptions.throwIfFatal(throwable);
subscriber.onError(throwable);
}
}

}

public CommandReactiveService(ConnectionManager connectionManager) {
super(connectionManager);
}
Expand Down
53 changes: 53 additions & 0 deletions src/main/java/org/redisson/NettyFuturePublisher.java
@@ -0,0 +1,53 @@
package org.redisson;

import org.reactivestreams.Subscriber;

import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import reactor.core.support.Exceptions;
import reactor.rx.Stream;
import reactor.rx.action.Action;
import reactor.rx.subscription.ReactiveSubscription;

public class NettyFuturePublisher<T> extends Stream<T> {

private final Future<? extends T> that;

public NettyFuturePublisher(Future<? extends T> that) {
this.that = that;
}

@Override
public void subscribe(final Subscriber<? super T> subscriber) {
try {
subscriber.onSubscribe(new ReactiveSubscription<T>(this, subscriber) {

@Override
public void request(long elements) {
Action.checkRequest(elements);
if (isComplete()) return;

that.addListener(new FutureListener<T>() {
@Override
public void operationComplete(Future<T> future) throws Exception {
if (!future.isSuccess()) {
subscriber.onError(future.cause());
return;
}

if (future.getNow() != null) {
subscriber.onNext(future.getNow());
}
onComplete();
}
});
}
});
} catch (Throwable throwable) {
Exceptions.throwIfFatal(throwable);
subscriber.onError(throwable);
}
}


}
11 changes: 11 additions & 0 deletions src/main/java/org/redisson/RedissonReactive.java
Expand Up @@ -36,6 +36,7 @@
import org.redisson.core.RMapReactive;
import org.redisson.core.RScoredSortedSetReactive;
import org.redisson.core.RSetReactive;
import org.redisson.core.RTopicReactive;

import io.netty.util.concurrent.Future;

Expand Down Expand Up @@ -157,6 +158,16 @@ public RLexSortedSetReactive getLexSortedSet(String name) {
return new RedissonLexSortedSetReactive(commandExecutor, name);
}

@Override
public <M> RTopicReactive<M> getTopic(String name) {
return new RedissonTopicReactive<M>(commandExecutor, name);
}

@Override
public <M> RTopicReactive<M> getTopic(String name, Codec codec) {
return new RedissonTopicReactive<M>(codec, commandExecutor, name);
}

@Override
public void shutdown() {
connectionManager.shutdown();
Expand Down
21 changes: 11 additions & 10 deletions src/main/java/org/redisson/RedissonReactiveClient.java
Expand Up @@ -27,6 +27,7 @@
import org.redisson.core.RMapReactive;
import org.redisson.core.RScoredSortedSetReactive;
import org.redisson.core.RSetReactive;
import org.redisson.core.RTopicReactive;

public interface RedissonReactiveClient {

Expand Down Expand Up @@ -105,16 +106,16 @@ public interface RedissonReactiveClient {
*/
RLexSortedSetReactive getLexSortedSet(String name);

// /**
// * Returns topic instance by name.
// *
// * @param name of topic
// * @return
// */
// <M> RTopic<M> getTopic(String name);
//
// <M> RTopic<M> getTopic(String name, Codec codec);
//
/**
* Returns topic instance by name.
*
* @param name of topic
* @return
*/
<M> RTopicReactive<M> getTopic(String name);

<M> RTopicReactive<M> getTopic(String name, Codec codec);

// /**
// * Returns topic instance satisfies by pattern name.
// *
Expand Down
117 changes: 117 additions & 0 deletions src/main/java/org/redisson/RedissonTopicReactive.java
@@ -0,0 +1,117 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;

import java.util.Collections;
import java.util.List;

import org.reactivestreams.Publisher;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.core.MessageListener;
import org.redisson.core.RTopicReactive;
import org.redisson.core.StatusListener;

import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;

/**
* Distributed topic implementation. Messages are delivered to all message listeners across Redis cluster.
*
* @author Nikita Koksharov
*
* @param <M> message
*/
public class RedissonTopicReactive<M> implements RTopicReactive<M> {

private final CommandReactiveExecutor commandExecutor;
private final String name;
private final Codec codec;

protected RedissonTopicReactive(CommandReactiveExecutor commandExecutor, String name) {
this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name);
}

protected RedissonTopicReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) {
this.commandExecutor = commandExecutor;
this.name = name;
this.codec = codec;
}

@Override
public List<String> getChannelNames() {
return Collections.singletonList(name);
}

@Override
public Publisher<Long> publish(M message) {
return commandExecutor.writeObservable(name, codec, RedisCommands.PUBLISH, name, message);
}

@Override
public Publisher<Integer> addListener(StatusListener listener) {
return addListener(new PubSubStatusListener(listener, name));
};

@Override
public Publisher<Integer> addListener(MessageListener<M> listener) {
PubSubMessageListener<M> pubSubListener = new PubSubMessageListener<M>(listener, name);
return addListener(pubSubListener);
}

private Publisher<Integer> addListener(final RedisPubSubListener<M> pubSubListener) {
final Promise<Integer> promise = commandExecutor.getConnectionManager().newPromise();
Future<PubSubConnectionEntry> future = commandExecutor.getConnectionManager().subscribe(codec, name, pubSubListener);
future.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override
public void operationComplete(Future<PubSubConnectionEntry> future) throws Exception {
if (!future.isSuccess()) {
promise.setFailure(future.cause());
return;
}

promise.setSuccess(System.identityHashCode(pubSubListener));
}
});
return new NettyFuturePublisher<Integer>(promise);
}


@Override
public void removeListener(int listenerId) {
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
if (entry == null) {
return;
}
synchronized (entry) {
if (entry.isActive()) {
entry.removeListener(name, listenerId);
if (!entry.hasListeners(name)) {
commandExecutor.getConnectionManager().unsubscribe(name);
}
return;
}
}

// listener has been re-attached
removeListener(listenerId);
}


}
46 changes: 46 additions & 0 deletions src/main/java/org/redisson/core/RTopicReactive.java
@@ -0,0 +1,46 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;

import java.util.List;

import org.reactivestreams.Publisher;

/**
* Distributed topic. Messages are delivered to all message listeners across Redis cluster.
*
* @author Nikita Koksharov
*
* @param <M> the type of message object
*/
public interface RTopicReactive<M> {

List<String> getChannelNames();

/**
* Publish the message to all subscribers of this topic asynchronously
*
* @param message
* @return the <code>Future</code> object with number of clients that received the message
*/
Publisher<Long> publish(M message);

Publisher<Integer> addListener(StatusListener listener);

Publisher<Integer> addListener(MessageListener<M> listener);

void removeListener(int listenerId);
}

0 comments on commit 1cc0b2f

Please sign in to comment.