Skip to content

DefaultEndpoint.QUEUE_SIZE becomes out of sync, preventing command queueing #764

@nivekastoreth

Description

@nivekastoreth

Observed Version(s): 5.0.3.RELEASE

Introduced in Version(s): 4.4.0.Final

  • Still exists in 5.0.3.RELEASE

Still visible in master? Unknown but likely

  • I've not tested the 5.x branch at all

Expected:
When request queue size is hit, submitted commands are terminated early. When request queue drains, new commands are once again submitted

Actual:
I'm still in the process of determining exactly what is happening here, but what I'm observing is that when a redis instance is performing a task that blocks the foreground thread for a substantial amount of time (seconds up to minutes, details on how to do this below), the DefaultEndpoint can become wedged in a state where QUEUE_SIZE is stuck at a non-zero value. If this value is greater than clientOptions.getRequestQueueSize() - command, validateWrite will never again validate any writes submitted to it.


To Reproduce
Using the setup shown below, connect to redis and verify that commands are processed correctly. Then submit a redis save command, and while that save is running (that's why we use a large list, but there are other ways to replicate this), submit more than requestQueueSize requests:

sudo docker exec helix-redis redis-cli save &
for i in {1..100}; do cat payload.json| curl -v -H 'Content-Type: application/json' -d @- http://localhost:8000/redis-endpoint 2>&1; done

Performing the above, and then waiting for the save command to complete, results in the log file:

After the save operation has completed, submitting a single followup request results in the log file:


Speculation:
I believe the dequeue command is never called due, in some part, to the following error, but currently haven't tracked down the exact flow that results in this case:

2018-04-20 14:43:53 UTC [lettuce-nioEventLoop-10-3] WARN  i.n.c.AbstractChannelHandlerContext - Failed to mark a promise as failure because it has failed already: DefaultChannelPromise@4a01a122(failure: io.lettuce.core.RedisException: Internal stack size exceeded: 10. Commands are not accepted until the stack size drops.), unnotified cause: io.lettuce.core.RedisException: Internal stack size exceeded: 10. Commands are not accepted until the stack size drops.
	at io.lettuce.core.protocol.CommandHandler.validateWrite(CommandHandler.java:441)
	at io.lettuce.core.protocol.CommandHandler.addToStack(CommandHandler.java:410)
	at io.lettuce.core.protocol.CommandHandler.writeSingleCommand(CommandHandler.java:367)
	at io.lettuce.core.protocol.CommandHandler.write(CommandHandler.java:334)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
	at io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:38)
	at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:1081)
	at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1128)
	at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1070)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:163)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:745)

Setup:
Redis:

-- sudo docker exec redis redis-cli --eval redis-gen.lua , "large_set" 50000000 0

local set_name = #ARGV >= 1 and ARGV[1] or 'large_set'
local entry_count = #ARGV >= 2 and ARGV[2] or 50000000
local entry_start = #ARGV >= 3 and ARGV[3] or 0

local memory = {}

local base = '00000000-0000-0000-0000-'
local min = 100000000000

local start = entry_count * entry_start
local stop = start + entry_count
for user = start, stop, 1 do
  redis.call("sadd", set_name, base .. (min + user))
end

memory[set_name] = redis.call("scard", set_name)
return cjson.encode(memory)

Client:

  val resources = DefaultClientResources.builder()
    .ioThreadPoolSize(11)
    .computationThreadPoolSize(11)
    .build()
  val client = RedisClient.create(resources)
  val options = ClientOptions.builder()
    .autoReconnect(true)
    .requestQueueSize(10) // low number here to make it easier to reproduce
    .pingBeforeActivateConnection(false)
    .cancelCommandsOnReconnectFailure(true)
    .disconnectedBehavior(DisconnectedBehavior.REJECT_COMMANDS)
    .build()
  client.setOptions(options)
  val connection = client.connect(uri)

  // later on, submitting requests via:
  val async: RedisAsyncCommands[String, String] = connection().async()
  val future: RedisFuture[[String] = async.evalsha[String](digest.value, ScriptOutputType.VALUE, keys, values: _*)

P.S. This println also seems removable:

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions