Skip to content
This repository has been archived by the owner on Apr 5, 2022. It is now read-only.

Commit

Permalink
Is it environment ?
Browse files Browse the repository at this point in the history
  • Loading branch information
Stephane Maldini committed May 22, 2015
1 parent 2c50c3b commit f4749cd
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 78 deletions.
Expand Up @@ -59,7 +59,8 @@
* result in 2 messages in the log, no matter how many dispatcher threads are used.
* <p/>
* You can modify what thread the outputStream subscriber, which does the send to the output channel,
* will use by explicitly calling <code>dispatchOn</code> or other switch (http://projectreactor.io/docs/reference/#streams-multithreading)
* will use by explicitly calling <code>dispatchOn</code> or other switch (http://projectreactor
* .io/docs/reference/#streams-multithreading)
* before returning the outputStream from your processor.
* <p/>
* Use {@link org.springframework.xd.reactor.MultipleBroadcasterMessageHandler} for concurrent execution on dispatcher
Expand All @@ -70,79 +71,81 @@
* @author Mark Pollack
* @author Stephane Maldini
*/
public class BroadcasterMessageHandler extends AbstractMessageProducingHandler implements DisposableBean {

protected final Log logger = LogFactory.getLog(getClass());

private final RingBufferProcessor<Object> stream;

@SuppressWarnings("rawtypes")
private final Processor reactorProcessor;

private final Class<?> inputType;

/**
* Construct a new BroadcasterMessageHandler given the reactor based Processor to delegate
* processing to.
*
* @param processor The stream based reactor processor
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public BroadcasterMessageHandler(Processor processor) {
Assert.notNull(processor, "processor cannot be null.");
this.reactorProcessor = processor;
Environment.initializeIfEmpty(); // This by default uses SynchronousDispatcher
Method method = ReflectionUtils.findMethod(this.reactorProcessor.getClass(), "process", Stream.class);
this.inputType = ResolvableType.forMethodParameter(method, 0).getNested(2).getRawClass();

//Stream with a RingBufferProcessor
this.stream = RingBufferProcessor.share("xd-reactor", 8192); //todo expose the backlog size in module conf

//user defined stream processing
Publisher<?> outputStream = processor.process(Streams.wrap(stream));

outputStream.subscribe(new DefaultSubscriber<Object>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}

@Override
public void onNext(Object outputObject) {
if (ClassUtils.isAssignable(Message.class, outputObject.getClass())) {
getOutputChannel().send((Message) outputObject);
} else {
getOutputChannel().send(MessageBuilder.withPayload(outputObject).build());
}
}

@Override
public void onError(Throwable throwable) {
//Simple log error handling
logger.error(throwable);
}

@Override
public void onComplete() {
//Send a message ?
}
});
}

@Override
protected void handleMessageInternal(Message<?> message) throws Exception {

if (inputType == null || ClassUtils.isAssignable(inputType, message.getClass())) {
stream.onNext(message);
} else {
//TODO handle type conversion of payload to input type if possible
stream.onNext(message.getPayload());
}
}

@Override
public void destroy() throws Exception {
stream.onComplete();
Environment.terminate();
}
public class BroadcasterMessageHandler extends AbstractMessageProducingHandler implements DisposableBean {

protected final Log logger = LogFactory.getLog(getClass());

private final RingBufferProcessor<Object> stream;

private final Environment environment;

@SuppressWarnings("rawtypes")
private final Processor reactorProcessor;

private final Class<?> inputType;

/**
* Construct a new BroadcasterMessageHandler given the reactor based Processor to delegate
* processing to.
*
* @param processor The stream based reactor processor
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public BroadcasterMessageHandler(Processor processor) {
Assert.notNull(processor, "processor cannot be null.");
this.reactorProcessor = processor;
environment = Environment.initializeIfEmpty(); // This by default uses SynchronousDispatcher
Method method = ReflectionUtils.findMethod(this.reactorProcessor.getClass(), "process", Stream.class);
this.inputType = ResolvableType.forMethodParameter(method, 0).getNested(2).getRawClass();

//Stream with a RingBufferProcessor
this.stream = RingBufferProcessor.share("xd-reactor", 8192); //todo expose the backlog size in module conf

//user defined stream processing
Publisher<?> outputStream = processor.process(Streams.wrap(stream));

outputStream.subscribe(new DefaultSubscriber<Object>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}

@Override
public void onNext(Object outputObject) {
if (ClassUtils.isAssignable(Message.class, outputObject.getClass())) {
getOutputChannel().send((Message) outputObject);
} else {
getOutputChannel().send(MessageBuilder.withPayload(outputObject).build());
}
}

@Override
public void onError(Throwable throwable) {
//Simple log error handling
logger.error(throwable);
}

@Override
public void onComplete() {
System.out.println("HELLO");
}
});
}

@Override
protected void handleMessageInternal(Message<?> message) throws Exception {
System.out.println(Thread.currentThread() + " " + message);
if (inputType == null || ClassUtils.isAssignable(inputType, message.getClass())) {
stream.onNext(message);
} else {
//TODO handle type conversion of payload to input type if possible
stream.onNext(message.getPayload());
}
}

@Override
public void destroy() throws Exception {
stream.onComplete();
environment.shutdown();
}
}
Expand Up @@ -85,6 +85,8 @@ public class MultipleBroadcasterMessageHandler extends AbstractMessageProducingH

private final Expression partitionExpression;

private final Environment environment;

private EvaluationContext evaluationContext = new StandardEvaluationContext();

/**
Expand All @@ -103,7 +105,7 @@ public MultipleBroadcasterMessageHandler(Processor processor, String partitionEx
this.partitionExpression = spelExpressionParser.parseExpression(partitionExpression);

// This by default create no dispatcher but provides for Timer if buffer(1, TimeUnit.Seconds) or similar is used
Environment.initializeIfEmpty();
this.environment = Environment.initializeIfEmpty();

Method method = ReflectionUtils.findMethod(this.processor.getClass(), "process", Stream.class);
this.inputType = ResolvableType.forMethodParameter(method, 0).getNested(2).getRawClass();
Expand Down Expand Up @@ -176,10 +178,12 @@ public void onComplete() {
public void destroy() throws Exception {
Collection<RingBufferProcessor<Object>> toRemove =
new ArrayList<RingBufferProcessor<Object>>(reactiveProcessorMap.values());

for (RingBufferProcessor<Object> ringBufferProcessor : toRemove) {
ringBufferProcessor.onComplete();
}
Environment.terminate();

environment.shutdown();
}

@Override
Expand Down

0 comments on commit f4749cd

Please sign in to comment.