Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions packages/core/realtime-js/src/RealtimeChannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,68 @@ export default class RealtimeChannel {
}
return this._on(type, filter, callback)
}
/**
* Sends a broadcast message explicitly via REST API.
*
* This method always uses the REST API endpoint regardless of WebSocket connection state.
* Useful when you want to guarantee REST delivery or when gradually migrating from implicit REST fallback.
*
* @param event The name of the broadcast event
* @param payload Payload to be sent (required)
* @param opts Options including timeout
* @returns Promise resolving to object with success status, and error details if failed
*/
async httpSend(
event: string,
payload: any,
opts: { timeout?: number } = {}
): Promise<{ success: true } | { success: false; status: number; error: string }> {
const authorization = this.socket.accessTokenValue
? `Bearer ${this.socket.accessTokenValue}`
: ''

if (payload === undefined || payload === null) {
return Promise.reject('Payload is required for httpSend()')
}

const options = {
method: 'POST',
headers: {
Authorization: authorization,
apikey: this.socket.apiKey ? this.socket.apiKey : '',
'Content-Type': 'application/json',
},
body: JSON.stringify({
messages: [
{
topic: this.subTopic,
event,
payload: payload,
private: this.private,
},
],
}),
}

const response = await this._fetchWithTimeout(
this.broadcastEndpointURL,
options,
opts.timeout ?? this.timeout
)

if (response.status === 202) {
return { success: true }
}

let errorMessage = response.statusText
try {
const errorBody = await response.json()
errorMessage = errorBody.error || errorBody.message || errorMessage
} catch {}

return Promise.reject(new Error(errorMessage))
}

/**
* Sends a message into the channel.
*
Expand All @@ -454,6 +516,12 @@ export default class RealtimeChannel {
opts: { [key: string]: any } = {}
): Promise<RealtimeChannelSendResponse> {
if (!this._canPush() && args.type === 'broadcast') {
console.warn(
'Realtime send() is automatically falling back to REST API. ' +
'This behavior will be deprecated in the future. ' +
'Please use httpSend() explicitly for REST delivery.'
)

const { event, payload: endpoint_payload } = args
const authorization = this.socket.accessTokenValue
? `Bearer ${this.socket.accessTokenValue}`
Expand Down
211 changes: 211 additions & 0 deletions packages/core/realtime-js/test/RealtimeChannel.messaging.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -473,3 +473,214 @@ describe('send', () => {
})
})
})

describe('httpSend', () => {
const createMockResponse = (status: number, statusText?: string, body?: any) => ({
status,
statusText: statusText || 'OK',
headers: new Headers(),
body: null,
json: vi.fn().mockResolvedValue(body || {}),
})

const createSocket = (hasToken = false, fetchMock?: any) => {
const config: any = {
fetch: fetchMock,
params: { apikey: 'abc123' },
}
if (hasToken) {
config.accessToken = () => Promise.resolve('token123')
}
return new RealtimeClient(testSetup.url, config)
}

const testCases = [
{
name: 'without access token',
hasToken: false,
expectedAuth: '',
},
{
name: 'with access token',
hasToken: true,
expectedAuth: 'Bearer token123',
},
]

testCases.forEach(({ name, hasToken, expectedAuth }) => {
describe(name, () => {
test('sends with correct Authorization header', async () => {
const mockResponse = createMockResponse(202)
const fetchStub = vi.fn().mockResolvedValue(mockResponse)
const socket = createSocket(hasToken, fetchStub)
if (hasToken) await socket.setAuth()
const channel = socket.channel('topic')

const result = await channel.httpSend('test', { data: 'test' })

expect(result).toEqual({ success: true })
expect(fetchStub).toHaveBeenCalledTimes(1)
const [, options] = fetchStub.mock.calls[0]
expect(options.headers.Authorization).toBe(expectedAuth)
expect(options.headers.apikey).toBe('abc123')
})

test('rejects when payload is not provided', async () => {
const socket = createSocket(hasToken)
if (hasToken) await socket.setAuth()
const channel = socket.channel('topic')

await expect(channel.httpSend('test', undefined as any)).rejects.toBe(
'Payload is required for httpSend()'
)
})

test('rejects when payload is null', async () => {
const socket = createSocket(hasToken)
if (hasToken) await socket.setAuth()
const channel = socket.channel('topic')

await expect(channel.httpSend('test', null as any)).rejects.toBe(
'Payload is required for httpSend()'
)
})

test('handles timeout error', async () => {
const timeoutError = new Error('Request timeout')
timeoutError.name = 'AbortError'
const fetchStub = vi.fn().mockRejectedValue(timeoutError)
const socket = createSocket(hasToken, fetchStub)
if (hasToken) await socket.setAuth()
const channel = socket.channel('topic')

await expect(channel.httpSend('test', { data: 'test' })).rejects.toThrow('Request timeout')
})

test('handles non-202 status', async () => {
const mockResponse = createMockResponse(500, 'Internal Server Error', {
error: 'Server error',
})
const fetchStub = vi.fn().mockResolvedValue(mockResponse)
const socket = createSocket(hasToken, fetchStub)
if (hasToken) await socket.setAuth()
const channel = socket.channel('topic')

await expect(channel.httpSend('test', { data: 'test' })).rejects.toThrow('Server error')
})

test('respects custom timeout option', async () => {
const mockResponse = createMockResponse(202)
const fetchStub = vi.fn().mockResolvedValue(mockResponse)
const socket = createSocket(hasToken, fetchStub)
if (hasToken) await socket.setAuth()
const channel = socket.channel('topic')

const result = await channel.httpSend('test', { data: 'test' }, { timeout: 3000 })

expect(result).toEqual({ success: true })
expect(fetchStub).toHaveBeenCalledTimes(1)
const [, options] = fetchStub.mock.calls[0]
expect(options.headers.Authorization).toBe(expectedAuth)
})

test('sends correct payload', async () => {
const mockResponse = createMockResponse(202)
const fetchStub = vi.fn().mockResolvedValue(mockResponse)
const socket = createSocket(hasToken, fetchStub)
if (hasToken) await socket.setAuth()
const channel = socket.channel('topic')

const result = await channel.httpSend('test-payload', { data: 'value' })

expect(result).toEqual({ success: true })
expect(fetchStub).toHaveBeenCalledTimes(1)
const [, options] = fetchStub.mock.calls[0]
expect(options.headers.Authorization).toBe(expectedAuth)
expect(options.body).toBe(
'{"messages":[{"topic":"topic","event":"test-payload","payload":{"data":"value"},"private":false}]}'
)
})
})
})

describe('with access token - additional scenarios', () => {
test('returns success true on 202 status with private channel', async () => {
const mockResponse = createMockResponse(202, 'Accepted')
const fetchStub = vi.fn().mockResolvedValue(mockResponse)
const socket = createSocket(true, fetchStub)
await socket.setAuth()
const channel = socket.channel('topic', { config: { private: true } })

const result = await channel.httpSend('test-explicit', { data: 'explicit' })

expect(result).toEqual({ success: true })
const expectedUrl = testSetup.url
.replace('/socket', '')
.replace('wss', 'https')
.concat('/api/broadcast')
expect(fetchStub).toHaveBeenCalledTimes(1)
const [url, options] = fetchStub.mock.calls[0]
expect(url).toBe(expectedUrl)
expect(options.method).toBe('POST')
expect(options.headers.Authorization).toBe('Bearer token123')
expect(options.headers.apikey).toBe('abc123')
expect(options.body).toBe(
'{"messages":[{"topic":"topic","event":"test-explicit","payload":{"data":"explicit"},"private":true}]}'
)
})

test('uses default timeout when not specified', async () => {
const mockResponse = createMockResponse(202)
const fetchStub = vi.fn().mockResolvedValue(mockResponse)
const socket = new RealtimeClient(testSetup.url, {
fetch: fetchStub,
timeout: 5000,
params: { apikey: 'abc123' },
accessToken: () => Promise.resolve('token123'),
})
await socket.setAuth()
const channel = socket.channel('topic')

const result = await channel.httpSend('test', { data: 'test' })

expect(result).toEqual({ success: true })
expect(fetchStub).toHaveBeenCalledTimes(1)
const [, options] = fetchStub.mock.calls[0]
expect(options.signal).toBeDefined()
})

test('uses statusText when error body has no error field', async () => {
const mockResponse = {
status: 400,
statusText: 'Bad Request',
headers: new Headers(),
body: null,
json: vi.fn().mockResolvedValue({ message: 'Invalid request' }),
}
const fetchStub = vi.fn().mockResolvedValue(mockResponse)
const socket = createSocket(true, fetchStub)
await socket.setAuth()
const channel = socket.channel('topic')

await expect(channel.httpSend('test', { data: 'test' })).rejects.toThrow('Invalid request')
})

test('falls back to statusText when json parsing fails', async () => {
const mockResponse = {
status: 503,
statusText: 'Service Unavailable',
headers: new Headers(),
body: null,
json: vi.fn().mockRejectedValue(new Error('Invalid JSON')),
}
const fetchStub = vi.fn().mockResolvedValue(mockResponse)
const socket = createSocket(true, fetchStub)
await socket.setAuth()
const channel = socket.channel('topic')

await expect(channel.httpSend('test', { data: 'test' })).rejects.toThrow(
'Service Unavailable'
)
})
})
})