Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions packages/core/realtime-js/src/RealtimeChannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,68 @@
}
return this._on(type, filter, callback)
}
/**
* Sends a broadcast message explicitly via REST API.
*
* This method always uses the REST API endpoint regardless of WebSocket connection state.
* Useful when you want to guarantee REST delivery or when gradually migrating from implicit REST fallback.
*
* @param event The name of the broadcast event
* @param payload Payload to be sent (required)
* @param opts Options including timeout
* @returns Promise resolving to object with success status, and error details if failed
*/
async postSend(
event: string,
payload: any,
opts: { timeout?: number } = {}
): Promise<{ success: true } | { success: false; status: number; error: string }> {
if (!this.socket.accessTokenValue) {
return Promise.reject('Access token is required for postSend()')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the secret key is being used? Would we still need an access token?

}

if (payload === undefined || payload === null) {
return Promise.reject('Payload is required for postSend()')
}

const options = {
method: 'POST',
headers: {
Authorization: `Bearer ${this.socket.accessTokenValue}`,
apikey: this.socket.apiKey ? this.socket.apiKey : '',
'Content-Type': 'application/json',
},
body: JSON.stringify({
messages: [
{
topic: this.subTopic,
event,
payload: payload,
private: this.private,
},
],
}),
}

const response = await this._fetchWithTimeout(
this.broadcastEndpointURL,
options,
opts.timeout ?? this.timeout
)

if (response.status === 202) {
return { success: true }
}

let errorMessage = response.statusText
try {
const errorBody = await response.json()
errorMessage = errorBody.error || errorBody.message || errorMessage
} catch {}

return Promise.reject(new Error(errorMessage))

Check failure on line 497 in packages/core/realtime-js/src/RealtimeChannel.ts

View workflow job for this annotation

GitHub Actions / Run tests for Node.js 20

test/RealtimeChannel.messaging.test.ts > postSend > falls back to statusText when json parsing fails

Error: Service Unavailable ❯ RealtimeChannel.postSend src/RealtimeChannel.ts:497:27 ❯ test/RealtimeChannel.messaging.test.ts:725:20

Check failure on line 497 in packages/core/realtime-js/src/RealtimeChannel.ts

View workflow job for this annotation

GitHub Actions / Run tests for Node.js 20

test/RealtimeChannel.messaging.test.ts > postSend > uses statusText when error body has no error field

Error: Invalid request ❯ RealtimeChannel.postSend src/RealtimeChannel.ts:497:27 ❯ test/RealtimeChannel.messaging.test.ts:698:20

Check failure on line 497 in packages/core/realtime-js/src/RealtimeChannel.ts

View workflow job for this annotation

GitHub Actions / Run tests for Node.js 20

test/RealtimeChannel.messaging.test.ts > postSend > returns error object on non-202 status

Error: Server error ❯ RealtimeChannel.postSend src/RealtimeChannel.ts:497:27 ❯ test/RealtimeChannel.messaging.test.ts:581:20
}

/**
* Sends a message into the channel.
*
Expand All @@ -454,6 +516,12 @@
opts: { [key: string]: any } = {}
): Promise<RealtimeChannelSendResponse> {
if (!this._canPush() && args.type === 'broadcast') {
console.warn(
'Realtime send() is automatically falling back to REST API. ' +
'This behavior will be deprecated in the future. ' +
'Please use postSend() explicitly for REST delivery.'
)

const { event, payload: endpoint_payload } = args
const authorization = this.socket.accessTokenValue
? `Bearer ${this.socket.accessTokenValue}`
Expand Down
258 changes: 258 additions & 0 deletions packages/core/realtime-js/test/RealtimeChannel.messaging.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -473,3 +473,261 @@ describe('send', () => {
})
})
})

describe('postSend', () => {
test('rejects when access token is not set', async () => {
const socket = new RealtimeClient(testSetup.url, {
params: { apikey: 'abc123' },
})
const channel = socket.channel('topic')

await expect(channel.postSend('test', { data: 'test' })).rejects.toBe(
'Access token is required for postSend()'
)
})

test('rejects when payload is not provided', async () => {
const socket = new RealtimeClient(testSetup.url, {
params: { apikey: 'abc123' },
accessToken: () => Promise.resolve('token123'),
})
await socket.setAuth()
const channel = socket.channel('topic')

await expect(channel.postSend('test', undefined as any)).rejects.toBe(
'Payload is required for postSend()'
)
})

test('returns success true on 202 status', async () => {
const mockResponse = {
status: 202,
statusText: 'Accepted',
headers: new Headers(),
body: null,
}
const fetchStub = vi.fn().mockResolvedValue(mockResponse)

const socket = new RealtimeClient(testSetup.url, {
fetch: fetchStub as unknown as typeof fetch,
timeout: defaultTimeout,
params: { apikey: 'def456' },
accessToken: () => Promise.resolve('access_token_456'),
})

await socket.setAuth()

const channel = socket.channel('topic', {
config: { private: true },
})

const result = await channel.postSend('test-explicit', {
data: 'explicit',
})

expect(result).toEqual({ success: true })

const expectedUrl = testSetup.url
.replace('/socket', '')
.replace('wss', 'https')
.concat('/api/broadcast')

expect(fetchStub).toHaveBeenCalledTimes(1)
const [url, options] = fetchStub.mock.calls[0]
expect(url).toBe(expectedUrl)
expect(options.method).toBe('POST')
expect(options.headers.Authorization).toBe('Bearer access_token_456')
expect(options.headers.apikey).toBe('def456')
expect(options.body).toBe(
'{"messages":[{"topic":"topic","event":"test-explicit","payload":{"data":"explicit"},"private":true}]}'
)
})

test('throws on timeout error', async () => {
const timeoutError = new Error('Request timeout')
timeoutError.name = 'AbortError'

const fetchStub = vi.fn().mockRejectedValue(timeoutError)

const socket = new RealtimeClient(testSetup.url, {
fetch: fetchStub as unknown as typeof fetch,
params: { apikey: 'abc123' },
accessToken: () => Promise.resolve('token123'),
})
await socket.setAuth()
const channel = socket.channel('topic')

await expect(channel.postSend('test', { data: 'test' })).rejects.toThrow('Request timeout')
})

test('returns error object on non-202 status', async () => {
const mockResponse = {
status: 500,
statusText: 'Internal Server Error',
headers: new Headers(),
body: null,
json: vi.fn().mockResolvedValue({ error: 'Server error' }),
}
const fetchStub = vi.fn().mockResolvedValue(mockResponse)

const socket = new RealtimeClient(testSetup.url, {
fetch: fetchStub as unknown as typeof fetch,
params: { apikey: 'abc123' },
accessToken: () => Promise.resolve('token123'),
})
await socket.setAuth()
const channel = socket.channel('topic')

const result = await channel.postSend('test', { data: 'test' })

expect(result).toEqual({
success: false,
status: 500,
error: 'Server error',
})
})

test('respects custom timeout option', async () => {
const mockResponse = {
status: 202,
headers: new Headers(),
body: null,
}
const fetchStub = vi.fn().mockResolvedValue(mockResponse)

const socket = new RealtimeClient(testSetup.url, {
fetch: fetchStub as unknown as typeof fetch,
timeout: 5000,
params: { apikey: 'abc123' },
accessToken: () => Promise.resolve('token123'),
})
await socket.setAuth()
const channel = socket.channel('topic')

const result = await channel.postSend('test', { data: 'test' }, { timeout: 3000 })

expect(result).toEqual({ success: true })
expect(fetchStub).toHaveBeenCalledTimes(1)
const [, options] = fetchStub.mock.calls[0]
expect(options.signal).toBeDefined()
expect(options.signal).toBeInstanceOf(AbortSignal)
})

test('uses default timeout when not specified', async () => {
const mockResponse = {
status: 202,
headers: new Headers(),
body: null,
}
const fetchStub = vi.fn().mockResolvedValue(mockResponse)

const socket = new RealtimeClient(testSetup.url, {
fetch: fetchStub as unknown as typeof fetch,
timeout: 5000,
params: { apikey: 'abc123' },
accessToken: () => Promise.resolve('token123'),
})
await socket.setAuth()
const channel = socket.channel('topic')

const result = await channel.postSend('test', { data: 'test' })

expect(result).toEqual({ success: true })
expect(fetchStub).toHaveBeenCalledTimes(1)
const [, options] = fetchStub.mock.calls[0]
expect(options.signal).toBeDefined()
})

test('allows non-empty payload', async () => {
const mockResponse = {
status: 202,
headers: new Headers(),
body: null,
}
const fetchStub = vi.fn().mockResolvedValue(mockResponse)

const socket = new RealtimeClient(testSetup.url, {
fetch: fetchStub as unknown as typeof fetch,
params: { apikey: 'abc123' },
accessToken: () => Promise.resolve('token123'),
})
await socket.setAuth()
const channel = socket.channel('topic')

const result = await channel.postSend('test-payload', { data: 'value' })

expect(result).toEqual({ success: true })
expect(fetchStub).toHaveBeenCalledTimes(1)

const [, options] = fetchStub.mock.calls[0]
const body = JSON.parse(options.body)
expect(body.messages[0].payload).toEqual({ data: 'value' })
})

test('rejects when payload is null', async () => {
const socket = new RealtimeClient(testSetup.url, {
params: { apikey: 'abc123' },
accessToken: () => Promise.resolve('token123'),
})
await socket.setAuth()
const channel = socket.channel('topic')

await expect(channel.postSend('test', null as any)).rejects.toBe(
'Payload is required for postSend()'
)
})

test('uses statusText when error body has no error field', async () => {
const mockResponse = {
status: 400,
statusText: 'Bad Request',
headers: new Headers(),
body: null,
json: vi.fn().mockResolvedValue({ message: 'Invalid request' }),
}
const fetchStub = vi.fn().mockResolvedValue(mockResponse)

const socket = new RealtimeClient(testSetup.url, {
fetch: fetchStub as unknown as typeof fetch,
params: { apikey: 'abc123' },
accessToken: () => Promise.resolve('token123'),
})
await socket.setAuth()
const channel = socket.channel('topic')

const result = await channel.postSend('test', { data: 'test' })

expect(result).toEqual({
success: false,
status: 400,
error: 'Invalid request',
})
})

test('falls back to statusText when json parsing fails', async () => {
const mockResponse = {
status: 503,
statusText: 'Service Unavailable',
headers: new Headers(),
body: null,
json: vi.fn().mockRejectedValue(new Error('Invalid JSON')),
}
const fetchStub = vi.fn().mockResolvedValue(mockResponse)

const socket = new RealtimeClient(testSetup.url, {
fetch: fetchStub as unknown as typeof fetch,
params: { apikey: 'abc123' },
accessToken: () => Promise.resolve('token123'),
})
await socket.setAuth()
const channel = socket.channel('topic')

const result = await channel.postSend('test', { data: 'test' })

expect(result).toEqual({
success: false,
status: 503,
error: 'Service Unavailable',
})
})
})
Loading