Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions apps/backend/src/__tests__/config-management.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ function makeEnvs(managementKey?: string): Envs {
return {
APP_CONFIG_PATH: '/data/config.json',
ENABLE_LOGS: false,
LOG_TO_FILE: false,
LOG_MAX_FILE_BYTES: 10_485_760,
LOG_MAX_FILES: 5,
ENVIRONMENT: 'test' as any,
HTTP_TIMEOUT_MS: 3000,
LOG_LEVEL: 'fatal',
Expand Down
3 changes: 3 additions & 0 deletions apps/backend/src/__tests__/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ const config = AppConfig.parse({
const envs: Envs = {
APP_CONFIG_PATH: '/data/config.json',
ENABLE_LOGS: false,
LOG_TO_FILE: false,
LOG_MAX_FILE_BYTES: 10_485_760,
LOG_MAX_FILES: 5,
ENVIRONMENT: 'test' as any,
HTTP_TIMEOUT_MS: 3000,
LOG_LEVEL: 'fatal',
Expand Down
11 changes: 11 additions & 0 deletions apps/backend/src/lib/envs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@ export const Envs = z.object({
OTEL_SERVICE_NAME: z.string().default('jack-backend'),
NODE_ENV: z.string().optional(),
ENABLE_LOGS: z.stringbool().optional().default(true),
// Persist logs to a rotating NDJSON file the management API serves to the UI.
LOG_TO_FILE: z.stringbool().optional().default(true),
// Directory for the rotating log files. Defaults to a `logs/` dir next to the
// config file (the same persistent volume as the sqlite DB).
LOG_DIR: z.string().optional(),
// Rotate the active log file once it reaches this many bytes (default 10 MiB).
LOG_MAX_FILE_BYTES: z.coerce.number().int().positive().default(10_485_760),
// Keep this many rotated files (plus the active one); older ones are pruned.
LOG_MAX_FILES: z.coerce.number().int().min(0).default(5),
// Management API credential. When set, the management surface starts on its OWN
// port (MANAGEMENT_PORT) and every request must carry `X-Management-Key: <this>`.
// When unset, the management listener is not started at all.
Expand All @@ -28,6 +37,8 @@ export const Envs = z.object({
}).transform(vars => ({
...vars,
ENABLE_LOGS: vars.NODE_ENV !== 'test' && vars.ENABLE_LOGS,
// Never write log files during tests (they construct their own temp sinks).
LOG_TO_FILE: vars.NODE_ENV !== 'test' && vars.LOG_TO_FILE,
}))

export type Envs = z.infer<typeof Envs>
Expand Down
55 changes: 27 additions & 28 deletions apps/backend/src/logger.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import type { LogAttributes } from '@opentelemetry/api-logs'
import type { DestinationStream, StreamEntry } from 'pino'
import { createRequire } from 'node:module'
import process from 'node:process'
import { trace } from '@opentelemetry/api'
import { logs, SeverityNumber } from '@opentelemetry/api-logs'
import { levels, multistream, pino } from 'pino'
import { getAppEnvs, isOtelEnabled } from './lib/envs'
import { redactObject } from './lib/redact'
import { fileSink, logHub } from './modules/logging/log-store'

const envs = getAppEnvs()
const otelEnabled = isOtelEnabled(envs)
Expand Down Expand Up @@ -83,31 +86,30 @@ const otelLogStream = {
},
}

// With tracing on, fan out to stdout (raw JSON) and the OTel bridge via an
// in-process multistream — no worker thread, which `thread-stream` transports
// don't survive under Bun. With tracing off, keep the original path:
// pretty-printed in dev, plain synchronous stdout in production.
// Console destination. In production it's plain synchronous stdout (raw JSON); in
// dev it's pino-pretty used as an *in-process* stream (no worker thread, which
// `thread-stream` transports don't survive under Bun). pino-pretty is a dev-only
// dependency, so it's required lazily — production never hits this branch.
function consoleStream(): DestinationStream {
if (envs.ENVIRONMENT === 'production')
return process.stdout
const nodeRequire = createRequire(import.meta.url)
const pretty = nodeRequire('pino-pretty') as (opts: Record<string, unknown>) => DestinationStream
return pretty({ colorize: true, singleLine: true, messageKey: 'message', ignore: 'pid,hostname,severity' })
}

// Everything fans out through one in-process multistream: console, the rotating
// log file (persistence + the UI's backfill), the live SSE hub, and — when tracing
// is on — the OTel bridge. No worker threads anywhere.
function getLogger() {
if (!otelEnabled) {
return pino({
enabled: envs.ENABLE_LOGS,
level: envs.LOG_LEVEL,
messageKey: 'message',
formatters: logFormatters,
mixin: traceContextMixin,
transport: envs.ENVIRONMENT !== 'production'
? {
target: 'pino-pretty',
options: {
colorize: true,
singleLine: true,
messageKey: 'message',
ignore: 'pid,hostname,severity',
},
}
: undefined,
})
}
const streams: StreamEntry[] = [
{ stream: consoleStream(), level: envs.LOG_LEVEL },
]
if (otelEnabled)
streams.push({ stream: otelLogStream, level: envs.LOG_LEVEL })
if (fileSink)
streams.push({ stream: fileSink, level: envs.LOG_LEVEL })
streams.push({ stream: logHub, level: envs.LOG_LEVEL })

return pino(
{
Expand All @@ -117,10 +119,7 @@ function getLogger() {
formatters: logFormatters,
mixin: traceContextMixin,
},
multistream([
{ stream: process.stdout, level: envs.LOG_LEVEL },
{ stream: otelLogStream, level: envs.LOG_LEVEL },
]),
multistream(streams),
)
}

Expand Down
7 changes: 7 additions & 0 deletions apps/backend/src/management-app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import { CatalogController } from './modules/catalog/catalog.controller'
import { getCatalogRouter } from './modules/catalog/catalog.router'
import { ConfigController } from './modules/config/config.controller'
import { getConfigRouter } from './modules/config/config.router'
import { logHub } from './modules/logging/log-store'
import { LogsController } from './modules/logging/logs.controller'
import { getLogsRouter } from './modules/logging/logs.router'
import { StatusController } from './modules/status/status.controller'
import { getStatusRouter } from './modules/status/status.router'

Expand Down Expand Up @@ -44,6 +47,10 @@ export function getManagementApp(params: {
const statusController = new StatusController(params.connectors, params.downloadsRepository)
app.route('/', getStatusRouter(statusController))

// Logs read from the shared LogHub singleton the logger writes to (same module
// instance), so this exposes the running process's own logs.
app.route('/logs', getLogsRouter(new LogsController(logHub)))

const tmdbClient = params.tmdbApiKey ? new TmdbClient(params.tmdbApiKey) : undefined
const catalogController = new CatalogController(params.connectors, tmdbClient, params.downloadsService)
app.route('/catalog', getCatalogRouter(catalogController))
Expand Down
154 changes: 154 additions & 0 deletions apps/backend/src/modules/logging/log-hub.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import { mkdtemp, rm, writeFile } from 'node:fs/promises'
import { tmpdir } from 'node:os'
import { join } from 'node:path'
import { afterEach, beforeEach, describe, expect, test } from 'bun:test'
import { LogHub } from './log-hub'

let dir: string
let path: string

beforeEach(async () => {
dir = await mkdtemp(join(tmpdir(), 'jack-loghub-'))
path = join(dir, 'jack.ndjson')
})

afterEach(async () => {
await rm(dir, { recursive: true, force: true })
})

function ndjson(records: object[]): string {
return `${records.map(r => JSON.stringify(r)).join('\n')}\n`
}

describe('LogHub live fan-out', () => {
test('delivers parsed records to subscribers', () => {
const hub = new LogHub(path)
const seen: unknown[] = []
hub.subscribe(r => seen.push(r))

hub.write(`${JSON.stringify({ level: 30, message: 'hi' })}\n`)

expect(seen).toEqual([{ level: 30, message: 'hi' }])
})

test('is a no-op with no subscribers and ignores malformed lines', () => {
const hub = new LogHub(path)
const seen: unknown[] = []
hub.write('not json') // no subscribers → nothing happens, no throw
const unsub = hub.subscribe(r => seen.push(r))
hub.write('still not json\n')

expect(seen).toEqual([])
unsub()
expect(hub.subscriberCount).toBe(0)
})

test('unsubscribe stops delivery', () => {
const hub = new LogHub(path)
const seen: unknown[] = []
const unsub = hub.subscribe(r => seen.push(r))
hub.write(`${JSON.stringify({ level: 30, message: 'a' })}\n`)
unsub()
hub.write(`${JSON.stringify({ level: 30, message: 'b' })}\n`)

expect(seen).toHaveLength(1)
})

test('a throwing subscriber does not break the others', () => {
const hub = new LogHub(path)
const seen: unknown[] = []
hub.subscribe(() => {
throw new Error('boom')
})
hub.subscribe(r => seen.push(r))

hub.write(`${JSON.stringify({ level: 40, message: 'x' })}\n`)

expect(seen).toHaveLength(1)
})
})

describe('LogHub backfill', () => {
test('returns the last N records oldest→newest', async () => {
await writeFile(path, ndjson([
{ level: 30, message: 'a' },
{ level: 30, message: 'b' },
{ level: 30, message: 'c' },
]))
const hub = new LogHub(path)

const out = await hub.backfill({ lines: 2 })

expect(out.map(r => r.message)).toEqual(['b', 'c'])
})

test('filters below the minimum level', async () => {
await writeFile(path, ndjson([
{ level: 30, message: 'info' },
{ level: 40, message: 'warn' },
{ level: 50, message: 'error' },
]))
const hub = new LogHub(path)

const out = await hub.backfill({ lines: 10, minLevel: 40 })

expect(out.map(r => r.message)).toEqual(['warn', 'error'])
})

test('reads across rotated files to fulfill the requested line count', async () => {
// Older history lives in .1; the active file holds the newest lines.
await writeFile(`${path}.1`, ndjson([
{ level: 30, message: 'a' },
{ level: 30, message: 'b' },
]))
await writeFile(path, ndjson([
{ level: 30, message: 'c' },
{ level: 30, message: 'd' },
]))
const hub = new LogHub(path)

const out = await hub.backfill({ lines: 3 })

expect(out.map(r => r.message)).toEqual(['b', 'c', 'd'])
})

test('does not read older files once enough recent lines are collected', async () => {
await writeFile(`${path}.1`, ndjson([{ level: 30, message: 'old' }]))
await writeFile(path, ndjson([
{ level: 30, message: 'x' },
{ level: 30, message: 'y' },
]))
const hub = new LogHub(path)

const out = await hub.backfill({ lines: 2 })

expect(out.map(r => r.message)).toEqual(['x', 'y'])
})

test('fails closed: a record without a numeric level is excluded under a level floor', async () => {
await writeFile(path, `${[
{ level: 30, message: 'info' },
{ message: 'no-level' },
{ level: 50, message: 'error' },
].map(r => JSON.stringify(r)).join('\n')}\n`)
const hub = new LogHub(path)

const out = await hub.backfill({ lines: 10, minLevel: 40 })

expect(out.map(r => r.message)).toEqual(['error'])
})

test('returns empty when the file does not exist', async () => {
const hub = new LogHub(join(dir, 'missing.ndjson'))
expect(await hub.backfill({ lines: 10 })).toEqual([])
})

test('skips malformed lines', async () => {
await writeFile(path, `${JSON.stringify({ level: 30, message: 'ok' })}\nGARBAGE\n`)
const hub = new LogHub(path)

const out = await hub.backfill({ lines: 10 })

expect(out.map(r => r.message)).toEqual(['ok'])
})
})
96 changes: 96 additions & 0 deletions apps/backend/src/modules/logging/log-hub.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
export interface LogRecord {
time?: number
level?: number
severity?: string
message?: string
trace_id?: string
[key: string]: unknown
}

export type LogSubscriber = (record: LogRecord) => void

/**
* The live/history fan-out for logs. It doubles as a pino `multistream`
* destination: every serialized line is parsed and pushed to current SSE
* subscribers (live tail), while `backfill` reads the persisted rotating file for
* the initial "last N lines". Live delivery therefore never tails the file, so it
* is oblivious to rotation.
*/
export class LogHub {
private readonly subscribers = new Set<LogSubscriber>()

constructor(private readonly filePath: string) {}

/** pino `multistream` destination: parse each finished line and fan out live. */
write(line: string): void {
if (this.subscribers.size === 0)
return
let record: LogRecord
try {
record = JSON.parse(line) as LogRecord
}
catch {
return
}
for (const subscriber of this.subscribers) {
try {
subscriber(record)
}
catch {
// A failing subscriber must not break logging or the other subscribers.
}
}
}

subscribe(subscriber: LogSubscriber): () => void {
this.subscribers.add(subscriber)
return () => {
this.subscribers.delete(subscriber)
}
}

/** Live subscriber count — exposed for tests/metrics. */
get subscriberCount(): number {
return this.subscribers.size
}

/**
* The most recent `lines` records, oldest→newest, optionally keeping only those
* at or above `minLevel` (pino numeric level). Reads the active file and, if it
* doesn't yet hold enough lines (e.g. just after a rotation), walks back through
* the rotated siblings `.1`, `.2`, … so retained history isn't silently dropped.
* Persists across restarts because the files live on a persistent volume.
*/
async backfill({ lines, minLevel }: { lines: number, minLevel?: number }): Promise<LogRecord[]> {
// Fail closed: when a level floor is set, a record must carry a numeric level
// at or above it — a missing/malformed level is excluded, not let through.
const passesLevel = (record: LogRecord): boolean =>
minLevel == null || (typeof record.level === 'number' && record.level >= minLevel)

let collected: LogRecord[] = []
// i = 0 is the active file; i > 0 are the successively older rotated files.
// Each rotated file is entirely older than the previous, so prepend its records.
for (let i = 0; i < 1000 && collected.length < lines; i++) {
const path = i === 0 ? this.filePath : `${this.filePath}.${i}`
const file = Bun.file(path)
if (!(await file.exists()))
break
const records: LogRecord[] = []
for (const raw of (await file.text()).split('\n')) {
if (!raw)
continue
let record: LogRecord
try {
record = JSON.parse(raw) as LogRecord
}
catch {
continue
}
if (passesLevel(record))
records.push(record)
}
collected = records.concat(collected)
}
return collected.slice(-lines)
}
}
Loading