From ffe56136d927324f446a418fcffc96b6f032a715 Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Tue, 30 Sep 2025 16:27:09 -0300 Subject: [PATCH 1/6] feat: add support for broadcast replay configuration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Added ReplayOption type and replay field to RealtimeChannelBroadcastConfig to support configuring broadcast replay with 'since' timestamp and optional 'limit' parameter. Added BroadcastMeta type and meta field to BroadcastPayload for receiving replay metadata including replayed status and message id. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- src/realtime/src/realtime/types.py | 14 +++++++++++++- supabase/.temp/cli-latest | 1 + 2 files changed, 14 insertions(+), 1 deletion(-) create mode 100644 supabase/.temp/cli-latest diff --git a/src/realtime/src/realtime/types.py b/src/realtime/src/realtime/types.py index 8274e240..9ef5693d 100644 --- a/src/realtime/src/realtime/types.py +++ b/src/realtime/src/realtime/types.py @@ -107,9 +107,15 @@ class PostgresChangesPayload(TypedDict): ids: List[int] +class BroadcastMeta(TypedDict, total=False): + replayed: bool + id: str + + class BroadcastPayload(TypedDict): event: str payload: dict[str, Any] + meta: NotRequired[BroadcastMeta] @dataclass(frozen=True) @@ -172,9 +178,15 @@ def __init__(self, events: PresenceEvents): # TypedDicts -class RealtimeChannelBroadcastConfig(TypedDict): +class ReplayOption(TypedDict, total=False): + since: int + limit: int + + +class RealtimeChannelBroadcastConfig(TypedDict, total=False): ack: bool self: bool + replay: ReplayOption class RealtimeChannelPresenceConfig(TypedDict): diff --git a/supabase/.temp/cli-latest b/supabase/.temp/cli-latest new file mode 100644 index 00000000..2fdeb234 --- /dev/null +++ b/supabase/.temp/cli-latest @@ -0,0 +1 @@ +v2.45.5 \ No newline at end of file From b893f567aa688651cc08660ed85177c5cea875c6 Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Tue, 30 Sep 2025 16:34:37 -0300 Subject: [PATCH 2/6] remove temp file --- supabase/.temp/cli-latest | 1 - 1 file changed, 1 deletion(-) delete mode 100644 supabase/.temp/cli-latest diff --git a/supabase/.temp/cli-latest b/supabase/.temp/cli-latest deleted file mode 100644 index 2fdeb234..00000000 --- a/supabase/.temp/cli-latest +++ /dev/null @@ -1 +0,0 @@ -v2.45.5 \ No newline at end of file From 953bd8b1672bd4a77106db056ecafd4718bf41c7 Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Thu, 2 Oct 2025 05:48:01 -0300 Subject: [PATCH 3/6] test: add tests for broadcast replay --- src/realtime/tests/test_connection.py | 201 +++++++++++++++++++++++++- supabase/.temp/cli-latest | 1 + 2 files changed, 199 insertions(+), 3 deletions(-) create mode 100644 supabase/.temp/cli-latest diff --git a/src/realtime/tests/test_connection.py b/src/realtime/tests/test_connection.py index 77560983..db720513 100644 --- a/src/realtime/tests/test_connection.py +++ b/src/realtime/tests/test_connection.py @@ -9,6 +9,7 @@ from realtime import AsyncRealtimeChannel, AsyncRealtimeClient, RealtimeSubscribeStates from realtime.message import Message from realtime.types import DEFAULT_HEARTBEAT_INTERVAL, DEFAULT_TIMEOUT, ChannelEvents +from websockets import broadcast load_dotenv() @@ -111,7 +112,7 @@ async def test_postgrest_changes(socket: AsyncRealtimeClient): await socket.set_auth(token) channel: AsyncRealtimeChannel = socket.channel("test-postgres-changes") - received_events = {"all": [], "insert": [], "update": [], "delete": []} + received_events: dict[str, list[dict]] = {"all": [], "insert": [], "update": [], "delete": []} def all_changes_callback(payload): print("all_changes_callback: ", payload) @@ -206,7 +207,7 @@ async def test_postgrest_changes_on_different_tables(socket: AsyncRealtimeClient await socket.set_auth(token) channel: AsyncRealtimeChannel = socket.channel("test-postgres-changes") - received_events = {"all": [], "insert": []} + received_events: dict[str, list[dict]] = {"all": [], "insert": []} def all_changes_callback(payload): print("all_changes_callback: ", payload) @@ -261,7 +262,7 @@ def insert_callback(payload): assert insert["data"]["record"]["id"] == created_todo_id assert insert["data"]["record"]["description"] == "Test todo" - assert insert["data"]["record"]["is_completed"] == False + assert insert["data"]["record"]["is_completed"] is False assert received_events["insert"] == [insert, message_insert] @@ -438,3 +439,197 @@ async def test_send_message_reconnection(socket: AsyncRealtimeClient): await socket.send(message) await socket.close() + + +@pytest.mark.asyncio +async def test_subscribe_to_private_channel_with_broadcast_replay(socket: AsyncRealtimeClient): + """Test that channel subscription sends correct payload with broadcast replay configuration.""" + import json + from unittest.mock import AsyncMock, patch + + # Mock the websocket connection + mock_ws = AsyncMock() + socket._ws_connection = mock_ws + + # Connect the socket (this will use our mock) + await socket.connect() + + # Calculate replay timestamp + ten_mins_ago = datetime.datetime.now() - datetime.timedelta(minutes=10) + ten_mins_ago_ms = int(ten_mins_ago.timestamp() * 1000) + + # Create channel with broadcast replay configuration + channel: AsyncRealtimeChannel = socket.channel( + "test-private-channel", + params={ + "config": { + "private": True, + "broadcast": { + "replay": { + "since": ten_mins_ago_ms, + "limit": 100 + } + } + } + } + ) + + # Mock the subscription callback to be called immediately + callback_called = False + def mock_callback(state, error): + nonlocal callback_called + callback_called = True + + # Subscribe to the channel + await channel.subscribe(mock_callback) + + # Verify that send was called with the correct payload + assert mock_ws.send.called, "WebSocket send should have been called" + + # Get the sent message + sent_message = mock_ws.send.call_args[0][0] + message_data = json.loads(sent_message) + + # Verify the message structure + assert message_data["topic"] == "realtime:test-private-channel" + assert message_data["event"] == "phx_join" + assert "ref" in message_data + assert "payload" in message_data + + # Verify the payload contains the correct broadcast replay configuration + payload = message_data["payload"] + assert "config" in payload + + config = payload["config"] + assert config["private"] is True + assert "broadcast" in config + + broadcast_config = config["broadcast"] + assert "replay" in broadcast_config + + replay_config = broadcast_config["replay"] + assert replay_config["since"] == ten_mins_ago_ms + assert replay_config["limit"] == 100 + + # Verify postgres_changes array is present (even if empty) + assert "postgres_changes" in config + assert isinstance(config["postgres_changes"], list) + + await socket.close() + + +@pytest.mark.asyncio +async def test_subscribe_to_channel_with_empty_replay_config(socket: AsyncRealtimeClient): + """Test that channel subscription handles empty replay configuration correctly.""" + import json + from unittest.mock import AsyncMock, patch + + # Mock the websocket connection + mock_ws = AsyncMock() + socket._ws_connection = mock_ws + + # Connect the socket + await socket.connect() + + # Create channel with empty replay configuration + channel: AsyncRealtimeChannel = socket.channel( + "test-empty-replay", + params={ + "config": { + "private": False, + "broadcast": { + "ack": True, + "self": False, + "replay": {} + } + } + } + ) + + # Mock the subscription callback + callback_called = False + def mock_callback(state, error): + nonlocal callback_called + callback_called = True + + # Subscribe to the channel + await channel.subscribe(mock_callback) + + # Verify that send was called + assert mock_ws.send.called, "WebSocket send should have been called" + + # Get the sent message + sent_message = mock_ws.send.call_args[0][0] + message_data = json.loads(sent_message) + + # Verify the payload structure + payload = message_data["payload"] + config = payload["config"] + + assert config["private"] is False + assert "broadcast" in config + + broadcast_config = config["broadcast"] + assert broadcast_config["ack"] is True + assert broadcast_config["self"] is False + assert broadcast_config["replay"] == {} + + await socket.close() + + +@pytest.mark.asyncio +async def test_subscribe_to_channel_without_replay_config(socket: AsyncRealtimeClient): + """Test that channel subscription works without replay configuration.""" + import json + from unittest.mock import AsyncMock, patch + + # Mock the websocket connection + mock_ws = AsyncMock() + socket._ws_connection = mock_ws + + # Connect the socket + await socket.connect() + + # Create channel without replay configuration + channel: AsyncRealtimeChannel = socket.channel( + "test-no-replay", + params={ + "config": { + "private": False, + "broadcast": { + "ack": True, + "self": True + } + } + } + ) + + # Mock the subscription callback + callback_called = False + def mock_callback(state, error): + nonlocal callback_called + callback_called = True + + # Subscribe to the channel + await channel.subscribe(mock_callback) + + # Verify that send was called + assert mock_ws.send.called, "WebSocket send should have been called" + + # Get the sent message + sent_message = mock_ws.send.call_args[0][0] + message_data = json.loads(sent_message) + + # Verify the payload structure + payload = message_data["payload"] + config = payload["config"] + + assert config["private"] is False + assert "broadcast" in config + + broadcast_config = config["broadcast"] + assert broadcast_config["ack"] is True + assert broadcast_config["self"] is True + assert "replay" not in broadcast_config + + await socket.close() diff --git a/supabase/.temp/cli-latest b/supabase/.temp/cli-latest new file mode 100644 index 00000000..c5299e67 --- /dev/null +++ b/supabase/.temp/cli-latest @@ -0,0 +1 @@ +v2.47.2 \ No newline at end of file From 9cad9ac02d482e9bac1257886551badf4c660139 Mon Sep 17 00:00:00 2001 From: Leonardo Santiago Date: Fri, 3 Oct 2025 13:19:26 -0300 Subject: [PATCH 4/6] chore: delete temp file --- supabase/.temp/cli-latest | 1 - 1 file changed, 1 deletion(-) delete mode 100644 supabase/.temp/cli-latest diff --git a/supabase/.temp/cli-latest b/supabase/.temp/cli-latest deleted file mode 100644 index c5299e67..00000000 --- a/supabase/.temp/cli-latest +++ /dev/null @@ -1 +0,0 @@ -v2.47.2 \ No newline at end of file From eefa301443d53c34ce69f7d8b410467b998fd691 Mon Sep 17 00:00:00 2001 From: Leonardo Santiago Date: Fri, 3 Oct 2025 13:19:44 -0300 Subject: [PATCH 5/6] fix: formatting --- src/realtime/tests/test_connection.py | 122 ++++++++++++-------------- 1 file changed, 56 insertions(+), 66 deletions(-) diff --git a/src/realtime/tests/test_connection.py b/src/realtime/tests/test_connection.py index 43c384ac..8e839b7e 100644 --- a/src/realtime/tests/test_connection.py +++ b/src/realtime/tests/test_connection.py @@ -6,6 +6,7 @@ import pytest from dotenv import load_dotenv from pydantic import BaseModel +from websockets import broadcast from realtime import ( AsyncRealtimeChannel, @@ -15,7 +16,6 @@ ) from realtime.message import Message from realtime.types import DEFAULT_HEARTBEAT_INTERVAL, DEFAULT_TIMEOUT, ChannelEvents -from websockets import broadcast load_dotenv() @@ -492,138 +492,135 @@ async def test_send_message_reconnection(socket: AsyncRealtimeClient): @pytest.mark.asyncio -async def test_subscribe_to_private_channel_with_broadcast_replay(socket: AsyncRealtimeClient): +async def test_subscribe_to_private_channel_with_broadcast_replay( + socket: AsyncRealtimeClient, +): """Test that channel subscription sends correct payload with broadcast replay configuration.""" import json from unittest.mock import AsyncMock, patch - + # Mock the websocket connection mock_ws = AsyncMock() socket._ws_connection = mock_ws - + # Connect the socket (this will use our mock) await socket.connect() - + # Calculate replay timestamp ten_mins_ago = datetime.datetime.now() - datetime.timedelta(minutes=10) ten_mins_ago_ms = int(ten_mins_ago.timestamp() * 1000) - + # Create channel with broadcast replay configuration channel: AsyncRealtimeChannel = socket.channel( - "test-private-channel", + "test-private-channel", params={ "config": { - "private": True, - "broadcast": { - "replay": { - "since": ten_mins_ago_ms, - "limit": 100 - } - } + "private": True, + "broadcast": {"replay": {"since": ten_mins_ago_ms, "limit": 100}}, } - } + }, ) - + # Mock the subscription callback to be called immediately callback_called = False + def mock_callback(state, error): nonlocal callback_called callback_called = True - + # Subscribe to the channel await channel.subscribe(mock_callback) - + # Verify that send was called with the correct payload assert mock_ws.send.called, "WebSocket send should have been called" - + # Get the sent message sent_message = mock_ws.send.call_args[0][0] message_data = json.loads(sent_message) - + # Verify the message structure assert message_data["topic"] == "realtime:test-private-channel" assert message_data["event"] == "phx_join" assert "ref" in message_data assert "payload" in message_data - + # Verify the payload contains the correct broadcast replay configuration payload = message_data["payload"] assert "config" in payload - + config = payload["config"] assert config["private"] is True assert "broadcast" in config - + broadcast_config = config["broadcast"] assert "replay" in broadcast_config - + replay_config = broadcast_config["replay"] assert replay_config["since"] == ten_mins_ago_ms assert replay_config["limit"] == 100 - + # Verify postgres_changes array is present (even if empty) assert "postgres_changes" in config assert isinstance(config["postgres_changes"], list) - + await socket.close() @pytest.mark.asyncio -async def test_subscribe_to_channel_with_empty_replay_config(socket: AsyncRealtimeClient): +async def test_subscribe_to_channel_with_empty_replay_config( + socket: AsyncRealtimeClient, +): """Test that channel subscription handles empty replay configuration correctly.""" import json from unittest.mock import AsyncMock, patch - + # Mock the websocket connection mock_ws = AsyncMock() socket._ws_connection = mock_ws - + # Connect the socket await socket.connect() - + # Create channel with empty replay configuration channel: AsyncRealtimeChannel = socket.channel( - "test-empty-replay", + "test-empty-replay", params={ "config": { "private": False, - "broadcast": { - "ack": True, - "self": False, - "replay": {} - } + "broadcast": {"ack": True, "self": False, "replay": {}}, } - } + }, ) - + # Mock the subscription callback callback_called = False + def mock_callback(state, error): nonlocal callback_called callback_called = True - + # Subscribe to the channel await channel.subscribe(mock_callback) - + # Verify that send was called assert mock_ws.send.called, "WebSocket send should have been called" - + # Get the sent message sent_message = mock_ws.send.call_args[0][0] message_data = json.loads(sent_message) - + # Verify the payload structure payload = message_data["payload"] config = payload["config"] - + assert config["private"] is False assert "broadcast" in config - + broadcast_config = config["broadcast"] assert broadcast_config["ack"] is True assert broadcast_config["self"] is False assert broadcast_config["replay"] == {} - + await socket.close() @@ -632,54 +629,47 @@ async def test_subscribe_to_channel_without_replay_config(socket: AsyncRealtimeC """Test that channel subscription works without replay configuration.""" import json from unittest.mock import AsyncMock, patch - + # Mock the websocket connection mock_ws = AsyncMock() socket._ws_connection = mock_ws - + # Connect the socket await socket.connect() - + # Create channel without replay configuration channel: AsyncRealtimeChannel = socket.channel( - "test-no-replay", - params={ - "config": { - "private": False, - "broadcast": { - "ack": True, - "self": True - } - } - } + "test-no-replay", + params={"config": {"private": False, "broadcast": {"ack": True, "self": True}}}, ) - + # Mock the subscription callback callback_called = False + def mock_callback(state, error): nonlocal callback_called callback_called = True - + # Subscribe to the channel await channel.subscribe(mock_callback) - + # Verify that send was called assert mock_ws.send.called, "WebSocket send should have been called" - + # Get the sent message sent_message = mock_ws.send.call_args[0][0] message_data = json.loads(sent_message) - + # Verify the payload structure payload = message_data["payload"] config = payload["config"] - + assert config["private"] is False assert "broadcast" in config - + broadcast_config = config["broadcast"] assert broadcast_config["ack"] is True assert broadcast_config["self"] is True assert "replay" not in broadcast_config - + await socket.close() From fc8455a9a37c6bea8f2c439ed7098e4354c7fd8b Mon Sep 17 00:00:00 2001 From: Leonardo Santiago Date: Fri, 3 Oct 2025 13:21:24 -0300 Subject: [PATCH 6/6] fix: fix tests to add missing parameters --- src/realtime/tests/test_connection.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/realtime/tests/test_connection.py b/src/realtime/tests/test_connection.py index 8e839b7e..0f5aeb2a 100644 --- a/src/realtime/tests/test_connection.py +++ b/src/realtime/tests/test_connection.py @@ -517,6 +517,7 @@ async def test_subscribe_to_private_channel_with_broadcast_replay( "config": { "private": True, "broadcast": {"replay": {"since": ten_mins_ago_ms, "limit": 100}}, + "presence": {"enabled": True, "key": ""}, } }, ) @@ -588,6 +589,7 @@ async def test_subscribe_to_channel_with_empty_replay_config( "config": { "private": False, "broadcast": {"ack": True, "self": False, "replay": {}}, + "presence": {"enabled": True, "key": ""}, } }, ) @@ -640,7 +642,13 @@ async def test_subscribe_to_channel_without_replay_config(socket: AsyncRealtimeC # Create channel without replay configuration channel: AsyncRealtimeChannel = socket.channel( "test-no-replay", - params={"config": {"private": False, "broadcast": {"ack": True, "self": True}}}, + params={ + "config": { + "private": False, + "broadcast": {"ack": True, "self": True}, + "presence": {"enabled": True, "key": ""}, + } + }, ) # Mock the subscription callback