Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't write replies if close the client ASAP #7202

Merged
merged 7 commits into from Sep 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/acl.c
Expand Up @@ -297,7 +297,13 @@ void ACLFreeUserAndKillClients(user *u) {
* it in non authenticated mode. */
c->user = DefaultUser;
c->authenticated = 0;
freeClientAsync(c);
/* We will write replies to this client later, so we can't
* close it directly even if async. */
if (c == server.current_client) {
c->flags |= CLIENT_CLOSE_AFTER_COMMAND;
} else {
freeClientAsync(c);
}
}
}
ACLFreeUser(u);
Expand Down
8 changes: 7 additions & 1 deletion src/module.c
Expand Up @@ -5530,7 +5530,13 @@ void revokeClientAuthentication(client *c) {

c->user = DefaultUser;
c->authenticated = 0;
freeClientAsync(c);
/* We will write replies to this client later, so we can't close it
* directly even if async. */
if (c == server.current_client) {
c->flags |= CLIENT_CLOSE_AFTER_COMMAND;
} else {
freeClientAsync(c);
}
}

/* Cleanup all clients that have been authenticated with this module. This
Expand Down
14 changes: 14 additions & 0 deletions src/networking.c
Expand Up @@ -223,6 +223,9 @@ int prepareClientToWrite(client *c) {
* handler since there is no socket at all. */
if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK;

/* If CLIENT_CLOSE_ASAP flag is set, we need not write anything. */
if (c->flags & CLIENT_CLOSE_ASAP) return C_ERR;

/* CLIENT REPLY OFF / SKIP handling: don't send replies. */
if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR;

Expand Down Expand Up @@ -1436,6 +1439,9 @@ int handleClientsWithPendingWrites(void) {
* that may trigger write error or recreate handler. */
if (c->flags & CLIENT_PROTECTED) continue;

/* Don't write to clients that are going to be closed anyway. */
if (c->flags & CLIENT_CLOSE_ASAP) continue;

/* Try to write buffers to the client socket. */
if (writeToClient(c,0) == C_ERR) continue;

Expand Down Expand Up @@ -3114,6 +3120,14 @@ int handleClientsWithPendingWritesUsingThreads(void) {
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;

/* Remove clients from the list of pending writes since
* they are going to be closed ASAP. */
if (c->flags & CLIENT_CLOSE_ASAP) {
listDelNode(server.clients_pending_write, ln);
continue;
}

int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
Expand Down
7 changes: 7 additions & 0 deletions src/server.c
Expand Up @@ -3402,6 +3402,13 @@ void call(client *c, int flags) {
dirty = server.dirty-dirty;
if (dirty < 0) dirty = 0;

/* After executing command, we will close the client after writing entire
* reply if it is set 'CLIENT_CLOSE_AFTER_COMMAND' flag. */
if (c->flags & CLIENT_CLOSE_AFTER_COMMAND) {
c->flags &= ~CLIENT_CLOSE_AFTER_COMMAND;
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
}

/* When EVAL is called loading the AOF we don't want commands called
* from Lua to go into the slowlog or to populate statistics. */
if (server.loading && c->flags & CLIENT_LUA)
Expand Down
2 changes: 2 additions & 0 deletions src/server.h
Expand Up @@ -264,6 +264,8 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
about writes performed by myself.*/
#define CLIENT_IN_TO_TABLE (1ULL<<38) /* This client is in the timeout table. */
#define CLIENT_PROTOCOL_ERROR (1ULL<<39) /* Protocol error chatting with it. */
#define CLIENT_CLOSE_AFTER_COMMAND (1ULL<<40) /* Close after executing commands
* and writing entire reply. */

/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
Expand Down
17 changes: 17 additions & 0 deletions tests/unit/acl.tcl
Expand Up @@ -260,6 +260,23 @@ start_server {tags {"acl"}} {
catch {r ACL help xxx} e
assert_match "*Unknown subcommand or wrong number of arguments*" $e
}

test {Delete a user that the client doesn't use} {
r ACL setuser not_used on >passwd
assert {[r ACL deluser not_used] == 1}
# The client is not closed
assert {[r ping] eq {PONG}}
}

test {Delete a user that the client is using} {
r ACL setuser using on +acl >passwd
r AUTH using passwd
# The client will receive reply normally
assert {[r ACL deluser using] == 1}
# The client is closed
catch {[r ping]} e
assert_match "*I/O error*" $e
}
}

set server_path [tmpdir "server.acl"]
Expand Down
90 changes: 90 additions & 0 deletions tests/unit/obuf-limits.tcl
Expand Up @@ -70,4 +70,94 @@ start_server {tags {"obuf-limits"}} {
assert {$omem >= 100000 && $time_elapsed < 6}
$rd1 close
}

test {No response for single command if client output buffer hard limit is enforced} {
r config set client-output-buffer-limit {normal 100000 0 0}
# Total size of all items must be more than 100k
set item [string repeat "x" 1000]
for {set i 0} {$i < 150} {incr i} {
r lpush mylist $item
}
set orig_mem [s used_memory]
# Set client name and get all items
set rd [redis_deferring_client]
$rd client setname mybiglist
assert {[$rd read] eq "OK"}
$rd lrange mylist 0 -1
oranagra marked this conversation as resolved.
Show resolved Hide resolved
$rd flush
after 100

# Before we read reply, redis will close this client.
set clients [r client list]
assert_no_match "*name=mybiglist*" $clients
set cur_mem [s used_memory]
# 10k just is a deviation threshold
assert {$cur_mem < 10000 + $orig_mem}
oranagra marked this conversation as resolved.
Show resolved Hide resolved

# Read nothing
set fd [$rd channel]
assert_equal {} [read $fd]
}

test {No response for multi commands in pipeline if client output buffer limit is enforced} {
r config set client-output-buffer-limit {normal 100000 0 0}
set value [string repeat "x" 10000]
r set bigkey $value
set rd1 [redis_deferring_client]
set rd2 [redis_deferring_client]
$rd2 client setname multicommands
assert_equal "OK" [$rd2 read]
# Let redis sleep 2s firstly
$rd1 debug sleep 2
$rd1 flush
after 100

# Total size should be less than OS socket buffer, redis can
# execute all commands in this pipeline when it wakes up.
for {set i 0} {$i < 15} {incr i} {
$rd2 set $i $i
$rd2 get $i
$rd2 del $i
# One bigkey is 10k, total response size must be more than 100k
$rd2 get bigkey
}
$rd2 flush
after 100

# Reds must wake up if it can send reply
assert_equal "PONG" [r ping]
set clients [r client list]
assert_no_match "*name=multicommands*" $clients
set fd [$rd2 channel]
assert_equal {} [read $fd]
ShooterIT marked this conversation as resolved.
Show resolved Hide resolved
}

test {Execute transactions completely even if client output buffer limit is enforced} {
r config set client-output-buffer-limit {normal 100000 0 0}
# Total size of all items must be more than 100k
set item [string repeat "x" 1000]
for {set i 0} {$i < 150} {incr i} {
r lpush mylist2 $item
}

# Output buffer limit is enforced during executing transaction
r client setname transactionclient
r set k1 v1
r multi
r set k2 v2
r get k2
r lrange mylist2 0 -1
r set k3 v3
r del k1
catch {[r exec]} e
oranagra marked this conversation as resolved.
Show resolved Hide resolved
assert_match "*I/O error*" $e
reconnect
set clients [r client list]
assert_no_match "*name=transactionclient*" $clients

# Transactions should be executed completely
assert_equal {} [r get k1]
assert_equal "v2" [r get k2]
ShooterIT marked this conversation as resolved.
Show resolved Hide resolved
assert_equal "v3" [r get k3]
}
}