Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions scripts/builders/kafka-connect-monitor.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
DOCKERFILE="./services/docker/Dockerfile.kafka_connect_monitor"
CONTEXT="../"
REPO="sjc.ocir.io/axbydjxa5zuh/kafka-connect-monitor"
SERVICES="kafka-connect-monitor"
22 changes: 22 additions & 0 deletions scripts/services/docker/Dockerfile.kafka_connect_monitor
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
FROM node:20-alpine as builder

RUN apk add --no-cache python3 make g++

WORKDIR /usr/crowd/app
RUN npm install -g corepack@latest && corepack enable pnpm && corepack prepare pnpm@9.15.0 --activate

COPY ./pnpm-workspace.yaml ./pnpm-lock.yaml ./
RUN pnpm fetch

COPY ./services ./services
RUN pnpm i --frozen-lockfile

FROM node:20-bookworm-slim as runner

WORKDIR /usr/crowd/app
RUN npm install -g corepack@latest && corepack enable pnpm && corepack prepare pnpm@9.15.0 --activate && apt update && apt install -y ca-certificates --no-install-recommends && rm -rf /var/lib/apt/lists/*

COPY --from=builder /usr/crowd/app/node_modules ./node_modules
COPY --from=builder /usr/crowd/app/services/base.tsconfig.json ./services/base.tsconfig.json
COPY --from=builder /usr/crowd/app/services/libs ./services/libs
COPY --from=builder /usr/crowd/app/services/apps/kafka_connect_monitor/ ./services/apps/kafka_connect_monitor
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
**/.git
**/node_modules
**/venv*
**/.webpack
**/.serverless
**/.env
**/.env.*
**/.idea
**/.vscode
**/dist
.vscode/
.github/
frontend/
scripts/
.flake8
*.md
Makefile
backend/
27 changes: 27 additions & 0 deletions services/apps/kafka_connect_monitor/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"name": "@crowd/kafka-connect-monitor",
"scripts": {
"start": "SERVICE=kafka-connect-monitor tsx src/main.ts",
"start:debug:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=kafka-connect-monitor LOG_LEVEL=trace tsx --inspect=0.0.0.0:9240 src/main.ts",
"start:debug": "SERVICE=kafka-connect-monitor LOG_LEVEL=trace tsx --inspect=0.0.0.0:9240 src/main.ts",
"dev:local": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug:local",
"dev": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug",
"lint": "npx eslint --ext .ts src --max-warnings=0",
"format": "npx prettier --write \"src/**/*.ts\"",
"format-check": "npx prettier --check .",
"tsc-check": "tsc --noEmit"
},
"dependencies": {
"@crowd/logging": "workspace:*",
"@types/express": "^4.17.17",
"@types/node": "^20.8.2",
"bunyan-middleware": "^1.0.2",
"express": "^4.18.2",
"prom-client": "^15.1.0",
"tsx": "^4.7.1",
"typescript": "^5.6.3"
},
"devDependencies": {
"nodemon": "^2.0.22"
}
}
53 changes: 53 additions & 0 deletions services/apps/kafka_connect_monitor/src/main.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import bunyanMiddleware from 'bunyan-middleware'
import express, { ErrorRequestHandler, Request, RequestHandler } from 'express'

import { Logger, getChildLogger, getServiceLogger } from '@crowd/logging'

import { installConnectorHealthRoutes } from './routes/health'

const log = getServiceLogger()
const PORT = 8085

setImmediate(async () => {
const app = express()

app.use('/health', async (req, res) => {
res.sendStatus(200)
})

app.use(express.json())
app.use(loggingMiddleware(log))

// Install routes
installConnectorHealthRoutes(app, log)

app.use(errorMiddleware())

app.listen(PORT, () => {
log.info(`Kafka Connect Monitor listening on port ${PORT}!`)
})
})

export const errorMiddleware = (): ErrorRequestHandler => {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
return (err, req, res, _next) => {
const request = req as ApiRequest

request.log.error(err, 'Error occurred!')
res.status(500).send('Internal Server Error')
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Middleware Logger Property Mismatch

The loggingMiddleware is configured with propertyName: 'requestId' (line 49), which means bunyan-middleware attaches the logger to req.requestId, not req.log. However, the errorMiddleware tries to access request.log.error() on line 36, which will be undefined. This will cause a runtime error when an exception occurs. The propertyName should be 'log' or the error handler should use request.requestId.error().

Fix in Cursor Fix in Web

}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Middleware Order Causes Error Handling Failure

The errorMiddleware tries to use req.log for error reporting. However, loggingMiddleware is placed after express.json(). If JSON parsing fails, req.log won't be initialized, leading to a TypeError when the error handler attempts to access req.log.error().

Additional Locations (1)

Fix in Cursor Fix in Web


export interface ApiRequest extends Request {
log: Logger
}

export const loggingMiddleware = (log: Logger): RequestHandler => {
return bunyanMiddleware({
headerName: 'x-request-id',
propertyName: 'requestId',
logName: `requestId`,
logger: getChildLogger('apiRequest', log),
level: 'trace',
})
}
114 changes: 114 additions & 0 deletions services/apps/kafka_connect_monitor/src/routes/health.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import { Request, Response, Router } from 'express'
import { Gauge, Registry } from 'prom-client'

import { Logger } from '@crowd/logging'

const KAFKA_CONNECT_URL = 'http://localhost:8083'

// All possible Kafka Connect states
const CONNECTOR_STATES = ['RUNNING', 'FAILED', 'PAUSED', 'UNASSIGNED'] as const
const TASK_STATES = ['RUNNING', 'FAILED', 'PAUSED', 'UNASSIGNED'] as const

interface ConnectorTask {
id: number
state: string
worker_id: string
}

interface ConnectorStatus {
name: string
connector: {
state: string
worker_id: string
}
tasks: ConnectorTask[]
type: string
}

interface ConnectorsResponse {
[connectorName: string]: {
status: ConnectorStatus
}
}

export function installConnectorHealthRoutes(app: Router, log: Logger): void {
app.get('/connector-health', async (req: Request, res: Response) => {
try {
// Create a new registry for this request
const register = new Registry()

// Fetch connector statuses from Kafka Connect
const connectorsUrl = `${KAFKA_CONNECT_URL}/connectors?expand=status`
const response = await fetch(connectorsUrl)

if (!response.ok) {
log.error(
{ status: response.status, statusText: response.statusText },
'Failed to fetch connector status from Kafka Connect',
)
res.status(500).json({
error: 'Failed to fetch connector status from Kafka Connect',
status: response.status,
})
return
}

const data = (await response.json()) as ConnectorsResponse

// Create gauges for connector status (one-hot encoding per state)
const connectorStatusGauge = new Gauge({
name: 'connector_status',
help: 'Connector status (one-hot: 1 for active state, 0 otherwise)',
labelNames: ['connector', 'worker_id', 'state'],
registers: [register],
})

// Create gauges for task status (one-hot encoding per state)
const taskStatusGauge = new Gauge({
name: 'task_status',
help: 'Task status (one-hot: 1 for active state, 0 otherwise)',
labelNames: ['connector', 'task_id', 'worker_id', 'state'],
registers: [register],
})

// Process each connector
for (const connectorData of Object.values(data)) {
const status = connectorData.status

// Set connector status metric (one-hot: 1 for current state, 0 for all others)
for (const state of CONNECTOR_STATES) {
connectorStatusGauge.set(
{
connector: status.name,
worker_id: status.connector.worker_id,
state,
},
status.connector.state === state ? 1 : 0,
)
}

// Set task status metrics (one-hot: 1 for current state, 0 for all others)
for (const task of status.tasks) {
for (const state of TASK_STATES) {
taskStatusGauge.set(
{
connector: status.name,
task_id: task.id.toString(),
worker_id: task.worker_id,
state,
},
task.state === state ? 1 : 0,
)
}
}
}

// Return metrics in Prometheus format
res.set('Content-Type', register.contentType)
res.send(await register.metrics())
} catch (err) {
log.error(err, 'Error fetching connector health')
res.status(500).json({ error: 'Internal server error' })
}
})
}
4 changes: 4 additions & 0 deletions services/apps/kafka_connect_monitor/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"extends": "../../base.tsconfig.json",
"include": ["src/**/*"]
}
Loading