Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncConsumer runs tasks sequentially (should be in parallel?) #1924

Open
primal100 opened this issue Oct 5, 2022 · 15 comments · May be fixed by #1933
Open

AsyncConsumer runs tasks sequentially (should be in parallel?) #1924

primal100 opened this issue Oct 5, 2022 · 15 comments · May be fixed by #1933

Comments

@primal100
Copy link

primal100 commented Oct 5, 2022

The goal with async programming should be to run things in parallel where possible. However the AsyncConsumer consumes messages and runs tasks sequentially. The key is this code in utils.py.

        while True:
            # Wait for any of them to complete
            await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
            # Find the completed one(s), yield results, and replace them
            for i, task in enumerate(tasks):
                if task.done():
                    result = task.result()
                    await dispatch(result)
                    tasks[i] = asyncio.ensure_future(consumer_callables[i]())

So the loop is forced to wait for the handler to complete before getting the next message.

Using asyncio.create_task(dispatch(result)) instead of await dispatch(result) here would ensure the tasks run in parallel (it's a little bit more complicated than that as it's needed to keep track of the tasks to report exceptions and avoid warnings). I have a subclass of the AsyncConsumer for my own app which runs tasks in parallel and results in a speedup. So I could submit a PR based on that.

A better solution would be to use the new asyncio.TaskGroup coming in Python 3.11. There seems to be a backport here:

https://pypi.org/project/taskgroup/

There are other libraries implementing something similar to TaskGroup, or a simpler version could be implemented for Channels consumers to use.

What do you think?

  • What you expected to happen vs. what actually happened:
    I expect that the async consumer can receive messages and process the resulting actions in parallel. Receive message, create task, receive message. Instead the consumer receives messages, creates a task, waits for the task to complete, then receives another message.
  • How you're running Channels (runserver? daphne/runworker? Nginx/Apache in front?):
    Runworker
@LucidDan
Copy link

LucidDan commented Oct 9, 2022

I think this wouldn't be safe for ASGI events, right?
e.g. HTTP expects to process ASGI messages for a connection in order (eg receiving http request body, if it is chunked).
If you create tasks for each message, isn't there a chance of things being processed out of order? I realise in your case you are using it in runworker, but await_many_dispatch() gets passed receive() for both layers and asgi event messages, so a general implementation would need to make sure that isn't a problem.

I also wonder how this would perform in some types of work load - eg light processing load in dispatch() with large number of layer events. I have at least one app in mind that I've built where it is at times handling 1000s of messages per second and each dispatch() call is <1 second. Pretty sure adding a task to each message would slow things down, or at least would use much more memory and eventually slow things down via python garbage collection.

@carltongibson
Copy link
Member

Yes. Some profiling would be worthwhile to make sure we don't introduce an unnecessary performance regression.

@primal100
Copy link
Author

primal100 commented Oct 9, 2022

Thanks for your feedback.

I think this wouldn't be safe for ASGI events, right? e.g. HTTP expects to process ASGI messages for a connection in order (eg receiving http request body, if it is chunked). If you create tasks for each message, isn't there a chance of things being processed out of order? I realise in your case you are using it in runworker, but await_many_dispatch() gets passed receive() for both layers and asgi event messages, so a general implementation would need to make sure that isn't a problem.

I can't imagine any modern network based protocol depending on packets arriving in order. In terms of websockets for example there is no expectation messages are processed in order. Delays can happen over the network that can cause packets to arrive out of order so it's expected that the underlying protocol has some way of dealing with that (for example by using JSONRPC with an id paramater to match requests and responses). I would have thought ASGI would define a way to re-build a chunked http request body without depending on it arriving in order. But I admit I am mostly used to working with Channels Consumers in a worker context, rather than ASGI and HTTP so if someone with detailed knowledge with the protocol thinks we need to keep the existing behaviour then fair enough, but I think there could be a bool variable controlling whether the requests are handled one-by-one or concurrently. Or perhaps a separate class.

I also wonder how this would perform in some types of work load - eg light processing load in dispatch() with large number of layer events. I have at least one app in mind that I've built where it is at times handling 1000s of messages per second and each dispatch() call is <1 second. Pretty sure adding a task to each message would slow things down, or at least would use much more memory and eventually slow things down via python garbage collection.

It really depends if your dispatch calls makes I/O requests or runs no gil C extension code. If your dispatch is purely cpu-bound python code, then you should be using the SyncConsumer. The current AsyncConsumer already creates one future for every dispatch call (utils.py:51), and a task is just a subclass of a future, basically the same thing with a different api, so it shouldn't slow things down compared to the current implementation. If your app is pure python cpu-bound code and running in the same process, then you are just adding asyncio overhead for nothing. The channels docs makes this clear, that you should only use AsyncConsumers for tasks runnning in parallel.

"We recommend that you write SyncConsumers by default, and only use AsyncConsumers in cases where you know you are doing something that would be improved by async handling (long-running tasks that could be done in parallel) and you are only using async-native libraries."

If your dispatch runs I/O bound or no-gil C extension tasks, then I guarantee that app you've built would see a performance boost from concurrency. Yes, asyncio is designed to run 1000s of tasks concurrently. It's not designed for idil waiting of co-routines.

I will admit that when the current implementation is used for HTTP each connection has it's own consumer instance running tasks, so you have concurrency across connections. Still, concurrency within a connection would be nice to have, and of course really important for a worker.

If you are worried about creating 1000s of tasks or if the protocol requires tasks being performed in order then I think you should be using the Sync Consumer. There is no point in using asyncio "in name only" where you use the nice async/await api but don't get the benefits of it, and only the added overhead of an event loop for no reason. It's a feature of asyncronous programming that things are done out of order and any modern app or protocol should be able to deal with that. But if there are legacy protocols that don't, then they should use the Sync Consumer. But I acknowledge that if some developers are already using the Async Consumer and are sure they need tasks done in order you wouldn't want to break backward compatibility. A separate AsyncConcurrentConsumer class may be needed in that case. But the docs have always made clear that the AsyncConsumer should only be used for tasks that can be run in parallel (and hence finish out of order).

This is even more relevant with Django ORM now supporting async queries as of version 4.1. The current AsyncConsumer would be waiting for one ORM request to complete before moving onto the next one, instead of doing them concurrently. It defeats the purpose of Asyncronous programming.

Anyway, I'll work on an exploratory PR using Task Groups with profiling and we can take it from there. It will be interesting for you @LucidDan to feedback then how it works with your existing app.

@carltongibson
Copy link
Member

Hey @primal100 — I've been digging into this a little bit — It would be good to see your PR (even if draft) if you have something you can share? Thanks.

@primal100 primal100 linked a pull request Oct 14, 2022 that will close this issue
@carltongibson carltongibson linked a pull request Oct 15, 2022 that will close this issue
@carltongibson
Copy link
Member

e.g. HTTP expects to process ASGI messages for a connection in order (eg receiving http request body, if it is chunked).

I think this is handled by AsyncHttpConsumer.http_request waiting for the whole body before handing off to the handle() implementation:

async def http_request(self, message):
"""
Async entrypoint - concatenates body fragments and hands off control
to ``self.handle`` when the body has been completely received.
"""
if "body" in message:
self.body.append(message["body"])
if not message.get("more_body"):
try:
await self.handle(b"".join(self.body))
finally:
await self.disconnect()
raise StopConsumer()

@LucidDan
Copy link

e.g. HTTP expects to process ASGI messages for a connection in order (eg receiving http request body, if it is chunked).

I think this is handled by AsyncHttpConsumer.http_request waiting for the whole body before handing off to the handle() implementation:

async def http_request(self, message):
"""
Async entrypoint - concatenates body fragments and hands off control
to ``self.handle`` when the body has been completely received.
"""
if "body" in message:
self.body.append(message["body"])
if not message.get("more_body"):
try:
await self.handle(b"".join(self.body))
finally:
await self.disconnect()
raise StopConsumer()

This might be a case of me not thinking in async enough - I think this is only a potential issue if Tasks created by create_task can execute out of order (ie does the event loop guarantee that tasks are started in the order they are created in?)

I was thinking of the scenario of http_request receiving chunks out of order, but if asyncio maintains ordering of tasks it's not possible for that to happen, I guess?

I'll try to get some performance testing in today with (and without) the PR. I've been wanting to set up some tests to compare v3 and v4 anyway, so it'll serve both purposes.

@donghyeon
Copy link

I encountered the same issue, and after investigating, I've found that AsyncConsumer instances handle coroutines sequentially within a single instance. Any await expression within a single AsyncConsumer instance blocks that instance, meaning that subsequent coroutines within that instance are executed one after another, not concurrently. This blocking only affects the specific instance; other instances remain unblocked.

The following code demonstrates the two distinct asynchronous behaviors:

  1. Sequential (AsyncConsumer's default): Coroutines within a single consumer instance run sequentially.
  2. Concurrent (using asyncio.create_task): Enables concurrent execution of coroutines within a single consumer instance.
import asyncio
import json
import random
from channels.generic.websocket import AsyncWebsocketConsumer

class RandomSleepEchoConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        await self.accept()

    async def receive(self, text_data):
        text_data_json = json.loads(text_data)
        message = text_data_json["message"]

        # Option 1: Concurrent Execution (using asyncio.create_task)
        # This DOES NOT block subsequent message handling.
        asyncio.create_task(self.random_sleep_and_echo({"message": message}))
        
        # # Option 2: Sequential Execution (by awaiting directly)
        # # This DOES block subsequent message handling in this instance.
        # await self.random_sleep_and_echo({"message": message})

        # # Option 3: Sequential Execution (by sending a message through channel layer)
        # # (Django Channels recommended approach)
        # # This also BLOCKS subsequent message handling in this instance.
        # await self.channel_layer.send(self.channel_name, {
        #     "type": "random.sleep.and.echo",
        #     "message": message,
        # })

    async def random_sleep_and_echo(self, event):
        sleep_time = random.randint(1, 5)
        print(f"Channel {self.channel_name[-6:]} - Sleeping for {sleep_time} seconds...")
        await asyncio.sleep(sleep_time)
        print(f"Channel {self.channel_name[-6:]} - Awake after {sleep_time} seconds...")
        await self.send(text_data=json.dumps({"message": event["message"]}))

This behavior can be surprising and lead to unexpected blocking. To improve clarity and the developer experience, I propose the following:

  1. Documentation Enhancement: The documentation should explicitly state that coroutines within a single AsyncConsumer instance run sequentially by default. A clear example, similar to the one above, demonstrating how to achieve concurrency using asyncio.create_task is crucial. The current documentation could be misinterpreted to imply concurrent execution.

  2. (Optional) Feature Request: Built-in Concurrency Control: Consider providing a built-in mechanism to manage concurrency within an AsyncConsumer. This could be a new "concurrent mode," perhaps controlled by a decorator or a class-level attribute. Sequential mode would remain the default for backward compatibility. While this offers more granular control, it's a more substantial change.

These changes would prevent unexpected blocking and improve the developer experience with Django Channels' asynchronous capabilities.

@carltongibson
Copy link
Member

Hi @donghyeon — 

Yes. Incoming ASGI events are dispatched serially, so if you want to do long-running work, you need to move that into a separate task, exactly as you've shown. I'm very happy to review a docs change there.

On 2, take a look at #1933. This proposes adjusting await_many_dispatch to allow concurrent handling of ASGI events. I'm not 100% sure what to make of it, but people experimenting and feeding back would be helpful. (I think any approach would need some kind of class attribute on AsyncConsumer to allowing opting in to new behaviour).

@donghyeon
Copy link

Thanks for the quick response, @carltongibson!

As a potential workaround, I've created a simple mixin that leverages asyncio.TaskGroup (requires Python 3.11+). This approach feels quite intuitive and has been working well so far. The primary trade-off is that the AsyncConsumer's __call__ method is offloaded to a background task, which means we lose some direct control over its execution.

class ConcurrentMixin:
    async def __call__(self, scope, receive, send):
        try:
            async with asyncio.TaskGroup() as self.tg:
                # Wrap super().__call__ in a task to ensure it's cancelled
                # along with other tasks in the TaskGroup if any of them raise an exception.
                # This maintains consistent exception handling behavior with the original AsyncConsumer.
                self.tg.create_task(super().__call__(scope, receive, send))
        except* StopConsumer:
            pass

    async def dispatch(self, message):
        # Create a new task for each incoming message.
        # This ensures that messages are handled concurrently.
        self.tg.create_task(super().dispatch(message))

# Extend classes using Mixin
class ConcurrentAsyncConsumer(ConcurrentMixin, AsyncConsumer):
    pass

class ConcurrentAsyncWebsocketConsumer(ConcurrentMixin, AsyncWebsocketConsumer):
    pass

Regarding exception handling, the TaskGroup simplifies things. When any task within the group raises an exception (including StopConsumer), the TaskGroup automatically cancels all other tasks within it. This includes the task running super().__call__, which in turn triggers the cancellation of consumer_callables (receive and channel_receive) within Django Channels' await_many_dispatch function. Therefore, I believe the overall exception handling behavior remains consistent with the original design.

My main concern is the Python 3.11+ requirement (Django 5.x versions need to support Python 3.10). Additionally, using asyncio.TaskGroup's exception handling would require using except* and ExceptionGroup. ExceptionGroup allows handling multiple exceptions raised concurrently within the TaskGroup, which could introduce incompatibilities with existing codebases that aren't prepared for this.

@carltongibson
Copy link
Member

@donghyeon That's a really interesting example! (Let me ponder it)

We still need to support Django 4.2 as well (for now at least) so Python 3.9+

@donghyeon
Copy link

donghyeon commented Feb 23, 2025

I've also implemented both decorator and class-attribute-based solutions for enabling concurrent message handling.

Key Considerations:

  • Decorator and Mixin Approaches: These approaches leverage asyncio.TaskGroup to manage concurrency, effectively wrapping the core logic of AsyncConsumer without directly modifying the library's source code. This makes them safer and potentially suitable for inclusion in the Django Channels documentation as recommended workarounds.
  • Class Attribute Approach: This approach offers good clarity and aligns well with Django's design principles. However, incorporating it directly into the base AsyncConsumer class would constitute a breaking change, as it alters the default behavior slightly.

Usage Examples:

from channels.generic.websocket import AsyncWebsocketConsumer

@concurrent_mode
class DecoratedConcurrentConsumer(AsyncWebsocketConsumer):
    async def receive(self, text_data):
        # Coroutines will be handled concurrently.
        await self.send(text_data=f"Processed: {text_data}")


class AttributedConcurrentConsumer(AsyncWebsocketConsumer):
    concurrent_mode = True  # Enable concurrency

    async def receive(self, text_data):
        # Coroutines will be handled concurrently.
        await self.send(text_data=f"Processed: {text_data}")


class SequentialConsumer(AsyncWebsocketConsumer):
    # concurrent_mode is False by default (inherited from AsyncConsumer)

    async def receive(self, text_data):
        # Coroutines will be handled sequentially.
        await self.send(text_data=f"Processed: {text_data}")

Implementation
1. Decorator

def concurrent_mode(cls):
    """
    Decorator to enable concurrent message handling within an AsyncConsumer.

    This decorator modifies the __call__ and dispatch methods of the
    consumer class to use an asyncio.TaskGroup for concurrent execution.
    """
    if not issubclass(cls, AsyncConsumer):
        raise TypeError("The @concurrent_mode decorator can only be applied to AsyncConsumer subclasses.")

    original_call = cls.__call__
    original_dispatch = cls.dispatch

    @functools.wraps(original_call)
    async def wrapped_call(self, scope, receive, send):
        try:
            async with asyncio.TaskGroup() as self.tg:
                # Run the original __call__ in a task to ensure proper
                # exception handling and cancellation.
                self.tg.create_task(original_call(self, scope, receive, send))
        except* StopConsumer:
            # The TaskGroup handles cancellation automatically.
            pass

    @functools.wraps(original_dispatch)
    async def wrapped_dispatch(self, message):
        # Create a task for each dispatched message for concurrency.
        self.tg.create_task(original_dispatch(self, message))

    cls.__call__ = wrapped_call
    cls.dispatch = wrapped_dispatch
    return cls

2. Class attribute

class AsyncConsumer:
    """
    Base consumer class with concurrency control via a class attribute.
    """

    _sync = False
    channel_layer_alias = DEFAULT_CHANNEL_LAYER

    # Controls concurrency.  False (default) is sequential; True is concurrent.
    concurrent_mode = False

    async def __call__(self, scope, receive, send):
        """
        Dispatches incoming messages to type-based handlers asynchronously.
        """
        self.scope = scope

        # Initialize channel layer
        self.channel_layer = get_channel_layer(self.channel_layer_alias)
        if self.channel_layer is not None:
            self.channel_name = await self.channel_layer.new_channel()
            self.channel_receive = functools.partial(
                self.channel_layer.receive, self.channel_name
            )
        # Store send function
        if self._sync:
            self.base_send = async_to_sync(send)
        else:
            self.base_send = send
        # Pass messages in from channel layer or client to dispatch method
        consumer_callables = [receive]
        if self.channel_layer is not None:
            consumer_callables.append(self.channel_receive)
        try:
            if self.concurrent_mode:
                async with asyncio.TaskGroup() as self.tg:
                    # Run await_many_dispatch in a task for consistent exception handling.
                    self.tg.create_task(await_many_dispatch(consumer_callables, self.dispatch))
            else:
                await await_many_dispatch(consumer_callables, self.dispatch)
        except* StopConsumer:  # Use except* for TaskGroup compatibility
            # Exit cleanly
            pass
    
    async def dispatch(self, message):
        """
        Works out what to do with a message.
        """
        handler = getattr(self, get_handler_name(message), None)
        if handler:
            await aclose_old_connections()
            if self.concurrent_mode:
                # Create a task for each handler for concurrency.
                self.tg.create_task(handler(message))
            else:
                # await each handler for sequential behavior (default).
                await handler(message)
            
        else:
            raise ValueError("No handler for message type %s" % message["type"])

    async def send(self, message):
        """
        Overrideable/callable-by-subclasses send method.
        """
        await self.base_send(message)

    @classmethod
    def as_asgi(cls, **initkwargs):
        """
        Return an ASGI v3 single callable that instantiates a consumer instance
        per scope. Similar in purpose to Django's as_view().

        initkwargs will be used to instantiate the consumer instance.
        """

        async def app(scope, receive, send):
            consumer = cls(**initkwargs)
            return await consumer(scope, receive, send)

        app.consumer_class = cls
        app.consumer_initkwargs = initkwargs

        # take name and docstring from class
        functools.update_wrapper(app, cls, updated=())
        return app

@donghyeon
Copy link

I’ve updated the mixin implementation with a few changes:

  1. To ensure compatibility with Django 4.2 LTS, I switched to manually managing concurrent tasks using asyncio.Task and asyncio.Queue (available in Python 3.7+), instead of relying on asyncio.TaskGroup (which requires Python 3.11+).
  2. I added a concurrent_mode class attribute to the mixin. This approach keeps the existing AsyncConsumer class and await_many_dispatch function unchanged, so no modifications to the core implementation are needed.

Implementation

# Requires Python 3.11+ (using asyncio.TaskGroup)
class ConcurrentMixin:
    # Controls concurrency.  False (default) is sequential; True is concurrent.
    concurrent_mode = False

    async def __call__(self, scope, receive, send):
        if self.concurrent_mode:
            try:
                async with asyncio.TaskGroup() as self.tg:
                    self.tg.create_task(super().__call__(scope, receive, send))
            except* StopConsumer:
                pass
        else:
            await super().__call__(scope, receive, send)
    
    async def dispatch(self, message):
        if self.concurrent_mode:
            self.tg.create_task(super().dispatch(message))
        else:
            await super().dispatch(message)


# Requires Python 3.7+ (using asyncio.create_task)
class LegacyConcurrentMixin:
    # Controls concurrency.  False (default) is sequential; True is concurrent.
    concurrent_mode = False

    async def __call__(self, scope, receive, send):
        if self.concurrent_mode:
            self._concurrent_tasks = set()
            self._exceptions = asyncio.Queue()

            # Run the original __call__ in a task to ensure proper
            # exception handling and cancellation.
            call_task = asyncio.create_task(super().__call__(scope, receive, send))
            self._add_task_with_exception_handler(call_task)

            try:
                exception = await self._exceptions.get()
                self._cancel_concurrent_tasks()
                raise exception
            except StopConsumer:
                # Exit cleanly
                pass
            finally:
                # Should we await other self._exceptions and raise them?
                # Not implemented for now.
                pass
        else:
            await super().__call__(scope, receive, send)
    
    async def dispatch(self, message):
        if self.concurrent_mode:
            dispatched_task = asyncio.create_task(super().dispatch(message))
            self._add_task_with_exception_handler(dispatched_task)
        else:
            await super().dispatch(message)
        
    def _add_task_with_exception_handler(self, task: asyncio.Task):
        self._concurrent_tasks.add(task)
        def _task_done_callback(task: asyncio.Task):
            self._concurrent_tasks.discard(task)
            try:
                exception = task.exception()
                if exception is not None:
                    self._exceptions.put_nowait(exception)
            except asyncio.CancelledError:
                pass
        task.add_done_callback(_task_done_callback)

    def _cancel_concurrent_tasks(self):
        for task in self._concurrent_tasks:
            if not task.done():
                task.cancel()


# Extend classes using Mixin
class ConcurrentAsyncConsumer(ConcurrentMixin, AsyncConsumer):
    pass

class ConcurrentAsyncWebsocketConsumer(ConcurrentMixin, AsyncWebsocketConsumer):
    pass

@carltongibson
Copy link
Member

@donghyeon Nice! The next step would be to pull that into a PR with test cases demonstrating the new behaviour.

@donghyeon
Copy link

@carltongibson Thank you for your input, and apologies for my delayed response—I’ve been unwell and undergoing treatment recently. This is my first contribution to such a mature open source community, so I'll need some time to get up to speed with the proper process. In the meantime, I’d like to discuss a design question for enabling concurrent_mode in AsyncConsumer.

Current Consumer Implementation Context

Django Channels currently distinguishes between synchronous and asynchronous behavior by design. Specifically, the implementation separates them into distinct classes—AsyncConsumer and SyncConsumer—to maintain clear operational differences. In practice, SyncConsumer extends AsyncConsumer and uses an internal class attribute (self._sync) to enforce synchronous behavior, a setting that developers don't directly adjust.

Design Options for Enabling Concurrent Mode

1. Concurrent Prefix for Class Extension:

This approach would introduce new classes with a “Concurrent” prefix (e.g., ConcurrentAsyncConsumer) to clearly signal concurrent behavior. The advantage is its explicitness, but it could lead to very verbose class names and require extending every generic async class, which might unnecessarily bloat the API.

2. Activation via Class Attributes:

Allowing developers to enable concurrent behavior by setting a class attribute (e.g., concurrent_mode = True) avoids adding more classes. It’s a more streamlined solution, but it risks obscuring the intended behavior since the class name no longer reflects that the consumer supports concurrency.

Seeking Expert Advice: Concurrent Mode as the Default?

According to the current official Django Channels documentation,

When should you use AsyncConsumer and when should you use SyncConsumer? … so they’re only useful if you’re also calling async code (for example, using HTTPX to fetch 20 pages in parallel)

developers are led to expect that AsyncConsumer supports parallel operations. Yet, in practice, it processes messages sequentially, which does not align with that expectation. One potential solution is to make concurrent behavior the default, with sequential processing available as an opt-in option—possibly through setting a class attribute self.concurrent_mode=False or directly using SyncConsumer—so that those who need it can still choose sequential handling. With this in mind, what is the best direction: should we maintain the current clear separation by extending a ConcurrentAsyncConsumer for every generic class, or should we shift toward a design where concurrent processing is the norm for AsyncConsumer (with sequential processing offered optionally)?

I’d appreciate your guidance on which approach best aligns with Django Channels’ design philosophy and developer expectations.

@carltongibson
Copy link
Member

Hi @donghyeon — no problem! There's no rush here.

… developers are led to expect that AsyncConsumer supports parallel operations. Yet, in practice, it processes messages sequentially, which does not align with that expectation.

OK, so ASGI messages are handled sequentially, but the entire dispatch is asynchronous — using async def non-blocking functions. First step is to clarify that in the docs, and provide an example of moving long-running work into its own task, via create_task().

Then I would add a class attribute, with the default maintaining the current behaviour: concurrent_asgi_dispatch = False. (Happy to bike-shed about the exact name.)

Then we would document setting concurrent_asgi_dispatch = True with some example.

Why that way?: If I have a Channels application currently deployed, I may be leaning on the fact that ASGI messages are dispatched sequentially. If we were to switch that to concurrent dispatch as the default we risk introducing race-conditions into those already deployed applications. (This concern is why this issue has been sat here: I haven't had capacity to work it through properly, so it's waited for someone to pick it up.)

I don't think a whole new class is needed. A class attribute seems sufficient API to me. (Again, though, that's open to bike-shedding a bit.)

Make sense?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants