Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Deadlock with streams on redis 7.2 #12290

Closed
Rafiot opened this issue Jun 9, 2023 · 7 comments · Fixed by #12301
Closed

[BUG] Deadlock with streams on redis 7.2 #12290

Rafiot opened this issue Jun 9, 2023 · 7 comments · Fixed by #12301

Comments

@Rafiot
Copy link

Rafiot commented Jun 9, 2023

Describe the bug

The code below causes redis 7.2 to get completely stuck (100% CPU, impossible to connect to the server, have to use kill -9). nb_subscribers = 2 will work fine.

The same code works as expected with redis 7.0.

To reproduce

subscriber.py

#!/usr/bin/env python3

import time

from multiprocessing import Process

from redis import Redis


nb_subscribers = 3


def subscriber(user_id):
    r = Redis(unix_socket_path='cache.sock')
    try:
        r.xgroup_create(name='tasks_queue', groupname='test', mkstream=True)
    except Exception:
        print('group already exists')

    while True:
        new_stream = r.xreadgroup(
            groupname='test', consumername=f'testuser-{user_id}', streams={'tasks_queue': '>'},
            block=2000, count=1)
        if not new_stream:
            time.sleep(5)
            continue
        print(new_stream)


processes = []
for i in range(nb_subscribers):
    p = Process(target=subscriber, args=(i,))
    p.start()
    processes.append(p)

while processes:
    new_p = []
    for p in processes:
        if p.is_alive():
            new_p.append(p)
    processes = new_p
    time.sleep(5)

print('all processes dead')

feeder.py

#!/usr/bin/env python3

import time
import uuid

from multiprocessing import Process

from redis import Redis

nb_feeders = 1


def feeder():

    r = Redis(unix_socket_path='cache.sock')

    while True:
        fields = {'task_uuid': str(uuid.uuid4())}
        r.xadd(name='tasks_queue', fields=fields, id='*', maxlen=5000)
        time.sleep(.1)


processes = []
for _ in range(nb_feeders):
    p = Process(target=feeder)
    p.start()
    processes.append(p)

while processes:
    new_p = []
    for p in processes:
        if p.is_alive():
            new_p.append(p)
    processes = new_p
    time.sleep(5)

print('all processes dead')
@enjoy-binbin
Copy link
Collaborator

thanks for report, verified.

@enjoy-binbin
Copy link
Collaborator

@ranshid look like it was due to #11012, it keep going, reprocess command -> blockForKeys -> reprocess command
we need to find some ways to break the loop, below is a rough fix of mine, maybe you have a better view.

    /* Block if needed. */
    if (timeout != -1) {
        /* If we are not allowed to block the client, the only thing
         * we can do is treating it as a timeout (even with timeout 0). */
        if (c->flags & CLIENT_DENY_BLOCKING) {
            addReplyNullArray(c);
            goto cleanup;
        }
        /* We change the '$' to the current last ID for this stream. this is
         * Since later on when we unblock on arriving data - we would like to
         * re-process the command and in case '$' stays we will spin-block forever.
         */
        for (int id_idx = 0; id_idx < streams_count; id_idx++) {
            int arg_idx = id_idx + streams_arg + streams_count;
            if (strcmp(c->argv[arg_idx]->ptr,"$") == 0) {
                robj *argv_streamid = createObjectFromStreamID(&ids[id_idx]);
                rewriteClientCommandArgument(c, arg_idx, argv_streamid);
                decrRefCount(argv_streamid);
            }

+            /* In '>' case, if we have blocked before, reprocess the command and enter here,
+             * do not enter the block again, otherwise we will spin-block forever. */
+          if (c->bstate.timeout != 0 && strcmp(c->argv[arg_idx]->ptr, ">") == 0) {
+              goto cleanup;
            }
        }
        blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count, timeout, xreadgroup);
        goto cleanup;
    }

@oranagra FYI, this issue needs attention

@oranagra
Copy link
Member

i think the right fix is to avoid an endless loop in handleClientsBlockedOnKey and handleClientsBlockedOnKeys,
looks like there was some attempt in handleClientsBlockedOnKeys but maybe not sufficiently good.
and it looks like using a similar trick in handleClientsBlockedOnKey is complicated.
i.e. stashing the list on the stack and iterating on it after creating a fresh one for future use, is problematic since the code keeps accessing the global list.
with the following ugly fix, the problem is solved, what's left is to write a tcl test that reproduces it.

diff --git a/src/blocked.c b/src/blocked.c
index 1b3a804b1..af1d5c039 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -325,7 +325,7 @@ void handleClientsBlockedOnKeys(void) {
      * (i.e. not from call(), module context, etc.) */
     serverAssert(server.also_propagate.numops == 0);
 
-    while(listLength(server.ready_keys) != 0) {
+    if (listLength(server.ready_keys) != 0) {
         list *l;
 
         /* Point server.ready_keys to a fresh list and save the current one
@@ -563,8 +563,8 @@ static void handleClientsBlockedOnKey(readyList *rl) {
         listNode *ln;
         listIter li;
         listRewind(clients,&li);
-
-        while((ln = listNext(&li))) {
+        long count = listLength(clients);
+        while((ln = listNext(&li)) && count--) {
             client *receiver = listNodeValue(ln);
             robj *o = lookupKeyReadWithFlags(rl->db, rl->key, LOOKUP_NOEFFECTS);
             /* 1. In case new key was added/touched we need to verify it satisfy the

enjoy-binbin added a commit to enjoy-binbin/redis that referenced this issue Jun 12, 2023
For the XREADGROUP BLOCK > scenario, there is an endless loop.
Due to redis#11012, it keep going, reprocess command -> blockForKeys -> reprocess command

The right fix is to avoid an endless loop in handleClientsBlockedOnKey and handleClientsBlockedOnKeys,
looks like there was some attempt in handleClientsBlockedOnKeys but maybe not sufficiently good,
and it looks like using a similar trick in handleClientsBlockedOnKey is complicated.
i.e. stashing the list on the stack and iterating on it after creating a fresh one for future use,
is problematic since the code keeps accessing the global list.

The fix is proposed by oranagra.

Fixes redis#12290

Co-authored-by: Oran Agra <oran@redislabs.com>
@selfboot
Copy link

I still have a question. If the feeder and subscribe client processes are killed, theoretically, the server should be able to close the connections of these two clients. Even in version 7.2 with the infinite loop bug, it shouldn't keep putting disconnected clients back into the queue, right?

In addition, I have another question. When I use gdb attach to the server and see that there is an infinite loop in the while loop below, I didn't see at what point Li added a client during the processing. Can you help explain this?

static void handleClientsBlockedOnKey(readyList *rl) {
        ...
        while((ln = listNext(&li))) {
            client *receiver = listNodeValue(ln);
            ... 
       }

@selfboot
Copy link

By the way, here are two screenshots of my troubleshooting process after reproducing the issue myself, including a CPU flame graph analyzed by eBPF.

20230613_bug_redis_deadlock_cpu

Added r.client_setname(f'subscriber_{user_id}') in the test script, and during GDB analysis, it was observed that the receiver kept processing between two clients.
client_repeat

@oranagra
Copy link
Member

I still have a question. If the feeder and subscribe client processes are killed, theoretically, the server should be able to close the connections of these two clients. Even in version 7.2 with the infinite loop bug, it shouldn't keep putting disconnected clients back into the queue, right?

maybe it shouldn't (keep processing disconnected clients), but it does.
redis doesn't attempt to write (or read) to the socket while it processes a command.
it'll only realize that the client dropped when it's done processing all the commands of the event loop and it either writes a reply to the socket, or attempts to read more data from it.

In addition, I have another question. When I use gdb attach to the server and see that there is an infinite loop in the while loop below, I didn't see at what point Li added a client during the processing. Can you help explain this?

when we re-process the command (and realize it should be blocked again), it calls blockForKeys (which adds another element to that list)

@selfboot
Copy link

I still have a question. If the feeder and subscribe client processes are killed, theoretically, the server should be able to close the connections of these two clients. Even in version 7.2 with the infinite loop bug, it shouldn't keep putting disconnected clients back into the queue, right?

maybe it shouldn't (keep processing disconnected clients), but it does. redis doesn't attempt to write (or read) to the socket while it processes a command. it'll only realize that the client dropped when it's done processing all the commands of the event loop and it either writes a reply to the socket, or attempts to read more data from it.

In addition, I have another question. When I use gdb attach to the server and see that there is an infinite loop in the while loop below, I didn't see at what point Li added a client during the processing. Can you help explain this?

when we re-process the command (and realize it should be blocked again), it calls blockForKeys (which adds another element to that list)

Thank you very much for your answer. I will continue to learn the implementation here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants