Skip to content

Commit

Permalink
Try fix complete -> close channel
Browse files Browse the repository at this point in the history
  • Loading branch information
Stephane Maldini committed Mar 12, 2015
1 parent 3df5b94 commit fc835a4
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 586 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
@@ -1,3 +1,5 @@
# Reactor 2.0.0 - CHANGELOG

# Reactor 1.1.0 - CHANGELOG

This is a non-exhaustive list of changes between Reactor 1.0 and 1.1:
Expand Down
304 changes: 0 additions & 304 deletions COMMITLOG.txt

This file was deleted.

8 changes: 5 additions & 3 deletions build.gradle
Expand Up @@ -135,9 +135,6 @@ configure(allprojects) { project ->

// dependencies that are common across all java projects
dependencies {
// Logging
compile "org.slf4j:slf4j-api:$slf4jVersion"

// JSR-305 annotations
optional "com.google.code.findbugs:jsr305:2.0.0"

Expand Down Expand Up @@ -259,6 +256,11 @@ project('reactor-core') {
// Reactive Streams
compile "org.reactivestreams:reactive-streams:1.0.0.RC3"

// Logging
compile "org.slf4j:slf4j-api:$slf4jVersion"

testCompile "org.reactivestreams:reactive-streams-tck:1.0.0.RC3"

// High-speed Messaging
optional "net.openhft:chronicle:$openHftChronicleVersion",
"net.openhft:lang:$openHftLangVersion"
Expand Down
Expand Up @@ -33,6 +33,7 @@
*/
public abstract class AbstractLifecycleDispatcher implements Dispatcher {

protected static final int DEFAULT_BUFFER_SIZE = 1024;

private final AtomicBoolean alive = new AtomicBoolean(true);
public final ClassLoader context = new ClassLoader(Thread.currentThread()
Expand Down
Expand Up @@ -16,14 +16,10 @@

package reactor.core.dispatch;

import org.reactivestreams.Processor;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.dispatch.wait.WaitingMood;
import reactor.core.support.NamedDaemonThreadFactory;
import reactor.core.support.SpecificationExceptions;
import reactor.fn.Consumer;
import reactor.jarjar.com.lmax.disruptor.*;
import reactor.jarjar.com.lmax.disruptor.dsl.Disruptor;
Expand All @@ -41,8 +37,6 @@
*/
public final class RingBufferDispatcher extends SingleThreadDispatcher implements WaitingMood {

private static final int DEFAULT_BUFFER_SIZE = 1024;

private final Logger log = LoggerFactory.getLogger(getClass());
private final ExecutorService executor;
private final Disruptor<RingBufferTask> disruptor;
Expand Down Expand Up @@ -81,7 +75,7 @@ public RingBufferDispatcher(String name,
* Creates a new {@literal RingBufferDispatcher} with the given {@code name}. It will use a {@link RingBuffer} with
* {@code bufferSize} slots, configured with a producer type of {@link ProducerType#MULTI MULTI}
* and a {@link BlockingWaitStrategy blocking wait. A given @param uncaughtExceptionHandler} will catch anything not
* handled e.g. by the owning {@link reactor.bus.EventBus} or {@link reactor.rx.Stream}.
* handled e.g. by the owning {@code reactor.bus.EventBus} or {@code reactor.rx.Stream}.
*
* @param name The name of the dispatcher
* @param bufferSize The size to configure the ring buffer with
Expand Down Expand Up @@ -165,16 +159,6 @@ public void onEvent(RingBufferTask task, long sequence, boolean endOfBatch) thro
this.ringBuffer = disruptor.start();
}

//@Override
public <E> Processor<E, E> dispatch() {
if (alive()) {
disruptor.shutdown();
}

final SequenceBarrier barrier = ringBuffer.newBarrier();
return new RingBufferProcessor<>(barrier);
}

@Override
public boolean awaitAndShutdown(long timeout, TimeUnit timeUnit) {
boolean alive = alive();
Expand Down Expand Up @@ -267,266 +251,4 @@ public RingBufferTask setSequenceId(long sequenceId) {
}
}

private class RingBufferTaskEventHandler<E> implements EventHandler<RingBufferTask>, Consumer<E> {
private final Subscriber<? super E> sub;
private final RingBufferProcessor<E> owner;

Subscription s;

public RingBufferTaskEventHandler(RingBufferProcessor<E> owner, Subscriber<? super E> sub) {
this.sub = sub;
this.owner = owner;
}

@Override
public void accept(E e) {
try {
sub.onNext(e);
} catch (Throwable t) {
sub.onError(t);
}
}

@Override
public void onEvent(RingBufferTask ringBufferTask, long seq, boolean end) throws Exception {
try {
handleSubscriber(ringBufferTask, seq, end);
recurse();
} catch (Throwable e) {
sub.onError(e);
}
}

@SuppressWarnings("unchecked")
private void handleSubscriber(Task task, long seq, boolean end) {
if (task.eventConsumer == owner.COMPLETE_SENTINEL) {

if (s != null) {
s.cancel();
s = null;
}
sub.onComplete();

} else if (task.eventConsumer == owner.ERROR_SENTINEL) {
if (s != null) {
s.cancel();
s = null;
}
sub.onError((Throwable) task.data);
} else if (task.eventConsumer == owner) {
System.out.println(Thread.currentThread() + "-" + inContext() + " ] Task:" + task.data + " seq:" + seq + " " +
end);
sub.onNext((E) task.data);
}
}

void recurse() {
//Process any recursive tasks
if (tailRecurseSeq < 0) {
return;
}
int next = -1;
while (next < tailRecurseSeq) {
handleSubscriber(tailRecursionPile.get(++next), -1, false);
}

// clean up extra tasks
next = tailRecurseSeq;
int max = backlog * 2;
while (next >= max) {
tailRecursionPile.remove(next--);
}
tailRecursionPileSize = max;
tailRecurseSeq = -1;
}
}

private class RingBufferProcessor<E> implements Processor<E, E>, Consumer<E> {
private final SequenceBarrier barrier;
private final Sequence pending = new Sequence(0l);
private final Sequence batch = new Sequence(0l);
final Consumer COMPLETE_SENTINEL = new Consumer() {
@Override
public void accept(Object o) {
}
};

final Consumer<Throwable> ERROR_SENTINEL = new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
}
};

Subscription s;

public RingBufferProcessor(SequenceBarrier barrier) {
this.barrier = barrier;
}

@Override
public void subscribe(final Subscriber<? super E> sub) {
final RingBufferTaskEventHandler<E> eventHandler = new RingBufferTaskEventHandler<>(this, sub);
final EventProcessor p = new BatchEventProcessor<RingBufferTask>(
ringBuffer,
barrier,
eventHandler
);


p.getSequence().set(barrier.getCursor());
ringBuffer.addGatingSequences(p.getSequence());
executor.execute(p);

Subscription subscription = new Subscription() {

@Override
@SuppressWarnings("unchecked")
public void request(long n) {
if (n <= 0l) {
sub.onError(SpecificationExceptions.spec_3_09_exception(n));
return;
}

if (pending.get() == Long.MAX_VALUE) {
return;
}

synchronized (this) {
if (pending.addAndGet(n) < 0l) pending.set(Long.MAX_VALUE);
}

if (s != null) {
if (n == Long.MAX_VALUE) {
s.request(n);
} else {
int toRequest = (int)Math.min(Integer.MAX_VALUE, Math.min(n, remainingSlots()));
if (toRequest > 0) {
s.request(toRequest);
if(batch.get() > 1l){
ringBuffer.next(toRequest);
}
}
}
}

}

@Override
public void cancel() {
try {
ringBuffer.removeGatingSequence(p.getSequence());
p.halt();
} finally {
s.cancel();
}
}
};

eventHandler.s = subscription;
sub.onSubscribe(subscription);
}

@Override
public void onSubscribe(final Subscription s) {
if (this.s != null) {
s.cancel();
return;
}
this.s = s;
}

void publishSignal(Object data, Consumer consumer) {
long batch = this.batch.incrementAndGet();
long demand = pending.get();
long seqId;

if (batch > 1l) {
seqId = (ringBuffer.getCursor() - demand) + batch;
} else if (demand == 0l || demand == Long.MAX_VALUE) {
seqId = ringBuffer.next();
} else {
int preallocate =
(int) Math.min(Integer.MAX_VALUE,
Math.min(
ringBuffer.remainingCapacity(),
Math.abs(batch - demand)
) + 1l
);

if (preallocate > 0l) {
seqId = ringBuffer.next(preallocate);
System.out.println(Thread.currentThread() + " is nexting " + preallocate + " with " + data + " on seq " +
seqId);
seqId = seqId - (preallocate - 1l);
} else {
seqId = ringBuffer.next();
}
}

System.out.println(Thread.currentThread() + " is marking " + seqId + " with " + data+"="+batch+"/"+demand+" : "+ringBuffer.getCursor()) ;
ringBuffer.get(seqId)
.setSequenceId(seqId)
.setData(data)
.setEventConsumer(consumer);

if (demand == Long.MAX_VALUE) {
ringBuffer.publish(seqId);
} else if (demand == batch) {
synchronized (this) {
if (this.pending.addAndGet(-demand) < 0l) this.pending.set(0l);
}
this.batch.set(0l);
ringBuffer.publish(seqId - (batch - 1l), seqId);
}
}

@Override
public void onNext(E o) {
if (!inContext()) {
publishSignal(o, this);

} else {
allocateRecursiveTask()
.setData(o)
.setEventConsumer(this);
}
}

@Override
public void accept(E e) {
//IGNORE
}

@Override
public void onError(Throwable t) {
try {
pending.set(0l);
publishSignal(t, ERROR_SENTINEL);
} finally {
if (s != null) {
s.cancel();
}
}
}

@Override
public void onComplete() {
try {
pending.set(0l);
publishSignal(null, COMPLETE_SENTINEL);
} finally {
if (s != null) {
s.cancel();
}
}
}

@Override
public String toString() {
return "RingBufferSubscriber{" +
", barrier=" + barrier.getCursor() +
", pending=" + pending +
'}';
}
}
}
Expand Up @@ -32,8 +32,10 @@
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.support.NamedDaemonThreadFactory;
import reactor.fn.Consumer;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.net.ChannelStream;
import reactor.io.net.config.ServerSocketOptions;
import reactor.io.net.config.SslOptions;
import reactor.io.net.impl.netty.NettyChannelStream;
Expand Down Expand Up @@ -156,6 +158,17 @@ public void operationComplete(ChannelFuture future) throws Exception {
return promise;
}

@Override
@SuppressWarnings("unchecked")
protected Consumer<Void> completeConsumer(ChannelStream<IN, OUT> ch) {
return new Consumer<Void>() {
@Override
public void accept(Void aVoid) {
((Channel)ch.delegate()).close();
}
};
}

@Override
@SuppressWarnings("unchecked")
public Promise<Boolean> shutdown() {
Expand Down

0 comments on commit fc835a4

Please sign in to comment.