Skip to content

Supabase async python client stops receiving realtime events #1034

@ritikd2

Description

@ritikd2

Bug report

  • I confirm this is a bug with Supabase, not with my own application.
  • I confirm I have searched the Docs, GitHub Discussions, and Discord.

Describe the bug

Supabase async client stops receiving real-time events after remaining idle for a few hours. As far as I can tell, this issue arises randomly after the last time the client gets realtime events correctly.

To Reproduce

Here's the code I am using:

from supabase import acreate_client
from supabase.client import ClientOptions
import asyncio
from gotrue import AsyncMemoryStorage

clients = {}
client_tasks = {}

# A global lock to prevent multiple tasks from re-creating the same client
CLIENT_CREATION_LOCK = asyncio.Lock()

async def setup_listeners(s_user):
    logger.info("[SL_001] Setting up listeners")
    await s_user.channel('schema-db-changes').on_postgres_changes("*", schema="public", callback=handle_event_wrapper).subscribe()
    logger.info("[SL_002] Listeners setup complete")

async def _create_new_supabase_user():
    """
    Actually creates a brand-new Supabase client, signs in, sets up listeners, etc.
    (Extracted out so we can call it from both get_supabase_user() and the listen loop.)
    """
    SUPABASE_URL = os.getenv('SUPABASE_URL')
    SUPABASE_KEY = os.getenv('SUPABASE_API_KEY')
    
    s_client = await acreate_client(
        SUPABASE_URL,
        SUPABASE_KEY,
        options=ClientOptions(
            postgrest_client_timeout=100000, 
            storage_client_timeout=100000, 
            schema="public", 
            realtime={"hb_interval": 30, "auto_reconnect": True},
            storage=AsyncMemoryStorage()
        )
    )
    
    await s_client.auth.sign_in_with_password({
        "email": os.getenv('SUPABASE_USER_EMAIL'),
        "password": os.getenv('SUPABASE_USER_PASSWORD')
    })
    
    await s_client.realtime.connect()

    # Import here to avoid circular dependency
    await setup_listeners(s_client)
    
    return s_client

async def _realtime_listen_loop(client_name: str, s_user):
    """
    This is the indefinite loop that calls s_user.realtime.listen()
    """
    logger.info(f"[_realtime_listen_loop] Starting for {client_name}")
    try:
        # This call blocks until the connection drops or error occurs
        await s_user.realtime.listen()
    except (
        websockets.exceptions.ConnectionClosedError,
        websockets.exceptions.ConnectionClosedOK,
        asyncio.IncompleteReadError,
        Exception
    ) as e:
        logger.error(f"[_realtime_listen_loop] Connection dropped: {e}")
    finally:
        # Cleanup here (the loop is done)
        logger.info(f"[_realtime_listen_loop] Shutting down for {client_name}")
        try:
            await s_user.auth.sign_out()
        except Exception as e2:
            logger.warning(f"[_realtime_listen_loop] Error on disconnect: {e2}")
        if client_tasks.get(client_name) is not None:
            logger.info(f'[_realtime_listen_loop] Deleting client task from dictionary since there was an error')
            del client_tasks[client_name]

async def get_supabase_user(client_name="supabase_user"):
    """
    Return a valid Supabase client. If none exists or if the current one is
    invalid, create a new one. Also ensure there's a background task
    listening for realtime changes.
    """
    async with CLIENT_CREATION_LOCK:
        # 1. Do we already have a client?
        s_client = clients.get(client_name)
        if s_client:
            # Check if it's still valid (e.g., session not expired)
            try:
                session_res = await s_client.auth.get_session()
                if session_res and session_res.access_token:
                    # If the background loop is missing or finished, restart it
                    loop_task = client_tasks.get(client_name)
                    if not loop_task or loop_task.done():
                        logger.info(f"[get_supabase_user] Re-starting loop for existing {client_name} client.")
                        loop_task = asyncio.create_task(_realtime_listen_loop(client_name, s_client))
                        client_tasks[client_name] = loop_task
                    
                    # Return the existing valid client
                    logger.debug(f"[get_supabase_user] Using existing {client_name} client.")
                    return s_client
                else:
                    logger.warning(f"[get_supabase_user] Session is None or invalid. Will re-create.")
            except Exception as e:
                logger.warning(f"[get_supabase_user] Existing client session invalid: {e}")
            
            # If we got here, the existing client is no good.
            # Cleanup references. We do NOT wait for the loop to cancel, 
            # because it might have died already. 
            if client_name in client_tasks:
                old_task = client_tasks[client_name]
                if not old_task.done():
                    # Cancel it
                    logger.info(f"[get_supabase_user] Cancelling old _realtime_listen_loop for {client_name}.")
                    old_task.cancel()
                    # (Optionally await it, but be mindful about locking/cycles)
                del client_tasks[client_name]

            if client_name in clients:
                logger.info(f"[get_supabase_user] Deleting old client for {client_name} since something failed and starting new.")
                del clients[client_name]

        # 2. If we reach here, we need to create a brand-new client
        logger.info(f"[get_supabase_user] Creating new client for {client_name}.")
        new_client = await _create_new_supabase_user()
        clients[client_name] = new_client

        # Start the background realtime listening in a task
        loop_task = asyncio.create_task(_realtime_listen_loop(client_name, new_client))
        client_tasks[client_name] = loop_task

        return new_client

Expected behavior

Expected: The client always receives realtime events properly.

The extra setup I've added in is to account for JWT expired / Refresh token not found errors that pop up occasionally. I have confirmed that the set up works after the JWT expired errors occur, it receives realtime events after the errors, but after a few hours of remaining idle it stops receiving events. There are no other errors that occur before it stops receiving events. The most recent case where it stopped working, the client was idle (received no new realtime events) for ~7 hours before I tested it again and it failed to receive the events.

To ensure the connection stays alive and client is refreshed, I have a task running which calls the supabase client created above every 15 seconds.

Screenshots

N/A

System information

  • Version of supabase-py: 2.11.0
  • Version of realtime: 2.1.0

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions