From 62c1617fb985940ed067421f43d16d57ae94338c Mon Sep 17 00:00:00 2001 From: Roberto Bianchi Date: Fri, 25 Apr 2025 20:05:39 +0200 Subject: [PATCH 01/25] feat(ws): enable websocket log streaming Signed-off-by: Roberto Bianchi update Signed-off-by: Roberto Bianchi update Signed-off-by: Roberto Bianchi --- package-lock.json | 9 ++ package.json | 21 ++-- test.ts | 28 +++++ web/backend/openapi.json | 2 +- web/backend/plugins/websocket.ts | 6 + web/backend/routes/root.ts | 76 +++++++++--- web/backend/schemas/index.ts | 3 + web/frontend/src/api.ts | 9 +- web/frontend/src/client/backend-types.d.ts | 10 +- web/frontend/src/client/backend.openapi.json | 2 +- web/frontend/src/client/backend.ts | 30 ----- .../components/application-logs/AppLogs.tsx | 110 +++++++++++++----- 12 files changed, 207 insertions(+), 99 deletions(-) create mode 100644 test.ts create mode 100644 web/backend/plugins/websocket.ts diff --git a/package-lock.json b/package-lock.json index a217bea8..62e6e8eb 100644 --- a/package-lock.json +++ b/package-lock.json @@ -8,6 +8,7 @@ "name": "@platformatic/watt-admin", "version": "0.2.0", "dependencies": { + "@fastify/websocket": "^11.0.2", "@inquirer/prompts": "^7.3.3", "@platformatic/composer": "^2.60.0", "@platformatic/control": "^2.60.0", @@ -16,6 +17,7 @@ "@platformatic/vite": "^2.60.0", "fastify": "^5.0.0", "proxyquire": "^2.1.3", + "react-use-websocket": "^4.13.0", "split2": "4.2.0", "wattpm": "^2.60.0" }, @@ -31,6 +33,7 @@ "@types/proxyquire": "1.3.31", "@types/react-dom": "^19.0.4", "@types/split2": "4.2.3", + "@types/ws": "^8.18.1", "@vitejs/plugin-react": "^4.3.3", "autoprefixer": "^10.4.20", "borp": "^0.19.0", @@ -12472,6 +12475,12 @@ "react-dom": ">=16.0.0" } }, + "node_modules/react-use-websocket": { + "version": "4.13.0", + "resolved": "https://registry.npmjs.org/react-use-websocket/-/react-use-websocket-4.13.0.tgz", + "integrity": "sha512-anMuVoV//g2N76Wxqvqjjo1X48r9Np3y1/gMl7arX84tAPXdy5R7sB5lO5hvCzQRYjqXwV8XMAiEBOUbyrZFrw==", + "license": "MIT" + }, "node_modules/read-cache": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/read-cache/-/read-cache-1.0.0.tgz", diff --git a/package.json b/package.json index 6647d362..3b5e9641 100644 --- a/package.json +++ b/package.json @@ -17,40 +17,43 @@ "lint:fix": "eslint . --fix" }, "dependencies": { + "@fastify/websocket": "^11.0.2", "@inquirer/prompts": "^7.3.3", "@platformatic/composer": "^2.60.0", + "@platformatic/control": "^2.60.0", "@platformatic/runtime": "^2.60.0", "@platformatic/service": "^2.60.0", - "@platformatic/control": "^2.60.0", "@platformatic/vite": "^2.60.0", + "fastify": "^5.0.0", "proxyquire": "^2.1.3", + "react-use-websocket": "^4.13.0", "split2": "4.2.0", - "fastify": "^5.0.0", "wattpm": "^2.60.0" }, "devDependencies": { "@fastify/type-provider-json-schema-to-ts": "^5.0.0", - "@types/proxyquire": "1.3.31", - "fastify-tsconfig": "^2.0.0", - "borp": "^0.19.0", "@inquirer/testing": "^2.1.45", - "@types/split2": "4.2.3", - "eslint": "^9.22.0", - "typescript": "^5.5.4", - "neostandard": "^0.12.1", "@platformatic/client-cli": "^2.58.0", "@platformatic/ui-components": "^0.15.2", "@types/d3": "^7.4.3", + "@types/proxyquire": "1.3.31", "@types/react-dom": "^19.0.4", + "@types/split2": "4.2.3", + "@types/ws": "^8.18.1", "@vitejs/plugin-react": "^4.3.3", "autoprefixer": "^10.4.20", + "borp": "^0.19.0", "d3": "~7.9.0", "dayjs": "^1.11.13", + "eslint": "^9.22.0", + "fastify-tsconfig": "^2.0.0", + "neostandard": "^0.12.1", "react": "^19.0.0", "react-dom": "^19.0.0", "react-router-dom": "^7.0.0", "source-map-support": "^0.5.21", "tailwindcss": "^3.4.15", + "typescript": "^5.5.4", "use-error-boundary": "^2.0.6", "vite": "^5.4.11", "vitest": "^3.0.8", diff --git a/test.ts b/test.ts new file mode 100644 index 00000000..a1bdc834 --- /dev/null +++ b/test.ts @@ -0,0 +1,28 @@ +// FIXME: remove this file, it's only used as test for websocket streams (command to run is `node --experimental-strip-types test.ts`) +import Fastify from 'fastify' +import fastifyWebsocket from '@fastify/websocket' + +const fastify = Fastify() + +await fastify.register(fastifyWebsocket) + +fastify.register(async function (fastify) { + fastify.get('/ws', { websocket: true }, (socket, req) => { + const interval = setInterval(() => { + const message = `Current time: ${new Date().toISOString()}` + socket.send(message) + }, 1000) + + socket.on('close', () => { + clearInterval(interval) + }) + }) +}) + +try { + await fastify.listen({ port: 3000 }) + console.log('Server listening on port 3000') +} catch (err) { + console.error('Error starting server:', err) + process.exit(1) +} diff --git a/web/backend/openapi.json b/web/backend/openapi.json index 13994a84..8bb9f5f8 100644 --- a/web/backend/openapi.json +++ b/web/backend/openapi.json @@ -722,7 +722,7 @@ } } }, - "/runtimes/{pid}/logs": { + "/ws": { "get": { "parameters": [ { diff --git a/web/backend/plugins/websocket.ts b/web/backend/plugins/websocket.ts new file mode 100644 index 00000000..62e97db5 --- /dev/null +++ b/web/backend/plugins/websocket.ts @@ -0,0 +1,6 @@ +import websocket from '@fastify/websocket' +import { FastifyInstance } from 'fastify' + +export default async function (fastify: FastifyInstance) { + await fastify.register(websocket) +} diff --git a/web/backend/routes/root.ts b/web/backend/routes/root.ts index 5646ba03..3294960f 100644 --- a/web/backend/routes/root.ts +++ b/web/backend/routes/root.ts @@ -1,8 +1,7 @@ import { FastifyInstance } from 'fastify' import { JsonSchemaToTsProvider } from '@fastify/type-provider-json-schema-to-ts' import { RuntimeApiClient } from '@platformatic/control' -import { getLogsFromReadable } from '../utils/log' -import { metricResponseSchema, SelectableRuntime, selectableRuntimeSchema } from '../schemas' +import { metricResponseSchema, PidParam, pidParamSchema, SelectableRuntime, selectableRuntimeSchema } from '../schemas' export default async function (fastify: FastifyInstance) { const typedFastify = fastify.withTypeProvider() @@ -42,7 +41,7 @@ export default async function (fastify: FastifyInstance) { typedFastify.get('/runtimes/:pid/health', { schema: { - params: { type: 'object', properties: { pid: { type: 'number' } }, required: ['pid'] }, + params: pidParamSchema, response: { 200: { type: 'object', @@ -71,10 +70,7 @@ export default async function (fastify: FastifyInstance) { }) typedFastify.get('/runtimes/:pid/metrics', { - schema: { - params: { type: 'object', properties: { pid: { type: 'number' } }, required: ['pid'] }, - response: { 200: metricResponseSchema } - }, + schema: { params: pidParamSchema, response: { 200: metricResponseSchema } }, }, async ({ params: { pid } }) => { return typedFastify.mappedMetrics[pid]?.aggregated || emptyMetrics }) @@ -114,7 +110,7 @@ export default async function (fastify: FastifyInstance) { typedFastify.get('/runtimes/:pid/services', { schema: { - params: { type: 'object', properties: { pid: { type: 'number' } }, required: ['pid'] }, + params: pidParamSchema, response: { 200: { type: 'object', @@ -202,12 +198,59 @@ export default async function (fastify: FastifyInstance) { return api.getRuntimeServices(request.params.pid) }) - typedFastify.get('/runtimes/:pid/logs', { - schema: { - params: { type: 'object', properties: { pid: { type: 'number' } }, required: ['pid'] } - } - }, async ({ params: { pid }, log }) => - getLogsFromReadable(await api.getRuntimeAllLogsStream(pid), log)) + // const logStream: Record = {} + // typedFastify.get<{ Params: PidParam }>('/runtimes/:pid/logs/ws', { + typedFastify.get<{ Params: PidParam }>('/ws', { + schema: { params: pidParamSchema }, + websocket: true + // }, async (socket, { params: { pid } }) => { + }, async (socket) => { + // try { + + // FIXME: remove this test logic (that is exactly the same as on `/test.ts`) + const interval = setInterval(() => { + const message = `Current time: ${new Date().toISOString()}` + socket.send(message) + }, 1000) + + socket.on('close', () => { + clearInterval(interval) + }) + + // socket.on('close', () => { + // logStream[pid].destroy() + // delete logStream[pid] + // }) + + // logStream[pid].on('data', (chunk) => { + // fastify.log.info({ chunk }, 'Incoming log stream data chunk') + // socket.send(chunk.toString()) + // }) + + // logStream[pid].on('error', (err) => { + // fastify.log.error({ err }, 'Error during log stream') + // if (socket.readyState === WebSocket.OPEN) { + // socket.send(JSON.stringify({ error: err.message })) + // socket.close() + // } + // delete logStream[pid] + // }) + + // logStream[pid].on('end', () => { + // if (socket.readyState === WebSocket.OPEN) { + // socket.close() + // delete logStream[pid] + // } + // }) + // } catch (err) { + // fastify.log.error({ err }, 'Fatal error caught on log stream') + // if (socket.readyState === WebSocket.OPEN) { + // socket.send(JSON.stringify({ error: 'Failed to get log stream' })) + // socket.close() + // delete logStream[pid] + // } + // } + }) typedFastify.get('/runtimes/:pid/openapi/:serviceId', { schema: { @@ -218,10 +261,7 @@ export default async function (fastify: FastifyInstance) { }) typedFastify.post('/runtimes/:pid/restart', { - schema: { - params: { type: 'object', properties: { pid: { type: 'number' } }, required: ['pid'] }, - body: { type: 'object' } - } + schema: { params: pidParamSchema, body: { type: 'object' } } }, async (request) => { try { await api.restartRuntime(request.params.pid) diff --git a/web/backend/schemas/index.ts b/web/backend/schemas/index.ts index a4b6cc6e..1c8b0fa5 100644 --- a/web/backend/schemas/index.ts +++ b/web/backend/schemas/index.ts @@ -67,6 +67,9 @@ export const metricResponseSchema = { } as const export type MetricsResponse = FromSchema +export const pidParamSchema = { type: 'object', additionalProperties: false, properties: { pid: { type: 'number' } }, required: ['pid'] } as const +export type PidParam = FromSchema + export const selectableRuntimeSchema = { type: 'object', additionalProperties: false, diff --git a/web/frontend/src/api.ts b/web/frontend/src/api.ts index f54d7744..592ae150 100644 --- a/web/frontend/src/api.ts +++ b/web/frontend/src/api.ts @@ -1,8 +1,7 @@ import { getRuntimes, getRuntimesPidHealth, getRuntimesPidMetrics, getRuntimesPidMetricsServiceId, getRuntimesPidMetricsServiceIdWorkerId, getRuntimesPidServices, postRuntimesPidRestart, setBaseUrl } from './client/backend' import { subtractSecondsFromDate } from './utilities/dates' -const host = '/api' -setBaseUrl(host) +setBaseUrl('/api') export const getApiApplication = async () => { const { body } = await getRuntimes({ query: { includeAdmin: false } }) @@ -34,12 +33,6 @@ export const getServices = async (pid: number) => { return body?.services } -export const getLogs = async (id: number) => { - const result = await fetch(`${host}/runtimes/${id}/logs`) - const data = await result.json() - return data -} - export const getServiceHealth = async (pid: number) => { const { body: { status } } = await getRuntimesPidHealth({ path: { pid } }) if (status === 'KO') { diff --git a/web/frontend/src/client/backend-types.d.ts b/web/frontend/src/client/backend-types.d.ts index 8e9c2de2..042cc3fe 100644 --- a/web/frontend/src/client/backend-types.d.ts +++ b/web/frontend/src/client/backend-types.d.ts @@ -85,15 +85,15 @@ export type GetRuntimesPidServicesResponseOK = { 'entrypoint': string; 'producti export type GetRuntimesPidServicesResponses = FullResponse -export type GetRuntimesPidLogsRequest = { +export type GetWsRequest = { path: { 'pid': number; } } -export type GetRuntimesPidLogsResponseOK = unknown -export type GetRuntimesPidLogsResponses = - FullResponse +export type GetWsResponseOK = unknown +export type GetWsResponses = + FullResponse export type GetRuntimesPidOpenapiServiceIdRequest = { path: { @@ -158,7 +158,7 @@ export interface Backend { * @param req - request parameters object * @returns the API response */ - getRuntimesPidLogs(req: GetRuntimesPidLogsRequest): Promise; + getWs(req: GetWsRequest): Promise; /** * @param req - request parameters object * @returns the API response diff --git a/web/frontend/src/client/backend.openapi.json b/web/frontend/src/client/backend.openapi.json index 13994a84..8bb9f5f8 100644 --- a/web/frontend/src/client/backend.openapi.json +++ b/web/frontend/src/client/backend.openapi.json @@ -722,7 +722,7 @@ } } }, - "/runtimes/{pid}/logs": { + "/ws": { "get": { "parameters": [ { diff --git a/web/frontend/src/client/backend.ts b/web/frontend/src/client/backend.ts index 80234014..2ad20232 100644 --- a/web/frontend/src/client/backend.ts +++ b/web/frontend/src/client/backend.ts @@ -220,35 +220,6 @@ const _getRuntimesPidServices = async (url: string, request: Types.GetRuntimesPi export const getRuntimesPidServices: Backend['getRuntimesPidServices'] = async (request: Types.GetRuntimesPidServicesRequest): Promise => { return await _getRuntimesPidServices(baseUrl, request) } -const _getRuntimesPidLogs = async (url: string, request: Types.GetRuntimesPidLogsRequest): Promise => { - const headers: HeadersInit = { - ...defaultHeaders - } - - const response = await fetch(`${url}/runtimes/${request.path['pid']}/logs`, { - headers, - ...defaultFetchParams - }) - - const textResponses = [200] - if (textResponses.includes(response.status)) { - return { - statusCode: response.status as 200, - headers: headersToJSON(response.headers), - body: await response.text() - } - } - const responseType = response.headers.get('content-type')?.startsWith('application/json') ? 'json' : 'text' - return { - statusCode: response.status as 200, - headers: headersToJSON(response.headers), - body: await response[responseType]() - } -} - -export const getRuntimesPidLogs: Backend['getRuntimesPidLogs'] = async (request: Types.GetRuntimesPidLogsRequest): Promise => { - return await _getRuntimesPidLogs(baseUrl, request) -} const _getRuntimesPidOpenapiServiceId = async (url: string, request: Types.GetRuntimesPidOpenapiServiceIdRequest): Promise => { const headers: HeadersInit = { ...defaultHeaders @@ -327,7 +298,6 @@ export default function build (url: string, options?: BuildOptions) { getRuntimesPidMetricsServiceId: _getRuntimesPidMetricsServiceId.bind(url, ...arguments), getRuntimesPidMetricsServiceIdWorkerId: _getRuntimesPidMetricsServiceIdWorkerId.bind(url, ...arguments), getRuntimesPidServices: _getRuntimesPidServices.bind(url, ...arguments), - getRuntimesPidLogs: _getRuntimesPidLogs.bind(url, ...arguments), getRuntimesPidOpenapiServiceId: _getRuntimesPidOpenapiServiceId.bind(url, ...arguments), postRuntimesPidRestart: _postRuntimesPidRestart.bind(url, ...arguments) } diff --git a/web/frontend/src/components/application-logs/AppLogs.tsx b/web/frontend/src/components/application-logs/AppLogs.tsx index 5b44b6df..f4ef7287 100644 --- a/web/frontend/src/components/application-logs/AppLogs.tsx +++ b/web/frontend/src/components/application-logs/AppLogs.tsx @@ -1,5 +1,5 @@ -import React, { useState, useEffect, useRef } from 'react' -import { useInterval } from '../../hooks/useInterval' +import React, { useState, useEffect, useRef, useCallback } from 'react' +import useWebSocket from 'react-use-websocket' import { RICH_BLACK, WHITE, TRANSPARENT, MARGIN_0, OPACITY_15 } from '@platformatic/ui-components/src/components/constants' import styles from './AppLogs.module.css' import typographyStyles from '../../styles/Typography.module.css' @@ -16,12 +16,10 @@ import { DIRECTION_STILL, DIRECTION_TAIL, STATUS_PAUSED_LOGS, - STATUS_RESUMED_LOGS, - REFRESH_INTERVAL_LOGS + STATUS_RESUMED_LOGS } from '../../ui-constants' import LogFilterSelector from './LogFilterSelector' import useAdminStore from '../../useAdminStore' -import { getLogs } from '../../api' interface AppLogsProps { filteredServices: string[]; @@ -50,6 +48,53 @@ const AppLogs: React.FC = ({ filteredServices }) => { const [statusPausedLogs, setStatusPausedLogs] = useState('') const [filteredLogsLengthAtPause, setFilteredLogsLengthAtPause] = useState(0) const [error, setError] = useState(undefined) + const [isPaused, setIsPaused] = useState(false) + + const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:' + const wsUrl = `${protocol}//${window.location.host}/runtimes/${runtimePid}/logs/ws` + + const handleMessage = useCallback((event: MessageEvent) => { + console.log('the event', event) + try { + let logEntry: LogEntry + try { + logEntry = JSON.parse(event.data) + } catch (e) { + logEntry = { + level: 30, + time: new Date().toISOString(), + name: 'unknown', + msg: event.data + } + } + + if (!isPaused) { + setApplicationLogs(prevLogs => [...prevLogs, logEntry]) + } + } catch (err) { + console.error('Error processing log message:', err) + } + }, [isPaused]) + + useWebSocket(wsUrl, { + onOpen: () => { + console.log('WebSocket connected') + setLoading(false) + setError(undefined) + }, + onMessage: handleMessage, + onError: (event) => { + console.error('WebSocket error:', event) + setError(new Error('Failed to connect to log stream')) + setLoading(false) + }, + onClose: () => { + console.log('WebSocket disconnected') + }, + shouldReconnect: () => !!runtimePid, + reconnectAttempts: 10, + reconnectInterval: 3000 + }) useEffect(() => { if (logContentRef.current && scrollDirection === DIRECTION_TAIL && filteredLogs.length > 0) { @@ -71,13 +116,11 @@ const AppLogs: React.FC = ({ filteredServices }) => { if (statusPausedLogs) { switch (statusPausedLogs) { case STATUS_PAUSED_LOGS: - // callApiPauseLogs() - console.log('pause TODO') + setIsPaused(true) break case STATUS_RESUMED_LOGS: - console.log('resume TODO') - // callApiResumeLogs() + setIsPaused(false) break default: @@ -113,29 +156,18 @@ const AppLogs: React.FC = ({ filteredServices }) => { filteredServices ]) - const getData = async (): Promise => { - try { - if (runtimePid) { - const logs = await getLogs(runtimePid) - setApplicationLogs(logs) - setError(undefined) - } - } catch (error) { - setError(error) - } finally { - setLoading(false) - } - } - - useInterval(() => { getData() }, REFRESH_INTERVAL_LOGS) - useEffect(() => { getData() }, [runtimePid]) - useEffect(() => { if (scrollDirection !== DIRECTION_TAIL && filteredLogsLengthAtPause > 0 && filteredLogsLengthAtPause < filteredLogs.length) { setDisplayGoToBottom(true) } }, [scrollDirection, filteredLogs.length, filteredLogsLengthAtPause]) + useEffect(() => { + setApplicationLogs([]) + setFilteredLogs([]) + setLoading(true) + }, [runtimePid]) + function resumeScrolling (): void { setScrollDirection(DIRECTION_TAIL) setDisplayGoToBottom(false) @@ -145,7 +177,7 @@ const AppLogs: React.FC = ({ filteredServices }) => { function saveLogs (): void { let fileData = '' applicationLogs.forEach(log => { - fileData += `${log} + fileData += `${JSON.stringify(log, null, 2)} ` }) @@ -189,6 +221,12 @@ const AppLogs: React.FC = ({ filteredServices }) => { setLastScrollTop(st <= 0 ? 0 : st) } + // Clear logs button + function clearLogs (): void { + setApplicationLogs([]) + setFilteredLogs([]) + } + if (error) { return setError(undefined)} /> } @@ -236,6 +274,24 @@ const AppLogs: React.FC = ({ filteredServices }) => { selected={displayLog === RAW} textClass={typographyStyles.desktopButtonSmall} /> +