From 8eedcbd3e070fe13e94cd75eb4010f0f82abf49a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filipe=20Caba=C3=A7o?= Date: Wed, 8 Oct 2025 18:26:20 +0100 Subject: [PATCH] fix(realtime): realtime explicit REST call --- .../core/realtime-js/src/RealtimeChannel.ts | 68 +++++ .../test/RealtimeChannel.messaging.test.ts | 258 ++++++++++++++++++ 2 files changed, 326 insertions(+) diff --git a/packages/core/realtime-js/src/RealtimeChannel.ts b/packages/core/realtime-js/src/RealtimeChannel.ts index 303bf7cda..9c687f340 100644 --- a/packages/core/realtime-js/src/RealtimeChannel.ts +++ b/packages/core/realtime-js/src/RealtimeChannel.ts @@ -435,6 +435,68 @@ export default class RealtimeChannel { } return this._on(type, filter, callback) } + /** + * Sends a broadcast message explicitly via REST API. + * + * This method always uses the REST API endpoint regardless of WebSocket connection state. + * Useful when you want to guarantee REST delivery or when gradually migrating from implicit REST fallback. + * + * @param event The name of the broadcast event + * @param payload Payload to be sent (required) + * @param opts Options including timeout + * @returns Promise resolving to object with success status, and error details if failed + */ + async postSend( + event: string, + payload: any, + opts: { timeout?: number } = {} + ): Promise<{ success: true } | { success: false; status: number; error: string }> { + if (!this.socket.accessTokenValue) { + return Promise.reject('Access token is required for postSend()') + } + + if (payload === undefined || payload === null) { + return Promise.reject('Payload is required for postSend()') + } + + const options = { + method: 'POST', + headers: { + Authorization: `Bearer ${this.socket.accessTokenValue}`, + apikey: this.socket.apiKey ? this.socket.apiKey : '', + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + messages: [ + { + topic: this.subTopic, + event, + payload: payload, + private: this.private, + }, + ], + }), + } + + const response = await this._fetchWithTimeout( + this.broadcastEndpointURL, + options, + opts.timeout ?? this.timeout + ) + + if (response.status === 202) { + return { success: true } + } + + let errorMessage = response.statusText + try { + const errorBody = await response.json() + errorMessage = errorBody.error || errorBody.message || errorMessage + } catch {} + + return Promise.reject(new Error(errorMessage)) + } + /** * Sends a message into the channel. * @@ -454,6 +516,12 @@ export default class RealtimeChannel { opts: { [key: string]: any } = {} ): Promise { if (!this._canPush() && args.type === 'broadcast') { + console.warn( + 'Realtime send() is automatically falling back to REST API. ' + + 'This behavior will be deprecated in the future. ' + + 'Please use postSend() explicitly for REST delivery.' + ) + const { event, payload: endpoint_payload } = args const authorization = this.socket.accessTokenValue ? `Bearer ${this.socket.accessTokenValue}` diff --git a/packages/core/realtime-js/test/RealtimeChannel.messaging.test.ts b/packages/core/realtime-js/test/RealtimeChannel.messaging.test.ts index 2af90b0d6..367a68f98 100644 --- a/packages/core/realtime-js/test/RealtimeChannel.messaging.test.ts +++ b/packages/core/realtime-js/test/RealtimeChannel.messaging.test.ts @@ -473,3 +473,261 @@ describe('send', () => { }) }) }) + +describe('postSend', () => { + test('rejects when access token is not set', async () => { + const socket = new RealtimeClient(testSetup.url, { + params: { apikey: 'abc123' }, + }) + const channel = socket.channel('topic') + + await expect(channel.postSend('test', { data: 'test' })).rejects.toBe( + 'Access token is required for postSend()' + ) + }) + + test('rejects when payload is not provided', async () => { + const socket = new RealtimeClient(testSetup.url, { + params: { apikey: 'abc123' }, + accessToken: () => Promise.resolve('token123'), + }) + await socket.setAuth() + const channel = socket.channel('topic') + + await expect(channel.postSend('test', undefined as any)).rejects.toBe( + 'Payload is required for postSend()' + ) + }) + + test('returns success true on 202 status', async () => { + const mockResponse = { + status: 202, + statusText: 'Accepted', + headers: new Headers(), + body: null, + } + const fetchStub = vi.fn().mockResolvedValue(mockResponse) + + const socket = new RealtimeClient(testSetup.url, { + fetch: fetchStub as unknown as typeof fetch, + timeout: defaultTimeout, + params: { apikey: 'def456' }, + accessToken: () => Promise.resolve('access_token_456'), + }) + + await socket.setAuth() + + const channel = socket.channel('topic', { + config: { private: true }, + }) + + const result = await channel.postSend('test-explicit', { + data: 'explicit', + }) + + expect(result).toEqual({ success: true }) + + const expectedUrl = testSetup.url + .replace('/socket', '') + .replace('wss', 'https') + .concat('/api/broadcast') + + expect(fetchStub).toHaveBeenCalledTimes(1) + const [url, options] = fetchStub.mock.calls[0] + expect(url).toBe(expectedUrl) + expect(options.method).toBe('POST') + expect(options.headers.Authorization).toBe('Bearer access_token_456') + expect(options.headers.apikey).toBe('def456') + expect(options.body).toBe( + '{"messages":[{"topic":"topic","event":"test-explicit","payload":{"data":"explicit"},"private":true}]}' + ) + }) + + test('throws on timeout error', async () => { + const timeoutError = new Error('Request timeout') + timeoutError.name = 'AbortError' + + const fetchStub = vi.fn().mockRejectedValue(timeoutError) + + const socket = new RealtimeClient(testSetup.url, { + fetch: fetchStub as unknown as typeof fetch, + params: { apikey: 'abc123' }, + accessToken: () => Promise.resolve('token123'), + }) + await socket.setAuth() + const channel = socket.channel('topic') + + await expect(channel.postSend('test', { data: 'test' })).rejects.toThrow('Request timeout') + }) + + test('returns error object on non-202 status', async () => { + const mockResponse = { + status: 500, + statusText: 'Internal Server Error', + headers: new Headers(), + body: null, + json: vi.fn().mockResolvedValue({ error: 'Server error' }), + } + const fetchStub = vi.fn().mockResolvedValue(mockResponse) + + const socket = new RealtimeClient(testSetup.url, { + fetch: fetchStub as unknown as typeof fetch, + params: { apikey: 'abc123' }, + accessToken: () => Promise.resolve('token123'), + }) + await socket.setAuth() + const channel = socket.channel('topic') + + const result = await channel.postSend('test', { data: 'test' }) + + expect(result).toEqual({ + success: false, + status: 500, + error: 'Server error', + }) + }) + + test('respects custom timeout option', async () => { + const mockResponse = { + status: 202, + headers: new Headers(), + body: null, + } + const fetchStub = vi.fn().mockResolvedValue(mockResponse) + + const socket = new RealtimeClient(testSetup.url, { + fetch: fetchStub as unknown as typeof fetch, + timeout: 5000, + params: { apikey: 'abc123' }, + accessToken: () => Promise.resolve('token123'), + }) + await socket.setAuth() + const channel = socket.channel('topic') + + const result = await channel.postSend('test', { data: 'test' }, { timeout: 3000 }) + + expect(result).toEqual({ success: true }) + expect(fetchStub).toHaveBeenCalledTimes(1) + const [, options] = fetchStub.mock.calls[0] + expect(options.signal).toBeDefined() + expect(options.signal).toBeInstanceOf(AbortSignal) + }) + + test('uses default timeout when not specified', async () => { + const mockResponse = { + status: 202, + headers: new Headers(), + body: null, + } + const fetchStub = vi.fn().mockResolvedValue(mockResponse) + + const socket = new RealtimeClient(testSetup.url, { + fetch: fetchStub as unknown as typeof fetch, + timeout: 5000, + params: { apikey: 'abc123' }, + accessToken: () => Promise.resolve('token123'), + }) + await socket.setAuth() + const channel = socket.channel('topic') + + const result = await channel.postSend('test', { data: 'test' }) + + expect(result).toEqual({ success: true }) + expect(fetchStub).toHaveBeenCalledTimes(1) + const [, options] = fetchStub.mock.calls[0] + expect(options.signal).toBeDefined() + }) + + test('allows non-empty payload', async () => { + const mockResponse = { + status: 202, + headers: new Headers(), + body: null, + } + const fetchStub = vi.fn().mockResolvedValue(mockResponse) + + const socket = new RealtimeClient(testSetup.url, { + fetch: fetchStub as unknown as typeof fetch, + params: { apikey: 'abc123' }, + accessToken: () => Promise.resolve('token123'), + }) + await socket.setAuth() + const channel = socket.channel('topic') + + const result = await channel.postSend('test-payload', { data: 'value' }) + + expect(result).toEqual({ success: true }) + expect(fetchStub).toHaveBeenCalledTimes(1) + + const [, options] = fetchStub.mock.calls[0] + const body = JSON.parse(options.body) + expect(body.messages[0].payload).toEqual({ data: 'value' }) + }) + + test('rejects when payload is null', async () => { + const socket = new RealtimeClient(testSetup.url, { + params: { apikey: 'abc123' }, + accessToken: () => Promise.resolve('token123'), + }) + await socket.setAuth() + const channel = socket.channel('topic') + + await expect(channel.postSend('test', null as any)).rejects.toBe( + 'Payload is required for postSend()' + ) + }) + + test('uses statusText when error body has no error field', async () => { + const mockResponse = { + status: 400, + statusText: 'Bad Request', + headers: new Headers(), + body: null, + json: vi.fn().mockResolvedValue({ message: 'Invalid request' }), + } + const fetchStub = vi.fn().mockResolvedValue(mockResponse) + + const socket = new RealtimeClient(testSetup.url, { + fetch: fetchStub as unknown as typeof fetch, + params: { apikey: 'abc123' }, + accessToken: () => Promise.resolve('token123'), + }) + await socket.setAuth() + const channel = socket.channel('topic') + + const result = await channel.postSend('test', { data: 'test' }) + + expect(result).toEqual({ + success: false, + status: 400, + error: 'Invalid request', + }) + }) + + test('falls back to statusText when json parsing fails', async () => { + const mockResponse = { + status: 503, + statusText: 'Service Unavailable', + headers: new Headers(), + body: null, + json: vi.fn().mockRejectedValue(new Error('Invalid JSON')), + } + const fetchStub = vi.fn().mockResolvedValue(mockResponse) + + const socket = new RealtimeClient(testSetup.url, { + fetch: fetchStub as unknown as typeof fetch, + params: { apikey: 'abc123' }, + accessToken: () => Promise.resolve('token123'), + }) + await socket.setAuth() + const channel = socket.channel('topic') + + const result = await channel.postSend('test', { data: 'test' }) + + expect(result).toEqual({ + success: false, + status: 503, + error: 'Service Unavailable', + }) + }) +})