Skip to content

Running Scheduled Task in Websokcets Or Scaling with multiple workers #1467

@suryanarayanan035

Description

@suryanarayanan035

Scenario

Hello there,
Thanks for creating this cool and elegant library. I am building a chat application using this library.
In my application I want to ping the client for every 10 seconds to verify whether they are connected still. But the issue is I cannot do so.

What I have tried

I tried to create an async function which will sleep for 10 seconds then call websockets.ping method.
But this can't be used because if I use asyncio.sleep then I need to use await which will block the codes below the function call.

Expectation

So, I thought of moving the pinging work to a separate worker. So my main server, will handle the connections and my worker will do the pinging. To do so, I have to serialize/pickle the websocket object which is not possible.

So, I would like to know a way to scale websockets with multiple worker or running interval based tasks without blocking the main process.

Here is my server code:

async def handler(websocket):
    user = save_user_data_on_connect(websocket)
    CURRENT_CONNECTIONS[user.get_id()] = websocket
    status, pair = pair_user(user)
    if status == 'paired':
        message = json.dumps(construct_message_dict('connected'))
        await CURRENT_CONNECTIONS[pair].send(message)
        await websocket.send(message)
*** this is where the logic for start the scheduled job will come***
        logging.info(f"connected msg sent to user {websocket.id}")
    async for message in websocket:
        receiver = get_user_pair(str(websocket.id))
        try:
            print(f"Message Received: {message}")
            message = parse_message(message)
            message['type'] = 'user_sent'
            if receiver and 'content' in message:
                message = save_message_to_db(str(websocket.id), receiver, message)
                await CURRENT_CONNECTIONS[receiver].send(message)
        except websockets.exceptions.ConnectionClosed:
            cleanup_disconnected_connection(receiver)
            message = json.dumps(construct_message_dict('disconnected'))
            await CURRENT_CONNECTIONS[str(websocket.id)].send(message)
        except Exception as e:
            print(e)

Happy to provide more information if needed. Thanks in adavance!

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions