Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 76 additions & 81 deletions apps/server/src/diagnostics/TraceDiagnostics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type {
ServerTraceDiagnosticsSpanOccurrence,
ServerTraceDiagnosticsSpanSummary,
} from "@t3tools/contracts";
import { decodeJsonResult } from "@t3tools/shared/schemaJson";
import * as Context from "effect/Context";
import * as DateTime from "effect/DateTime";
import * as Effect from "effect/Effect";
Expand All @@ -17,22 +18,30 @@ import * as PlatformError from "effect/PlatformError";
import * as Result from "effect/Result";
import * as Schema from "effect/Schema";

interface TraceRecordLike {
readonly name?: unknown;
readonly traceId?: unknown;
readonly spanId?: unknown;
readonly startTimeUnixNano?: unknown;
readonly endTimeUnixNano?: unknown;
readonly durationMs?: unknown;
readonly exit?: unknown;
readonly events?: unknown;
}
const TraceEvent = Schema.Struct({
name: Schema.optional(Schema.String),
timeUnixNano: Schema.optional(Schema.String),
attributes: Schema.optional(Schema.Record(Schema.String, Schema.Unknown)),
});

interface TraceEventLike {
readonly name?: unknown;
readonly timeUnixNano?: unknown;
readonly attributes?: unknown;
}
const TraceExit = Schema.Struct({
_tag: Schema.String,
cause: Schema.optional(Schema.String),
});

const TraceRecord = Schema.Struct({
name: Schema.String,
traceId: Schema.String,
spanId: Schema.String,
startTimeUnixNano: Schema.optional(Schema.String),
endTimeUnixNano: Schema.String,
durationMs: Schema.Number,
exit: Schema.optional(TraceExit),
events: Schema.optional(Schema.Array(TraceEvent)),
Comment on lines +39 to +40

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium diagnostics/TraceDiagnostics.ts:39

The new TraceRecord schema strictly validates the optional exit and events fields. When a span line has valid core fields but a malformed optional payload (e.g. exit: { _tag: "Failure", cause: 123 } or events: [null]), decodeTraceRecord() fails and aggregateTraceDiagnostics() drops the entire span and increments parseErrorCount. Valid trace spans disappear from diagnostics when only optional metadata is bad. Consider making exit and events best-effort by wrapping them with Schema.optional(Schema.UndefinedOr(...)) or post-processing them so malformed optional data does not reject the whole record.

Suggested change
exit: Schema.optional(TraceExit),
events: Schema.optional(Schema.Array(TraceEvent)),
exit: Schema.optional(Schema.UndefinedOr(TraceExit)),
events: Schema.optional(Schema.UndefinedOr(Schema.Array(TraceEvent))),
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @apps/server/src/diagnostics/TraceDiagnostics.ts around lines 39-40:

The new `TraceRecord` schema strictly validates the optional `exit` and `events` fields. When a span line has valid core fields but a malformed optional payload (e.g. `exit: { _tag: "Failure", cause: 123 }` or `events: [null]`), `decodeTraceRecord()` fails and `aggregateTraceDiagnostics()` drops the entire span and increments `parseErrorCount`. Valid trace spans disappear from diagnostics when only optional metadata is bad. Consider making `exit` and `events` best-effort by wrapping them with `Schema.optional(Schema.UndefinedOr(...))` or post-processing them so malformed optional data does not reject the whole record.

});
type TraceRecord = typeof TraceRecord.Type;

const decodeTraceRecord = decodeJsonResult(TraceRecord);

export interface TraceDiagnosticsOptions {
readonly traceFilePath: string;
Expand Down Expand Up @@ -90,47 +99,25 @@ function toRotatedTracePaths(traceFilePath: string, maxFiles: number): ReadonlyA
return [...backups, traceFilePath];
}

function isRecordObject(value: unknown): value is TraceRecordLike {
return typeof value === "object" && value !== null;
}

function toStringValue(value: unknown): string | null {
return typeof value === "string" && value.trim().length > 0 ? value : null;
}

function toNumberValue(value: unknown): number | null {
return typeof value === "number" && Number.isFinite(value) ? value : null;
}

function unixNanoToDateTime(value: unknown): DateTime.Utc | null {
const text = toStringValue(value);
if (!text) return null;
function unixNanoToDateTime(value: string): Option.Option<DateTime.Utc> {
try {
const millis = Number(BigInt(text) / 1_000_000n);
return Option.getOrNull(DateTime.make(millis));
const millis = Number(BigInt(value) / 1_000_000n);
return DateTime.make(millis);
} catch {
return null;
return Option.none();
}
}

function readExitTag(exit: unknown): string | null {
if (!isRecordObject(exit) || !("_tag" in exit)) return null;
return toStringValue(exit._tag);
function readExitTag(exit: TraceRecord["exit"]): string | null {
return exit?._tag ?? null;
}

function readExitCause(exit: unknown): string {
if (!isRecordObject(exit) || !("cause" in exit)) return "Failure";
return toStringValue(exit.cause)?.trim() ?? "Failure";
}

function isTraceEvent(value: unknown): value is TraceEventLike {
return typeof value === "object" && value !== null;
}

function readEventAttributes(event: TraceEventLike): Readonly<Record<string, unknown>> {
return typeof event.attributes === "object" && event.attributes !== null
? (event.attributes as Readonly<Record<string, unknown>>)
: {};
function readExitCause(exit: TraceRecord["exit"]): string {
return exit?.cause?.trim() || "Failure";
}

function makeEmptyDiagnostics(input: {
Expand Down Expand Up @@ -211,8 +198,8 @@ export function aggregateTraceDiagnostics(
let failureCount = 0;
let interruptionCount = 0;
let slowSpanCount = 0;
let firstSpanAt: DateTime.Utc | null = null;
let lastSpanAt: DateTime.Utc | null = null;
let firstSpanAt = Option.none<DateTime.Utc>();
let lastSpanAt = Option.none<DateTime.Utc>();

const spansByName = new Map<
string,
Expand All @@ -229,38 +216,44 @@ export function aggregateTraceDiagnostics(
for (const line of lines) {
if (line.trim().length === 0) continue;

let parsed: unknown;
try {
parsed = JSON.parse(line);
} catch {
const parsedResult = decodeTraceRecord(line);
if (Result.isFailure(parsedResult)) {
parseErrorCount += 1;
continue;
}

if (!isRecordObject(parsed)) {
parseErrorCount += 1;
continue;
}

const name = toStringValue(parsed.name);
const traceId = toStringValue(parsed.traceId);
const spanId = toStringValue(parsed.spanId);
const durationMs = toNumberValue(parsed.durationMs);
const parsed = parsedResult.success;
const name = parsed.name;
const traceId = parsed.traceId;
const spanId = parsed.spanId;
const durationMs = parsed.durationMs;
const endedAt = unixNanoToDateTime(parsed.endTimeUnixNano);
const startedAt = unixNanoToDateTime(parsed.startTimeUnixNano);

if (!name || !traceId || !spanId || durationMs === null || !endedAt) {
const startedAt =
parsed.startTimeUnixNano === undefined
? Option.none<DateTime.Utc>()
: unixNanoToDateTime(parsed.startTimeUnixNano);

if (
name.trim().length === 0 ||
traceId.trim().length === 0 ||
spanId.trim().length === 0 ||
!Number.isFinite(durationMs) ||
Option.isNone(endedAt)
) {
parseErrorCount += 1;
continue;
}

recordCount += 1;
firstSpanAt =
startedAt && (firstSpanAt === null || DateTime.isLessThan(startedAt, firstSpanAt))
? startedAt
: firstSpanAt;
lastSpanAt =
lastSpanAt === null || DateTime.isGreaterThan(endedAt, lastSpanAt) ? endedAt : lastSpanAt;
if (
Option.isSome(startedAt) &&
(Option.isNone(firstSpanAt) || DateTime.isLessThan(startedAt.value, firstSpanAt.value))
) {
firstSpanAt = startedAt;
}
if (Option.isNone(lastSpanAt) || DateTime.isGreaterThan(endedAt.value, lastSpanAt.value)) {
lastSpanAt = endedAt;
}

const exitTag = readExitTag(parsed.exit);
const isFailure = exitTag === "Failure";
Expand All @@ -280,7 +273,7 @@ export function aggregateTraceDiagnostics(
if (isFailure) spanSummary.failureCount += 1;
spansByName.set(name, spanSummary);

const spanItem = { name, durationMs, endedAt, traceId, spanId };
const spanItem = { name, durationMs, endedAt: endedAt.value, traceId, spanId };
if (durationMs >= slowSpanThresholdMs) {
slowSpanCount += 1;
}
Expand All @@ -292,22 +285,21 @@ export function aggregateTraceDiagnostics(

const failureKey = `${name}\0${cause}`;
const existing = failuresByKey.get(failureKey);
const isLatestFailure = !existing || DateTime.isGreaterThan(endedAt, existing.lastSeenAt);
const isLatestFailure =
!existing || DateTime.isGreaterThan(endedAt.value, existing.lastSeenAt);
failuresByKey.set(failureKey, {
name,
cause,
count: (existing?.count ?? 0) + 1,
lastSeenAt: isLatestFailure ? endedAt : existing!.lastSeenAt,
lastSeenAt: isLatestFailure ? endedAt.value : existing!.lastSeenAt,
traceId: isLatestFailure ? traceId : existing!.traceId,
spanId: isLatestFailure ? spanId : existing!.spanId,
});
}

if (Array.isArray(parsed.events)) {
for (const rawEvent of parsed.events) {
if (!isTraceEvent(rawEvent)) continue;
const attributes = readEventAttributes(rawEvent);
const level = toStringValue(attributes["effect.logLevel"]);
if (parsed.events !== undefined) {
for (const event of parsed.events) {
const level = toStringValue(event.attributes?.["effect.logLevel"]);
if (!level) continue;

logLevelCounts[level] = (logLevelCounts[level] ?? 0) + 1;
Expand All @@ -321,8 +313,11 @@ export function aggregateTraceDiagnostics(
continue;
}

const seenAt = unixNanoToDateTime(rawEvent.timeUnixNano) ?? endedAt;
const message = toStringValue(rawEvent.name)?.trim() ?? "Log event";
const seenAt =
event.timeUnixNano === undefined
? endedAt.value
: Option.getOrElse(unixNanoToDateTime(event.timeUnixNano), () => endedAt.value);
const message = event.name?.trim() || "Log event";
latestWarningAndErrorLogs.push({
spanName: name,
level,
Expand Down Expand Up @@ -354,8 +349,8 @@ export function aggregateTraceDiagnostics(
readAt,
recordCount,
parseErrorCount,
firstSpanAt: Option.fromNullishOr(firstSpanAt),
lastSpanAt: Option.fromNullishOr(lastSpanAt),
firstSpanAt,
lastSpanAt,
failureCount,
interruptionCount,
slowSpanThresholdMs,
Expand Down
53 changes: 44 additions & 9 deletions packages/shared/src/httpReadiness.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,23 @@ import * as Ref from "effect/Ref";
import * as Schedule from "effect/Schedule";
import { HttpClient, HttpClientRequest } from "effect/unstable/http";

export const DEFAULT_HTTP_READY_PROBE_TIMEOUT_MS = 1_000;
const DEFAULT_HTTP_READY_TIMEOUT = Duration.seconds(30);
const DEFAULT_HTTP_READY_INTERVAL = Duration.millis(100);
export const DEFAULT_HTTP_READY_PROBE_TIMEOUT = Duration.seconds(1);
export const DEFAULT_HTTP_READY_PROBE_TIMEOUT_MS = Duration.toMillis(
DEFAULT_HTTP_READY_PROBE_TIMEOUT,
);

function normalizeDuration(input: {
readonly duration: Duration.Input | undefined;
readonly durationMs: number | undefined;
readonly fallback: Duration.Duration;
}): Duration.Duration {
return Option.getOrElse(
Duration.fromInput(input.duration ?? input.durationMs ?? input.fallback),
() => input.fallback,
);
}

/**
* Normalizes an arbitrary readiness probe failure into a plain, structured value
Expand Down Expand Up @@ -39,9 +55,10 @@ export function describeReadinessCause(cause: unknown): unknown {
/**
* Generic HTTP readiness probe shared by the SSH tunnel and the desktop backend
* manager. Polls `baseUrl + path` until it returns a 2xx response or the overall
* `timeoutMs` elapses. Each individual probe is bounded by `probeTimeoutMs` so a
* `timeout` elapses. Each individual probe is bounded by `probeTimeout` so a
* single hung request cannot stall the retry loop, and the retry cadence is
* `intervalMs` bounded to roughly `timeoutMs / intervalMs` attempts.
* `interval` bounded to roughly `timeout / interval` attempts. Legacy `*Ms`
* options are still accepted at the boundary and are treated as milliseconds.
*
* The error type is left to the caller via `makeError`, so each consumer keeps
* its own tagged error. `makeError` is called at every failure site; callers can
Expand All @@ -53,6 +70,9 @@ export const waitForHttpReady = Effect.fn("shared.httpReadiness.waitForHttpReady
>(input: {
readonly baseUrl: string;
readonly path?: string;
readonly timeout?: Duration.Input;
readonly interval?: Duration.Input;
readonly probeTimeout?: Duration.Input;
readonly timeoutMs?: number;
readonly intervalMs?: number;
readonly probeTimeoutMs?: number;
Expand All @@ -63,10 +83,25 @@ export const waitForHttpReady = Effect.fn("shared.httpReadiness.waitForHttpReady
readonly cause: unknown;
}) => E;
}): Effect.fn.Return<void, E, HttpClient.HttpClient> {
const timeoutMs = input.timeoutMs ?? 30_000;
const intervalMs = input.intervalMs ?? 100;
const probeTimeoutMs = input.probeTimeoutMs ?? DEFAULT_HTTP_READY_PROBE_TIMEOUT_MS;
const retryPolicy = Schedule.spaced(Duration.millis(intervalMs)).pipe(
const timeout = normalizeDuration({
duration: input.timeout,
durationMs: input.timeoutMs,
fallback: DEFAULT_HTTP_READY_TIMEOUT,
});
const interval = normalizeDuration({
duration: input.interval,
durationMs: input.intervalMs,
fallback: DEFAULT_HTTP_READY_INTERVAL,
});
const probeTimeout = normalizeDuration({
duration: input.probeTimeout,
durationMs: input.probeTimeoutMs,
fallback: DEFAULT_HTTP_READY_PROBE_TIMEOUT,
});
const timeoutMs = Duration.toMillis(timeout);
const intervalMs = Duration.toMillis(interval);
const probeTimeoutMs = Duration.toMillis(probeTimeout);
const retryPolicy = Schedule.spaced(interval).pipe(
Schedule.take(Math.max(0, Math.ceil(timeoutMs / intervalMs))),
);
const requestUrl = new URL(input.path ?? "/", input.baseUrl).toString();
Expand Down Expand Up @@ -103,7 +138,7 @@ export const waitForHttpReady = Effect.fn("shared.httpReadiness.waitForHttpReady
Effect.gen(function* () {
attempt += 1;
const responseOption = yield* effect.pipe(
Effect.timeoutOption(Duration.millis(probeTimeoutMs)),
Effect.timeoutOption(probeTimeout),
Effect.mapError((cause) => fail(cause)),
);
return yield* Option.match(responseOption, {
Expand Down Expand Up @@ -133,7 +168,7 @@ export const waitForHttpReady = Effect.fn("shared.httpReadiness.waitForHttpReady

const result = yield* readinessClient.execute(HttpClientRequest.get(requestUrl)).pipe(
Effect.mapError((cause) => (isMadeError(cause) ? cause : fail(cause))),
Effect.timeoutOption(Duration.millis(timeoutMs)),
Effect.timeoutOption(timeout),
);

return yield* Option.match(result, {
Expand Down
6 changes: 3 additions & 3 deletions packages/ssh/src/tunnel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,9 @@ describe("ssh tunnel scripts", () => {
Effect.result(
waitForHttpReady({
baseUrl: "http://127.0.0.1:41773/",
timeoutMs: 1_000,
intervalMs: 100,
probeTimeoutMs: 250,
timeout: Duration.seconds(1),
interval: Duration.millis(100),
probeTimeout: Duration.millis(250),
}),
),
);
Expand Down
7 changes: 7 additions & 0 deletions packages/ssh/src/tunnel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { extractJsonObject, fromLenientJson } from "@t3tools/shared/schemaJson";
import { satisfiesSemverRange } from "@t3tools/shared/semver";
import * as Context from "effect/Context";
import * as Deferred from "effect/Deferred";
import * as Duration from "effect/Duration";
import * as Effect from "effect/Effect";
import * as Exit from "effect/Exit";
import * as FileSystem from "effect/FileSystem";
Expand Down Expand Up @@ -843,6 +844,9 @@ const readRemoteServerLogTail = Effect.fn("ssh/tunnel.readRemoteServerLogTail")(

export const waitForHttpReady = (input: {
readonly baseUrl: string;
readonly timeout?: Duration.Input;
readonly interval?: Duration.Input;
readonly probeTimeout?: Duration.Input;
readonly timeoutMs?: number;
readonly intervalMs?: number;
readonly probeTimeoutMs?: number;
Expand All @@ -851,6 +855,9 @@ export const waitForHttpReady = (input: {
waitForHttpReadyShared({
baseUrl: input.baseUrl,
...(input.path === undefined ? {} : { path: input.path }),
...(input.timeout === undefined ? {} : { timeout: input.timeout }),
...(input.interval === undefined ? {} : { interval: input.interval }),
...(input.probeTimeout === undefined ? {} : { probeTimeout: input.probeTimeout }),
...(input.timeoutMs === undefined ? {} : { timeoutMs: input.timeoutMs }),
...(input.intervalMs === undefined ? {} : { intervalMs: input.intervalMs }),
probeTimeoutMs: input.probeTimeoutMs ?? SSH_READY_PROBE_TIMEOUT_MS,
Expand Down
Loading