Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Should ReactiveCommandDispatcher.java check subscription status? #323

Closed
vleushin opened this issue Jul 27, 2016 · 15 comments
Closed

Should ReactiveCommandDispatcher.java check subscription status? #323

vleushin opened this issue Jul 27, 2016 · 15 comments
Labels
type: bug A general bug
Milestone

Comments

@vleushin
Copy link

Hello, I'm back to upgrading libraries and my issue #202 is probably still there. I tried on staging environment and I get this exception:

2016-07-27 05:54:36.986 [WARN] [lettuce-nioEventLoop-35-1] [c.l.r.p.CommandHandler] - [/172.17.0.29:43046 -> redis.staging.mint.internal/172.31.54.220:6379] Unexpected exception during request: rx.exceptions.OnCompletedFailedException: CANCELLED
rx.exceptions.OnCompletedFailedException: CANCELLED
    at rx.observers.SafeSubscriber.onCompleted(SafeSubscriber.java:85)
    at rx.internal.operators.OperatorDoOnEach$1.onCompleted(OperatorDoOnEach.java:54)
    at rx.observers.Subscribers$5.onCompleted(Subscribers.java:225)
    at rx.internal.operators.OperatorDoOnEach$1.onCompleted(OperatorDoOnEach.java:54)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.onCompleted(OnSubscribeMap.java:97)
    at rx.observers.Subscribers$5.onCompleted(Subscribers.java:225)
    at rx.observers.Subscribers$5.onCompleted(Subscribers.java:225)
    at rx.internal.operators.OperatorDoOnEach$1.onCompleted(OperatorDoOnEach.java:54)
    at rx.internal.operators.OperatorDoOnEach$1.onCompleted(OperatorDoOnEach.java:54)
    at rx.internal.operators.OperatorDoOnEach$1.onCompleted(OperatorDoOnEach.java:54)
    at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onCompleted(OperatorOnErrorResumeNextViaFunction.java:101)
    at rx.internal.operators.OperatorDoOnEach$1.onCompleted(OperatorDoOnEach.java:54)
    at rx.internal.operators.OperatorDoOnEach$1.onCompleted(OperatorDoOnEach.java:54)
    at com.netflix.hystrix.AbstractCommand$HystrixObservableTimeoutOperator$3.onCompleted(AbstractCommand.java:1142)
    at rx.observers.Subscribers$5.onCompleted(Subscribers.java:225)
    at com.netflix.hystrix.AbstractCommand$DeprecatedOnRunHookApplication$1.onCompleted(AbstractCommand.java:1399)
    at com.netflix.hystrix.AbstractCommand$ExecutionHookApplication$1.onCompleted(AbstractCommand.java:1332)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.onCompleted(OnSubscribeMap.java:97)
    at rx.internal.producers.SingleDelayedProducer.emit(SingleDelayedProducer.java:110)
    at rx.internal.producers.SingleDelayedProducer.setValue(SingleDelayedProducer.java:85)
    at rx.internal.operators.OperatorToObservableList$1.onCompleted(OperatorToObservableList.java:98)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:662)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:574)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.onCompleted(OperatorMerge.java:283)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.onCompleted(OnSubscribeMap.java:97)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.onCompleted(OnSubscribeMap.java:97)
    at rx.internal.producers.SingleDelayedProducer.emit(SingleDelayedProducer.java:110)
    at rx.internal.producers.SingleDelayedProducer.setValue(SingleDelayedProducer.java:85)
    at rx.internal.operators.OperatorToObservableList$1.onCompleted(OperatorToObservableList.java:98)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:662)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:574)
    at rx.internal.operators.OperatorMerge$InnerSubscriber.onCompleted(OperatorMerge.java:863)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.onCompleted(OnSubscribeMap.java:97)
    at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onCompleted(OperatorOnErrorResumeNextViaFunction.java:101)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.onCompleted(OnSubscribeMap.java:97)
    at com.lambdaworks.redis.ReactiveCommandDispatcher$ObservableCommand.complete(ReactiveCommandDispatcher.java:127)
    at com.lambdaworks.redis.protocol.CommandHandler.decode(CommandHandler.java:187)
    at com.lambdaworks.redis.protocol.CommandHandler.channelRead(CommandHandler.java:153)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:334)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:326)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:334)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:326)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:334)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:326)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:334)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:326)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1320)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:334)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:905)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:563)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:504)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:418)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:390)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:742)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:145)
    at java.lang.Thread.run(Thread.java:745)
Caused by: io.grpc.StatusRuntimeException: CANCELLED
    at io.grpc.Status.asRuntimeException(Status.java:536)
    at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onCompleted(ServerCalls.java:302)
    at rx.internal.util.ActionSubscriber.onCompleted(ActionSubscriber.java:49)
    at rx.observers.SafeSubscriber.onCompleted(SafeSubscriber.java:79)
    ... 64 more

Let's say it is ok.

Later I decided to check https://github.com/mp911de/lettuce/blob/master/src/main/java/com/lambdaworks/redis/ReactiveCommandDispatcher.java

        @Override
        @SuppressWarnings("unchecked")
        public void complete() {
            if (completed) {
                return;
            }

            super.complete();

            if (getOutput() != null) {
                Object result = getOutput().get();
                if (!(getOutput() instanceof StreamingOutput<?>) && result != null) {

                    if (dissolve && result instanceof Collection) {
                        Collection<T> collection = (Collection<T>) result;
                        for (T t : collection) {
                            subscriber.onNext(t);
                        }
                    } else {
                        subscriber.onNext((T) result);
                    }
                }

                if (getOutput().hasError()) {
                    subscriber.onError(new RedisCommandExecutionException(getOutput().getError()));
                    completed = true;
                    return;
                }
            }

            completed = true;
            subscriber.onCompleted();
        }

And I noticed that you do not check the subscription status (isUnsubscribed). So I think that I understand the problem now: I'm doing some logic in my command and sending some requests to Redis, command gets cancelled because of timeout or whatever reason, lettuce gets results and notifies unsubscribed subscriber about it (onNext, onCompleted, onError). and gets exception that command was cancelled.

So the question goes lis: should we check the subscription status before notifying subscriber about result or not. As far as I know we should, but I have to double check Rx docs to be sure.

@mp911de mp911de added the type: bug A general bug label Jul 27, 2016
@mp911de
Copy link
Collaborator

mp911de commented Jul 27, 2016

Hi @vleushin,
Thanks for the bug report. The docs mention in their examples checks to isUnsubscribed() before doing any processing. Examples also skip calls to onCompleted() if the subscriber is not subscribed anymore.

@vleushin
Copy link
Author

I also recall that Observable.create is not recommended for simple observable wrappers because you should do unsubscribe check manually (which you can forget), while Observable.fromCallable is recommended because it does it for you (it uses SingleDelayedProducer inside and it does check for isUnsubscribed).

Glad to help. Second question -- how bad it is for lettuce to get exceptions there. Is it just message and stacktrace in logs or will it lead to some state corruption somewhere?

@mp911de
Copy link
Collaborator

mp911de commented Jul 27, 2016

You can also unsubscribe during emission of items, so having that safeguards in place isn't too bad of an idea.

Lettuce should not get into an invalid state. Data streaming using onNext() (emitting items as they are received) are protected. Exceptions result in just a stack trace warning. There's no one else to notify.

Calls to onCompleted() throw the exception into the netty layer but nothing breaks. You'll see an exception message and that's it.

mp911de added a commit that referenced this issue Jul 27, 2016
Perform a check to isUnsubscribed() before emitting items or calling completion methods
mp911de added a commit that referenced this issue Jul 27, 2016
Perform a check to isUnsubscribed() before emitting items or calling completion methods
@mp911de
Copy link
Collaborator

mp911de commented Jul 27, 2016

I added the check to isUnsubscribed. There are however two issues:

  1. If no subscriber is subscribed, then the completion event would (should) not hit the subscriber/operator throwing OnCompletedFailedException
  2. The exception bounces back to the ReactiveCommandDispatcher. Some frameworks leave the exception hit somewhere and the just log it. Other frameworks call onError(…) that might swallow the exception. In any case, there's no way how to deal with the exception, because no one seems wanting to handle it (the subscriber can't handle it and the ReactiveCommandDispatcher has no chance of handling it other than either suppress or log). So the only sane way is adding a onError... operator so you can decide on your own what to do with the exception.

@mp911de
Copy link
Collaborator

mp911de commented Jul 27, 2016

@vleushin I had a chat with the RxJava guys. Throwing exceptions in onCompleted is a protocol violation and that should be fixed upstream. Using flatMap or doOnCompleted to execute work is the right approach.

@vleushin
Copy link
Author

This is interesting. Thanks! I think I must tell that to GRPC guys. On the other hand -- they are throwing it in case when you should not call in (when it is unsubscribed).

@mp911de mp911de added this to the Lettuce 4.2.2 milestone Aug 3, 2016
@mp911de
Copy link
Collaborator

mp911de commented Aug 3, 2016

I added the fix to check for isUnsubscribed to observable completion, so this ticket is solved now. If you still run into that issue, let's have a chat together with the other parties together to figure out what's going on and what it takes to fix it.

@mp911de mp911de closed this as completed Aug 3, 2016
@vleushin
Copy link
Author

vleushin commented Aug 3, 2016

Thanks. Meanwhile, I tried new library setup on production -- and I got same problem. Getting completely wrong results. Say, user 1 does get A, get B, get C, and user 2 does get X, get Y, get Z, two completely different sequences of requests, and somehow, magically somehow, user 1 ends up with X Z B and user 2 with A Y C or something similar.

I think I understand how to reproduce it on my local machine and I will be debugging this tomorrow. I'll share my findings.

@mp911de
Copy link
Collaborator

mp911de commented Aug 4, 2016

Thanks a lot for your feedback. I'd also like to understand the issue and get it fixed.

@vleushin
Copy link
Author

vleushin commented Aug 4, 2016

@mp911de I finally managed to pinpoint it. It is definitely because of that exception in onCompete! It triggers something.

Here is simple test to reproduce problem:

    @Test
    public void testWrongValue() throws ExecutionException, InterruptedException {
        final RedisClient client = RedisClient.create("redis://localhost/0");
        final StatefulRedisConnection<String, String> connect = client.connect();
        final RedisReactiveCommands<String, String> commands = connect.reactive();

        Observable.concat(
                commands.set("keyA", "valueA"),
                commands.set("keyB", "valueB"))
                .lastOrDefault(null)
                .toBlocking()
                .toFuture()
                .get();

        commands.get("keyA").subscribe(createSubscriberWithExceptionOnComplete());

        // bonus: comment this line and it will work fine
        commands.get("keyA").subscribe(createSubscriberWithExceptionOnComplete());

        Thread.sleep(100);

        final String valueB = commands.get("keyB")
                .toBlocking()
                .toFuture()
                .get();

        System.out.println(valueB); // prints valueA
    }

    private static Subscriber<String> createSubscriberWithExceptionOnComplete() {
        return new Subscriber<String>() {
            @Override
            public void onCompleted() {
                throw new RuntimeException("throwing something");
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onNext(String s) {
            }
        };
    }

@vleushin
Copy link
Author

vleushin commented Aug 4, 2016

I inspected CommandHandler, and I've found this code:

    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException {

        while (!queue.isEmpty()) {

            RedisCommand<K, V, ?> command = queue.peek();
            if (debugEnabled) {
                logger.debug("{} Queue contains: {} commands", logPrefix(), queue.size());
            }

            WithLatency withLatency = null;

            if (clientResources.commandLatencyCollector().isEnabled()) {
                RedisCommand<K, V, ?> unwrappedCommand = CommandWrapper.unwrap(command);
                if (unwrappedCommand instanceof WithLatency) {
                    withLatency = (WithLatency) unwrappedCommand;
                    if (withLatency.getFirstResponse() == -1) {
                        withLatency.firstResponse(nanoTime());
                    }
                }
            }

            if (!rsm.decode(buffer, command, command.getOutput())) {
                return;
            }

            command = queue.poll();
            recordLatency(withLatency, command.getType());

            command.complete();

            if (buffer != null && buffer.refCnt() != 0) {
                buffer.discardReadBytes();
            }
        }
    }

And I think because of that exception in command.complete() something is not discarded from buffer and commands get shifted. That's my wild guess, I think you understand this code much better.

One more observation that let me think about commands getting shifted:

        final String firstValueB = commands.get("keyB")
                .toBlocking()
                .toFuture()
                .get();
        final String secondValueB = commands.get("keyB")
                .toBlocking()
                .toFuture()
                .get();

        System.out.println(firstValueB); // prints valueA
        System.out.println(secondValueB); // prints valueB

@vleushin
Copy link
Author

vleushin commented Aug 10, 2016

@mp911de What do you think? I tried wrapping command.complete() in try/catch block and it worked fine so far. Using this version at the moment in production.

@mp911de
Copy link
Collaborator

mp911de commented Aug 10, 2016

Thanks for digging into the issue. I need to get up to speed. Was on travel last week and got sick, but I'll come back to you.

@mp911de
Copy link
Collaborator

mp911de commented Aug 10, 2016

I took a look at the issue. Your fix will work but that's not the root cause. The buffer maintains a reader index so additional incoming bytes can resume from the position where decoding was finished the last time bytes were received. That code is safe. Another issue causes the problem: The exception is propagated to the exception handler and exceptionCaught inspects the queue and completes the next command in the queue with the exception. At that time the causing command no longer in the queue so an innocent command is killed with the exception and the whole connection gets out of sync.

I already have a fix on my mind, I'll switch the completion/queue poll order so I can keep the global exception handler and I'll remove the command from the queue once it is either successfully or exceptionally completed.

I will apply your suggested fix.

mp911de added a commit that referenced this issue Aug 10, 2016
Command completion is now guarded by try/catch to prevent the global exception handler to kick in. Commands must be removed from the queue prior to completion to prevent duplicate submission on ping-before-reconnect.

See also #323
mp911de added a commit that referenced this issue Aug 10, 2016
Command completion is now guarded by try/catch to prevent the global exception handler to kick in. Commands must be removed from the queue prior to completion to prevent duplicate submission on ping-before-reconnect.

See also #323
@vleushin
Copy link
Author

I'm very glad we figured that out! Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: bug A general bug
Projects
None yet
Development

No branches or pull requests

2 participants