Skip to content

[QUESTION] create_task doesn't make concurrent code. Asyncio queues with sequential operations. #3745

@Rtut654

Description

@Rtut654

Issue I am facing

  1. Hello. I have a question regarding concurrency. I read the article and did implementation in the same way. So the parts which have to be parallel wrapped in context.application.create_task(). I'm testing it with two devices sending messages at once and the first task which suppose to notify user immediately when done works sequentially i.e. second user notified only after the first was processed. What is wrong here?
import asyncio
import os
from telegram.ext import (
    Application,
    ContextTypes,
    ConversationHandler,
    MessageHandler,
    filters,
)
async def process_audio(update, path):
    processed_text = f'this is processed text from {path}'
    await asyncio.sleep(5)
    await update.message.reply_text(
        "Your voice message was processed"
    )
    return processed_text
 
async def process_message(update, context, message):
    processed_message = f'this is processed message from {message}'
    await asyncio.sleep(5)
    await update.message.reply_text(
        "We processed your message, here is the reply."
    )
    return processed_message
 
async def get_audio_and_process(update):
    voice_file = await update.message.voice.get_file()
    await voice_file.download_to_drive('./example_audio.wav')
    await asyncio.sleep(3)
    duration = update.message.voice.duration
    result = await process_audio(update, './example_audio.wav')
    return result
 
async def process_conversation(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    if update.message.voice:
        text_task = context.application.create_task(
            get_audio_and_process(update),
            update=update)
        text_task_result = await text_task
        message = text_task_result
    elif update.message.text:
        message = update.message.text
    else:
        await update.message.reply_text(
            "Please send text or voice message"
        )
    context.application.create_task(
        process_message(update, context, message),
        update=update)
 
 
application = Application.builder().token(bot_token).concurrent_updates(True).build()
conv_handler = MessageHandler(filters.VOICE | filters.TEXT & (~filters.COMMAND), process_conversation, block=True)
 
application.add_handler(conv_handler)
application.run_polling()
  1. Eventually I want to do same thing but using queues from asyncio. Here is the code sample. It is possible to use async workers together with sequential code? Just if I would replace context.application.create_task in the first snippet with getting an item from correspondent queue and calling worker then putting the result to next queue etc?
    The idea is to have three workers with their queues which are same for all users, but still maintain sequential part in code where I have to read/write DB file. It is important that all task should have the same order for all users i.e. the order they were called in the code.
async def worker1():
    while True:
        request = await users_queue.get()
        result = process_with_worker1(request)
        await worker2_queue.put(result)
        users_queue.task_done()

async def worker2():
    #similar stuff as w1

def handle_text_message(update: Update, context):
    # here some sequential code like reading db 
    message = update.message
    request = {"message": message.text,"chat_id": update.effective_chat.id}
    users_queue.put_nowait(request)

loop = asyncio.get_event_loop()


users_queue = asyncio.Queue()
worker2_queue = asyncio.Queue()
worker3_queue = asyncio.Queue()

worker2_task = loop.create_task(worker2())
worker3_task = loop.create_task(worker3())


application = Application.builder().token(bot_token).concurrent_updates(True).build()
message_handler = MessageHandler(filters.TEXT & (~filters.COMMAND), handle_text_message, block=True)
application.add_handler(message_handler)
application.run_polling()
loop.create_task(worker1())
loop.run_forever()

PS. I'm aware about the possibility of non-empty queues when restarting the bot. For now it is not a problem.

Operating System

Ubuntu 20.04

Version of Python, python-telegram-bot & dependencies

Python 3.8.10
python-telegram-bot==20.3

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Priority

    None yet

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions