Skip to content

Commit

Permalink
Don't write replies if close the client ASAP (redis#7202)
Browse files Browse the repository at this point in the history
Before this commit, we would have continued to add replies to the reply buffer even if client
output buffer limit is reached, so the used memory would keep increasing over the configured limit.
What's more, we shouldn’t write any reply to the client if it is set 'CLIENT_CLOSE_ASAP' flag
because that doesn't conform to its definition and we will close all clients flagged with
'CLIENT_CLOSE_ASAP' in ‘beforeSleep’.

Because of code execution order, before this, we may firstly write to part of the replies to
the socket before disconnecting it, but in fact, we may can’t send the full replies to clients
since OS socket buffer is limited. But this unexpected behavior makes some commands work well,
for instance ACL DELUSER, if the client deletes the current user, we need to send reply to client
and close the connection, but before, we close the client firstly and write the reply to reply
buffer. secondly, we shouldn't do this despite the fact it works well in most cases.

We add a flag 'CLIENT_CLOSE_AFTER_COMMAND' to mark clients, this flag means we will close the
client after executing commands and send all entire replies, so that we can write replies to
reply buffer during executing commands, send replies to clients, and close them later.

We also fix some implicit problems. If client output buffer limit is enforced in 'multi/exec',
all commands will be executed completely in redis and clients will not read any reply instead of
partial replies. Even more, if the client executes 'ACL deluser' the using user in 'multi/exec',
it will not read the replies after 'ACL deluser' just like before executing 'client kill' itself
in 'multi/exec'.

We added some tests for output buffer limit breach during multi-exec and using a pipeline of
many small commands rather than one with big response.

Co-authored-by: Oran Agra <oran@redislabs.com>
  • Loading branch information
ShooterIT and oranagra committed Sep 24, 2020
1 parent b464afb commit 57709c4
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 2 deletions.
8 changes: 7 additions & 1 deletion src/acl.c
Expand Up @@ -297,7 +297,13 @@ void ACLFreeUserAndKillClients(user *u) {
* it in non authenticated mode. */
c->user = DefaultUser;
c->authenticated = 0;
freeClientAsync(c);
/* We will write replies to this client later, so we can't
* close it directly even if async. */
if (c == server.current_client) {
c->flags |= CLIENT_CLOSE_AFTER_COMMAND;
} else {
freeClientAsync(c);
}
}
}
ACLFreeUser(u);
Expand Down
8 changes: 7 additions & 1 deletion src/module.c
Expand Up @@ -5538,7 +5538,13 @@ void revokeClientAuthentication(client *c) {

c->user = DefaultUser;
c->authenticated = 0;
freeClientAsync(c);
/* We will write replies to this client later, so we can't close it
* directly even if async. */
if (c == server.current_client) {
c->flags |= CLIENT_CLOSE_AFTER_COMMAND;
} else {
freeClientAsync(c);
}
}

/* Cleanup all clients that have been authenticated with this module. This
Expand Down
14 changes: 14 additions & 0 deletions src/networking.c
Expand Up @@ -224,6 +224,9 @@ int prepareClientToWrite(client *c) {
* handler since there is no socket at all. */
if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK;

/* If CLIENT_CLOSE_ASAP flag is set, we need not write anything. */
if (c->flags & CLIENT_CLOSE_ASAP) return C_ERR;

/* CLIENT REPLY OFF / SKIP handling: don't send replies. */
if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR;

