Skip to content

Does Django Channels enforce single-threaded per connection execution? #2133

Open
@jmurua14

Description

@jmurua14

I have created an application that receives audio every second from a websocket connection and I would like to process the audio in parallel. However, the consumers receive function seems single-threaded. What I mean is that even if I send audio every second the audio processing needs more than 2 seconds and this delays everything. I am using the AsyncWebsocketConsumer consumer class. In the code below I have simulated the audio processing with a 2 second sleep so that you can try i out. I am executing django with daphne (daphne -b 0.0.0.0 -p 8000 proiektua.asgi:application )

consumers.py

import json
from urllib.parse import parse_qs
from channels.generic.websocket import AsyncWebsocketConsumer
from .models import Room, Message
from channels.db import database_sync_to_async
import io
import time
import asyncio
from asgiref.sync import sync_to_async

class ChatConsumer(AsyncWebsocketConsumer):

    async def connect(self):
        self.room_name = self.scope['url_route']['kwargs']['room_name']
        self.room_group_name = f"chat_{self.room_name}"
        
        query_params = parse_qs(self.scope["query_string"].decode())
        self.role = query_params.get("role", [""])[0]  # "sender" or "receiver"
        if self.role == "sender":
            print("SENDER")
        elif self.role == "receiver":
            print("RECEIVER")

        # Join room group
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )

        # Accept the WebSocket connection
        await self.accept()

        self.aurreko_denb = time.time()

    async def disconnect(self, close_code):
        # Leave room group
        await self.channel_layer.group_discard(
            self.room_group_name,
            self.channel_name
        )

    async def receive(self, text_data= None, bytes_data= None):
        
        if bytes_data:
            print(f"Time between receive: {time.time() - self.aurreko_denb}")
            self.aurreko_denb = time.time()
            asyncio.create_task(self.process_audio_and_send2(bytes_data)   


    async def process_audio_and_send2(self, bytes_data):
        """ Runs the audio processing without blocking receive """
        transcript = await sync_to_async(self.process_audio)(bytes_data)
        
        # Send the transcription back
        await self.send(text_data=transcript)

    def process_audio(self, audio_data):
        """ Process audio synchronously (e.g., speech-to-text) """
        import time  # Simulate a delay
        time.sleep(2)  # Simulate a slow process (Replace with actual STT)
        return "Transcribed text here"

Logs:

127.0.0.1:43476 - - [06/Feb/2025:09:34:51] "WSCONNECT /ws/chat/1/" - -
Time between receive: 1.0896313190460205
Time between receive: 2.002781391143799
Time between receive: 2.003610134124756
Time between receive: 2.0032858848571777
Time between receive: 2.0033938884735107
Time between receive: 2.0036416053771973
Time between receive: 2.0029494762420654
Time between receive: 2.003593921661377
Time between receive: 2.0041894912719727
127.0.0.1:43476 - - [06/Feb/2025:09:35:08] "WSDISCONNECT /ws/chat/1/" - -

I have tried with asyncio create_task but it doens't work how I want. I would appreciate any help, thanks in advance.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions