Should ReactiveCommandDispatcher.java check subscription status? #323

Closed
vleushin opened this Issue Jul 27, 2016 · 15 comments

Projects

None yet

2 participants

@vleushin

Hello, I'm back to upgrading libraries and my issue #202 is probably still there. I tried on staging environment and I get this exception:

2016-07-27 05:54:36.986 [WARN] [lettuce-nioEventLoop-35-1] [c.l.r.p.CommandHandler] - [/172.17.0.29:43046 -> redis.staging.mint.internal/172.31.54.220:6379] Unexpected exception during request: rx.exceptions.OnCompletedFailedException: CANCELLED
rx.exceptions.OnCompletedFailedException: CANCELLED
    at rx.observers.SafeSubscriber.onCompleted(SafeSubscriber.java:85)
    at rx.internal.operators.OperatorDoOnEach$1.onCompleted(OperatorDoOnEach.java:54)
    at rx.observers.Subscribers$5.onCompleted(Subscribers.java:225)
    at rx.internal.operators.OperatorDoOnEach$1.onCompleted(OperatorDoOnEach.java:54)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.onCompleted(OnSubscribeMap.java:97)
    at rx.observers.Subscribers$5.onCompleted(Subscribers.java:225)
    at rx.observers.Subscribers$5.onCompleted(Subscribers.java:225)
    at rx.internal.operators.OperatorDoOnEach$1.onCompleted(OperatorDoOnEach.java:54)
    at rx.internal.operators.OperatorDoOnEach$1.onCompleted(OperatorDoOnEach.java:54)
    at rx.internal.operators.OperatorDoOnEach$1.onCompleted(OperatorDoOnEach.java:54)
    at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onCompleted(OperatorOnErrorResumeNextViaFunction.java:101)
    at rx.internal.operators.OperatorDoOnEach$1.onCompleted(OperatorDoOnEach.java:54)
    at rx.internal.operators.OperatorDoOnEach$1.onCompleted(OperatorDoOnEach.java:54)
    at com.netflix.hystrix.AbstractCommand$HystrixObservableTimeoutOperator$3.onCompleted(AbstractCommand.java:1142)
    at rx.observers.Subscribers$5.onCompleted(Subscribers.java:225)
    at com.netflix.hystrix.AbstractCommand$DeprecatedOnRunHookApplication$1.onCompleted(AbstractCommand.java:1399)
    at com.netflix.hystrix.AbstractCommand$ExecutionHookApplication$1.onCompleted(AbstractCommand.java:1332)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.onCompleted(OnSubscribeMap.java:97)
    at rx.internal.producers.SingleDelayedProducer.emit(SingleDelayedProducer.java:110)
    at rx.internal.producers.SingleDelayedProducer.setValue(SingleDelayedProducer.java:85)
    at rx.internal.operators.OperatorToObservableList$1.onCompleted(OperatorToObservableList.java:98)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:662)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:574)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.onCompleted(OperatorMerge.java:283)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.onCompleted(OnSubscribeMap.java:97)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.onCompleted(OnSubscribeMap.java:97)
    at rx.internal.producers.SingleDelayedProducer.emit(SingleDelayedProducer.java:110)
    at rx.internal.producers.SingleDelayedProducer.setValue(SingleDelayedProducer.java:85)
    at rx.internal.operators.OperatorToObservableList$1.onCompleted(OperatorToObservableList.java:98)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:662)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:574)
    at rx.internal.operators.OperatorMerge$InnerSubscriber.onCompleted(OperatorMerge.java:863)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.onCompleted(OnSubscribeMap.java:97)
    at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onCompleted(OperatorOnErrorResumeNextViaFunction.java:101)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.onCompleted(OnSubscribeMap.java:97)
    at com.lambdaworks.redis.ReactiveCommandDispatcher$ObservableCommand.complete(ReactiveCommandDispatcher.java:127)
    at com.lambdaworks.redis.protocol.CommandHandler.decode(CommandHandler.java:187)
    at com.lambdaworks.redis.protocol.CommandHandler.channelRead(CommandHandler.java:153)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:334)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:326)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:334)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:326)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:334)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:326)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:334)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:326)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1320)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:334)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:905)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:563)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:504)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:418)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:390)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:742)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:145)
    at java.lang.Thread.run(Thread.java:745)
Caused by: io.grpc.StatusRuntimeException: CANCELLED
    at io.grpc.Status.asRuntimeException(Status.java:536)
    at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onCompleted(ServerCalls.java:302)
    at rx.internal.util.ActionSubscriber.onCompleted(ActionSubscriber.java:49)
    at rx.observers.SafeSubscriber.onCompleted(SafeSubscriber.java:79)
    ... 64 more

Let's say it is ok.

Later I decided to check https://github.com/mp911de/lettuce/blob/master/src/main/java/com/lambdaworks/redis/ReactiveCommandDispatcher.java

        @Override
        @SuppressWarnings("unchecked")
        public void complete() {
            if (completed) {
                return;
            }

            super.complete();

            if (getOutput() != null) {
                Object result = getOutput().get();
                if (!(getOutput() instanceof StreamingOutput<?>) && result != null) {

                    if (dissolve && result instanceof Collection) {
                        Collection<T> collection = (Collection<T>) result;
                        for (T t : collection) {
                            subscriber.onNext(t);
                        }
                    } else {
                        subscriber.onNext((T) result);
                    }
                }

                if (getOutput().hasError()) {
                    subscriber.onError(new RedisCommandExecutionException(getOutput().getError()));
                    completed = true;
                    return;
                }
            }

            completed = true;
            subscriber.onCompleted();
        }

And I noticed that you do not check the subscription status (isUnsubscribed). So I think that I understand the problem now: I'm doing some logic in my command and sending some requests to Redis, command gets cancelled because of timeout or whatever reason, lettuce gets results and notifies unsubscribed subscriber about it (onNext, onCompleted, onError). and gets exception that command was cancelled.

So the question goes lis: should we check the subscription status before notifying subscriber about result or not. As far as I know we should, but I have to double check Rx docs to be sure.

@mp911de mp911de added the bug label Jul 27, 2016
@mp911de
Owner
mp911de commented Jul 27, 2016

Hi @vleushin,
Thanks for the bug report. The docs mention in their examples checks to isUnsubscribed() before doing any processing. Examples also skip calls to onCompleted() if the subscriber is not subscribed anymore.

@vleushin

I also recall that Observable.create is not recommended for simple observable wrappers because you should do unsubscribe check manually (which you can forget), while Observable.fromCallable is recommended because it does it for you (it uses SingleDelayedProducer inside and it does check for isUnsubscribed).

Glad to help. Second question -- how bad it is for lettuce to get exceptions there. Is it just message and stacktrace in logs or will it lead to some state corruption somewhere?

@mp911de
Owner
mp911de commented Jul 27, 2016 edited

You can also unsubscribe during emission of items, so having that safeguards in place isn't too bad of an idea.

Lettuce should not get into an invalid state. Data streaming using onNext() (emitting items as they are received) are protected. Exceptions result in just a stack trace warning. There's no one else to notify.

Calls to onCompleted() throw the exception into the netty layer but nothing breaks. You'll see an exception message and that's it.

@mp911de mp911de added a commit that referenced this issue Jul 27, 2016
@mp911de Check for isUnsubscribed() before calling subscriber methods #323
Perform a check to isUnsubscribed() before emitting items or calling completion methods
ac00bd9
@mp911de mp911de added a commit that referenced this issue Jul 27, 2016
@mp911de Check for isUnsubscribed() before calling subscriber methods #323
Perform a check to isUnsubscribed() before emitting items or calling completion methods
9cc31a9
@mp911de
Owner
mp911de commented Jul 27, 2016 edited

I added the check to isUnsubscribed. There are however two issues:

  1. If no subscriber is subscribed, then the completion event would (should) not hit the subscriber/operator throwing OnCompletedFailedException
  2. The exception bounces back to the ReactiveCommandDispatcher. Some frameworks leave the exception hit somewhere and the just log it. Other frameworks call onError(…) that might swallow the exception. In any case, there's no way how to deal with the exception, because no one seems wanting to handle it (the subscriber can't handle it and the ReactiveCommandDispatcher has no chance of handling it other than either suppress or log). So the only sane way is adding a onError... operator so you can decide on your own what to do with the exception.
@mp911de
Owner
mp911de commented Jul 27, 2016

@vleushin I had a chat with the RxJava guys. Throwing exceptions in onCompleted is a protocol violation and that should be fixed upstream. Using flatMap or doOnCompleted to execute work is the right approach.

@vleushin

This is interesting. Thanks! I think I must tell that to GRPC guys. On the other hand -- they are throwing it in case when you should not call in (when it is unsubscribed).

@mp911de mp911de added this to the Lettuce 4.2.2 milestone Aug 3, 2016
@mp911de
Owner
mp911de commented Aug 3, 2016

I added the fix to check for isUnsubscribed to observable completion, so this ticket is solved now. If you still run into that issue, let's have a chat together with the other parties together to figure out what's going on and what it takes to fix it.

@mp911de mp911de closed this Aug 3, 2016
@vleushin
vleushin commented Aug 3, 2016

Thanks. Meanwhile, I tried new library setup on production -- and I got same problem. Getting completely wrong results. Say, user 1 does get A, get B, get C, and user 2 does get X, get Y, get Z, two completely different sequences of requests, and somehow, magically somehow, user 1 ends up with X Z B and user 2 with A Y C or something similar.

I think I understand how to reproduce it on my local machine and I will be debugging this tomorrow. I'll share my findings.

@mp911de
Owner
mp911de commented Aug 4, 2016

Thanks a lot for your feedback. I'd also like to understand the issue and get it fixed.

@vleushin
vleushin commented Aug 4, 2016 edited

@mp911de I finally managed to pinpoint it. It is definitely because of that exception in onCompete! It triggers something.

Here is simple test to reproduce problem:

    @Test
    public void testWrongValue() throws ExecutionException, InterruptedException {
        final RedisClient client = RedisClient.create("redis://localhost/0");
        final StatefulRedisConnection<String, String> connect = client.connect();
        final RedisReactiveCommands<String, String> commands = connect.reactive();

        Observable.concat(
                commands.set("keyA", "valueA"),
                commands.set("keyB", "valueB"))
                .lastOrDefault(null)
                .toBlocking()
                .toFuture()
                .get();

        commands.get("keyA").subscribe(createSubscriberWithExceptionOnComplete());

        // bonus: comment this line and it will work fine
        commands.get("keyA").subscribe(createSubscriberWithExceptionOnComplete());

        Thread.sleep(100);

        final String valueB = commands.get("keyB")
                .toBlocking()
                .toFuture()
                .get();

        System.out.println(valueB); // prints valueA
    }

    private static Subscriber<String> createSubscriberWithExceptionOnComplete() {
        return new Subscriber<String>() {
            @Override
            public void onCompleted() {
                throw new RuntimeException("throwing something");
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onNext(String s) {
            }
        };
    }
@vleushin
vleushin commented Aug 4, 2016 edited

I inspected CommandHandler, and I've found this code:

    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException {

        while (!queue.isEmpty()) {

            RedisCommand<K, V, ?> command = queue.peek();
            if (debugEnabled) {
                logger.debug("{} Queue contains: {} commands", logPrefix(), queue.size());
            }

            WithLatency withLatency = null;

            if (clientResources.commandLatencyCollector().isEnabled()) {
                RedisCommand<K, V, ?> unwrappedCommand = CommandWrapper.unwrap(command);
                if (unwrappedCommand instanceof WithLatency) {
                    withLatency = (WithLatency) unwrappedCommand;
                    if (withLatency.getFirstResponse() == -1) {
                        withLatency.firstResponse(nanoTime());
                    }
                }
            }

            if (!rsm.decode(buffer, command, command.getOutput())) {
                return;
            }

            command = queue.poll();
            recordLatency(withLatency, command.getType());

            command.complete();

            if (buffer != null && buffer.refCnt() != 0) {
                buffer.discardReadBytes();
            }
        }
    }

And I think because of that exception in command.complete() something is not discarded from buffer and commands get shifted. That's my wild guess, I think you understand this code much better.

One more observation that let me think about commands getting shifted:

        final String firstValueB = commands.get("keyB")
                .toBlocking()
                .toFuture()
                .get();
        final String secondValueB = commands.get("keyB")
                .toBlocking()
                .toFuture()
                .get();

        System.out.println(firstValueB); // prints valueA
        System.out.println(secondValueB); // prints valueB
@vleushin
vleushin commented Aug 10, 2016 edited

@mp911de What do you think? I tried wrapping command.complete() in try/catch block and it worked fine so far. Using this version at the moment in production.

@mp911de
Owner
mp911de commented Aug 10, 2016

Thanks for digging into the issue. I need to get up to speed. Was on travel last week and got sick, but I'll come back to you.

@mp911de
Owner
mp911de commented Aug 10, 2016 edited

I took a look at the issue. Your fix will work but that's not the root cause. The buffer maintains a reader index so additional incoming bytes can resume from the position where decoding was finished the last time bytes were received. That code is safe. Another issue causes the problem: The exception is propagated to the exception handler and exceptionCaught inspects the queue and completes the next command in the queue with the exception. At that time the causing command no longer in the queue so an innocent command is killed with the exception and the whole connection gets out of sync.

I already have a fix on my mind, I'll switch the completion/queue poll order so I can keep the global exception handler and I'll remove the command from the queue once it is either successfully or exceptionally completed.

I will apply your suggested fix.

@mp911de mp911de added a commit that referenced this issue Aug 10, 2016
@mp911de Guard command completion against exceptions #331
Command completion is now guarded by try/catch to prevent the global exception handler to kick in. Commands must be removed from the queue prior to completion to prevent duplicate submission on ping-before-reconnect.

See also #323
724b9da
@mp911de mp911de added a commit that referenced this issue Aug 10, 2016
@mp911de Guard command completion against exceptions #331
Command completion is now guarded by try/catch to prevent the global exception handler to kick in. Commands must be removed from the queue prior to completion to prevent duplicate submission on ping-before-reconnect.

See also #323
9d3bba7
@vleushin

I'm very glad we figured that out! Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment