Skip to content

Finagle redis client subscribe command fails in 18.6.0 #704

@shakhovv

Description

@shakhovv

We tried use subscribe command with auth in redis and got this error: NOAUTH Authentication required.

Expected behavior

Should work subscribe without errors in finagle-redis with auth

Actual behavior

My app can't make any subscription and we got this exception in log:

SEVERE: Exception propagated to the root monitor!
java.lang.IllegalArgumentException: Unexpected reply type: ErrorReply
	at com.twitter.finagle.redis.exp.SubscribeCommands$SubscriptionManager.onMessage(SubscribeClient.scala:242)
	at com.twitter.finagle.redis.exp.SubscribeDispatcher.$anonfun$loop$1(SubscribeDispatcher.scala:20)
	at com.twitter.finagle.redis.exp.SubscribeDispatcher.$anonfun$loop$1$adapted(SubscribeDispatcher.scala:18)
	at com.twitter.util.Promise$Monitored.apply(Promise.scala:202)
	at com.twitter.util.Promise$Monitored.apply(Promise.scala:198)
	at com.twitter.util.Promise$WaitQueue.com$twitter$util$Promise$WaitQueue$$run(Promise.scala:90)
	at com.twitter.util.Promise$WaitQueue$$anon$5.run(Promise.scala:85)
	at com.twitter.concurrent.LocalScheduler$Activation.run(Scheduler.scala:198)
	at com.twitter.concurrent.LocalScheduler$Activation.submit(Scheduler.scala:157)
	at com.twitter.concurrent.LocalScheduler.submit(Scheduler.scala:274)
	at com.twitter.concurrent.Scheduler$.submit(Scheduler.scala:109)
	at com.twitter.util.Promise$WaitQueue.runInScheduler(Promise.scala:85)
	at com.twitter.util.Promise$WaitQueue.runInScheduler$(Promise.scala:84)
	at com.twitter.util.Promise$Transformer.runInScheduler(Promise.scala:215)
	at com.twitter.util.Promise.updateIfEmpty(Promise.scala:739)
	at com.twitter.util.Promise.update(Promise.scala:711)
	at com.twitter.util.Promise.setValue(Promise.scala:687)
	at com.twitter.concurrent.AsyncQueue.offer(AsyncQueue.scala:122)
	at com.twitter.finagle.netty4.transport.ChannelTransport$$anon$2.channelRead(ChannelTransport.scala:188)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at com.twitter.finagle.netty4.codec.BufCodec$.channelRead(BufCodec.scala:65)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at com.twitter.finagle.util.BlockingTimeTrackingThreadFactory$$anon$1.run(BlockingTimeTrackingThreadFactory.scala:23)
	at java.lang.Thread.run(Thread.java:745)

Steps to reproduce the behavior

We use this environment:
finagle-redis: 18.6.0
redis: 4.0.9

Test:

  import com.twitter.util.Await
  import com.twitter.finagle.Redis
  import com.twitter.io.Buf

  "Subscription with auth" should "work correctly" in {

    val r = Redis.client.newRichClient("localhost:6379")
    Await.result(r.auth(Buf.Utf8("mypass")))

    val subscriber = r.subscribe(Seq(Buf.Utf8("test"))) { case (_: Buf, message: Buf) =>
      println(Buf.Utf8.unapply(message).get)
    }

    Await.result(subscriber)
    Await.result(r.publish(Buf.Utf8("test"), Buf.Utf8("test")))
  }

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions