Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions packages/config/local/cron-jobs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import {
clearCronJobsForTests,
closeCronJobDatabaseForTests,
createCronJob,
disableCronJob,
getCronJobById,
markCronJobCompleted,
markCronJobFailed,
markCronJobRunning,
patchCronJob,
reconcileInterruptedCronJobs,
} from "./cron-jobs";

Expand Down Expand Up @@ -202,3 +204,77 @@ describe("reconcileInterruptedCronJobs", () => {
expect(getCronJobById(job.id)?.lastRunStatus).toBe("failed");
});
});

describe("disableCronJob", () => {
test("flips enabled to false and reports the transition", () => {
const job = createCronJob({
title: "to-disable",
cronExpression: "*/5 * * * *",
channelId: "C_TEST",
messageText: "hi",
});
expect(getCronJobById(job.id)?.enabled).toBe(true);

const changed = disableCronJob(job.id);
expect(changed).toBe(true);
expect(getCronJobById(job.id)?.enabled).toBe(false);
});

test("returns false when the row is already disabled (idempotent)", () => {
const job = createCronJob({
title: "already-off",
cronExpression: "*/5 * * * *",
channelId: "C_TEST",
messageText: "hi",
enabled: false,
});
const changed = disableCronJob(job.id);
expect(changed).toBe(false);
expect(getCronJobById(job.id)?.enabled).toBe(false);
});

test("returns false when the row does not exist", () => {
expect(disableCronJob("does-not-exist")).toBe(false);
});

test("succeeds even when the row's channel was removed from local config", () => {
// This is the scenario `disableCronJob` exists to solve:
// a cron job points at a channel that has since been removed from
// ode.json. `patchCronJob` would throw via `getChannelSnapshot`, but
// the direct-SQL `disableCronJob` must still flip `enabled` to false
// so the scheduler stops firing the row.
const job = createCronJob({
title: "stale-channel",
cronExpression: "*/5 * * * *",
channelId: "C_TEST",
messageText: "hi",
});

// Drop the channel from the on-disk config so getChannelSnapshot fails.
const emptyConfig = {
user: {},
workspaces: [
{
id: "ws-test",
name: "Test Workspace",
type: "slack",
channelDetails: [], // C_TEST is gone
},
],
};
fs.writeFileSync(ODE_CONFIG_FILE, JSON.stringify(emptyConfig));
invalidateOdeConfigCache();

// Sanity check: patchCronJob blows up because it re-validates the
// channel before writing. This is exactly the bug Codex flagged on
// PR #211 and is the reason we route around it.
expect(() => patchCronJob(job.id, { enabled: false })).toThrow(
/Channel not found/i,
);
expect(getCronJobById(job.id)?.enabled).toBe(true);

// disableCronJob bypasses channel resolution entirely.
expect(disableCronJob(job.id)).toBe(true);
expect(getCronJobById(job.id)?.enabled).toBe(false);
});
});
25 changes: 25 additions & 0 deletions packages/config/local/cron-jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,31 @@ export function patchCronJob(id: string, params: PatchCronJobParams): CronJobRec
return updateCronJob(id, merged);
}

/**
* Directly flip `enabled` to 0 without re-validating the rest of the cron
* row. Unlike `patchCronJob`, this does NOT re-resolve the row's channel
* via `getChannelSnapshot`, so it is safe to call when the destination
* channel was removed from the local config — which is exactly the
* scenario that motivates auto-disable in the first place (bot kicked,
* workspace re-onboarded, channel id stale, etc.).
*
* Returns `true` if a row actually transitioned from enabled→disabled, so
* callers can avoid duplicate log/notification work if another writer beat
* them to it. A missing row returns `false` as well.
*/
export function disableCronJob(id: string): boolean {
const db = getDatabase();
const result = db.query(`
UPDATE cron_jobs
SET
enabled = 0,
updated_at = ?
WHERE id = ?
AND enabled = 1
`).run(Date.now(), id);
return result.changes > 0;
}

