diff --git a/packages/core/realtime-js/src/RealtimeChannel.ts b/packages/core/realtime-js/src/RealtimeChannel.ts index 303bf7cd..7d9cc70f 100644 --- a/packages/core/realtime-js/src/RealtimeChannel.ts +++ b/packages/core/realtime-js/src/RealtimeChannel.ts @@ -435,6 +435,68 @@ export default class RealtimeChannel { } return this._on(type, filter, callback) } + /** + * Sends a broadcast message explicitly via REST API. + * + * This method always uses the REST API endpoint regardless of WebSocket connection state. + * Useful when you want to guarantee REST delivery or when gradually migrating from implicit REST fallback. + * + * @param event The name of the broadcast event + * @param payload Payload to be sent (required) + * @param opts Options including timeout + * @returns Promise resolving to object with success status, and error details if failed + */ + async httpSend( + event: string, + payload: any, + opts: { timeout?: number } = {} + ): Promise<{ success: true } | { success: false; status: number; error: string }> { + const authorization = this.socket.accessTokenValue + ? `Bearer ${this.socket.accessTokenValue}` + : '' + + if (payload === undefined || payload === null) { + return Promise.reject('Payload is required for httpSend()') + } + + const options = { + method: 'POST', + headers: { + Authorization: authorization, + apikey: this.socket.apiKey ? this.socket.apiKey : '', + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + messages: [ + { + topic: this.subTopic, + event, + payload: payload, + private: this.private, + }, + ], + }), + } + + const response = await this._fetchWithTimeout( + this.broadcastEndpointURL, + options, + opts.timeout ?? this.timeout + ) + + if (response.status === 202) { + return { success: true } + } + + let errorMessage = response.statusText + try { + const errorBody = await response.json() + errorMessage = errorBody.error || errorBody.message || errorMessage + } catch {} + + return Promise.reject(new Error(errorMessage)) + } + /** * Sends a message into the channel. * @@ -454,6 +516,12 @@ export default class RealtimeChannel { opts: { [key: string]: any } = {} ): Promise { if (!this._canPush() && args.type === 'broadcast') { + console.warn( + 'Realtime send() is automatically falling back to REST API. ' + + 'This behavior will be deprecated in the future. ' + + 'Please use httpSend() explicitly for REST delivery.' + ) + const { event, payload: endpoint_payload } = args const authorization = this.socket.accessTokenValue ? `Bearer ${this.socket.accessTokenValue}` diff --git a/packages/core/realtime-js/test/RealtimeChannel.messaging.test.ts b/packages/core/realtime-js/test/RealtimeChannel.messaging.test.ts index 2af90b0d..5b26f657 100644 --- a/packages/core/realtime-js/test/RealtimeChannel.messaging.test.ts +++ b/packages/core/realtime-js/test/RealtimeChannel.messaging.test.ts @@ -473,3 +473,214 @@ describe('send', () => { }) }) }) + +describe('httpSend', () => { + const createMockResponse = (status: number, statusText?: string, body?: any) => ({ + status, + statusText: statusText || 'OK', + headers: new Headers(), + body: null, + json: vi.fn().mockResolvedValue(body || {}), + }) + + const createSocket = (hasToken = false, fetchMock?: any) => { + const config: any = { + fetch: fetchMock, + params: { apikey: 'abc123' }, + } + if (hasToken) { + config.accessToken = () => Promise.resolve('token123') + } + return new RealtimeClient(testSetup.url, config) + } + + const testCases = [ + { + name: 'without access token', + hasToken: false, + expectedAuth: '', + }, + { + name: 'with access token', + hasToken: true, + expectedAuth: 'Bearer token123', + }, + ] + + testCases.forEach(({ name, hasToken, expectedAuth }) => { + describe(name, () => { + test('sends with correct Authorization header', async () => { + const mockResponse = createMockResponse(202) + const fetchStub = vi.fn().mockResolvedValue(mockResponse) + const socket = createSocket(hasToken, fetchStub) + if (hasToken) await socket.setAuth() + const channel = socket.channel('topic') + + const result = await channel.httpSend('test', { data: 'test' }) + + expect(result).toEqual({ success: true }) + expect(fetchStub).toHaveBeenCalledTimes(1) + const [, options] = fetchStub.mock.calls[0] + expect(options.headers.Authorization).toBe(expectedAuth) + expect(options.headers.apikey).toBe('abc123') + }) + + test('rejects when payload is not provided', async () => { + const socket = createSocket(hasToken) + if (hasToken) await socket.setAuth() + const channel = socket.channel('topic') + + await expect(channel.httpSend('test', undefined as any)).rejects.toBe( + 'Payload is required for httpSend()' + ) + }) + + test('rejects when payload is null', async () => { + const socket = createSocket(hasToken) + if (hasToken) await socket.setAuth() + const channel = socket.channel('topic') + + await expect(channel.httpSend('test', null as any)).rejects.toBe( + 'Payload is required for httpSend()' + ) + }) + + test('handles timeout error', async () => { + const timeoutError = new Error('Request timeout') + timeoutError.name = 'AbortError' + const fetchStub = vi.fn().mockRejectedValue(timeoutError) + const socket = createSocket(hasToken, fetchStub) + if (hasToken) await socket.setAuth() + const channel = socket.channel('topic') + + await expect(channel.httpSend('test', { data: 'test' })).rejects.toThrow('Request timeout') + }) + + test('handles non-202 status', async () => { + const mockResponse = createMockResponse(500, 'Internal Server Error', { + error: 'Server error', + }) + const fetchStub = vi.fn().mockResolvedValue(mockResponse) + const socket = createSocket(hasToken, fetchStub) + if (hasToken) await socket.setAuth() + const channel = socket.channel('topic') + + await expect(channel.httpSend('test', { data: 'test' })).rejects.toThrow('Server error') + }) + + test('respects custom timeout option', async () => { + const mockResponse = createMockResponse(202) + const fetchStub = vi.fn().mockResolvedValue(mockResponse) + const socket = createSocket(hasToken, fetchStub) + if (hasToken) await socket.setAuth() + const channel = socket.channel('topic') + + const result = await channel.httpSend('test', { data: 'test' }, { timeout: 3000 }) + + expect(result).toEqual({ success: true }) + expect(fetchStub).toHaveBeenCalledTimes(1) + const [, options] = fetchStub.mock.calls[0] + expect(options.headers.Authorization).toBe(expectedAuth) + }) + + test('sends correct payload', async () => { + const mockResponse = createMockResponse(202) + const fetchStub = vi.fn().mockResolvedValue(mockResponse) + const socket = createSocket(hasToken, fetchStub) + if (hasToken) await socket.setAuth() + const channel = socket.channel('topic') + + const result = await channel.httpSend('test-payload', { data: 'value' }) + + expect(result).toEqual({ success: true }) + expect(fetchStub).toHaveBeenCalledTimes(1) + const [, options] = fetchStub.mock.calls[0] + expect(options.headers.Authorization).toBe(expectedAuth) + expect(options.body).toBe( + '{"messages":[{"topic":"topic","event":"test-payload","payload":{"data":"value"},"private":false}]}' + ) + }) + }) + }) + + describe('with access token - additional scenarios', () => { + test('returns success true on 202 status with private channel', async () => { + const mockResponse = createMockResponse(202, 'Accepted') + const fetchStub = vi.fn().mockResolvedValue(mockResponse) + const socket = createSocket(true, fetchStub) + await socket.setAuth() + const channel = socket.channel('topic', { config: { private: true } }) + + const result = await channel.httpSend('test-explicit', { data: 'explicit' }) + + expect(result).toEqual({ success: true }) + const expectedUrl = testSetup.url + .replace('/socket', '') + .replace('wss', 'https') + .concat('/api/broadcast') + expect(fetchStub).toHaveBeenCalledTimes(1) + const [url, options] = fetchStub.mock.calls[0] + expect(url).toBe(expectedUrl) + expect(options.method).toBe('POST') + expect(options.headers.Authorization).toBe('Bearer token123') + expect(options.headers.apikey).toBe('abc123') + expect(options.body).toBe( + '{"messages":[{"topic":"topic","event":"test-explicit","payload":{"data":"explicit"},"private":true}]}' + ) + }) + + test('uses default timeout when not specified', async () => { + const mockResponse = createMockResponse(202) + const fetchStub = vi.fn().mockResolvedValue(mockResponse) + const socket = new RealtimeClient(testSetup.url, { + fetch: fetchStub, + timeout: 5000, + params: { apikey: 'abc123' }, + accessToken: () => Promise.resolve('token123'), + }) + await socket.setAuth() + const channel = socket.channel('topic') + + const result = await channel.httpSend('test', { data: 'test' }) + + expect(result).toEqual({ success: true }) + expect(fetchStub).toHaveBeenCalledTimes(1) + const [, options] = fetchStub.mock.calls[0] + expect(options.signal).toBeDefined() + }) + + test('uses statusText when error body has no error field', async () => { + const mockResponse = { + status: 400, + statusText: 'Bad Request', + headers: new Headers(), + body: null, + json: vi.fn().mockResolvedValue({ message: 'Invalid request' }), + } + const fetchStub = vi.fn().mockResolvedValue(mockResponse) + const socket = createSocket(true, fetchStub) + await socket.setAuth() + const channel = socket.channel('topic') + + await expect(channel.httpSend('test', { data: 'test' })).rejects.toThrow('Invalid request') + }) + + test('falls back to statusText when json parsing fails', async () => { + const mockResponse = { + status: 503, + statusText: 'Service Unavailable', + headers: new Headers(), + body: null, + json: vi.fn().mockRejectedValue(new Error('Invalid JSON')), + } + const fetchStub = vi.fn().mockResolvedValue(mockResponse) + const socket = createSocket(true, fetchStub) + await socket.setAuth() + const channel = socket.channel('topic') + + await expect(channel.httpSend('test', { data: 'test' })).rejects.toThrow( + 'Service Unavailable' + ) + }) + }) +})