From f4f020f7195ea48460ae8c8d30db4240fa91e5c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filipe=20Caba=C3=A7o?= Date: Fri, 29 Aug 2025 12:36:15 +0100 Subject: [PATCH] fix: add hearthbeat callback --- src/RealtimeClient.ts | 28 +- test/RealtimeClient.heartbeatCallback.test.ts | 326 ++++++++++++++++++ test/RealtimeClient.lifecycle.test.ts | 22 ++ test/helpers/lifecycle.ts | 16 +- 4 files changed, 386 insertions(+), 6 deletions(-) create mode 100644 test/RealtimeClient.heartbeatCallback.test.ts diff --git a/src/RealtimeClient.ts b/src/RealtimeClient.ts index c976cfb9..e3759e60 100755 --- a/src/RealtimeClient.ts +++ b/src/RealtimeClient.ts @@ -81,6 +81,7 @@ export type RealtimeClientOptions = { transport?: WebSocketLikeConstructor timeout?: number heartbeatIntervalMs?: number + heartbeatCallback?: (status: HeartbeatStatus) => void logger?: Function encode?: Function decode?: Function @@ -158,6 +159,7 @@ export default class RealtimeClient { * @param options.params The optional params to pass when connecting. * @param options.headers Deprecated: headers cannot be set on websocket connections and this option will be removed in the future. * @param options.heartbeatIntervalMs The millisec interval to send a heartbeat message. + * @param options.heartbeatCallback The optional function to handle heartbeat status. * @param options.logger The optional function for specialized logging, ie: logger: (kind, msg, data) => { console.log(`${kind}: ${msg}`, data) } * @param options.logLevel Sets the log level for Realtime * @param options.encode The function to encode outgoing messages. Defaults to JSON: (payload, callback) => callback(JSON.stringify(payload)) @@ -421,7 +423,11 @@ export default class RealtimeClient { */ async sendHeartbeat() { if (!this.isConnected()) { - this.heartbeatCallback('disconnected') + try { + this.heartbeatCallback('disconnected') + } catch (e) { + this.log('error', 'error in heartbeat callback', e) + } return } @@ -432,7 +438,11 @@ export default class RealtimeClient { 'transport', 'heartbeat timeout. Attempting to re-establish connection' ) - this.heartbeatCallback('timeout') + try { + this.heartbeatCallback('timeout') + } catch (e) { + this.log('error', 'error in heartbeat callback', e) + } // Force reconnection after heartbeat timeout this._wasManualDisconnect = false @@ -454,7 +464,11 @@ export default class RealtimeClient { payload: {}, ref: this.pendingHeartbeatRef, }) - this.heartbeatCallback('sent') + try { + this.heartbeatCallback('sent') + } catch (e) { + this.log('error', 'error in heartbeat callback', e) + } this._setAuthSafely('heartbeat') } @@ -545,7 +559,11 @@ export default class RealtimeClient { this.decode(rawMessage.data, (msg: RealtimeMessage) => { // Handle heartbeat responses if (msg.topic === 'phoenix' && msg.event === 'phx_reply') { - this.heartbeatCallback(msg.payload.status === 'ok' ? 'ok' : 'error') + try { + this.heartbeatCallback(msg.payload.status === 'ok' ? 'ok' : 'error') + } catch (e) { + this.log('error', 'error in heartbeat callback', e) + } } // Handle pending heartbeat reference cleanup @@ -853,7 +871,7 @@ export default class RealtimeClient { options?.heartbeatIntervalMs ?? CONNECTION_TIMEOUTS.HEARTBEAT_INTERVAL this.worker = options?.worker ?? false this.accessToken = options?.accessToken ?? null - + this.heartbeatCallback = options?.heartbeatCallback ?? noop // Handle special cases if (options?.params) this.params = options.params if (options?.logger) this.logger = options.logger diff --git a/test/RealtimeClient.heartbeatCallback.test.ts b/test/RealtimeClient.heartbeatCallback.test.ts new file mode 100644 index 00000000..7d469a6d --- /dev/null +++ b/test/RealtimeClient.heartbeatCallback.test.ts @@ -0,0 +1,326 @@ +import { afterEach, beforeEach, describe, expect, test, vi } from 'vitest' +import { WebSocket as MockWebSocket } from 'mock-socket' +import RealtimeClient, { HeartbeatStatus } from '../src/RealtimeClient' +import { + setupRealtimeTest, + cleanupRealtimeTest, + TestSetup, +} from './helpers/setup' + +let testSetup: TestSetup + +beforeEach(() => (testSetup = setupRealtimeTest())) +afterEach(() => cleanupRealtimeTest(testSetup)) + +describe('heartbeatCallback option', () => { + test('should set default heartbeatCallback to noop when not provided', () => { + const socket = new RealtimeClient(testSetup.url, { + params: { apikey: '123456789' }, + }) + + expect(typeof socket.heartbeatCallback).toBe('function') + expect(() => socket.heartbeatCallback('sent')).not.toThrow() + }) + + test('should set custom heartbeatCallback when provided in options', () => { + const mockCallback = vi.fn() + const socket = new RealtimeClient(testSetup.url, { + params: { apikey: '123456789' }, + heartbeatCallback: mockCallback, + }) + + expect(socket.heartbeatCallback).toBe(mockCallback) + }) + + test('should call heartbeatCallback with "sent" when heartbeat is sent', async () => { + const mockCallback = vi.fn() + const socket = new RealtimeClient(testSetup.url, { + params: { apikey: '123456789' }, + heartbeatCallback: mockCallback, + }) + + socket.connect() + + // Wait for connection to be established + await new Promise((resolve) => setTimeout(resolve, 50)) + + // Mock the connection as connected + vi.spyOn(socket, 'isConnected').mockReturnValue(true) + + // Send heartbeat + await socket.sendHeartbeat() + + expect(mockCallback).toHaveBeenCalledWith('sent') + }) + + test('should call heartbeatCallback with "disconnected" when heartbeat sent while disconnected', async () => { + const mockCallback = vi.fn() + const socket = new RealtimeClient(testSetup.url, { + params: { apikey: '123456789' }, + heartbeatCallback: mockCallback, + }) + + // Ensure socket is disconnected + vi.spyOn(socket, 'isConnected').mockReturnValue(false) + + // Send heartbeat while disconnected + await socket.sendHeartbeat() + + expect(mockCallback).toHaveBeenCalledWith('disconnected') + }) + + test('should call heartbeatCallback with "timeout" when heartbeat times out', async () => { + const mockCallback = vi.fn() + const socket = new RealtimeClient(testSetup.url, { + params: { apikey: '123456789' }, + heartbeatCallback: mockCallback, + }) + + socket.connect() + + // Wait for connection to be established + await new Promise((resolve) => setTimeout(resolve, 50)) + + // Mock the connection as connected + vi.spyOn(socket, 'isConnected').mockReturnValue(true) + + // Set a pending heartbeat to simulate timeout condition + socket.pendingHeartbeatRef = 'test-ref' + + // Send heartbeat - should trigger timeout + await socket.sendHeartbeat() + + expect(mockCallback).toHaveBeenCalledWith('timeout') + }) + + test('should call heartbeatCallback with "ok" when heartbeat response is successful', () => { + const mockCallback = vi.fn() + const socket = new RealtimeClient(testSetup.url, { + params: { apikey: '123456789' }, + heartbeatCallback: mockCallback, + }) + + socket.connect() + + // Mock the connection + const mockConn = { + onmessage: null as any, + readyState: MockWebSocket.OPEN, + } + socket.conn = mockConn as any + + // Simulate heartbeat response message + const heartbeatResponse = { + topic: 'phoenix', + event: 'phx_reply', + payload: { status: 'ok' }, + ref: '1', + } + + // Trigger message handling + socket['_onConnMessage']({ data: JSON.stringify(heartbeatResponse) }) + + expect(mockCallback).toHaveBeenCalledWith('ok') + }) + + test('should call heartbeatCallback with "error" when heartbeat response indicates error', () => { + const mockCallback = vi.fn() + const socket = new RealtimeClient(testSetup.url, { + params: { apikey: '123456789' }, + heartbeatCallback: mockCallback, + }) + + socket.connect() + + // Mock the connection + const mockConn = { + onmessage: null as any, + readyState: MockWebSocket.OPEN, + } + socket.conn = mockConn as any + + // Simulate heartbeat error response message + const heartbeatResponse = { + topic: 'phoenix', + event: 'phx_reply', + payload: { status: 'error' }, + ref: '1', + } + + // Trigger message handling + socket['_onConnMessage']({ data: JSON.stringify(heartbeatResponse) }) + + expect(mockCallback).toHaveBeenCalledWith('error') + }) + + test('should call heartbeatCallback multiple times for different heartbeat events', async () => { + const mockCallback = vi.fn() + const socket = new RealtimeClient(testSetup.url, { + params: { apikey: '123456789' }, + heartbeatCallback: mockCallback, + }) + + socket.connect() + + // Wait for connection to be established + await new Promise((resolve) => setTimeout(resolve, 50)) + + // Mock the connection as connected + vi.spyOn(socket, 'isConnected').mockReturnValue(true) + + // Send heartbeat (should call with 'sent') + await socket.sendHeartbeat() + + // Mock the connection for message handling + const mockConn = { + onmessage: null as any, + readyState: MockWebSocket.OPEN, + } + socket.conn = mockConn as any + + // Simulate successful heartbeat response (should call with 'ok') + const heartbeatResponse = { + topic: 'phoenix', + event: 'phx_reply', + payload: { status: 'ok' }, + ref: '1', + } + + socket['_onConnMessage']({ data: JSON.stringify(heartbeatResponse) }) + + // Verify both calls were made + expect(mockCallback).toHaveBeenCalledTimes(2) + expect(mockCallback).toHaveBeenNthCalledWith(1, 'sent') + expect(mockCallback).toHaveBeenNthCalledWith(2, 'ok') + }) + + test('should handle heartbeatCallback errors gracefully in sendHeartbeat', async () => { + const errorCallback = vi.fn().mockImplementation(() => { + throw new Error('Callback error') + }) + + const socket = new RealtimeClient(testSetup.url, { + params: { apikey: '123456789' }, + heartbeatCallback: errorCallback, + }) + + socket.connect() + + // Wait for connection to be established + await new Promise((resolve) => setTimeout(resolve, 50)) + + // Mock the connection as connected + vi.spyOn(socket, 'isConnected').mockReturnValue(true) + + // Mock the log method to verify error logging + const logSpy = vi.spyOn(socket, 'log') + + // Send heartbeat - should not throw despite callback error + await expect(socket.sendHeartbeat()).resolves.not.toThrow() + + // Callback should still be called + expect(errorCallback).toHaveBeenCalledWith('sent') + + // Error should be logged + expect(logSpy).toHaveBeenCalledWith( + 'error', + 'error in heartbeat callback', + expect.any(Error) + ) + }) + + test('should handle heartbeatCallback errors gracefully in message handling', () => { + const errorCallback = vi.fn().mockImplementation(() => { + throw new Error('Callback error') + }) + + const socket = new RealtimeClient(testSetup.url, { + params: { apikey: '123456789' }, + heartbeatCallback: errorCallback, + }) + + socket.connect() + + // Mock the connection + const mockConn = { + onmessage: null as any, + readyState: MockWebSocket.OPEN, + } + socket.conn = mockConn as any + + // Mock the log method to verify error logging + const logSpy = vi.spyOn(socket, 'log') + + // Simulate heartbeat response message - should not throw despite callback error + const heartbeatResponse = { + topic: 'phoenix', + event: 'phx_reply', + payload: { status: 'ok' }, + ref: '1', + } + + expect(() => { + socket['_onConnMessage']({ data: JSON.stringify(heartbeatResponse) }) + }).not.toThrow() + + // Callback should still be called + expect(errorCallback).toHaveBeenCalledWith('ok') + + // Error should be logged + expect(logSpy).toHaveBeenCalledWith( + 'error', + 'error in heartbeat callback', + expect.any(Error) + ) + }) + + test('should work with onHeartbeat method to update callback', () => { + const initialCallback = vi.fn() + const updatedCallback = vi.fn() + + const socket = new RealtimeClient(testSetup.url, { + params: { apikey: '123456789' }, + heartbeatCallback: initialCallback, + }) + + // Verify initial callback is set + expect(socket.heartbeatCallback).toBe(initialCallback) + + // Update callback using onHeartbeat method + socket.onHeartbeat(updatedCallback) + + // Verify callback is updated + expect(socket.heartbeatCallback).toBe(updatedCallback) + + // Call the callback + socket.heartbeatCallback('sent') + + // Verify updated callback is called, not initial + expect(updatedCallback).toHaveBeenCalledWith('sent') + expect(initialCallback).not.toHaveBeenCalled() + }) + + test('should handle all HeartbeatStatus values correctly', () => { + const mockCallback = vi.fn() + const socket = new RealtimeClient(testSetup.url, { + params: { apikey: '123456789' }, + heartbeatCallback: mockCallback, + }) + + const statuses: HeartbeatStatus[] = [ + 'sent', + 'ok', + 'error', + 'timeout', + 'disconnected', + ] + + // Test each status + statuses.forEach((status) => { + socket.heartbeatCallback(status) + expect(mockCallback).toHaveBeenCalledWith(status) + }) + + expect(mockCallback).toHaveBeenCalledTimes(statuses.length) + }) +}) diff --git a/test/RealtimeClient.lifecycle.test.ts b/test/RealtimeClient.lifecycle.test.ts index 113d7af2..664e9797 100644 --- a/test/RealtimeClient.lifecycle.test.ts +++ b/test/RealtimeClient.lifecycle.test.ts @@ -72,6 +72,28 @@ describe('constructor', () => { }) assert.equal(socket.transport, null) }) + + test('sets heartbeatCallback when provided in options', () => { + const mockCallback = () => {} + const socket = new RealtimeClient(testSetup.url, { + params: { apikey: '123456789' }, + heartbeatCallback: mockCallback, + }) + + assert.equal(socket.heartbeatCallback, mockCallback) + }) + + test('defaults heartbeatCallback to noop when not provided', () => { + const socket = new RealtimeClient(testSetup.url, { + params: { apikey: '123456789' }, + }) + + // Should be a function (noop) + assert.equal(typeof socket.heartbeatCallback, 'function') + + // Should not throw when called + assert.doesNotThrow(() => socket.heartbeatCallback('sent')) + }) }) describe('connect with WebSocket', () => { diff --git a/test/helpers/lifecycle.ts b/test/helpers/lifecycle.ts index 52b99ea4..40ab828c 100644 --- a/test/helpers/lifecycle.ts +++ b/test/helpers/lifecycle.ts @@ -1,6 +1,5 @@ import { vi } from 'vitest' import RealtimeClient from '../../src/RealtimeClient' -import RealtimeChannel from '../../src/RealtimeChannel' /** * Data-driven fixtures for lifecycle testing scenarios @@ -112,6 +111,21 @@ export const fixtures = { heartbeatIntervalMs: 60000, }, }, + { + name: 'sets heartbeatCallback', + options: { + params: { apikey: '123456789' }, + heartbeatCallback: () => {}, + }, + expected: { + channelsLength: 0, + sendBufferLength: 0, + ref: 0, + transport: null, + timeout: 10000, + heartbeatIntervalMs: 25000, + }, + }, ], }