Description
I have created an application that receives audio every second from a websocket connection and I would like to process the audio in parallel. However, the consumers receive function seems single-threaded. What I mean is that even if I send audio every second the audio processing needs more than 2 seconds and this delays everything. I am using the AsyncWebsocketConsumer consumer class. In the code below I have simulated the audio processing with a 2 second sleep so that you can try i out. I am executing django with daphne (daphne -b 0.0.0.0 -p 8000 proiektua.asgi:application
)
consumers.py
import json
from urllib.parse import parse_qs
from channels.generic.websocket import AsyncWebsocketConsumer
from .models import Room, Message
from channels.db import database_sync_to_async
import io
import time
import asyncio
from asgiref.sync import sync_to_async
class ChatConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.room_name = self.scope['url_route']['kwargs']['room_name']
self.room_group_name = f"chat_{self.room_name}"
query_params = parse_qs(self.scope["query_string"].decode())
self.role = query_params.get("role", [""])[0] # "sender" or "receiver"
if self.role == "sender":
print("SENDER")
elif self.role == "receiver":
print("RECEIVER")
# Join room group
await self.channel_layer.group_add(
self.room_group_name,
self.channel_name
)
# Accept the WebSocket connection
await self.accept()
self.aurreko_denb = time.time()
async def disconnect(self, close_code):
# Leave room group
await self.channel_layer.group_discard(
self.room_group_name,
self.channel_name
)
async def receive(self, text_data= None, bytes_data= None):
if bytes_data:
print(f"Time between receive: {time.time() - self.aurreko_denb}")
self.aurreko_denb = time.time()
asyncio.create_task(self.process_audio_and_send2(bytes_data)
async def process_audio_and_send2(self, bytes_data):
""" Runs the audio processing without blocking receive """
transcript = await sync_to_async(self.process_audio)(bytes_data)
# Send the transcription back
await self.send(text_data=transcript)
def process_audio(self, audio_data):
""" Process audio synchronously (e.g., speech-to-text) """
import time # Simulate a delay
time.sleep(2) # Simulate a slow process (Replace with actual STT)
return "Transcribed text here"
Logs:
127.0.0.1:43476 - - [06/Feb/2025:09:34:51] "WSCONNECT /ws/chat/1/" - -
Time between receive: 1.0896313190460205
Time between receive: 2.002781391143799
Time between receive: 2.003610134124756
Time between receive: 2.0032858848571777
Time between receive: 2.0033938884735107
Time between receive: 2.0036416053771973
Time between receive: 2.0029494762420654
Time between receive: 2.003593921661377
Time between receive: 2.0041894912719727
127.0.0.1:43476 - - [06/Feb/2025:09:35:08] "WSDISCONNECT /ws/chat/1/" - -
I have tried with asyncio create_task but it doens't work how I want. I would appreciate any help, thanks in advance.