Skip to content

Bug: no event publish for psycopg and asyncpg channels backend #4110

@Harshal6927

Description

@Harshal6927

Description

When publishing events using PsycoPgChannelsBackend or AsyncPgChannelsBackend, they are not being published to subscribers.

Note: This example will work with MemoryChannelsBackend

URL to code causing the issue

No response

MCVE

# /// script
# requires-python = ">=3.12"
# dependencies = [
#   "litestar[standard]>=2.15.2",
#   "psycopg[binary,pool]>=3.2.6",
# ]
# ///

from litestar import Litestar, get
from litestar.channels import ChannelsPlugin
import time
from litestar.channels.backends.memory import MemoryChannelsBackend
# from litestar.channels.backends.psycopg import PsycoPgChannelsBackend

import uvicorn

channels_plugin = ChannelsPlugin(
    # backend=PsycoPgChannelsBackend(
    #     "postgresql://myuser:mypassword@127.0.0.1:8931/mydb"
    # ),
    backend=MemoryChannelsBackend(),
    arbitrary_channels_allowed=True,
    create_ws_route_handlers=True,
    ws_handler_base_path="/ws",
)


@get("/")
async def index() -> str:
    return "Hello, world!"


@get("/create-event")
async def create_event() -> None:
    channels_plugin.publish(time.time_ns(), "test-event-channel")


app = Litestar(
    route_handlers=[index, create_event],
    plugins=[channels_plugin],
    debug=True,
)


if __name__ == "__main__":
    uvicorn.run(app=app, port=8621)



# /// script
# requires-python = ">=3.12"
# dependencies = [
#   "websockets",
# ]
# ///

import asyncio
import websockets
import json


async def listen_to_channel():
    uri = "ws://127.0.0.1:8621/ws/test-event-channel"

    print(f"Connecting to {uri}...")
    try:
        async with websockets.connect(uri) as websocket:
            print("Connection established! Waiting for messages...")

            while True:
                message = await websocket.recv()
                try:
                    data = json.loads(message)
                    print(f"Received: {data}")
                except json.JSONDecodeError:
                    print(f"Received: {message}")
    except websockets.exceptions.ConnectionClosedError as e:
        print(f"Connection closed unexpectedly: {e}")
    except Exception as e:
        print(f"Error: {e}")


if __name__ == "__main__":
    asyncio.run(listen_to_channel())

Steps to reproduce

  1. Replace MemoryChannelsBackend with PsycoPgChannelsBackend or AsyncPgChannelsBackend
  2. Run the Litestar server and publish one event by calling /create-event endpoint to create a channel
  3. Run the second script to subscribe to the channel and listen for updates
  4. Call the /create-event endpoint to generate events

Screenshots

No response

Logs


Litestar Version

v2.15.2

Platform

  • Linux
  • Mac
  • Windows
  • Other (Please specify in the description above)

Metadata

Metadata

Assignees

No one assigned

    Labels

    Bug 🐛This is something that is not working as expected

    Type

    No type

    Projects

    Status

    Triage

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions