diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 29a2841b25..5502a87566 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -907,6 +907,37 @@ importers: specifier: ^2.0.22 version: 2.0.22 + services/apps/kafka_connect_monitor: + dependencies: + '@crowd/logging': + specifier: workspace:* + version: link:../../libs/logging + '@types/express': + specifier: ^4.17.17 + version: 4.17.21 + '@types/node': + specifier: ^20.8.2 + version: 20.12.7 + bunyan-middleware: + specifier: ^1.0.2 + version: 1.0.2 + express: + specifier: ^4.18.2 + version: 4.21.2 + prom-client: + specifier: ^15.1.0 + version: 15.1.3 + tsx: + specifier: ^4.7.1 + version: 4.7.3 + typescript: + specifier: ^5.6.3 + version: 5.6.3 + devDependencies: + nodemon: + specifier: ^2.0.22 + version: 2.0.22 + services/apps/members_enrichment_worker: dependencies: '@crowd/archetype-standard': @@ -5269,6 +5300,9 @@ packages: bindings@1.5.0: resolution: {integrity: sha512-p2q/t/mhvuOj/UeLlV6566GD/guowlr0hHxClI0W9m7MWYkL1F0hLo+0Aexs9HSPCtR1SXQ0TD3MMKrXZajbiQ==} + bintrees@1.0.2: + resolution: {integrity: sha512-VOMgTMwjAaUG580SXn3LacVgjurrbMme7ZZNYGSSV7mmtY6QQRh0Eg3pwIcntQ77DErK1L0NxkbetjcoXzVwKw==} + bl@4.1.0: resolution: {integrity: sha512-1W07cM9gS6DcLperZfFSj+bWLtaPGSOHWhPiGzXmvVJbRLdG82sH/Kn8EtW1VqWVA54AKf2h5k5BbnIbwF3h6w==} @@ -8444,6 +8478,10 @@ packages: resolution: {integrity: sha512-cdGef/drWFoydD1JsMzuFf8100nZl+GT+yacc2bEced5f9Rjk4z+WtFUTBu9PhOi9j/jfmBPu0mMEY4wIdAF8A==} engines: {node: '>= 0.6.0'} + prom-client@15.1.3: + resolution: {integrity: sha512-6ZiOBfCywsD4k1BN9IX0uZhF+tJkV8q8llP64G5Hajs4JOeVLPCwpPVcpXy3BwYiUGgyJzsJJQeOIv7+hDSq8g==} + engines: {node: ^16 || ^18 || >=20} + proto-list@1.2.4: resolution: {integrity: sha512-vtK/94akxsTMhe0/cbfpR+syPuszcuwhqVjJq26CuNDgFGj682oRBXOP5MJpv2r7JtE8MsiepGIqvvOTBwn2vA==} @@ -9190,6 +9228,9 @@ packages: resolution: {integrity: sha512-a20gEsvHnWe0ygBY8JbxoM4w3SJdhc7ZAuxkLqh+nvNQN2IOt0B5lLgM490X5Hl8FF0dl0tOf2ewFYAlIFgzVA==} engines: {node: '>=4.5'} + tdigest@0.1.2: + resolution: {integrity: sha512-+G0LLgjjo9BZX2MfdvPfH+MKLCrxlXSYec5DaPYP1fe6Iyhf0/fSmJ0bFiZ1F8BT6cGXl2LpltQptzjXKWEkKA==} + teeny-request@7.2.0: resolution: {integrity: sha512-SyY0pek1zWsi0LRVAALem+avzMLc33MKW/JLLakdP4s9+D7+jHcy5x6P+h94g2QNZsAqQNfX5lsbd3WSeJXrrw==} engines: {node: '>=10'} @@ -14598,6 +14639,8 @@ snapshots: dependencies: file-uri-to-path: 1.0.0 + bintrees@1.0.2: {} + bl@4.1.0: dependencies: buffer: 5.7.1 @@ -18141,6 +18184,11 @@ snapshots: process@0.11.10: {} + prom-client@15.1.3: + dependencies: + '@opentelemetry/api': 1.6.0 + tdigest: 0.1.2 + proto-list@1.2.4: {} proto3-json-serializer@2.0.2: @@ -19100,6 +19148,10 @@ snapshots: safe-buffer: 5.2.1 yallist: 3.1.1 + tdigest@0.1.2: + dependencies: + bintrees: 1.0.2 + teeny-request@7.2.0(encoding@0.1.13): dependencies: http-proxy-agent: 5.0.0 diff --git a/scripts/builders/kafka-connect-monitor.env b/scripts/builders/kafka-connect-monitor.env new file mode 100644 index 0000000000..b30902a889 --- /dev/null +++ b/scripts/builders/kafka-connect-monitor.env @@ -0,0 +1,4 @@ +DOCKERFILE="./services/docker/Dockerfile.kafka_connect_monitor" +CONTEXT="../" +REPO="sjc.ocir.io/axbydjxa5zuh/kafka-connect-monitor" +SERVICES="kafka-connect-monitor" \ No newline at end of file diff --git a/scripts/services/docker/Dockerfile.kafka_connect_monitor b/scripts/services/docker/Dockerfile.kafka_connect_monitor new file mode 100644 index 0000000000..c932d5f028 --- /dev/null +++ b/scripts/services/docker/Dockerfile.kafka_connect_monitor @@ -0,0 +1,22 @@ +FROM node:20-alpine as builder + +RUN apk add --no-cache python3 make g++ + +WORKDIR /usr/crowd/app +RUN npm install -g corepack@latest && corepack enable pnpm && corepack prepare pnpm@9.15.0 --activate + +COPY ./pnpm-workspace.yaml ./pnpm-lock.yaml ./ +RUN pnpm fetch + +COPY ./services ./services +RUN pnpm i --frozen-lockfile + +FROM node:20-bookworm-slim as runner + +WORKDIR /usr/crowd/app +RUN npm install -g corepack@latest && corepack enable pnpm && corepack prepare pnpm@9.15.0 --activate && apt update && apt install -y ca-certificates --no-install-recommends && rm -rf /var/lib/apt/lists/* + +COPY --from=builder /usr/crowd/app/node_modules ./node_modules +COPY --from=builder /usr/crowd/app/services/base.tsconfig.json ./services/base.tsconfig.json +COPY --from=builder /usr/crowd/app/services/libs ./services/libs +COPY --from=builder /usr/crowd/app/services/apps/kafka_connect_monitor/ ./services/apps/kafka_connect_monitor diff --git a/scripts/services/docker/Dockerfile.kafka_connect_monitor.dockerignore b/scripts/services/docker/Dockerfile.kafka_connect_monitor.dockerignore new file mode 100644 index 0000000000..3d2791ea2a --- /dev/null +++ b/scripts/services/docker/Dockerfile.kafka_connect_monitor.dockerignore @@ -0,0 +1,18 @@ +**/.git +**/node_modules +**/venv* +**/.webpack +**/.serverless +**/.env +**/.env.* +**/.idea +**/.vscode +**/dist +.vscode/ +.github/ +frontend/ +scripts/ +.flake8 +*.md +Makefile +backend/ \ No newline at end of file diff --git a/services/apps/kafka_connect_monitor/package.json b/services/apps/kafka_connect_monitor/package.json new file mode 100644 index 0000000000..617197f3f5 --- /dev/null +++ b/services/apps/kafka_connect_monitor/package.json @@ -0,0 +1,27 @@ +{ + "name": "@crowd/kafka-connect-monitor", + "scripts": { + "start": "SERVICE=kafka-connect-monitor tsx src/main.ts", + "start:debug:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=kafka-connect-monitor LOG_LEVEL=trace tsx --inspect=0.0.0.0:9240 src/main.ts", + "start:debug": "SERVICE=kafka-connect-monitor LOG_LEVEL=trace tsx --inspect=0.0.0.0:9240 src/main.ts", + "dev:local": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug:local", + "dev": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug", + "lint": "npx eslint --ext .ts src --max-warnings=0", + "format": "npx prettier --write \"src/**/*.ts\"", + "format-check": "npx prettier --check .", + "tsc-check": "tsc --noEmit" + }, + "dependencies": { + "@crowd/logging": "workspace:*", + "@types/express": "^4.17.17", + "@types/node": "^20.8.2", + "bunyan-middleware": "^1.0.2", + "express": "^4.18.2", + "prom-client": "^15.1.0", + "tsx": "^4.7.1", + "typescript": "^5.6.3" + }, + "devDependencies": { + "nodemon": "^2.0.22" + } +} diff --git a/services/apps/kafka_connect_monitor/src/main.ts b/services/apps/kafka_connect_monitor/src/main.ts new file mode 100644 index 0000000000..93e0a2b68e --- /dev/null +++ b/services/apps/kafka_connect_monitor/src/main.ts @@ -0,0 +1,53 @@ +import bunyanMiddleware from 'bunyan-middleware' +import express, { ErrorRequestHandler, Request, RequestHandler } from 'express' + +import { Logger, getChildLogger, getServiceLogger } from '@crowd/logging' + +import { installConnectorHealthRoutes } from './routes/health' + +const log = getServiceLogger() +const PORT = 8085 + +setImmediate(async () => { + const app = express() + + app.use('/health', async (req, res) => { + res.sendStatus(200) + }) + + app.use(express.json()) + app.use(loggingMiddleware(log)) + + // Install routes + installConnectorHealthRoutes(app, log) + + app.use(errorMiddleware()) + + app.listen(PORT, () => { + log.info(`Kafka Connect Monitor listening on port ${PORT}!`) + }) +}) + +export const errorMiddleware = (): ErrorRequestHandler => { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + return (err, req, res, _next) => { + const request = req as ApiRequest + + request.log.error(err, 'Error occurred!') + res.status(500).send('Internal Server Error') + } +} + +export interface ApiRequest extends Request { + log: Logger +} + +export const loggingMiddleware = (log: Logger): RequestHandler => { + return bunyanMiddleware({ + headerName: 'x-request-id', + propertyName: 'requestId', + logName: `requestId`, + logger: getChildLogger('apiRequest', log), + level: 'trace', + }) +} diff --git a/services/apps/kafka_connect_monitor/src/routes/health.ts b/services/apps/kafka_connect_monitor/src/routes/health.ts new file mode 100644 index 0000000000..a6fa2e6cda --- /dev/null +++ b/services/apps/kafka_connect_monitor/src/routes/health.ts @@ -0,0 +1,114 @@ +import { Request, Response, Router } from 'express' +import { Gauge, Registry } from 'prom-client' + +import { Logger } from '@crowd/logging' + +const KAFKA_CONNECT_URL = 'http://localhost:8083' + +// All possible Kafka Connect states +const CONNECTOR_STATES = ['RUNNING', 'FAILED', 'PAUSED', 'UNASSIGNED'] as const +const TASK_STATES = ['RUNNING', 'FAILED', 'PAUSED', 'UNASSIGNED'] as const + +interface ConnectorTask { + id: number + state: string + worker_id: string +} + +interface ConnectorStatus { + name: string + connector: { + state: string + worker_id: string + } + tasks: ConnectorTask[] + type: string +} + +interface ConnectorsResponse { + [connectorName: string]: { + status: ConnectorStatus + } +} + +export function installConnectorHealthRoutes(app: Router, log: Logger): void { + app.get('/connector-health', async (req: Request, res: Response) => { + try { + // Create a new registry for this request + const register = new Registry() + + // Fetch connector statuses from Kafka Connect + const connectorsUrl = `${KAFKA_CONNECT_URL}/connectors?expand=status` + const response = await fetch(connectorsUrl) + + if (!response.ok) { + log.error( + { status: response.status, statusText: response.statusText }, + 'Failed to fetch connector status from Kafka Connect', + ) + res.status(500).json({ + error: 'Failed to fetch connector status from Kafka Connect', + status: response.status, + }) + return + } + + const data = (await response.json()) as ConnectorsResponse + + // Create gauges for connector status (one-hot encoding per state) + const connectorStatusGauge = new Gauge({ + name: 'connector_status', + help: 'Connector status (one-hot: 1 for active state, 0 otherwise)', + labelNames: ['connector', 'worker_id', 'state'], + registers: [register], + }) + + // Create gauges for task status (one-hot encoding per state) + const taskStatusGauge = new Gauge({ + name: 'task_status', + help: 'Task status (one-hot: 1 for active state, 0 otherwise)', + labelNames: ['connector', 'task_id', 'worker_id', 'state'], + registers: [register], + }) + + // Process each connector + for (const connectorData of Object.values(data)) { + const status = connectorData.status + + // Set connector status metric (one-hot: 1 for current state, 0 for all others) + for (const state of CONNECTOR_STATES) { + connectorStatusGauge.set( + { + connector: status.name, + worker_id: status.connector.worker_id, + state, + }, + status.connector.state === state ? 1 : 0, + ) + } + + // Set task status metrics (one-hot: 1 for current state, 0 for all others) + for (const task of status.tasks) { + for (const state of TASK_STATES) { + taskStatusGauge.set( + { + connector: status.name, + task_id: task.id.toString(), + worker_id: task.worker_id, + state, + }, + task.state === state ? 1 : 0, + ) + } + } + } + + // Return metrics in Prometheus format + res.set('Content-Type', register.contentType) + res.send(await register.metrics()) + } catch (err) { + log.error(err, 'Error fetching connector health') + res.status(500).json({ error: 'Internal server error' }) + } + }) +} diff --git a/services/apps/kafka_connect_monitor/tsconfig.json b/services/apps/kafka_connect_monitor/tsconfig.json new file mode 100644 index 0000000000..bf7f183850 --- /dev/null +++ b/services/apps/kafka_connect_monitor/tsconfig.json @@ -0,0 +1,4 @@ +{ + "extends": "../../base.tsconfig.json", + "include": ["src/**/*"] +}