export function markCronJobTriggered(id: string, minuteStartMs: number): boolean {
const db = getDatabase();
const result = db.query(`
Expand Down
98 changes: 97 additions & 1 deletion packages/core/cron/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
} from "@/config";
import {
type CronJobRecord,
disableCronJob,
getCronJobById,
listEnabledCronJobs,
markCronJobCompleted,
Expand Down Expand Up @@ -37,6 +38,7 @@ import { buildSessionEnvironment, prepareSessionWorkspace } from "@/core/session
import { sendChannelMessage as sendDiscordChannelMessage } from "@/ims/discord/client";
import { sendChannelMessage as sendLarkChannelMessage } from "@/ims/lark/client";
import { sendChannelMessage as sendSlackChannelMessage } from "@/ims/slack/client";
import { isPermanentChannelError } from "@/shared/delivery/permanent-error";
import { log } from "@/utils";

const CRON_POLL_INTERVAL_MS = 15_000;
Expand Down Expand Up @@ -265,6 +267,57 @@ async function prepareCronSession(job: CronJobRecord, runId: string): Promise<{
return { session, sessionId, cwd, created };
}

/**
* Auto-disable a cron job whose destination channel is permanently
* unreachable (bot kicked, channel archived, token revoked, etc.). Without
* this, every cron tick would re-attempt the same send and capture an
* identical Sentry event for the lifetime of the daemon — see
* `isPermanentChannelError` for the precise classification.
*
* Best-effort: we never let bookkeeping failures here shadow the original
* delivery error. The function does NOT throw.
*/
async function disableCronJobForPermanentChannelError(
job: CronJobRecord,
error: unknown,
agentResultDetailId: string | null,
): Promise<void> {
const reason = error instanceof Error ? error.message : String(error);
const summary = `auto-disabled: destination channel unreachable (${reason})`;
if (agentResultDetailId) {
try {
failAgentResult({ detailId: agentResultDetailId, errorText: summary });
} catch (failError) {
log.warn("Failed to mark cron agent_result detail as failed during auto-disable", {
detailId: agentResultDetailId,
error: String(failError),
});
}
}
try {
disableCronJob(job.id);
} catch (disableError) {
log.warn("Failed to disable cron job after permanent channel error", {
cronJobId: job.id,
error: String(disableError),
});
}
try {
markCronJobFailed(job.id, summary);
} catch (markError) {
log.warn("Failed to mark auto-disabled cron job as failed", {
cronJobId: job.id,
error: String(markError),
});
}
log.warn("Auto-disabled cron job: destination channel unreachable", {
cronJobId: job.id,
title: job.title,
channelId: job.channelId,
error: reason,
});
}

async function runCronJob(job: CronJobRecord, minuteStartMs: number): Promise<void> {
const agent = createAgentAdapter();
const runId = getCronRunId(minuteStartMs);
Expand Down Expand Up @@ -350,7 +403,22 @@ async function runCronJob(job: CronJobRecord, minuteStartMs: number): Promise<vo
);
const finalText = buildFinalResponseText(responses) ?? "_Done_";

const realThreadId = await sendResultToChannel(job, finalText);
let realThreadId: string | undefined;
try {
realThreadId = await sendResultToChannel(job, finalText);
} catch (sendError) {
// The agent turn succeeded, but we can't deliver the result. If the
// channel is permanently unreachable (bot kicked, channel archived,
// token revoked), auto-disable the job so the scheduler stops
// generating identical Sentry events on every tick. Transient send
// failures fall through to the generic error handler below and the
// job stays enabled for the next tick.
if (isPermanentChannelError(sendError)) {
await disableCronJobForPermanentChannelError(job, sendError, agentResultDetailId);
return;
}
throw sendError;
}
if (realThreadId) {
seedCronChannelThreadSession({
platform: job.platform,
Expand Down Expand Up @@ -409,6 +477,34 @@ async function runCronJob(job: CronJobRecord, minuteStartMs: number): Promise<vo
const failureText = `*Cron job failed:* ${job.title}\n${message}`;
await sendResultToChannel(job, failureText);
} catch (notifyError) {
// If the failure-notification itself hits a permanent channel error,
// the destination is the real problem (not the agent turn). Auto-
// disable so we don't spam the same channel on every tick. We
// intentionally check the *notify* error here, not the original
// `error`, because the original could be a transient agent timeout
// while the channel is still healthy.
if (isPermanentChannelError(notifyError)) {
try {
disableCronJob(job.id);
markCronJobFailed(
job.id,
`auto-disabled: channel unreachable (${message})`,
);
log.warn("Auto-disabled cron job: destination channel unreachable", {
cronJobId: job.id,
title: job.title,
channelId: job.channelId,
originalError: message,
sendError: String(notifyError),
});
} catch (disableError) {
log.warn("Failed to auto-disable cron job after permanent channel error", {
cronJobId: job.id,
error: String(disableError),
});
}
return;
}
log.warn("Failed to send cron job failure notification", {
cronJobId: job.id,
error: String(notifyError),
Expand Down
31 changes: 30 additions & 1 deletion packages/ims/discord/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,20 @@ async function maybeHandleLauncherCommand(params: {
}
async function resolveTextChannel(channelId: string, processorId?: string) {
const attempts: string[] = [];
// Collect Discord API error codes across every fetch attempt so the
// caller can detect permanent-access failures (Unknown Channel / Missing
// Access / etc.) even though we re-throw a generic wrapper Error below.
// discord.js surfaces these on `err.code` as numbers; we forward them on
// the wrapper as `discordErrorCodes` and also expose the first as `code`
// so `isPermanentChannelError` can pick it up via the `code` shape.
const discordErrorCodes: number[] = [];
const captureCode = (error: unknown) => {
if (typeof error === "object" && error !== null) {
const code = (error as { code?: unknown }).code;
if (typeof code === "number") discordErrorCodes.push(code);
}
};

if (processorId) {
const pinnedClient = discordClientByProcessorId.get(processorId);
if (pinnedClient) {
Expand All @@ -126,6 +140,7 @@ async function resolveTextChannel(channelId: string, processorId?: string) {
}
attempts.push(`bot=${pinnedClient.user?.id || "unknown"}: channel_not_text_or_missing`);
} catch (error) {
captureCode(error);
const errorMessage = error instanceof Error ? error.message : String(error);
attempts.push(`bot=${pinnedClient.user?.id || "unknown"}: ${errorMessage}`);
}
Expand All @@ -141,6 +156,7 @@ async function resolveTextChannel(channelId: string, processorId?: string) {
}
attempts.push(`bot=${client.user?.id || "unknown"}: channel_not_text_or_missing`);
} catch (error) {
captureCode(error);
const errorMessage = error instanceof Error ? error.message : String(error);
attempts.push(`bot=${client.user?.id || "unknown"}: ${errorMessage}`);
}
Expand All @@ -154,7 +170,20 @@ async function resolveTextChannel(channelId: string, processorId?: string) {
});
}

throw new Error(`Discord channel ${channelId} is not text-based or inaccessible`);
const wrapper = new Error(`Discord channel ${channelId} is not text-based or inaccessible`);
if (discordErrorCodes.length > 0) {
// Prefer the most informative code: prioritise "permanent" classes
// (10003 Unknown Channel, 50001 Missing Access, 50013 Missing Perms,
// 50007 Cannot DM) so isPermanentChannelError can disable the cron row
// even if some bots failed transiently.
const PERMANENT_PRIORITY = new Set([10003, 50001, 50013, 50007]);
const permanent = discordErrorCodes.find((c) => PERMANENT_PRIORITY.has(c));
(wrapper as Error & { code?: number; discordErrorCodes?: number[] }).code =
permanent ?? discordErrorCodes[0];
(wrapper as Error & { code?: number; discordErrorCodes?: number[] }).discordErrorCodes =
[...discordErrorCodes];
}
throw wrapper;
}

async function buildDiscordContext(
Expand Down
79 changes: 79 additions & 0 deletions packages/shared/delivery/permanent-error.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import { describe, expect, test } from "bun:test";
import { isPermanentChannelError } from "./permanent-error";

describe("isPermanentChannelError", () => {
test("matches Slack channel_not_found in message", () => {
expect(
isPermanentChannelError(
new Error("An API error occurred: channel_not_found"),
),
).toBe(true);
});

test("matches Slack SDK error shape with data.error", () => {
const err = Object.assign(new Error("slack_webapi_platform_error"), {
data: { error: "channel_not_found" },
});
expect(isPermanentChannelError(err)).toBe(true);
});

test("matches Slack not_in_channel / is_archived / account_inactive", () => {
expect(isPermanentChannelError(new Error("not_in_channel"))).toBe(true);
expect(isPermanentChannelError(new Error("is_archived"))).toBe(true);
expect(isPermanentChannelError(new Error("channel_is_archived"))).toBe(true);
expect(isPermanentChannelError(new Error("account_inactive"))).toBe(true);
});

test("matches Slack auth-revoked errors", () => {
expect(isPermanentChannelError(new Error("token_revoked"))).toBe(true);
expect(isPermanentChannelError(new Error("invalid_auth"))).toBe(true);
expect(isPermanentChannelError(new Error("missing_scope"))).toBe(true);
});

test("matches Discord-style numeric codes (Missing Access, Unknown Channel)", () => {
expect(
isPermanentChannelError(Object.assign(new Error("Missing Access"), { code: 50001 })),
).toBe(true);
expect(
isPermanentChannelError(Object.assign(new Error("Unknown Channel"), { code: 10003 })),
).toBe(true);
});

test("matches the Discord resolveTextChannel wrapper when it carries a DiscordAPIError code", () => {
// resolveTextChannel() in packages/ims/discord/client.ts wraps the
// underlying DiscordAPIError in a generic Error("... is not text-based
// or inaccessible"). The wrapper forwards the original numeric `code`
// so this helper can still classify it as permanent.
const wrapper = Object.assign(
new Error("Discord channel 123 is not text-based or inaccessible"),
{ code: 10003 },
);
expect(isPermanentChannelError(wrapper)).toBe(true);
});

test("matches Lark chat_not_found", () => {
expect(isPermanentChannelError(new Error("chat_not_found: chat does not exist"))).toBe(true);
});

test("ignores transient / retryable failures", () => {
expect(isPermanentChannelError(new Error("status 429"))).toBe(false);
expect(isPermanentChannelError(new Error("ECONNRESET"))).toBe(false);
expect(isPermanentChannelError(new Error("status 500"))).toBe(false);
expect(isPermanentChannelError(new Error("socket hang up"))).toBe(false);
expect(isPermanentChannelError(new Error("rate_limited"))).toBe(false);
});

test("ignores nullish / empty inputs", () => {
expect(isPermanentChannelError(null)).toBe(false);
expect(isPermanentChannelError(undefined)).toBe(false);
expect(isPermanentChannelError("")).toBe(false);
expect(isPermanentChannelError({})).toBe(false);
});

test("ignores message_not_found (that's a delete/update race, not permanent)", () => {
// `message_not_found` is already handled as a benign update/delete race
// in @/core/observability/sentry. It is NOT a channel-access problem and
// must remain retryable from this helper's perspective.
expect(isPermanentChannelError(new Error("message_not_found"))).toBe(false);
});
});
Loading
Loading