Skip to content
This repository has been archived by the owner on Apr 5, 2022. It is now read-only.

Commit

Permalink
Le bon test ?
Browse files Browse the repository at this point in the history
  • Loading branch information
Stephane Maldini committed May 25, 2015
1 parent 12d2892 commit 46e2b38
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 150 deletions.
Expand Up @@ -20,20 +20,19 @@
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.core.ResolvableType;
import org.springframework.integration.handler.AbstractMessageProducingHandler;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils;
import reactor.Environment;
import reactor.core.processor.RingBufferProcessor;
import reactor.rx.Stream;
import reactor.rx.Streams;
import reactor.rx.action.support.DefaultSubscriber;

import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;

/**
* Adapts the item at a time delivery of a {@link org.springframework.messaging.MessageHandler}
Expand Down Expand Up @@ -75,27 +74,40 @@ public class BroadcasterMessageHandler<IN, OUT> extends AbstractMessageProducing

private final Environment environment;

@SuppressWarnings("rawtypes")
private final Processor<IN, OUT> reactorProcessor;
private Class<IN> inputType;

private final Class<?> inputType;
private final ResolvableType resolvableInputType;
@SuppressWarnings("unchecked")
static <IN> Class<IN> extractGeneric(Processor<IN, ?> processor) {
if (processor.getClass().getGenericInterfaces().length == 0) return null;

Type t = processor.getClass().getGenericInterfaces()[0];
if (ParameterizedType.class.isAssignableFrom(t.getClass())) {
ParameterizedType pt = (ParameterizedType) t;

if (pt.getActualTypeArguments().length == 0) return null;

t = pt.getActualTypeArguments()[0];
if (t instanceof ParameterizedType) {
return (Class<IN>) ((ParameterizedType) t).getRawType();
} else if (t instanceof Class) {
return (Class<IN>) t;
}
}
return null;
}

/**
* Construct a new BroadcasterMessageHandler given the reactor based Processor to delegate
* processing to.
*
* @param processor The stream based reactor processor
*/
@SuppressWarnings({"unchecked", "rawtypes"})
@SuppressWarnings("unchecked")
public BroadcasterMessageHandler(Processor<IN, OUT> processor) {
Assert.notNull(processor, "processor cannot be null.");
this.reactorProcessor = processor;
environment = Environment.initializeIfEmpty(); // This by default uses SynchronousDispatcher

Method method = ReflectionUtils.findMethod(this.reactorProcessor.getClass(), "process", Stream.class);
resolvableInputType = ResolvableType.forMethodParameter(method, 0).getGeneric();
this.inputType = resolvableInputType == null ? null : resolvableInputType.getRawClass();
this.inputType = extractGeneric(processor);

//Stream with a RingBufferProcessor
this.stream = RingBufferProcessor.share("xd-reactor", 8192); //todo expose the backlog size in module conf
Expand All @@ -107,6 +119,9 @@ public BroadcasterMessageHandler(Processor<IN, OUT> processor) {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
if (logger.isDebugEnabled()) {
logger.debug("xd-reactor started [ " + BroadcasterMessageHandler.this + " ]");
}
}

@Override
Expand All @@ -121,12 +136,14 @@ public void onNext(Object outputObject) {
@Override
public void onError(Throwable throwable) {
//Simple log error handling
logger.error(throwable);
logger.error("", throwable);
}

@Override
public void onComplete() {
//ignore
if (logger.isDebugEnabled()) {
logger.debug("reactor-x completed [ " + BroadcasterMessageHandler.this + " ]");
}
}
});
}
Expand All @@ -136,17 +153,17 @@ public void onComplete() {
protected void handleMessageInternal(Message<?> message) throws Exception {

if (inputType == null || ClassUtils.isAssignable(inputType, message.getClass())) {
stream.onNext((IN)message);
stream.onNext((IN) message);
} else {
//TODO handle type conversion of payload to input type if possible

stream.onNext((IN)message.getPayload());
stream.onNext((IN) message.getPayload());
}
}

@Override
public void destroy() throws Exception {
stream.onComplete();
environment.shutdown();
stream.onComplete();
environment.shutdown();
}
}

0 comments on commit 46e2b38

Please sign in to comment.