Expand Down Expand Up @@ -1471,6 +1474,9 @@ int handleClientsWithPendingWrites(void) {
* that may trigger write error or recreate handler. */
if (c->flags & CLIENT_PROTECTED) continue;

/* Don't write to clients that are going to be closed anyway. */
if (c->flags & CLIENT_CLOSE_ASAP) continue;

/* Try to write buffers to the client socket. */
if (writeToClient(c,0) == C_ERR) continue;

Expand Down Expand Up @@ -3160,6 +3166,14 @@ int handleClientsWithPendingWritesUsingThreads(void) {
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;

/* Remove clients from the list of pending writes since
* they are going to be closed ASAP. */
if (c->flags & CLIENT_CLOSE_ASAP) {
listDelNode(server.clients_pending_write, ln);
continue;
}

int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
Expand Down
7 changes: 7 additions & 0 deletions src/server.c
Expand Up @@ -3427,6 +3427,13 @@ void call(client *c, int flags) {
dirty = server.dirty-dirty;
if (dirty < 0) dirty = 0;

/* After executing command, we will close the client after writing entire
* reply if it is set 'CLIENT_CLOSE_AFTER_COMMAND' flag. */
if (c->flags & CLIENT_CLOSE_AFTER_COMMAND) {
c->flags &= ~CLIENT_CLOSE_AFTER_COMMAND;
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
}

/* When EVAL is called loading the AOF we don't want commands called
* from Lua to go into the slowlog or to populate statistics. */
if (server.loading && c->flags & CLIENT_LUA)
Expand Down
2 changes: 2 additions & 0 deletions src/server.h
Expand Up @@ -265,6 +265,8 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
about writes performed by myself.*/
#define CLIENT_IN_TO_TABLE (1ULL<<38) /* This client is in the timeout table. */
#define CLIENT_PROTOCOL_ERROR (1ULL<<39) /* Protocol error chatting with it. */
#define CLIENT_CLOSE_AFTER_COMMAND (1ULL<<40) /* Close after executing commands
* and writing entire reply. */

/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
Expand Down
17 changes: 17 additions & 0 deletions tests/unit/acl.tcl
Expand Up @@ -260,6 +260,23 @@ start_server {tags {"acl"}} {
catch {r ACL help xxx} e
assert_match "*Unknown subcommand or wrong number of arguments*" $e
}

test {Delete a user that the client doesn't use} {
r ACL setuser not_used on >passwd
assert {[r ACL deluser not_used] == 1}
# The client is not closed
assert {[r ping] eq {PONG}}
}

test {Delete a user that the client is using} {
r ACL setuser using on +acl >passwd
r AUTH using passwd
# The client will receive reply normally
assert {[r ACL deluser using] == 1}
# The client is closed
catch {[r ping]} e
assert_match "*I/O error*" $e
}
}

set server_path [tmpdir "server.acl"]
Expand Down
90 changes: 90 additions & 0 deletions tests/unit/obuf-limits.tcl
Expand Up @@ -70,4 +70,94 @@ start_server {tags {"obuf-limits"}} {
assert {$omem >= 100000 && $time_elapsed < 6}
$rd1 close
}

test {No response for single command if client output buffer hard limit is enforced} {
r config set client-output-buffer-limit {normal 100000 0 0}
# Total size of all items must be more than 100k
set item [string repeat "x" 1000]
for {set i 0} {$i < 150} {incr i} {
r lpush mylist $item
}
set orig_mem [s used_memory]
# Set client name and get all items
set rd [redis_deferring_client]
$rd client setname mybiglist
assert {[$rd read] eq "OK"}
$rd lrange mylist 0 -1
$rd flush
after 100

# Before we read reply, redis will close this client.
set clients [r client list]
assert_no_match "*name=mybiglist*" $clients
set cur_mem [s used_memory]
# 10k just is a deviation threshold
assert {$cur_mem < 10000 + $orig_mem}

# Read nothing
set fd [$rd channel]
assert_equal {} [read $fd]
}

test {No response for multi commands in pipeline if client output buffer limit is enforced} {
r config set client-output-buffer-limit {normal 100000 0 0}
set value [string repeat "x" 10000]
r set bigkey $value
set rd1 [redis_deferring_client]
set rd2 [redis_deferring_client]
$rd2 client setname multicommands
assert_equal "OK" [$rd2 read]
# Let redis sleep 2s firstly
$rd1 debug sleep 2
$rd1 flush
after 100

# Total size should be less than OS socket buffer, redis can
# execute all commands in this pipeline when it wakes up.
for {set i 0} {$i < 15} {incr i} {
$rd2 set $i $i
$rd2 get $i
$rd2 del $i
# One bigkey is 10k, total response size must be more than 100k
$rd2 get bigkey
}
$rd2 flush
after 100

# Reds must wake up if it can send reply
assert_equal "PONG" [r ping]
set clients [r client list]
assert_no_match "*name=multicommands*" $clients
set fd [$rd2 channel]
assert_equal {} [read $fd]
}

test {Execute transactions completely even if client output buffer limit is enforced} {
r config set client-output-buffer-limit {normal 100000 0 0}
# Total size of all items must be more than 100k
set item [string repeat "x" 1000]
for {set i 0} {$i < 150} {incr i} {
r lpush mylist2 $item
}

# Output buffer limit is enforced during executing transaction
r client setname transactionclient
r set k1 v1
r multi
r set k2 v2
r get k2
r lrange mylist2 0 -1
r set k3 v3
r del k1
catch {[r exec]} e
assert_match "*I/O error*" $e
reconnect
set clients [r client list]
assert_no_match "*name=transactionclient*" $clients

# Transactions should be executed completely
assert_equal {} [r get k1]
assert_equal "v2" [r get k2]
assert_equal "v3" [r get k3]
}
}

0 comments on commit 57709c4

Please sign in to comment.